在Nodejs中實現(xiàn)一個緩存系統(tǒng)的方法詳解
在數(shù)據(jù)庫查詢遇到瓶頸時,我們通??梢圆捎镁彺鎭硖嵘樵兯俣?,同時緩解數(shù)據(jù)庫壓力。常用的緩存數(shù)據(jù)庫有Redis、Memcached等。在一些簡單場景中,我們也可以自己實現(xiàn)一個緩存系統(tǒng),避免使用額外的緩存中間件。這篇文章將帶你一步步實現(xiàn)一個完善的緩存系統(tǒng),它將包含過期清除、數(shù)據(jù)克隆、事件、大小限制、多級緩存等功能。
一個最簡單的緩存
class Cache { constructor() { this.cache = new Map(); } get = (key) => { return this.data[key]; }; set = (key, value) => { this.data[key] = value; }; del = (key) => { delete this.data[key]; }; }
我們使用Map結(jié)構(gòu)來保存數(shù)據(jù),使用方式也很簡單:
const cache = new Cache(); cache.set("a", "aaa"); cache.get("a") // aaa
添加過期時間
接下來我們嘗試為緩存設(shè)置一個過期時間。在獲取數(shù)據(jù)時,如果數(shù)據(jù)已經(jīng)過期了,則清除它。
class Cache { constructor(options) { this.cache = new Map(); this.options = Object.assign( { stdTTL: 0, // 緩存有效期,單位為s。為0表示永不過期 }, options ); } get = (key) => { const ret = this.cache.get(key); if (ret && this._check(key, ret)) { return ret.v; } else { return void 0; } }; set = (key, value, ttl) => { ttl = ttl ?? this.options.stdTTL; // 設(shè)置緩存的過期時間 this.cache.set(key, { v: value, t: ttl === 0 ? 0 : Date.now() + ttl * 1000 }); }; del = (key) => { this.cache.delete(key); }; // 檢查緩存是否過期,過期則刪除 _check = (key, data) => { if (data.t !== 0 && data.t < Date.now()) { this.del(key); return false; } return true; }; } module.exports = Cache;
我們寫個用例來測試一下:
const cache = new Cache({ stdTTL: 1 }); // 默認緩存1s cache.set("a", "aaa"); console.log(cache.get("a")); // 輸出: aaa setTimeout(() => { console.log(cache.get("a")); // 輸出: undefined }, 2000);
可見,超過有效期后,再次獲取時數(shù)據(jù)就不存在了。
私有屬性
前面的代碼中我們用_
開頭來標明私有屬性,我們也可以通過Symbol來實現(xiàn),像下面這樣:
const LENGTH = Symbol("length"); class Cache { constructor(options) { this[LENGTH] = options.length; } get length() { return this[LENGTH]; } }
Symbols 在 for...in 迭代中不可枚舉。另外,Object.getOwnPropertyNames() 不會返回 symbol 對象的屬性,但是你能使用 Object.getOwnPropertySymbols() 得到它們。
const cache = new Cache({ length: 100 }); Object.keys(cache); // [] Object.getOwnPropertySymbols(cache); // [Symbol(length)]
定期清除過期緩存
之前只會在get時判斷緩存是否過期,然而如果不對某個key進行g(shù)et操作,則過期緩存永遠不會被清除,導致無效的緩存堆積。接下來我們要實現(xiàn)定期自動清除過期緩存的功能。
class Cache { constructor(options) { this.cache = new Map(); this.options = Object.assign( { stdTTL: 0, // 緩存有效期,單位為s。為0表示永不過期 checkperiod: 600, // 定時檢查過期緩存,單位為s。小于0則不檢查 }, options ); this._checkData(); } get = (key) => { const ret = this.cache.get(key); if (ret && this._check(key, ret)) { return ret.v; } else { return void 0; } }; set = (key, value, ttl) => { ttl = ttl ?? this.options.stdTTL; this.cache.set(key, { v: value, t: Date.now() + ttl * 1000 }); }; del = (key) => { this.cache.delete(key); }; // 檢查是否過期,過期則刪除 _check = (key, data) => { if (data.t !== 0 && data.t < Date.now()) { this.del(key); return false; } return true; }; _checkData = () => { for (const [key, value] of this.cache.entries()) { this._check(key, value); } if (this.options.checkperiod > 0) { const timeout = setTimeout( this._checkData, this.options.checkperiod * 1000 ); // https://nodejs.org/api/timers.html#timeoutunref // 清除事件循環(huán)對timeout的引用。如果事件循環(huán)中不存在其他活躍事件,則直接退出進程 if (timeout.unref != null) { timeout.unref(); } } }; } module.exports = Cache;
我們添加了一個checkperiod
的參數(shù),同時在初始化時開啟了定時檢查過期緩存的邏輯。這里使用了timeout.unref()
來清除清除事件循環(huán)對timeout的引用,這樣如果事件循環(huán)中不存在其他活躍事件了,就可以直接退出。
const timeout = setTimeout( this._checkData, this.options.checkperiod * 1000 ); // https://nodejs.org/api/timers.html#timeoutunref // 清除事件循環(huán)對timeout的引用。如果事件循環(huán)中不存在其他活躍事件,則直接退出進程 if (timeout.unref != null) { timeout.unref(); }
克隆數(shù)據(jù)
當我們嘗試在緩存中存入對象數(shù)據(jù)時,我們可能會遇到下面的問題:
const cache = new Cache(); const data = { val: 100 }; cache.set("data", data); data.val = 101; cache.get("data") // { val: 101 }
由于緩存中保存的是引用,可能導致緩存內(nèi)容被意外的更改,這就讓人不太放心的。為了用起來沒有顧慮,我們需要支持一下數(shù)據(jù)的克隆,也就是深拷貝。
const cloneDeep = require("lodash.clonedeep"); class Cache { constructor(options) { this.cache = new Map(); this.options = Object.assign( { stdTTL: 0, // 緩存有效期,單位為s checkperiod: 600, // 定時檢查過期緩存,單位為s useClones: true, // 是否使用clone }, options ); this._checkData(); } get = (key) => { const ret = this.cache.get(key); if (ret && this._check(key, ret)) { return this._unwrap(ret); } else { return void 0; } }; set = (key, value, ttl) => { this.cache.set(key, this._wrap(value, ttl)); }; del = (key) => { this.cache.delete(key); }; // 檢查是否過期,過期則刪除 _check = (key, data) => { if (data.t !== 0 && data.t < Date.now()) { this.del(key); return false; } return true; }; _checkData = () => { for (const [key, value] of this.cache.entries()) { this._check(key, value); } if (this.options.checkperiod > 0) { const timeout = setTimeout( this._checkData, this.options.checkperiod * 1000 ); if (timeout.unref != null) { timeout.unref(); } } }; _unwrap = (value) => { return this.options.useClones ? cloneDeep(value.v) : value.v; }; _wrap = (value, ttl) => { ttl = ttl ?? this.options.stdTTL; return { t: ttl === 0 ? 0 : Date.now() + ttl * 1000, v: this.options.useClones ? cloneDeep(value) : value, }; }; }
我們使用lodash.clonedeep來實現(xiàn)深拷貝,同時添加了一個useClones
的參數(shù)來設(shè)置是否需要克隆數(shù)據(jù)。需要注意,在對象較大時使用深拷貝是比較消耗時間的。我們可以根據(jù)實際情況來決定是否需要使用克隆,或?qū)崿F(xiàn)更高效的拷貝方法。
添加事件
有時我們需要在緩存數(shù)據(jù)過期時執(zhí)行某些邏輯,所以我們可以在緩存上添加事件。我們需要使用到EventEmitter
類。
const { EventEmitter } = require("node:events"); const cloneDeep = require("lodash.clonedeep"); class Cache extends EventEmitter { constructor(options) { super(); this.cache = new Map(); this.options = Object.assign( { stdTTL: 0, // 緩存有效期,單位為s checkperiod: 600, // 定時檢查過期緩存,單位為s useClones: true, // 是否使用clone }, options ); this._checkData(); } get = (key) => { const ret = this.cache.get(key); if (ret && this._check(key, ret)) { return this._unwrap(ret); } else { return void 0; } }; set = (key, value, ttl) => { this.cache.set(key, this._wrap(value, ttl)); this.emit("set", key, value); }; del = (key) => { this.cache.delete(key); this.emit("del", key, oldVal.v); }; // 檢查是否過期,過期則刪除 _check = (key, data) => { if (data.t !== 0 && data.t < Date.now()) { this.emit("expired", key, data.v); this.del(key); return false; } return true; }; _checkData = () => { for (const [key, value] of this.cache.entries()) { this._check(key, value); } if (this.options.checkperiod > 0) { const timeout = setTimeout( this._checkData, this.options.checkperiod * 1000 ); if (timeout.unref != null) { timeout.unref(); } } }; _unwrap = (value) => { return this.options.useClones ? cloneDeep(value.v) : value.v; }; _wrap = (value, ttl) => { ttl = ttl ?? this.options.stdTTL; return { t: ttl === 0 ? 0 : Date.now() + ttl * 1000, v: this.options.useClones ? cloneDeep(value) : value, }; }; } module.exports = Cache;
繼承EventEmitter
類后,我們只需在判斷數(shù)據(jù)過期時通過this.emit()
觸發(fā)事件即可。如下:
this.emit("expired", key, value);
這樣使用緩存時就能監(jiān)聽過期事件了。
const cache = new Cache({ stdTTL: 1 }); cache.on("expired", (key ,value) => { // ... })
到這里,我們基本上就實現(xiàn)了node-cache庫的核心邏輯了。
限制緩存大小?。?/h2>
稍等,我們似乎忽略了一個重要的點。在高并發(fā)請求下,如果緩存激增,則內(nèi)存會有被耗盡的風險。無論如何,緩存只是用來優(yōu)化的,它不能影響主程序的正常運行。所以,限制緩存大小至關(guān)重要!
我們需要在緩存超過最大限制時自動清理緩存,一個常用的清除算法就是LRU,即清除最近最少使用的那部分數(shù)據(jù)。這里使用了yallist來實現(xiàn)LRU隊列,方案如下:
- LRU隊列里的首部保存最近使用的數(shù)據(jù),最近最少使用的數(shù)據(jù)則會移動到隊尾。在緩存超過最大限制時,優(yōu)先移除隊列尾部數(shù)據(jù)。
- 執(zhí)行g(shù)et/set操作時,將此數(shù)據(jù)節(jié)點移動/插入到隊首。
- 緩存超過最大限制時,移除隊尾數(shù)據(jù)。
const { EventEmitter } = require("node:events"); const clone = require("clone"); const Yallist = require("yallist"); class Cache extends EventEmitter { constructor(options) { super(); this.options = Object.assign( { stdTTL: 0, // 緩存有效期,單位為s checkperiod: 600, // 定時檢查過期緩存,單位為s useClones: true, // 是否使用clone lengthCalculator: () => 1, // 計算長度 maxLength: 1000, }, options ); this._length = 0; this._lruList = new Yallist(); this._cache = new Map(); this._checkData(); } get length() { return this._length; } get data() { return Array.from(this._cache).reduce((obj, [key, node]) => { return { ...obj, [key]: node.value.v }; }, {}); } get = (key) => { const node = this._cache.get(key); if (node && this._check(node)) { this._lruList.unshiftNode(node); // 移動到隊首 return this._unwrap(node.value); } else { return void 0; } }; set = (key, value, ttl) => { const { lengthCalculator, maxLength } = this.options; const len = lengthCalculator(value, key); // 元素本身超過最大長度,設(shè)置失敗 if (len > maxLength) { return false; } if (this._cache.has(key)) { const node = this._cache.get(key); const item = node.value; item.v = value; this._length += len - item.l; item.l = len; this.get(node); // 更新lru } else { const item = this._wrap(key, value, ttl, len); this._lruList.unshift(item); // 插入到隊首 this._cache.set(key, this._lruList.head); this._length += len; } this._trim(); this.emit("set", key, value); return true; }; del = (key) => { if (!this._cache.has(key)) { return false; } const node = this._cache.get(key); this._del(node); }; _del = (node) => { const item = node.value; this._length -= item.l; this._cache.delete(item.k); this._lruList.removeNode(node); this.emit("del", item.k, item.v); }; // 檢查是否過期,過期則刪除 _check = (node) => { const item = node.value; if (item.t !== 0 && item.t < Date.now()) { this.emit("expired", item.k, item.v); this._del(node); return false; } return true; }; _checkData = () => { for (const node of this._cache) { this._check(node); } if (this.options.checkperiod > 0) { const timeout = setTimeout( this._checkData, this.options.checkperiod * 1000 ); if (timeout.unref != null) { timeout.unref(); } } }; _unwrap = (item) => { return this.options.useClones ? clone(item.v) : item.v; }; _wrap = (key, value, ttl, length) => { ttl = ttl ?? this.options.stdTTL; return { k: key, v: this.options.useClones ? clone(value) : value, t: ttl === 0 ? 0 : Date.now() + ttl * 1000, l: length, }; }; _trim = () => { const { maxLength } = this.options; let walker = this._lruList.tail; while (this._length > maxLength && walker !== null) { // 刪除隊尾元素 const prev = walker.prev; this._del(walker); walker = prev; } }; }
代碼中還增加了兩個額外的配置選項:
options = { lengthCalculator: () => 1, // 計算長度 maxLength: 1000, // 緩存最大長度 }
lengthCalculator
支持我們自定義數(shù)據(jù)長度的計算方式。默認情況下maxLength
指的就是緩存數(shù)據(jù)的數(shù)量。然而在遇到Buffer類型的數(shù)據(jù)時,我們可能希望限制最大的字節(jié)數(shù),那么就可以像下面這樣定義:
const cache = new Cache({ maxLength: 500, lengthCalculator: (value) => { return value.length; }, }); const data = Buffer.alloc(100); cache.set("data", data); console.log(cache.length); // 100
這一部分的代碼就是參考社區(qū)中的lru-cache實現(xiàn)的。
多級緩存
如果應(yīng)用本身已經(jīng)依賴了數(shù)據(jù)庫的話,我們不妨再加一層數(shù)據(jù)庫緩存,來實現(xiàn)多級緩存:將內(nèi)存作為一級緩存(容量小,速度快),將數(shù)據(jù)庫作為二級緩存(容量大,速度慢) 。有兩個優(yōu)點:
- 能夠存儲的緩存數(shù)據(jù)大大增加。雖然數(shù)據(jù)庫緩存查詢速度比內(nèi)存慢,但相比原始查詢還是要快得多的。
- 重啟應(yīng)用時能夠從數(shù)據(jù)庫恢復緩存。
通過下面的方法可以實現(xiàn)一個多級緩存:
function multiCaching(caches) { return { get: async (key) => { let value, i; for (i = 0; i < caches.length; i++) { try { value = await caches[i].get(key); if (value !== undefined) break; } catch (e) {} } // 如果上層緩存沒查到,下層緩存查到了,需要同時更新上層緩存 if (value !== undefined && i > 0) { Promise.all( caches.slice(0, i).map((cache) => cache.set(key, value)) ).then(); } return value; }, set: async (key, value) => { await Promise.all(caches.map((cache) => cache.set(key, value))); }, del: async (key) => { await Promise.all(caches.map((cache) => cache.del(key))); }, }; } const multiCache = multiCaching([memoryCache, dbCache]); multiCache.set(key, value)
dbCache
對數(shù)據(jù)量大小不是那么敏感,我們可以在執(zhí)行g(shù)et/set操作時設(shè)置數(shù)據(jù)的最近使用時間,并在某個時刻清除最近未使用數(shù)據(jù),比如在每天的凌晨自動清除超過30天未使用的數(shù)據(jù)。
另外我們還需要在初始化時加載數(shù)據(jù)庫緩存到內(nèi)存中,比如按最近使用時間倒序返回3000條數(shù)據(jù),并存儲到內(nèi)存緩存中。
參考
以上就是在Nodejs中實現(xiàn)一個緩存系統(tǒng)的方法詳解的詳細內(nèi)容,更多關(guān)于Nodejs實現(xiàn)緩存系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
教你用Node.js與Express建立一個GraphQL服務(wù)器
GraphQL是一種通過強類型查詢語言構(gòu)建api的新方法,下面這篇文章主要給大家介紹了關(guān)于用Node.js與Express建立一個GraphQL服務(wù)器的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-12-12node.js express捕獲全局異常的三種方法實例分析
這篇文章主要介紹了node.js express捕獲全局異常的三種方法,結(jié)合實例形式簡單分析了node.js express捕獲全局異常的常見操作方法與使用注意事項,需要的朋友可以參考下2019-12-12