欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

node連接kafka2.0實(shí)現(xiàn)方法示例

 更新時(shí)間:2023年05月26日 09:14:18   作者:他強(qiáng)任他強(qiáng)03  
這篇文章主要介紹了node連接kafka2.0,nodejs連接kafka2.0的實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了kafka2.0的功能、原理、以及node.js連接kafka2.0的具體實(shí)現(xiàn)技巧,需要的朋友可以參考下

Kafka是由Apache軟件基金會開發(fā)的一個(gè)開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)

node.js使用Kafka需要安裝的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart

npm i wisrtoni40-confluent-schema --save

procedurer.ts文件

import { HighLevelProducer, KafkaClient } from 'kafka-node';
 import { v4 as uuidv4 } from 'uuid';
 import {
   ConfluentAvroStrategy,
   ConfluentMultiRegistry,
   ConfluentPubResolveStrategy,
 } from 'wisrtoni40-confluent-schema';
 /**
  * -----------------------------------------------------------------------------
  * Config
  * -----------------------------------------------------------------------------
  */
 const kafkaHost = '你的kafka host';
 const topic = '你的topic';
 const registryHost =
   '你的kafka注冊host';
 /**
  * -----------------------------------------------------------------------------
  * Kafka Client and Producer
  * -----------------------------------------------------------------------------
  */
 const kafkaClient = new KafkaClient({
   kafkaHost,
   clientId: uuidv4(),
   connectTimeout: 60000,
   requestTimeout: 60000,
   connectRetryOptions: {
     retries: 5,
     factor: 0,
     minTimeout: 1000,
     maxTimeout: 1000,
     randomize: false,
   },
   sasl: {
     mechanism: 'plain',
     username: '你的kafka用戶名',
     password: '你的kafka密碼',
   },
 });
 const producer = new HighLevelProducer(kafkaClient, {
   requireAcks: 1,
   ackTimeoutMs: 100,
 });
 /**
  * -----------------------------------------------------------------------------
  * Confluent Resolver
  * -----------------------------------------------------------------------------
  */
 const schemaRegistry = new ConfluentMultiRegistry(registryHost);
 const avro = new ConfluentAvroStrategy();
 const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic);
 /**
  * -----------------------------------------------------------------------------
  * Produce
  * -----------------------------------------------------------------------------
  */
 (async () => {
   const data = {
    evt_dt: 1664446229425,
    evt_type: 'tower_unload',
    plant: 'F110',
    machineName: 'TOWER_01',
    errorCode: '',
    description: '',
    result: 'OK',
    evt_ns: 'wmy.dx',
    evt_tp: 'tower.error',
    evt_pid: 'TOWER_01',
    evt_pubBy: 'nifi.11142'
   };
   const processedData = await resolver.resolve(data);
   producer.send([{ topic, messages: processedData }], (error, result) => {
     if (error) {
       console.error(error);
     } else {
       console.log(result);
     }
   });
 })();

procedurer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");
var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注冊host';
const kafkaClient = new kafka_node_1.KafkaClient({
  kafkaHost,
  clientId: (0, uuid_1.v4)(),
  connectTimeout: 60000,
  requestTimeout: 60000,
  connectRetryOptions: {
    retries: 5,
    factor: 0,
    minTimeout: 1000,
    maxTimeout: 1000,
    randomize: false,
  },
  sasl: {
    mechanism: 'plain',
    username: '你的kafka用戶名',
    password: '你的kafka密碼',
  },
});
const producer = new kafka_node_1.HighLevelProducer(kafkaClient, {
  requireAcks: 1,
  ackTimeoutMs: 100,
});
const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic);
(async () => {
  const data = {
    evt_dt: 1664446229425,
    evt_type: 'tower_unload',
    plant: 'F110',
    machineName: 'TOWER_01',
    errorCode: '',
    description: '',
    result: 'OK',
    evt_ns: 'wmy.dx',
    evt_tp: 'tower.error',
    evt_pid: 'TOWER_01',
    evt_pubBy: 'nifi.11142'
  };
  const processedData = await resolver.resolve(data);
  producer.send([{ topic, messages: processedData }], (error, result) => {
    if (error) {
      console.error(error);
    } else {
      console.log(result);
    }
  });
})();

consumer.ts文件

import { ConsumerGroup } from 'kafka-node';
import { v4 as uuidv4 } from 'uuid';
import {
  ConfluentAvroStrategy,
  ConfluentMultiRegistry,
  ConfluentSubResolveStrategy,
} from 'wisrtoni40-confluent-schema';
/**
 * -----------------------------------------------------------------------------
 * Config
 * -----------------------------------------------------------------------------
 */
const kafkaHost = '你的kafka host';
const topic = '你的topic';
const registryHost =
  '你的kafka注冊host';
/**
 * -----------------------------------------------------------------------------
 * Kafka Consumer
 * -----------------------------------------------------------------------------
 */
