JS 實(shí)現(xiàn)請(qǐng)求調(diào)度器
前言:JS 天然支持并行請(qǐng)求,但與此同時(shí)會(huì)帶來一些問題,比如會(huì)造成目標(biāo)服務(wù)器壓力過大,所以本文引入“請(qǐng)求調(diào)度器”來節(jié)制并發(fā)度。
TLDR; 直接跳轉(zhuǎn)『抽象和復(fù)用』章節(jié)。
為了獲取一批互不依賴的資源,通常從性能考慮可以用 Promise.all(arrayOfPromises)來并發(fā)執(zhí)行。比如我們已有 100 個(gè)應(yīng)用的 id,需求是聚合所有應(yīng)用的 PV,我們通常會(huì)這么寫:
const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';
// fetch 函數(shù)發(fā)送 HTTP 請(qǐng)求,返回 Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);
Promise.all(appPromises)
// 通過 reduce 做累加
.then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
.catch((error) => console.log(error));
上面的代碼在應(yīng)用個(gè)數(shù)不多的情況下,可以運(yùn)行正常。當(dāng)應(yīng)用個(gè)數(shù)達(dá)到成千上萬時(shí),對(duì)支持并發(fā)數(shù)不是很好的系統(tǒng),你的「壓測」會(huì)把第三放服務(wù)器搞掛,暫時(shí)無法響應(yīng)請(qǐng)求:
<html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html>
如何解決呢?
一個(gè)很自然的想法是,既然不支持這么多的并發(fā)請(qǐng)求,那就分割成幾大塊,每塊為一個(gè) chunk,chunk 內(nèi)部的請(qǐng)求依然并發(fā),但塊的大?。?code>chunkSize)限制在系統(tǒng)支持的最大并發(fā)數(shù)以內(nèi)。前一個(gè) chunk 結(jié)束后一個(gè) chunk 才能繼續(xù)執(zhí)行,也就是說 chunk 內(nèi)部的請(qǐng)求是并發(fā)的,但 chunk 之間是串行的。思路其實(shí)很簡單,寫起來卻有一定難度??偨Y(jié)起來三個(gè)操作:分塊、串行、聚合
難點(diǎn)在如何串行執(zhí)行 Promise,Promise 僅提供了并行(Promise.all)功能,并沒有提供串行功能。我們從簡單的三個(gè)請(qǐng)求開始,看如何實(shí)現(xiàn),啟發(fā)式解決問題(heuristic)。
// task1, task2, task3 是三個(gè)返回 Promise 的工廠函數(shù),模擬我們的異步請(qǐng)求
const task1 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(1);
console.log('task1 executed');
}, 1000);
});
const task2 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(2);
console.log('task2 executed');
}, 1000);
});
const task3 = () => new Promise((resolve) => {
setTimeout(() => {
resolve(3);
console.log('task3 executed');
}, 1000);
});
// 聚合結(jié)果
let result = 0;
const resultPromise = [task1, task2, task3].reduce((current, next) =>
current.then((number) => {
console.log('resolved with number', number); // task2, task3 的 Promise 將在這里被 resolve
result += number;
return next();
}),
Promise.resolve(0)) // 聚合初始值
.then(function(last) {
console.log('The last promise resolved with number', last); // task3 的 Promise 在這里被 resolve
result += last;
console.log('all executed with result', result);
return Promise.resolve(result);
});
運(yùn)行結(jié)果如圖 1:

代碼解析:我們想要的效果,直觀展示其實(shí)是 fn1().then(() => fn2()).then(() => fn3())。上面代碼能讓一組 Promise 按順序執(zhí)行的關(guān)鍵之處就在 reduce 這個(gè)“引擎”在一步步推動(dòng) Promise 工廠函數(shù)的執(zhí)行。
難點(diǎn)解決了,我們看看最終代碼:
/**
* 模擬 HTTP 請(qǐng)求
* @param {String} url
* @return {Promise}
*/
function fetch(url) {
console.log(`Fetching ${url}`);
return new Promise((resolve) => {
setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
});
}
const urlPrefix = 'http://opensearch.example.com/api/apps';
const aggregator = {
/**
* 入口方法,開啟定時(shí)任務(wù)
*
* @return {Promise}
*/
start() {
return this.fetchAppIds()
.then(ids => this.fetchAppsSerially(ids, 2))
.then(apps => this.sumPv(apps))
.catch(error => console.error(error));
},
/**
* 獲取所有應(yīng)用的 ID
*
* @private
*
* @return {Promise}
*/
fetchAppIds() {
return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
},
promiseFactory(ids) {
return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
},
/**
* 獲取所有應(yīng)用的詳情
*
* 一次并發(fā)請(qǐng)求 `concurrency` 個(gè)應(yīng)用,稱為一個(gè) chunk
* 前一個(gè) `chunk` 并發(fā)完成后一個(gè)才繼續(xù),直至所有應(yīng)用獲取完畢
*
* @private
*
* @param {[Number]} ids
* @param {Number} concurrency 一次并發(fā)的請(qǐng)求數(shù)量
* @return {[Object]} 所有應(yīng)用的信息
*/
fetchAppsSerially(ids, concurrency = 100) {
// 分塊
let chunkOfIds = ids.splice(0, concurrency);
const tasks = [];
while (chunkOfIds.length !== 0) {
tasks.push(this.promiseFactory(chunkOfIds));
chunkOfIds = ids.splice(0, concurrency);
}
// 按塊順序執(zhí)行
const result = [];
return tasks.reduce((current, next) => current.then((chunkOfApps) => {
console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
result.push(...chunkOfApps); // 拍扁數(shù)組
return next();
}), Promise.resolve([]))
.then((lastchunkOfApps) => {
console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');
result.push(...lastchunkOfApps); // 再次拍扁它
console.info('All chunks has been executed with result', result);
return result;
});
},
/**
* 聚合所有應(yīng)用的 PV
*
* @private
*
* @param {[]} apps
* @return {[type]} [description]
*/
sumPv(apps) {
const initial = { pv: 0 };
return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
}
};
// 開始運(yùn)行
aggregator.start().then(console.log);
運(yùn)行結(jié)果如圖 2:

