KOA+egg.js集成kafka消息隊(duì)列的示例
Egg.js : 基于KOA2的企業(yè)級(jí)框架
Kafka:高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)
本文章將集成egg + kafka + mysql 的日志系統(tǒng)例子
系統(tǒng)要求:日志記錄,通過kafka進(jìn)行消息隊(duì)列控制
思路圖:

這里消費(fèi)者和生產(chǎn)者都由日志系統(tǒng)提供
λ.1 環(huán)境準(zhǔn)備
①Kafka
官網(wǎng)下載kafka后,解壓
啟動(dòng)zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動(dòng)Kafka server
這里config/server.properties中將num.partitions=5,我們?cè)O(shè)置5個(gè)partitions
bin/kafka-server-start.sh config/server.properties
② egg + mysql
根據(jù)腳手架搭建好egg,再多安裝kafka-node,egg-mysql
mysql 用戶名root 密碼123456
λ.2 集成
1、根目錄新建app.js,這個(gè)文件在每次項(xiàng)目加載時(shí)候都會(huì)運(yùn)作
'use strict';
const kafka = require('kafka-node');
module.exports = app => {
app.beforeStart(async () => {
const ctx = app.createAnonymousContext();
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
const producer = new Producer(client, app.config.producerConfig);
producer.on('error', function(err) {
console.error('ERROR: [Producer] ' + err);
});
app.producer = producer;
const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
autoCommit: false,
});
consumer.on('message', async function(message) {
try {
await ctx.service.log.insert(JSON.parse(message.value));
consumer.commit(true, (err, data) => {
console.error('commit:', err, data);
});
} catch (error) {
console.error('ERROR: [GetMessage] ', message, error);
}
});
consumer.on('error', function(err) {
console.error('ERROR: [Consumer] ' + err);
});
});
};
上述代碼新建了生產(chǎn)者、消費(fèi)者。
生產(chǎn)者新建后加載進(jìn)app全局對(duì)象。我們將在請(qǐng)求時(shí)候生產(chǎn)消息。這里只是先新建實(shí)例
消費(fèi)者獲取消息將訪問service層的insert方法(數(shù)據(jù)庫插入數(shù)據(jù))。
具體參數(shù)可以參考kafka-node官方API,往下看會(huì)有生產(chǎn)者和消費(fèi)者的配置參數(shù)。
2、controller · log.js
這里獲取到了producer,并傳往service層
'use strict';
const Controller = require('egg').Controller;
class LogController extends Controller {
/**
* @description Kafka控制日志信息流
* @host /log/notice
* @method POST
* @param {Log} log 日志信息
*/
async notice() {
const producer = this.ctx.app.producer;
const Response = new this.ctx.app.Response();
const requestBody = this.ctx.request.body;
const backInfo = await this.ctx.service.log.send(producer, requestBody);
this.ctx.body = Response.success(backInfo);
}
}
module.exports = LogController;
3、service · log.js
這里有一個(gè)send方法,這里調(diào)用了producer.send ,進(jìn)行生產(chǎn)者生產(chǎn)
insert方法則是數(shù)據(jù)庫插入數(shù)據(jù)
'use strict';
const Service = require('egg').Service;
const uuidv1 = require('uuid/v1');
class LogService extends Service {
async send(producer, params) {
const payloads = [
{
topic: this.ctx.app.config.topic,
messages: JSON.stringify(params),
},
];
producer.send(payloads, function(err, data) {
console.log('send : ', data);
});
return 'success';
}
async insert(message) {
try {
const logDB = this.ctx.app.mysql.get('log');
const ip = this.ctx.ip;
const Logs = this.ctx.model.Log.build({
id: uuidv1(),
type: message.type || '',
level: message.level || 0,
operator: message.operator || '',
content: message.content || '',
ip,
user_agent: message.user_agent || '',
error_stack: message.error_stack || '',
url: message.url || '',
request: message.request || '',
response: message.response || '',
created_at: new Date(),
updated_at: new Date(),
});
const result = await logDB.insert('logs', Logs.dataValues);
if (result.affectedRows === 1) {
console.log(`SUCEESS: [Insert ${message.type}]`);
} else console.error('ERROR: [Insert DB] ', result);
} catch (error) {
console.error('ERROR: [Insert] ', message, error);
}
}
}
module.exports = LogService;
4、config · config.default.js
一些上述代碼用到的配置參數(shù)具體在這里,注這里開了5個(gè)partition。
'use strict';
module.exports = appInfo => {
const config = (exports = {});
const topic = 'logAction_p5';
// add your config here
config.middleware = [];
config.security = {
csrf: {
enable: false,
},
};
// mysql database configuration
config.mysql = {
clients: {
basic: {
host: 'localhost',
port: '3306',
user: 'root',
password: '123456',
database: 'merchants_basic',
},
log: {
host: 'localhost',
port: '3306',
user: 'root',
password: '123456',
database: 'merchants_log',
},
},
default: {},
app: true,
agent: false,
};
// sequelize config
config.sequelize = {
dialect: 'mysql',
database: 'merchants_log',
host: 'localhost',
port: '3306',
username: 'root',
password: '123456',
dialectOptions: {
requestTimeout: 999999,
},
pool: {
acquire: 999999,
},
};
// kafka config
config.kafkaHost = 'localhost:9092';
config.topic = topic;
config.producerConfig = {
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 1,
};
config.consumerTopics = [
{ topic, partition: 0 },
{ topic, partition: 1 },
{ topic, partition: 2 },
{ topic, partition: 3 },
{ topic, partition: 4 },
];
return config;
};
5、實(shí)體類:
mode · log.js
這里使用了 Sequelize
'use strict';
module.exports = app => {
const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
const Log = app.model.define('log', {
/**
* UUID
*/
id: { type: STRING(36), primaryKey: true },
/**
* 日志類型
*/
type: STRING(100),
/**
* 優(yōu)先等級(jí)(數(shù)字越高,優(yōu)先級(jí)越高)
*/
level: INTEGER,
/**
* 操作者
*/
operator: STRING(50),
/**
* 日志內(nèi)容
*/
content: TEXT,
/**
* IP
*/
ip: STRING(36),
/**
* 當(dāng)前用戶代理信息
*/
user_agent: STRING(150),
/**
* 錯(cuò)誤堆棧
*/
error_stack: TEXT,
/**
* URL
*/
url: STRING(255),
/**
* 請(qǐng)求對(duì)象
*/
request: TEXT,
/**
* 響應(yīng)對(duì)象
*/
response: TEXT,
/**
* 創(chuàng)建時(shí)間
*/
created_at: DATE,
/**
* 更新時(shí)間
*/
updated_at: DATE,
});
return Log;
};
6、測(cè)試Python腳本:
import requests
from multiprocessing import Pool
from threading import Thread
from multiprocessing import Process
def loop():
t = 1000
while t:
url = "http://localhost:7001/log/notice"
payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
headers = {
'Content-Type': "application/json",
'Cache-Control': "no-cache"
}
response = requests.request("POST", url, data=payload, headers=headers)
print(response.text)
if __name__ == '__main__':
for i in range(10):
t = Thread(target=loop)
t.start()
7、建表語句:
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for logs -- ---------------------------- DROP TABLE IF EXISTS `logs`; CREATE TABLE `logs` ( `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志類型', `level` int(11) NULL DEFAULT NULL COMMENT '優(yōu)先等級(jí)(數(shù)字越高,優(yōu)先級(jí)越高)', `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人', `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息', `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP', `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當(dāng)前用戶代理信息', `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '錯(cuò)誤堆棧', `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '當(dāng)前URL', `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '請(qǐng)求對(duì)象', `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '響應(yīng)對(duì)象', `created_at` datetime(0) NULL DEFAULT NULL COMMENT '創(chuàng)建時(shí)間', `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新時(shí)間', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
λ.3 后話
網(wǎng)上類似資料甚少,啃各種文檔,探尋技術(shù)實(shí)現(xiàn)方式
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
node.js中的querystring.parse方法使用說明
這篇文章主要介紹了node.js中的querystring.parse方法使用說明,本文介紹了querystring.parse的方法說明、語法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下2014-12-12
NodeJS學(xué)習(xí)筆記之Module的簡(jiǎn)介
模塊是Node.js 應(yīng)用程序的基本組成部分,文件和模塊是一一對(duì)應(yīng)的。換言之,一個(gè) Node.js 文件就是一個(gè)模塊,這個(gè)文件可能是JavaScript 代碼、JSON 或者編譯過的C/C++ 擴(kuò)展。2017-03-03
Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)
在之前做過一個(gè)小項(xiàng)目,涉及到了文件上傳,在大文件上面使用了斷點(diǎn)續(xù)傳,降低了服務(wù)器方面的壓力,現(xiàn)在小編把Node中文件斷點(diǎn)續(xù)傳原理和方法總結(jié)分享給大家,感興趣的朋友一起看看吧2022-01-01
Node登錄權(quán)限驗(yàn)證token驗(yàn)證實(shí)現(xiàn)的方法示例
這篇文章主要介紹了Node登錄權(quán)限驗(yàn)證token驗(yàn)證實(shí)現(xiàn)的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05

