欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

KOA+egg.js集成kafka消息隊列的示例

 更新時間:2018年11月09日 10:23:13   作者:KnoveZ  
這篇文章主要介紹了KOA+egg.js集成kafka消息隊列的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

Egg.js : 基于KOA2的企業(yè)級框架

Kafka:高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)

本文章將集成egg + kafka + mysql 的日志系統(tǒng)例子

系統(tǒng)要求:日志記錄,通過kafka進行消息隊列控制

思路圖:

這里消費者和生產(chǎn)者都由日志系統(tǒng)提供

λ.1 環(huán)境準(zhǔn)備

①Kafka

官網(wǎng)下載kafka后,解壓

啟動zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka server

這里config/server.properties中將num.partitions=5,我們設(shè)置5個partitions

bin/kafka-server-start.sh config/server.properties

② egg + mysql

根據(jù)腳手架搭建好egg,再多安裝kafka-node,egg-mysql

mysql 用戶名root 密碼123456

λ.2 集成

1、根目錄新建app.js,這個文件在每次項目加載時候都會運作

'use strict';
 
const kafka = require('kafka-node');
 
module.exports = app => {
 app.beforeStart(async () => {
 const ctx = app.createAnonymousContext();
 
 const Producer = kafka.Producer;
 const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
 const producer = new Producer(client, app.config.producerConfig);
 
 producer.on('error', function(err) {
  console.error('ERROR: [Producer] ' + err);
 });
 
 app.producer = producer;
 
 const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
  autoCommit: false,
 });
 
 consumer.on('message', async function(message) {
  try {
  await ctx.service.log.insert(JSON.parse(message.value));
  consumer.commit(true, (err, data) => {
   console.error('commit:', err, data);
  });
  } catch (error) {
  console.error('ERROR: [GetMessage] ', message, error);
  }
 });
 
 consumer.on('error', function(err) {
  console.error('ERROR: [Consumer] ' + err);
 });
 });
};

上述代碼新建了生產(chǎn)者、消費者。

生產(chǎn)者新建后加載進app全局對象。我們將在請求時候生產(chǎn)消息。這里只是先新建實例

消費者獲取消息將訪問service層的insert方法(數(shù)據(jù)庫插入數(shù)據(jù))。

具體參數(shù)可以參考kafka-node官方API,往下看會有生產(chǎn)者和消費者的配置參數(shù)。

2、controller · log.js

這里獲取到了producer,并傳往service層

'use strict';
 
const Controller = require('egg').Controller;
 
class LogController extends Controller {
 /**
 * @description Kafka控制日志信息流
 * @host /log/notice
 * @method POST
 * @param {Log} log 日志信息
 */
 async notice() {
 const producer = this.ctx.app.producer;
 const Response = new this.ctx.app.Response();
 
 const requestBody = this.ctx.request.body;
 const backInfo = await this.ctx.service.log.send(producer, requestBody);
 this.ctx.body = Response.success(backInfo);
 }
}
 
module.exports = LogController;

3、service · log.js

這里有一個send方法,這里調(diào)用了producer.send ,進行生產(chǎn)者生產(chǎn)

insert方法則是數(shù)據(jù)庫插入數(shù)據(jù)

'use strict';
 
const Service = require('egg').Service;
const uuidv1 = require('uuid/v1');
 
class LogService extends Service {
 async send(producer, params) {
 const payloads = [
  {
  topic: this.ctx.app.config.topic,
  messages: JSON.stringify(params),
  },
 ];
 
 producer.send(payloads, function(err, data) {
  console.log('send : ', data);
 });
 
 return 'success';
 }
 async insert(message) {
 try {
  const logDB = this.ctx.app.mysql.get('log');
  const ip = this.ctx.ip;
 
  const Logs = this.ctx.model.Log.build({
  id: uuidv1(),
  type: message.type || '',
  level: message.level || 0,
  operator: message.operator || '',
  content: message.content || '',
  ip,
  user_agent: message.user_agent || '',
  error_stack: message.error_stack || '',
  url: message.url || '',
  request: message.request || '',
  response: message.response || '',
  created_at: new Date(),
  updated_at: new Date(),
  });
 
  const result = await logDB.insert('logs', Logs.dataValues);
 
  if (result.affectedRows === 1) {
  console.log(`SUCEESS: [Insert ${message.type}]`);
  } else console.error('ERROR: [Insert DB] ', result);
 } catch (error) {
  console.error('ERROR: [Insert] ', message, error);
 }
 }
}
 
