本章主要讲解:
首先需要分别创建 IOT Hub 和 EventHubs.
注意这里, 状态
初始不显示, 首次消息通讯成功后再来这里看应该会变.
系统可能会存在这样的Bug, 怎么也收不到消息. 这里的状态就会一直不显示:
像这样的情况发生的时候, 删除路由,删除终结点,删除事件中心, 重头再来.
仔细看上图, 两个路由的查询条件一样, 但分别放进 test
/test2
两个不同的终结点.
// eventhub.js
const EventHubClient = require('azure-event-hubs').Client;
const getPartitionIds = async (client) => {
const partitionIds = await client.getPartitionIds();
return partitionIds;
};
const defaultErrorHandler = async (err) => {
console.error(err);
};
const defaultMessageHandler = (message) => {
console.log(message.body);
console.log(message.body.toString());
};
const createReceiver = async ({ connStr = '', messageHandler = defaultMessageHandler } = {}, errorHandler = defaultErrorHandler) => {
const client = EventHubClient.fromConnectionString(connStr);
await client.open();
const partitionIds = await getPartitionIds(client);
partitionIds.forEach(async (partitionId) => {
const receiver = await client.createReceiver('$Default', partitionId, { startAfterTime: Date.now() });
receiver.on('errorReceived', async (err) => {
if (err.transport && err.transport.name === 'AmqpProtocolError') {
console.log(`Restart #${partitionId}`);
await createReceiver({ connStr, errorHandler, messageHandler });
} else {
await errorHandler(err);
}
});
receiver.on('message', messageHandler);
});
};
module.exports = createReceiver;
// server.js
const receiver = require('./eventhub');
(async () => {
await receiver({
connStr: 'test事件中心的连接',
messageHandler: (msg) => {
console.log('test');
console.log(msg.body);
}
});
await receiver({
connStr: 'test2事件中心的连接',
messageHandler: (msg) => {
console.log('test2');
console.log(msg.body);
}
});
})();
参考官方的示例: https://github.com/azure/azure-iot-sdk-node/blob/master/device/samples/simple_sample_device.js#L44
注意第44行位置, 下面添加一行, 加入路由属性
message.properties.add('route', 'tt');