欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Node.js + Redis Sorted Set實現(xiàn)任務(wù)隊列

 更新時間:2016年09月19日 08:39:54   投稿:hebedich  
本文給大家分享的是使用Node.js + Redis Sorted Set實現(xiàn)任務(wù)隊列的方法示例,非常的實用,有需要的小伙伴可以參考下

需求:功能 A 需要調(diào)用第三方 API 獲取數(shù)據(jù),而第三方 API 自身是異步處理方式,在調(diào)用后會返回數(shù)據(jù)與狀態(tài) { data: "查詢結(jié)果", "status": "正在異步處理中" } ,這樣就需要間隔一段時間后再去調(diào)用第三方 API 獲取數(shù)據(jù)。為了用戶在使用功能 A 時不會因為第三方 API 正在異步處理中而必須等待,將用戶請求加入任務(wù)隊列中,返回部分數(shù)據(jù)并關(guān)閉請求。然后定時從任務(wù)隊列里中取出任務(wù)調(diào)用第三方 API,若返回狀態(tài)為”異步處理中“,將該任務(wù)再次加入任務(wù)隊列,若返回狀態(tài)為”已處理完畢“,將返回數(shù)據(jù)入庫。

根據(jù)以上問題,想到使用 Node.js + Redis sorted set 來實現(xiàn)任務(wù)隊列。Node.js 實現(xiàn)自身應(yīng)用 API 用來接受用戶請求,合并數(shù)據(jù)庫已存數(shù)據(jù)與 API 返回的部分數(shù)據(jù)返回給用戶,并將任務(wù)加入到任務(wù)隊列中。利用 Node.js child process 與 cron 定時從任務(wù)隊列中取出任務(wù)執(zhí)行。

在設(shè)計任務(wù)隊列的過程中需要考慮到的幾個問題

  • 并行執(zhí)行多個任務(wù)
  • 任務(wù)唯一性
  • 任務(wù)成功或失敗后的處理

針對以上問題的解決方案

  • 并行執(zhí)行多個任務(wù)利用 Promise.all 來實現(xiàn)
  • 任務(wù)唯一性利用 Redis sorted set 來實現(xiàn)。使用時間戳作為分值可以實現(xiàn)將 sorted set 作為 list 來使用,在加入任務(wù)時判斷任務(wù)是否已經(jīng)存在,在取出任務(wù)執(zhí)行時將該任務(wù)分值設(shè)置為 0,每次取出分值大于 0 的任務(wù)來執(zhí)行,可以避免重復(fù)執(zhí)行任務(wù)。
  • 執(zhí)行任務(wù)成功后刪除任務(wù),執(zhí)行任務(wù)失敗后將任務(wù)分值更新為當前時間時間戳,這樣就可以將失敗的任務(wù)重新加入任務(wù)隊列尾部

示例代碼

// remote_api.js 模擬第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請求
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
  }, 3000);
});

app.listen('9001', () => {
  console.log('API 服務(wù)監(jiān)聽端口:9001');
});
// producer.js 自身應(yīng)用 API,用來接受用戶請求并將任務(wù)加入任務(wù)隊列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
  // 先判斷任務(wù)是否已經(jīng)存在,存在:跳過,不存在:加入任務(wù)隊列
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    } else {
      if (task) {
        console.log('任務(wù)已存在,不新增相同任務(wù)');
        callback(null, task);
      } else {
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, result);
          }
        });
      }
    }
  });
}

app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    } else {
      res.status(200).send('正在查詢中......');
    }
  });
});

app.listen(9002, () => {
  console.log('生產(chǎn)者服務(wù)監(jiān)聽端口:9002');
});
// consumer.js 定時獲取任務(wù)并執(zhí)行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行執(zhí)行任務(wù)數(shù)量

function getTasksFromQueue(callback) {
  // 獲取多個任務(wù)
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    } else {
      // 將任務(wù)分值設(shè)置為 0,表示正在處理
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, tasks)
          }
        });
      }
    }
  });
}

function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  });
}

function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  })
}

function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          } else {

          }
        });
      } else {
        try {
          body = typeof body !== 'object' ? JSON.parse(body) : body;
        } catch (error) {
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        } else {
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        }
      }
    });
  });
}

// 定時,每隔 5 秒獲取新的任務(wù)來執(zhí)行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('獲取新任務(wù)');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    } else {
      if (tasks.length > 0) {
        console.log(tasks);

        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        })
        .catch((error) => {
          console.log(error);
        });
        
      }
    }
  });
});

相關(guān)文章

  • 一文詳解NPM如何換源

    一文詳解NPM如何換源

    在每一次的實際開發(fā)過程中我們都會下載相關(guān)的依賴包,最官方的是 npm,但是該服務(wù)器對于國內(nèi)開發(fā)者來說,下載起來是比較慢的,所以我們需要換源,下面這篇文章主要給大家介紹了關(guān)于NPM如何換源的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • Nodejs 構(gòu)建Cluster集群多線程Worker threads

    Nodejs 構(gòu)建Cluster集群多線程Worker threads

    這篇文章主要為大家介紹了Nodejs 構(gòu)建Cluster集群多線程Worker threads示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • Nodejs express框架一個工程中同時使用ejs模版和jade模版

    Nodejs express框架一個工程中同時使用ejs模版和jade模版

    這篇文章主要介紹了Nodejs express框架一個工程中同時使用ejs模版和jade模版 的相關(guān)資料,需要的朋友可以參考下
    2015-12-12
  • node如何實現(xiàn)cmd彈窗交互之inquirer

    node如何實現(xiàn)cmd彈窗交互之inquirer

    這篇文章主要介紹了node如何實現(xiàn)cmd彈窗交互之inquirer問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • node刪除、復(fù)制文件或文件夾示例代碼

    node刪除、復(fù)制文件或文件夾示例代碼

    這篇文章主要給大家介紹了關(guān)于node刪除、復(fù)制文件或文件夾的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用node具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-08-08
  • Node.js中Swagger的使用指南詳解

    Node.js中Swagger的使用指南詳解

    Swagger(目前用OpenAPI?Specification代替)是一個用于設(shè)計、構(gòu)建、記錄和使用REST?API的強大工具,本文將探討使用Swagger的一些關(guān)鍵技巧,需要的可以參考一下
    2024-01-01
  • nodejs實現(xiàn)用戶登錄路由功能

    nodejs實現(xiàn)用戶登錄路由功能

    這篇文章主要介紹了nodejs中實現(xiàn)用戶登錄路由功能,本文通過實例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-05-05
  • NodeJS和瀏覽器中this關(guān)鍵字的不同之處

    NodeJS和瀏覽器中this關(guān)鍵字的不同之處

    這篇文章主要給大家介紹了關(guān)于NodeJS和瀏覽器中this關(guān)鍵字不同的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-03-03
  • node.js下when.js 的異步編程實踐

    node.js下when.js 的異步編程實踐

    這篇文章主要介紹了node.js下when.js 的異步編程實踐,需要的朋友可以參考下
    2014-12-12
  • nvm管理node版本的詳細圖文教程

    nvm管理node版本的詳細圖文教程

    nvm全英文也叫node.js version management,是一個nodejs的版本管理工具,下面這篇文章主要給大家介紹了關(guān)于nvm管理node版本的詳細圖文教程,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2022-12-12

最新評論