module.exports = LogService;

4、config · config.default.js

一些上述代碼用到的配置參數(shù)具體在這里,注這里開了5個partition。

'use strict';
 
module.exports = appInfo => {
 const config = (exports = {});
 
 const topic = 'logAction_p5';
 
 // add your config here
 config.middleware = [];
 
 config.security = {
 csrf: {
  enable: false,
 },
 };
 
 // mysql database configuration
 config.mysql = {
 clients: {
  basic: {
  host: 'localhost',
  port: '3306',
  user: 'root',
  password: '123456',
  database: 'merchants_basic',
  },
  log: {
  host: 'localhost',
  port: '3306',
  user: 'root',
  password: '123456',
  database: 'merchants_log',
  },
 },
 default: {},
 app: true,
 agent: false,
 };
 
 // sequelize config
 config.sequelize = {
 dialect: 'mysql',
 database: 'merchants_log',
 host: 'localhost',
 port: '3306',
 username: 'root',
 password: '123456',
 dialectOptions: {
  requestTimeout: 999999,
 },
 pool: {
  acquire: 999999,
 },
 };
 
 // kafka config
 config.kafkaHost = 'localhost:9092';
 
 config.topic = topic;
 
 config.producerConfig = {
 // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
 partitionerType: 1,
 };
 
 config.consumerTopics = [
 { topic, partition: 0 },
 { topic, partition: 1 },
 { topic, partition: 2 },
 { topic, partition: 3 },
 { topic, partition: 4 },
 ];
 
 return config;
};

5、實體類:

mode · log.js

這里使用了 Sequelize

'use strict';
 
module.exports = app => {
 const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
 
 const Log = app.model.define('log', {
 /**
  * UUID
  */
 id: { type: STRING(36), primaryKey: true },
 /**
  * 日志類型
  */
 type: STRING(100),
 /**
  * 優(yōu)先等級(數(shù)字越高,優(yōu)先級越高)
  */
 level: INTEGER,
 /**
  * 操作者
  */
 operator: STRING(50),
 /**
  * 日志內(nèi)容
  */
 content: TEXT,
 /**
  * IP
  */
 ip: STRING(36),
 /**
  * 當(dāng)前用戶代理信息
  */
 user_agent: STRING(150),
 /**
  * 錯誤堆棧
  */
 error_stack: TEXT,
 /**
  * URL
  */
 url: STRING(255),
 /**
  * 請求對象
  */
 request: TEXT,
 /**
  * 響應(yīng)對象
  */
 response: TEXT,
 /**
  * 創(chuàng)建時間
  */
 created_at: DATE,
 /**
  * 更新時間
  */
 updated_at: DATE,
 });
 
 return Log;
};

6、測試Python腳本:

import requests
 
from multiprocessing import Pool
from threading import Thread
 
from multiprocessing import Process
 
 
def loop():
 t = 1000
 while t:
  url = "http://localhost:7001/log/notice"
 
  payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
  headers = {
  'Content-Type': "application/json",
  'Cache-Control': "no-cache"
  }
 
  response = requests.request("POST", url, data=payload, headers=headers)
 
  print(response.text)
 
if __name__ == '__main__':
 for i in range(10):
  t = Thread(target=loop)
  t.start()

7、建表語句:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
 
