Node.js + Redis Sorted Set實現(xiàn)任務(wù)隊列
需求:功能 A 需要調(diào)用第三方 API 獲取數(shù)據(jù),而第三方 API 自身是異步處理方式,在調(diào)用后會返回數(shù)據(jù)與狀態(tài) { data: "查詢結(jié)果", "status": "正在異步處理中" } ,這樣就需要間隔一段時間后再去調(diào)用第三方 API 獲取數(shù)據(jù)。為了用戶在使用功能 A 時不會因為第三方 API 正在異步處理中而必須等待,將用戶請求加入任務(wù)隊列中,返回部分數(shù)據(jù)并關(guān)閉請求。然后定時從任務(wù)隊列里中取出任務(wù)調(diào)用第三方 API,若返回狀態(tài)為”異步處理中“,將該任務(wù)再次加入任務(wù)隊列,若返回狀態(tài)為”已處理完畢“,將返回數(shù)據(jù)入庫。
根據(jù)以上問題,想到使用 Node.js + Redis sorted set 來實現(xiàn)任務(wù)隊列。Node.js 實現(xiàn)自身應(yīng)用 API 用來接受用戶請求,合并數(shù)據(jù)庫已存數(shù)據(jù)與 API 返回的部分數(shù)據(jù)返回給用戶,并將任務(wù)加入到任務(wù)隊列中。利用 Node.js child process 與 cron 定時從任務(wù)隊列中取出任務(wù)執(zhí)行。
在設(shè)計任務(wù)隊列的過程中需要考慮到的幾個問題
- 并行執(zhí)行多個任務(wù)
- 任務(wù)唯一性
- 任務(wù)成功或失敗后的處理
針對以上問題的解決方案
- 并行執(zhí)行多個任務(wù)利用 Promise.all 來實現(xiàn)
- 任務(wù)唯一性利用 Redis sorted set 來實現(xiàn)。使用時間戳作為分值可以實現(xiàn)將 sorted set 作為 list 來使用,在加入任務(wù)時判斷任務(wù)是否已經(jīng)存在,在取出任務(wù)執(zhí)行時將該任務(wù)分值設(shè)置為 0,每次取出分值大于 0 的任務(wù)來執(zhí)行,可以避免重復(fù)執(zhí)行任務(wù)。
- 執(zhí)行任務(wù)成功后刪除任務(wù),執(zhí)行任務(wù)失敗后將任務(wù)分值更新為當前時間時間戳,這樣就可以將失敗的任務(wù)重新加入任務(wù)隊列尾部
示例代碼
// remote_api.js 模擬第三方 API 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請求 res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] }); }, 3000); }); app.listen('9001', () => { console.log('API 服務(wù)監(jiān)聽端口:9001'); }); // producer.js 自身應(yīng)用 API,用來接受用戶請求并將任務(wù)加入任務(wù)隊列 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // 先判斷任務(wù)是否已經(jīng)存在,存在:跳過,不存在:加入任務(wù)隊列 redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { if (error) { console.log(error); } else { if (task) { console.log('任務(wù)已存在,不新增相同任務(wù)'); callback(null, task); } else { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { res.status(200).send('正在查詢中......'); } }); }); app.listen(9002, () => { console.log('生產(chǎn)者服務(wù)監(jiān)聽端口:9002'); }); // consumer.js 定時獲取任務(wù)并執(zhí)行 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule'); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // 并行執(zhí)行任務(wù)數(shù)量 function getTasksFromQueue(callback) { // 獲取多個任務(wù) redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { if (error) { callback(error); } else { // 將任務(wù)分值設(shè)置為 0,表示正在處理 if (tasks.length > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { if (error) { callback(error); } else { callback(null, tasks) } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }) } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; request(requestOptions, (error, response, body) => { if (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error) => { if (error) { console.log(error); } else { } }); } else { try { body = typeof body !== 'object' ? JSON.parse(body) : body; } catch (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); return; } if (body.status !== 200) { resolve('failed'); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); } else { resolve('succeed'); removeSucceedTaskFromQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); } } }); }); } // 定時,每隔 5 秒獲取新的任務(wù)來執(zhí)行 let job = schedule.scheduleJob('*/5 * * * * *', () => { console.log('獲取新任務(wù)'); getTasksFromQueue((error, tasks) => { if (error) { console.log(error); } else { if (tasks.length > 0) { console.log(tasks); Promise.all(tasks.map(execTask)) .then((results) => { console.log(results); }) .catch((error) => { console.log(error); }); } } }); });
相關(guān)文章
Nodejs 構(gòu)建Cluster集群多線程Worker threads
這篇文章主要為大家介紹了Nodejs 構(gòu)建Cluster集群多線程Worker threads示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10Nodejs express框架一個工程中同時使用ejs模版和jade模版
這篇文章主要介紹了Nodejs express框架一個工程中同時使用ejs模版和jade模版 的相關(guān)資料,需要的朋友可以參考下2015-12-12node如何實現(xiàn)cmd彈窗交互之inquirer
這篇文章主要介紹了node如何實現(xiàn)cmd彈窗交互之inquirer問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-10-10