node kafkajs

由 夕空 撰写于  2022年4月6日
const { Kafka } = require('kafkajs');
var kafka, producer, kafkajson;

fs.readFile(nwPath.resolve(nwDir, '../kafka-topic.json'), function (err, data) {
if (err) {
layer.open({
content: '缺少kafka-topic.json文件!'
, btn: '关闭'
});
// updater();
throw err;
}
if (data[0] === 0xEF && data[1] === 0xBB && data[2] === 0xBF) {
//解决记事本保存的utf8会带dom,导致无法JSON.parse
data = data.slice(3);
}
kafkajson = JSON.parse(data.toString());

kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout: 30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: [kafkajson.messageServerUri]
})
//接收
const consumer = kafka.consumer({ groupId: 'test-group' })
consumer.connect().then(e => {
return consumer;
}).then(e => {
return consumer.subscribe({ topic: kafkajson.receiveTopic, fromBeginning: true });
}).then(e => {
consumer.run({
eachMessage: ({ topic, partition, message }) => {
kafkasend(message.value.toString())
},
})
}).catch(e => {
console.log("CATCH:",e);
})


//发送
producer = kafka.producer()
// kafkasend("hello")

})


function kafkasend(params) {
console.log("发送kafka");
producer.connect().then(e => {
return producer
}).then(e => {
return producer.send({
topic: kafkajson.sendTopic,
messages: params,
})
}).then(e => {
producer.disconnect()
console.log('hello');
location.reload();//刷新页面
}).catch(e => {
console.log("发送错误");
})
}



声明:星耀夕空|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - node kafkajs


欢迎光顾我的小站!