!> IOT Hub应用实际开发过程中的一些注意细节
资源:
const iothub = require('azure-iothub');
const registry = iothub.Registry.fromConnectionString('[connectionString]');
const device = new iothub.Device(null);
device.deviceId = '[deviceId]';
function printDeviceInfo(err, deviceInfo, res) {
if (deviceInfo) {
console.log(JSON.stringify(deviceInfo, null, 2));
console.log(`Device id: ${deviceInfo.deviceId}`);
console.log(`Device key: ${deviceInfo.authentication.symmetricKey.primaryKey}`);
}
}
// 删除设备 registry.delete(deviceId, (err, deviceInfo, res) => {});
registry.create(device, (err, deviceInfo, res) => {
if (err) {
registry.get(device.deviceId, printDeviceInfo);
}
if (deviceInfo) {
printDeviceInfo(err, deviceInfo, res);
}
});
const clientFromConnectionString = require('azure-iot-device-mqtt').clientFromConnectionString;
const Message = require('azure-iot-device').Message;
const connectionString = 'HostName=[修改连接主机];DeviceId=[deviceID];SharedAccessKey=[连接密钥]';
const client = clientFromConnectionString(connectionString);
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(`${op} error: ${err.toString()}`);
if (res) console.log(`${op} status: ${res.constructor.name}`);
};
}
const connectCallback = function (err) {
if (err) {
console.log(`Could not connect: ${err}`);
} else {
console.log('Client connected');
// Create a message and send it to the IoT Hub every second
setInterval(() => {
const windSpeed = 10 + (Math.random() * 4);
const data = JSON.stringify({ deviceId: 'myFirstNodeDevice', windSpeed });
const message = new Message(data);
console.log(`Sending message: ${message.getData()}`);
client.sendEvent(message, printResultFor('send'));
}, 1000);
}
};
client.open(connectCallback);
const EventHubClient = require('azure-event-hubs').Client;
const connectionString = 'HostName=[修改连接主机];SharedAccessKeyName=iothubowner;SharedAccessKey=[修改连接密钥]';
const printError = function (err) {
console.log(err.message);
};
const printMessage = function (message) {
console.log('Message received: ');
console.log(JSON.stringify(message.body));
Object.getOwnPropertyNames(message).forEach((x) => {
console.log(x, message[x]);
});
console.log('');
};
const client = EventHubClient.fromConnectionString(connectionString);
client.open()
.then(client.getPartitionIds.bind(client))
.then(partitionIds => partitionIds.map(partitionId => client.createReceiver('$Default', partitionId, { startAfterTime: Date.now()}).then((receiver) => {
console.log(`Created partition receiver: ${partitionId}`);
receiver.on('errorReceived', printError);
receiver.on('message', printMessage);
})))
.catch(printError);
注意:
properties
,在消息体中是message.applicationProperties
startAfterTime
或startAfterOffset
参数来决定启动时间, 可用记录上一次消息的offset
/enqueued-time
来保证中断重启能够继续从未处理的消息开始读取message
包含的属性如下:[ 'partitionKey',
'body',
'enqueuedTimeUtc',
'offset',
'properties',
'applicationProperties',
'sequenceNumber',
'annotations',
'systemProperties' ]
消息体示例:
Message received:
partitionKey undefined
body { deviceId: 'myFirstNodeDevice', windSpeed: 10.51685587945142 }
enqueuedTimeUtc 2017-06-13T01:21:02.519Z
offset 73240
properties undefined
applicationProperties { asdf: 'asdfz' }
sequenceNumber 182
annotations { 'x-opt-sequence-number': 182,
'x-opt-offset': '73240',
'x-opt-enqueued-time': 2017-06-13T01:21:02.519Z,
'iothub-connection-device-id': 'myFirstNodeDevice',
'iothub-connection-auth-method': '{ "scope": "device", "type": "sas", "issuer": "iothub" }',
'iothub-connection-auth-generation-id': 'xxxxxxx',
'iothub-enqueuedtime': 2017-06-13T01:21:02.786Z,
'iothub-message-source': 'Telemetry' }
systemProperties undefined
点击进入已创建的实体
不要从别处获得连接字符串,因为可能无法连接. 最终获得的连接字符串应当包含EntityPath
字段,类似:
Endpoint=sb://xxxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=iothubroutes_xxxx;SharedAccessKey=xxxx;EntityPath=xxxx
将 Event Hubs 里的事件关联到 IoT Hub
const clientFromConnectionString = require('azure-iot-device-mqtt').clientFromConnectionString;
const Message = require('azure-iot-device').Message;
const connectionString = 'HostName=[修改连接主机];DeviceId=[deviceID];SharedAccessKey=[连接密钥]';
const client = clientFromConnectionString(connectionString);
function printResultFor(op) {
return function printResult(err, res) {
if (err) console.log(`${op} error: ${err.toString()}`);
if (res) console.log(`${op} status: ${res.constructor.name}`);
};
}
const connectCallback = function (err) {
if (err) {
console.log(`Could not connect: ${err}`);
} else {
console.log('Client connected');
// Create a message and send it to the IoT Hub every second
setInterval(() => {
const windSpeed = 10 + (Math.random() * 4);
const data = JSON.stringify({ deviceId: 'myFirstNodeDevice', windSpeed });
const message = new Message(data);
// 随机发送到路由或默认事件上
if (Math.round(Math.random()) === 1) {
message.properties.add('route', 'test');
}
console.log(`Sending message: ${message.getData()}`);
client.sendEvent(message, printResultFor('send'));
}, 1000);
}
};
client.open(connectCallback);
无需修改,直接启动
复制 IoT Hub 侦听源码,修改连接字符串:
const EventHubClient = require('azure-event-hubs').Client;
// const connectionString = 'HostName=[修改连接主机];SharedAccessKeyName=iothubowner;SharedAccessKey=[修改连接密钥]';
const connectionString = 'Endpoint=[sb://修改连接主机.servicebus.chinacloudapi.cn/];SharedAccessKeyName=[修改连接策略];SharedAccessKey=[x修改连接密钥];EntityPath=[事件实体]'
const printError = function (err) {
console.log(err.message);
};
const printMessage = function (message) {
console.log('Message received: ');
console.log(JSON.stringify(message.body));
console.log(message);
console.log('');
};
const client = EventHubClient.fromConnectionString(connectionString);
client.open()
.then(client.getPartitionIds.bind(client))
.then(partitionIds => partitionIds.map(partitionId => client.createReceiver('$Default', partitionId, { startAfterTime: Date.now()}).then((receiver) => {
console.log(`Created partition receiver: ${partitionId}`);
receiver.on('errorReceived', printError);
receiver.on('message', printMessage);
})))
.catch(printError);
至此,完成路由转发.