node.js aedes 服务端mqtt

由 夕空 撰写于  2024年4月28日

服务端:

npm i aedes

const aedes = require('aedes')({
heartbeatInterval: 60000, //60s发送一次心跳包
connectTimeout: 120000, //如果与服务器120s没有收到连接客户端发过来的心跳包,则视为连接断开
})
const mqttServer = require('net').createServer(aedes.handle)
const port = 1883

mqttServer.listen(port, function () {
console.log('mqttServer started and listening on port ', port)
});

// 身份验证
aedes.authenticate = function (client, username, password, callback) {
callback(null, (username === 'user' && password.toString() === '123456')); //第一个可以是错误实例,第二个是是否同意连接
}

// 客户端连接
aedes.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id);
});

// 客户端断开,当有客户端主动断开连接或者服务器120s内没收到某个客户端的心跳包就会触发该回调
aedes.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id);
});

//订阅主题。该函数第一个参数是要订阅的主题; 第二个是用于处理收到的消息的函,它接受两个参数:packet 和 callback。packet 是一个 AedesPublishPacket 对象,表示收到的消息;callback 是一个函数,用于在消息处理完成后通知 aedes 服务器;第三个参数是订阅成功的回调函数
aedes.subscribe("msg", function (packet, callback) {
console.log("Server收到订阅消息:", packet.payload.toString());
callback();
}, () => {
console.log("订阅msg成功");
});

//处理收到的消息,我们订阅所有主题收到的消息都可以通过这个事件获取(我们可以把订阅收到消息的处理函数写在上面订阅主题函数的第二个参数里面,或者统一写在下面)
aedes.on("publish", async function (packet, client){
//packet.topic表示哪个主题,packet.payload是收到的数据,是一串二进制数据,我们需要用.toString()将它转化为字符串
if(packet.topic === 'msg'){
console.log("Received message:", packet.payload.toString(), client.id);
}
})
// 错误触发
aedes.on('clientError', function (client, error) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id);
});
// 连接错误
aedes.on('connectionError', function (client, error) {
console.log('connectionError: MQTT支持版本:3.1 & 3.1.1');
});

//发布主题
setInterval(()=>{ //两秒发布一次
aedes.publish({
topic: "text", //发布主题
payload: "yes", //消息内容
qos: 1, //MQTT消息的服务质量(quality of service)。服务质量是1,这意味着这个消息需要至少一次确认(ACK)才能被认为是传输成功
retain: false, // MQTT消息的保留标志(retain flag),它用于控制消息是否应该被保留在MQTT服务器上,以便新的订阅者可以接收到它。保留标志是false,这意味着这个消息不应该被保留
cmd: "publish", // MQTT消息的命令(command),它用于控制消息的类型。命令是"publish",这意味着这个消息是一个发布消息
dup: false //判断消息是否是重复的
}, (err) => { //发布失败的回调
if (err) {
console.log('发布失败')
}
});
},2000)


客户端:

npm i mqtt
const mqtt = require("mqtt");

const client = mqtt.connect('mqtt://127.0.0.1:1883', {
username: "user",
password: '123456',
clientId: 'mqtt_client123'
});

// 连接成功
client.on("connect", function () {
console.log("服务器连接成功");
console.log(client.options.clientId);
client.subscribe("text/#", { qos: 1 }); // 订阅text消息。/# 表示订阅text下的所有主题
});

client.on("message", function (top, message) {
console.log("当前topic:", top, "client收到:",message.toString());
});

client.on("error", function (err) {
console.log("连接失败:", err);
});

//发布消息
client.publish("text/subset", '订阅主题子消息');


参考:https://blog.csdn.net/qczxl520/article/details/115165285
https://blog.csdn.net/m0_61665835/article/details/130586059

声明:星耀夕空|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - node.js aedes 服务端mqtt


欢迎光顾我的小站!