const consumer = new ConsumerGroup(
  {
    kafkaHost,
    groupId: uuidv4(),
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    encoding: 'buffer',
    fromOffset: 'latest',
    outOfRangeOffset: 'latest',
    sasl: {
      mechanism: 'plain',
      username: '你的kafka用戶名',
      password: '你的kafka密碼',
    },
  },
  topic,
);
/**
 * -----------------------------------------------------------------------------
 * Confluent Resolver
 * -----------------------------------------------------------------------------
 */
const schemaRegistry = new ConfluentMultiRegistry(registryHost);
const avro = new ConfluentAvroStrategy();
const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro);
/**
 * -----------------------------------------------------------------------------
 * Consume
 * -----------------------------------------------------------------------------
 */
consumer.on('message', async msg => {
  const result = await resolver.resolve(msg.value);
  console.log(msg.offset);
  console.log(result);
});

comsumer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");
var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注冊host';
var consumer = new kafka_node_1.ConsumerGroup({
  kafkaHost: kafkaHost,
  groupId: (0, uuid_1.v4)(),
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  encoding: 'buffer',
  fromOffset: 'latest',
  outOfRangeOffset: 'latest',
  sasl: {
    mechanism: 'plain',
    username: '你的kafka用戶名',
    password: '你的kafka密碼'
  }
}, topic);
var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro);
consumer.on('message', async function (msg) {
  const result = await resolver.resolve(msg.value);
  console.log(msg.offset);
  console.log(result);
});

附:kafka官網(wǎng): https://kafka.apache.org/

相關(guān)文章

  • nodejs連接ftp上傳下載實(shí)現(xiàn)方法詳解【附:踩坑記錄】

    nodejs連接ftp上傳下載實(shí)現(xiàn)方法詳解【附:踩坑記錄】

    這篇文章主要介紹了nodejs連接ftp上傳下載實(shí)現(xiàn)方法,結(jié)合實(shí)例形式詳細(xì)分析了node.js使用ftp模塊實(shí)現(xiàn)針對ftp上傳、下載相關(guān)操作的方法,并附帶記錄了傳輸速度慢的解決方法,需要的朋友可以參考下
    2023-04-04
  • Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)

    Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)

    在之前做過一個(gè)小項(xiàng)目,涉及到了文件上傳,在大文件上面使用了斷點(diǎn)續(xù)傳,降低了服務(wù)器方面的壓力,現(xiàn)在小編把Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)分享給大家,感興趣的朋友一起看看吧
    2022-01-01
  • node.js Sequelize實(shí)現(xiàn)單實(shí)例字段或批量自增、自減

    node.js Sequelize實(shí)現(xiàn)單實(shí)例字段或批量自增、自減

    Sequelize 可以實(shí)現(xiàn)針對單個(gè)實(shí)例的一或多個(gè)字段的自增、自減操作,也可以對符合條件的數(shù)據(jù)進(jìn)行批量的自增、自減操作。單個(gè)實(shí)例字段的自增、自減可以利用Instance的相應(yīng)方法實(shí)現(xiàn),而批量自增、自減則需要借助sequelize提供的字面量方法實(shí)現(xiàn)。下面來看看詳細(xì)的介紹吧。
    2016-12-12
  • Windows下安裝Bun像Node或Deno的現(xiàn)代JS運(yùn)行時(shí)

    Windows下安裝Bun像Node或Deno的現(xiàn)代JS運(yùn)行時(shí)

    這篇文章主要為大家介紹了一款像Node或Deno的現(xiàn)代JavaScript運(yùn)行時(shí)的bun在Windows下安裝過程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • node微信開發(fā)之獲取access_token+自定義菜單

    node微信開發(fā)之獲取access_token+自定義菜單

    這篇文章主要介紹了node微信開發(fā)之獲取access_token+自定義菜單,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • 詳解如何使用Node.js實(shí)現(xiàn)熱重載頁面

    詳解如何使用Node.js實(shí)現(xiàn)熱重載頁面

    這篇文章主要介紹了詳解如何使用Node.js實(shí)現(xiàn)熱重載頁面,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • Webpack 實(shí)現(xiàn) Node.js 代碼熱替換

    Webpack 實(shí)現(xiàn) Node.js 代碼熱替換

    Webpack有一個(gè)很實(shí)用的功能叫做熱替換(Hot-replace),尤其是結(jié)合React Hot Loader插件,開發(fā)過程中都不需要刷新瀏覽器,任何前端代碼的更改都會實(shí)時(shí)的在瀏覽器中表現(xiàn)出來。
    2015-10-10
  • 快速解決brew安裝特定版本flow的問題

    快速解決brew安裝特定版本flow的問題

    今天小編就為大家分享一篇快速解決brew安裝特定版本flow的問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-05-05
  • npm查看鏡像源與切換鏡像源方法詳解

    npm查看鏡像源與切換鏡像源方法詳解

    這篇文章主要為大家介紹了npm查看鏡像源與切換鏡像源方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-06-06
  • node.js中的fs.realpath方法使用說明

    node.js中的fs.realpath方法使用說明

    這篇文章主要介紹了node.js中的fs.realpath方法使用說明,本文介紹了fs.realpath的方法說明、語法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下
    2014-12-12

最新評論