抽象和復(fù)用
目的達(dá)到了,因具備通用性,下面開始抽象成一個(gè)模式以便復(fù)用。
串行
先模擬一個(gè) http get 請(qǐng)求。
/**
* mocked http get.
* @param {string} url
* @returns {{ url: string; delay: number; }}
*/
function httpGet(url) {
const delay = Math.random() * 1000;
console.info('GET', url);
return new Promise((resolve) => {
setTimeout(() => {
resolve({
url,
delay,
at: Date.now()
})
}, delay);
})
}
串行執(zhí)行一批請(qǐng)求。
const ids = [1, 2, 3, 4, 5, 6, 7];
// 批量請(qǐng)求函數(shù),注意是 delay 執(zhí)行的『函數(shù)』對(duì)了,否則會(huì)立即將請(qǐng)求發(fā)送出去,達(dá)不到串行的目的
const httpGetters = ids.map(id =>
() => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);
// 串行執(zhí)行之
const tasks = await httpGetters.reduce((acc, cur) => {
return acc.then(cur);
// 簡寫,等價(jià)于
// return acc.then(() => cur());
}, Promise.resolve());
tasks.then(() => {
console.log('done');
});
注意觀察控制臺(tái)輸出,應(yīng)該串行輸出以下內(nèi)容:
GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 GET https://jsonplaceholder.typicode.com/posts/7
分段串行,段中并行
重點(diǎn)來了。本文的請(qǐng)求調(diào)度器實(shí)現(xiàn)
/**
* Schedule promises.
* @param {Array<(...arg: any[]) => Promise<any>>} factories
* @param {number} concurrency
*/
function schedulePromises(factories, concurrency) {
/**
* chunk
* @param {any[]} arr
* @param {number} size
* @returns {Array<any[]>}
*/
const chunk = (arr, size = 1) => {
return arr.reduce((acc, cur, idx) => {
const modulo = idx % size;
if (modulo === 0) {
acc[acc.length] = [cur];
} else {
acc[acc.length - 1].push(cur);
}
return acc;
}, [])
};
const chunks = chunk(factories, concurrency);
let resps = [];
return chunks.reduce(
(acc, cur) => {
return acc
.then(() => {
console.log('---');
return Promise.all(cur.map(f => f()));
})
.then((intermediateResponses) => {
resps.push(...intermediateResponses);
return resps;
})
},
Promise.resolve()
);
}
測試下,執(zhí)行調(diào)度器:
// 分段串行,段中并行
schedulePromises(httpGetters, 3).then((resps) => {
console.log('resps:', resps);
});
控制臺(tái)輸出:
---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7
resps: [
{
"url": "https://jsonplaceholder.typicode.com/posts/1",
"delay": 733.010980640727,
"at": 1615131322163
},
{
"url": "https://jsonplaceholder.typicode.com/posts/2",
"delay": 594.5056229848931,
"at": 1615131322024
},
{
"url": "https://jsonplaceholder.typicode.com/posts/3",
"delay": 738.8230109146299,
"at": 1615131322168
},
{
"url": "https://jsonplaceholder.typicode.com/posts/4",
"delay": 525.4604386109747,
"at": 1615131322698
},
{
"url": "https://jsonplaceholder.typicode.com/posts/5",
"delay": 29.086379722201183,
"at": 1615131322201
},
{
"url": "https://jsonplaceholder.typicode.com/posts/6",
"delay": 592.2345027398272,
"at": 1615131322765
},
{
"url": "https://jsonplaceholder.typicode.com/posts/7",
"delay": 513.0684467560949,
"at": 1615131323284
}
]
總結(jié)
- 如果并發(fā)請(qǐng)求的數(shù)量太大,可以考慮分塊串行,塊中請(qǐng)求并發(fā)。
- 問題看似復(fù)雜,不放先簡化之,然后一步步推導(dǎo)出關(guān)鍵點(diǎn),最后抽象,就能找到解決方案。
- 本文的精髓在于使用
reduce作為串行推動(dòng)的引擎,故掌握其對(duì)我們?nèi)粘i_發(fā)遇到的迷局破解可提供新思路,reduce精通見上篇 你終于用 Reduce 了 🎉。
以上就是JS 實(shí)現(xiàn)請(qǐng)求調(diào)度器的詳細(xì)內(nèi)容,更多關(guān)于JS 請(qǐng)求調(diào)度器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
細(xì)說webpack源碼之compile流程-rules參數(shù)處理技巧(2)
這篇文章主要介紹了webpack源碼之compile流程-rules參數(shù)處理技巧的相關(guān)知識(shí),需要的朋友參考下吧2017-12-12
動(dòng)態(tài)加載圖片路徑 保持JavaScript控件的相對(duì)獨(dú)立性
根據(jù)新界面的要求,需要一部分圖片來增強(qiáng)日期控件的美觀性??紤]到既要實(shí)現(xiàn)加載圖表的目標(biāo),又要保持控件的獨(dú)立性以便將來的移植。2010-09-09
javascript動(dòng)畫效果打開/關(guān)閉層
用javascript實(shí)現(xiàn)打開層和關(guān)閉層的效果,原理不錯(cuò),學(xué)習(xí),記錄好2008-06-06
詳解JavaScript UTC時(shí)間轉(zhuǎn)換方法
這篇文章主要介紹了JavaScript UTC時(shí)間轉(zhuǎn)換方法,介紹了本地時(shí)間到UTC時(shí)間的轉(zhuǎn)換、UTC日期到本地日期的轉(zhuǎn)換,感興趣的小伙伴們可以參考一下2016-01-01
JavaScript實(shí)現(xiàn)字符串轉(zhuǎn)數(shù)組的6種方法總結(jié)
數(shù)組是?JavaScript?中最強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),我們常常通過將字符串轉(zhuǎn)換為數(shù)組來解決許多算法。本文為大家總結(jié)了6個(gè)JS字符串轉(zhuǎn)數(shù)組的方法,希望對(duì)你有所幫助2022-09-09
javascript RadioButtonList獲取選中值
js獲取RadioButtonList值的代碼。2009-04-04
使用js實(shí)現(xiàn)將后臺(tái)傳入的json數(shù)據(jù)放在前臺(tái)顯示
今天小編就為大家分享一篇使用js實(shí)現(xiàn)將后臺(tái)傳入的json數(shù)據(jù)放在前臺(tái)顯示,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-08-08
Javascript點(diǎn)擊按鈕隨機(jī)改變數(shù)字與其顏色
這篇文章主要介紹了Javascript點(diǎn)擊按鈕隨機(jī)改變數(shù)字和其字體的顏色,實(shí)現(xiàn)后的效果很不錯(cuò),具有一定的參考價(jià)值,有需要的可以參考借鑒,下面來一起看看。2016-09-09