-- ----------------------------
-- Table structure for logs
-- ----------------------------
DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs` (
 `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
 `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志類型',
 `level` int(11) NULL DEFAULT NULL COMMENT '優(yōu)先等級(數(shù)字越高,優(yōu)先級越高)',
 `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人',
 `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息',
 `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP',
 `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當(dāng)前用戶代理信息',
 `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '錯誤堆棧',
 `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當(dāng)前URL',
 `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '請求對象',
 `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '響應(yīng)對象',
 `created_at` datetime(0) NULL DEFAULT NULL COMMENT '創(chuàng)建時間',
 `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新時間',
 PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
 
SET FOREIGN_KEY_CHECKS = 1;

λ.3 后話

網(wǎng)上類似資料甚少,啃各種文檔,探尋技術(shù)實現(xiàn)方式

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Node.js的基本知識簡單匯總

    Node.js的基本知識簡單匯總

    本文主要給大家簡單介紹了Node.js的基本知識,包括概念、特點、歷史、案例的相關(guān)資料,需要的朋友可以參考下
    2016-09-09
  • nodejs用gulp管理前端文件方法

    nodejs用gulp管理前端文件方法

    本篇文章給大家分享了nodejs用gulp管理前端文件的步驟方法以及優(yōu)缺點分析,有興趣的朋友參考下。
    2018-06-06
  • node.js中的querystring.parse方法使用說明

    node.js中的querystring.parse方法使用說明

    這篇文章主要介紹了node.js中的querystring.parse方法使用說明,本文介紹了querystring.parse的方法說明、語法、接收參數(shù)、使用實例和實現(xiàn)源碼,需要的朋友可以參考下
    2014-12-12
  • Node.js圖片驗證碼識別功能

    Node.js圖片驗證碼識別功能

    現(xiàn)在越來越多的網(wǎng)站采取圖片驗證碼,防止機器惡意向服務(wù)端發(fā)送請求,但是常規(guī)的圖片驗證碼也不是非常安全了,有非常多第三方庫可以對圖片上的數(shù)字文字等進行識別,下面小編給大家分享Node.js圖片驗證碼識別功能實現(xiàn),感興趣的朋友一起看看吧
    2024-04-04
  • 詳解nodejs通過響應(yīng)回寫的方式渲染頁面資源

    詳解nodejs通過響應(yīng)回寫的方式渲染頁面資源

    本篇文章主要介紹了詳解nodejs通過響應(yīng)回寫的方式渲染頁面資源,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • NodeJS學(xué)習(xí)筆記之Module的簡介

    NodeJS學(xué)習(xí)筆記之Module的簡介

    模塊是Node.js 應(yīng)用程序的基本組成部分,文件和模塊是一一對應(yīng)的。換言之,一個 Node.js 文件就是一個模塊,這個文件可能是JavaScript 代碼、JSON 或者編譯過的C/C++ 擴展。
    2017-03-03
  • Node中文件斷點續(xù)傳原理和方法總結(jié)

    Node中文件斷點續(xù)傳原理和方法總結(jié)

    在之前做過一個小項目,涉及到了文件上傳,在大文件上面使用了斷點續(xù)傳,降低了服務(wù)器方面的壓力,現(xiàn)在小編把Node中文件斷點續(xù)傳原理和方法總結(jié)分享給大家,感興趣的朋友一起看看吧
    2022-01-01
  • nodejs搭建本地http服務(wù)器教程

    nodejs搭建本地http服務(wù)器教程

    本篇文章主要介紹了nodejs搭建本地http服務(wù)器教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • Node登錄權(quán)限驗證token驗證實現(xiàn)的方法示例

    Node登錄權(quán)限驗證token驗證實現(xiàn)的方法示例

    這篇文章主要介紹了Node登錄權(quán)限驗證token驗證實現(xiàn)的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • Node的stream數(shù)據(jù)流你了解嗎

    Node的stream數(shù)據(jù)流你了解嗎

    這篇文章主要為大家詳細介紹了Node的stream數(shù)據(jù)流,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-02-02

最新評論