node連接kafka2.0實(shí)現(xiàn)方法示例
Kafka是由Apache軟件基金會開發(fā)的一個(gè)開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)
node.js使用Kafka需要安裝的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart
npm i wisrtoni40-confluent-schema --save
procedurer.ts文件
import { HighLevelProducer, KafkaClient } from 'kafka-node'; import { v4 as uuidv4 } from 'uuid'; import { ConfluentAvroStrategy, ConfluentMultiRegistry, ConfluentPubResolveStrategy, } from 'wisrtoni40-confluent-schema'; /** * ----------------------------------------------------------------------------- * Config * ----------------------------------------------------------------------------- */ const kafkaHost = '你的kafka host'; const topic = '你的topic'; const registryHost = '你的kafka注冊host'; /** * ----------------------------------------------------------------------------- * Kafka Client and Producer * ----------------------------------------------------------------------------- */ const kafkaClient = new KafkaClient({ kafkaHost, clientId: uuidv4(), connectTimeout: 60000, requestTimeout: 60000, connectRetryOptions: { retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: false, }, sasl: { mechanism: 'plain', username: '你的kafka用戶名', password: '你的kafka密碼', }, }); const producer = new HighLevelProducer(kafkaClient, { requireAcks: 1, ackTimeoutMs: 100, }); /** * ----------------------------------------------------------------------------- * Confluent Resolver * ----------------------------------------------------------------------------- */ const schemaRegistry = new ConfluentMultiRegistry(registryHost); const avro = new ConfluentAvroStrategy(); const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic); /** * ----------------------------------------------------------------------------- * Produce * ----------------------------------------------------------------------------- */ (async () => { const data = { evt_dt: 1664446229425, evt_type: 'tower_unload', plant: 'F110', machineName: 'TOWER_01', errorCode: '', description: '', result: 'OK', evt_ns: 'wmy.dx', evt_tp: 'tower.error', evt_pid: 'TOWER_01', evt_pubBy: 'nifi.11142' }; const processedData = await resolver.resolve(data); producer.send([{ topic, messages: processedData }], (error, result) => { if (error) { console.error(error); } else { console.log(result); } }); })();
procedurer.js文件
var kafka_node_1 = require("kafka-node"); var uuid_1 = require("uuid"); var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema"); var kafkaHost = '你的kafka host'; var topic = '你的topic'; var registryHost = '你的kafka注冊host'; const kafkaClient = new kafka_node_1.KafkaClient({ kafkaHost, clientId: (0, uuid_1.v4)(), connectTimeout: 60000, requestTimeout: 60000, connectRetryOptions: { retries: 5, factor: 0, minTimeout: 1000, maxTimeout: 1000, randomize: false, }, sasl: { mechanism: 'plain', username: '你的kafka用戶名', password: '你的kafka密碼', }, }); const producer = new kafka_node_1.HighLevelProducer(kafkaClient, { requireAcks: 1, ackTimeoutMs: 100, }); const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost); const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy(); const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic); (async () => { const data = { evt_dt: 1664446229425, evt_type: 'tower_unload', plant: 'F110', machineName: 'TOWER_01', errorCode: '', description: '', result: 'OK', evt_ns: 'wmy.dx', evt_tp: 'tower.error', evt_pid: 'TOWER_01', evt_pubBy: 'nifi.11142' }; const processedData = await resolver.resolve(data); producer.send([{ topic, messages: processedData }], (error, result) => { if (error) { console.error(error); } else { console.log(result); } }); })();
consumer.ts文件
import { ConsumerGroup } from 'kafka-node'; import { v4 as uuidv4 } from 'uuid'; import { ConfluentAvroStrategy, ConfluentMultiRegistry, ConfluentSubResolveStrategy, } from 'wisrtoni40-confluent-schema'; /** * ----------------------------------------------------------------------------- * Config * ----------------------------------------------------------------------------- */ const kafkaHost = '你的kafka host'; const topic = '你的topic'; const registryHost = '你的kafka注冊host'; /** * ----------------------------------------------------------------------------- * Kafka Consumer * ----------------------------------------------------------------------------- */ const consumer = new ConsumerGroup( { kafkaHost, groupId: uuidv4(), sessionTimeout: 15000, protocol: ['roundrobin'], encoding: 'buffer', fromOffset: 'latest', outOfRangeOffset: 'latest', sasl: { mechanism: 'plain', username: '你的kafka用戶名', password: '你的kafka密碼', }, }, topic, ); /** * ----------------------------------------------------------------------------- * Confluent Resolver * ----------------------------------------------------------------------------- */ const schemaRegistry = new ConfluentMultiRegistry(registryHost); const avro = new ConfluentAvroStrategy(); const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro); /** * ----------------------------------------------------------------------------- * Consume * ----------------------------------------------------------------------------- */ consumer.on('message', async msg => { const result = await resolver.resolve(msg.value); console.log(msg.offset); console.log(result); });
comsumer.js文件
var kafka_node_1 = require("kafka-node"); var uuid_1 = require("uuid"); var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema"); var kafkaHost = '你的kafka host'; var topic = '你的topic'; var registryHost = '你的kafka注冊host'; var consumer = new kafka_node_1.ConsumerGroup({ kafkaHost: kafkaHost, groupId: (0, uuid_1.v4)(), sessionTimeout: 15000, protocol: ['roundrobin'], encoding: 'buffer', fromOffset: 'latest', outOfRangeOffset: 'latest', sasl: { mechanism: 'plain', username: '你的kafka用戶名', password: '你的kafka密碼' } }, topic); var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost); var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy(); var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro); consumer.on('message', async function (msg) { const result = await resolver.resolve(msg.value); console.log(msg.offset); console.log(result); });
附:kafka官網(wǎng): https://kafka.apache.org/
- kafka調(diào)試中遇到Connection to node -1 could not be established. Broker may not be available.
- Docker + Nodejs + Kafka + Redis + MySQL搭建簡單秒殺環(huán)境
- Kafka 常用命令行詳細(xì)介紹及整理
- docker部署kafka的方法步驟
- Kafka利用Java實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)實(shí)例教程
- kafka監(jiān)控獲取指定topic的消息總量示例
- Java使用kafka發(fā)送和生產(chǎn)消息的示例
- 詳解使用docker搭建kafka環(huán)境
相關(guān)文章
nodejs連接ftp上傳下載實(shí)現(xiàn)方法詳解【附:踩坑記錄】
這篇文章主要介紹了nodejs連接ftp上傳下載實(shí)現(xiàn)方法,結(jié)合實(shí)例形式詳細(xì)分析了node.js使用ftp模塊實(shí)現(xiàn)針對ftp上傳、下載相關(guān)操作的方法,并附帶記錄了傳輸速度慢的解決方法,需要的朋友可以參考下2023-04-04Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)
在之前做過一個(gè)小項(xiàng)目,涉及到了文件上傳,在大文件上面使用了斷點(diǎn)續(xù)傳,降低了服務(wù)器方面的壓力,現(xiàn)在小編把Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)分享給大家,感興趣的朋友一起看看吧2022-01-01node.js Sequelize實(shí)現(xiàn)單實(shí)例字段或批量自增、自減
Sequelize 可以實(shí)現(xiàn)針對單個(gè)實(shí)例的一或多個(gè)字段的自增、自減操作,也可以對符合條件的數(shù)據(jù)進(jìn)行批量的自增、自減操作。單個(gè)實(shí)例字段的自增、自減可以利用Instance的相應(yīng)方法實(shí)現(xiàn),而批量自增、自減則需要借助sequelize提供的字面量方法實(shí)現(xiàn)。下面來看看詳細(xì)的介紹吧。2016-12-12Windows下安裝Bun像Node或Deno的現(xiàn)代JS運(yùn)行時(shí)
這篇文章主要為大家介紹了一款像Node或Deno的現(xiàn)代JavaScript運(yùn)行時(shí)的bun在Windows下安裝過程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07node微信開發(fā)之獲取access_token+自定義菜單
這篇文章主要介紹了node微信開發(fā)之獲取access_token+自定義菜單,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03詳解如何使用Node.js實(shí)現(xiàn)熱重載頁面
這篇文章主要介紹了詳解如何使用Node.js實(shí)現(xiàn)熱重載頁面,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05Webpack 實(shí)現(xiàn) Node.js 代碼熱替換
Webpack有一個(gè)很實(shí)用的功能叫做熱替換(Hot-replace),尤其是結(jié)合React Hot Loader插件,開發(fā)過程中都不需要刷新瀏覽器,任何前端代碼的更改都會實(shí)時(shí)的在瀏覽器中表現(xiàn)出來。2015-10-10