在Nodejs中實現(xiàn)一個緩存系統(tǒng)的方法詳解
在數(shù)據(jù)庫查詢遇到瓶頸時,我們通常可以采用緩存來提升查詢速度,同時緩解數(shù)據(jù)庫壓力。常用的緩存數(shù)據(jù)庫有Redis、Memcached等。在一些簡單場景中,我們也可以自己實現(xiàn)一個緩存系統(tǒng),避免使用額外的緩存中間件。這篇文章將帶你一步步實現(xiàn)一個完善的緩存系統(tǒng),它將包含過期清除、數(shù)據(jù)克隆、事件、大小限制、多級緩存等功能。
一個最簡單的緩存
class Cache {
constructor() {
this.cache = new Map();
}
get = (key) => {
return this.data[key];
};
set = (key, value) => {
this.data[key] = value;
};
del = (key) => {
delete this.data[key];
};
}
我們使用Map結(jié)構(gòu)來保存數(shù)據(jù),使用方式也很簡單:
const cache = new Cache();
cache.set("a", "aaa");
cache.get("a") // aaa
添加過期時間
接下來我們嘗試為緩存設(shè)置一個過期時間。在獲取數(shù)據(jù)時,如果數(shù)據(jù)已經(jīng)過期了,則清除它。
class Cache {
constructor(options) {
this.cache = new Map();
this.options = Object.assign(
{
stdTTL: 0, // 緩存有效期,單位為s。為0表示永不過期
},
options
);
}
get = (key) => {
const ret = this.cache.get(key);
if (ret && this._check(key, ret)) {
return ret.v;
} else {
return void 0;
}
};
set = (key, value, ttl) => {
ttl = ttl ?? this.options.stdTTL;
// 設(shè)置緩存的過期時間
this.cache.set(key, { v: value, t: ttl === 0 ? 0 : Date.now() + ttl * 1000 });
};
del = (key) => {
this.cache.delete(key);
};
// 檢查緩存是否過期,過期則刪除
_check = (key, data) => {
if (data.t !== 0 && data.t < Date.now()) {
this.del(key);
return false;
}
return true;
};
}
module.exports = Cache;
我們寫個用例來測試一下:
const cache = new Cache({ stdTTL: 1 }); // 默認(rèn)緩存1s
cache.set("a", "aaa");
console.log(cache.get("a")); // 輸出: aaa
setTimeout(() => {
console.log(cache.get("a")); // 輸出: undefined
}, 2000);
可見,超過有效期后,再次獲取時數(shù)據(jù)就不存在了。
私有屬性
前面的代碼中我們用_開頭來標(biāo)明私有屬性,我們也可以通過Symbol來實現(xiàn),像下面這樣:
const LENGTH = Symbol("length");
class Cache {
constructor(options) {
this[LENGTH] = options.length;
}
get length() {
return this[LENGTH];
}
}
Symbols 在 for...in 迭代中不可枚舉。另外,Object.getOwnPropertyNames() 不會返回 symbol 對象的屬性,但是你能使用 Object.getOwnPropertySymbols() 得到它們。
const cache = new Cache({ length: 100 });
Object.keys(cache); // []
Object.getOwnPropertySymbols(cache); // [Symbol(length)]
定期清除過期緩存
之前只會在get時判斷緩存是否過期,然而如果不對某個key進行g(shù)et操作,則過期緩存永遠不會被清除,導(dǎo)致無效的緩存堆積。接下來我們要實現(xiàn)定期自動清除過期緩存的功能。
class Cache {
constructor(options) {
this.cache = new Map();
this.options = Object.assign(
{
stdTTL: 0, // 緩存有效期,單位為s。為0表示永不過期
checkperiod: 600, // 定時檢查過期緩存,單位為s。小于0則不檢查
},
options
);
this._checkData();
}
get = (key) => {
const ret = this.cache.get(key);
if (ret && this._check(key, ret)) {
return ret.v;
} else {
return void 0;
}
};
set = (key, value, ttl) => {
ttl = ttl ?? this.options.stdTTL;
this.cache.set(key, { v: value, t: Date.now() + ttl * 1000 });
};
del = (key) => {
this.cache.delete(key);
};
// 檢查是否過期,過期則刪除
_check = (key, data) => {
if (data.t !== 0 && data.t < Date.now()) {
this.del(key);
return false;
}
return true;
};
_checkData = () => {
for (const [key, value] of this.cache.entries()) {
this._check(key, value);
}
if (this.options.checkperiod > 0) {
const timeout = setTimeout(
this._checkData,
this.options.checkperiod * 1000
);
// https://nodejs.org/api/timers.html#timeoutunref
// 清除事件循環(huán)對timeout的引用。如果事件循環(huán)中不存在其他活躍事件,則直接退出進程
if (timeout.unref != null) {
timeout.unref();
}
}
};
}
module.exports = Cache;
我們添加了一個checkperiod的參數(shù),同時在初始化時開啟了定時檢查過期緩存的邏輯。這里使用了timeout.unref()來清除清除事件循環(huán)對timeout的引用,這樣如果事件循環(huán)中不存在其他活躍事件了,就可以直接退出。
const timeout = setTimeout(
this._checkData,
this.options.checkperiod * 1000
);
// https://nodejs.org/api/timers.html#timeoutunref
// 清除事件循環(huán)對timeout的引用。如果事件循環(huán)中不存在其他活躍事件,則直接退出進程
if (timeout.unref != null) {
timeout.unref();
}
克隆數(shù)據(jù)
當(dāng)我們嘗試在緩存中存入對象數(shù)據(jù)時,我們可能會遇到下面的問題:
const cache = new Cache();
const data = { val: 100 };
cache.set("data", data);
data.val = 101;
cache.get("data") // { val: 101 }
由于緩存中保存的是引用,可能導(dǎo)致緩存內(nèi)容被意外的更改,這就讓人不太放心的。為了用起來沒有顧慮,我們需要支持一下數(shù)據(jù)的克隆,也就是深拷貝。
const cloneDeep = require("lodash.clonedeep");
class Cache {
constructor(options) {
this.cache = new Map();
this.options = Object.assign(
{
stdTTL: 0, // 緩存有效期,單位為s
checkperiod: 600, // 定時檢查過期緩存,單位為s
useClones: true, // 是否使用clone
},
options
);
this._checkData();
}
get = (key) => {
const ret = this.cache.get(key);
if (ret && this._check(key, ret)) {
return this._unwrap(ret);
} else {
return void 0;
}
};
set = (key, value, ttl) => {
this.cache.set(key, this._wrap(value, ttl));
};
del = (key) => {
this.cache.delete(key);
};
// 檢查是否過期,過期則刪除
_check = (key, data) => {
if (data.t !== 0 && data.t < Date.now()) {
this.del(key);
return false;
}
return true;
};
_checkData = () => {
for (const [key, value] of this.cache.entries()) {
this._check(key, value);
}
if (this.options.checkperiod > 0) {
const timeout = setTimeout(
this._checkData,
this.options.checkperiod * 1000
);
if (timeout.unref != null) {
timeout.unref();
}
}
};
_unwrap = (value) => {
return this.options.useClones ? cloneDeep(value.v) : value.v;
};
_wrap = (value, ttl) => {
ttl = ttl ?? this.options.stdTTL;
return {
t: ttl === 0 ? 0 : Date.now() + ttl * 1000,
v: this.options.useClones ? cloneDeep(value) : value,
};
};
}
我們使用lodash.clonedeep來實現(xiàn)深拷貝,同時添加了一個useClones的參數(shù)來設(shè)置是否需要克隆數(shù)據(jù)。需要注意,在對象較大時使用深拷貝是比較消耗時間的。我們可以根據(jù)實際情況來決定是否需要使用克隆,或?qū)崿F(xiàn)更高效的拷貝方法。
添加事件
有時我們需要在緩存數(shù)據(jù)過期時執(zhí)行某些邏輯,所以我們可以在緩存上添加事件。我們需要使用到EventEmitter類。
const { EventEmitter } = require("node:events");
const cloneDeep = require("lodash.clonedeep");
class Cache extends EventEmitter {
constructor(options) {
super();
this.cache = new Map();
this.options = Object.assign(
{
stdTTL: 0, // 緩存有效期,單位為s
checkperiod: 600, // 定時檢查過期緩存,單位為s
useClones: true, // 是否使用clone
},
options
);
this._checkData();
}
get = (key) => {
const ret = this.cache.get(key);
if (ret && this._check(key, ret)) {
return this._unwrap(ret);
} else {
return void 0;
}
};
set = (key, value, ttl) => {
this.cache.set(key, this._wrap(value, ttl));
this.emit("set", key, value);
};
del = (key) => {
this.cache.delete(key);
this.emit("del", key, oldVal.v);
};
// 檢查是否過期,過期則刪除
_check = (key, data) => {
if (data.t !== 0 && data.t < Date.now()) {
this.emit("expired", key, data.v);
this.del(key);
return false;
}
return true;
};
_checkData = () => {
for (const [key, value] of this.cache.entries()) {
this._check(key, value);
}
if (this.options.checkperiod > 0) {
const timeout = setTimeout(
this._checkData,
this.options.checkperiod * 1000
);
if (timeout.unref != null) {
timeout.unref();
}
}
};
_unwrap = (value) => {
return this.options.useClones ? cloneDeep(value.v) : value.v;
};
_wrap = (value, ttl) => {
ttl = ttl ?? this.options.stdTTL;
return {
t: ttl === 0 ? 0 : Date.now() + ttl * 1000,
v: this.options.useClones ? cloneDeep(value) : value,
};
};
}
module.exports = Cache;
繼承EventEmitter類后,我們只需在判斷數(shù)據(jù)過期時通過this.emit()觸發(fā)事件即可。如下:
this.emit("expired", key, value);
這樣使用緩存時就能監(jiān)聽過期事件了。
const cache = new Cache({ stdTTL: 1 });
cache.on("expired", (key ,value) => {
// ...
})
到這里,我們基本上就實現(xiàn)了node-cache庫的核心邏輯了。
限制緩存大?。。?/h2>
稍等,我們似乎忽略了一個重要的點。在高并發(fā)請求下,如果緩存激增,則內(nèi)存會有被耗盡的風(fēng)險。無論如何,緩存只是用來優(yōu)化的,它不能影響主程序的正常運行。所以,限制緩存大小至關(guān)重要!
我們需要在緩存超過最大限制時自動清理緩存,一個常用的清除算法就是LRU,即清除最近最少使用的那部分?jǐn)?shù)據(jù)。這里使用了yallist來實現(xiàn)LRU隊列,方案如下:
- LRU隊列里的首部保存最近使用的數(shù)據(jù),最近最少使用的數(shù)據(jù)則會移動到隊尾。在緩存超過最大限制時,優(yōu)先移除隊列尾部數(shù)據(jù)。
- 執(zhí)行g(shù)et/set操作時,將此數(shù)據(jù)節(jié)點移動/插入到隊首。
- 緩存超過最大限制時,移除隊尾數(shù)據(jù)。
const { EventEmitter } = require("node:events");
const clone = require("clone");
const Yallist = require("yallist");
class Cache extends EventEmitter {
constructor(options) {
super();
this.options = Object.assign(
{
stdTTL: 0, // 緩存有效期,單位為s
checkperiod: 600, // 定時檢查過期緩存,單位為s
useClones: true, // 是否使用clone
lengthCalculator: () => 1, // 計算長度
maxLength: 1000,
},
options
);
this._length = 0;
this._lruList = new Yallist();
this._cache = new Map();
this._checkData();
}
get length() {
return this._length;
}
get data() {
return Array.from(this._cache).reduce((obj, [key, node]) => {
return { ...obj, [key]: node.value.v };
}, {});
}
get = (key) => {
const node = this._cache.get(key);
if (node && this._check(node)) {
this._lruList.unshiftNode(node); // 移動到隊首
return this._unwrap(node.value);
} else {
return void 0;
}
};
set = (key, value, ttl) => {
const { lengthCalculator, maxLength } = this.options;
const len = lengthCalculator(value, key);
// 元素本身超過最大長度,設(shè)置失敗
if (len > maxLength) {
return false;
}
if (this._cache.has(key)) {
const node = this._cache.get(key);
const item = node.value;
item.v = value;
this._length += len - item.l;
item.l = len;
this.get(node); // 更新lru
} else {
const item = this._wrap(key, value, ttl, len);
this._lruList.unshift(item); // 插入到隊首
this._cache.set(key, this._lruList.head);
this._length += len;
}
this._trim();
this.emit("set", key, value);
return true;
};
del = (key) => {
if (!this._cache.has(key)) {
return false;
}
const node = this._cache.get(key);
this._del(node);
};
_del = (node) => {
const item = node.value;
this._length -= item.l;
this._cache.delete(item.k);
this._lruList.removeNode(node);
this.emit("del", item.k, item.v);
};
// 檢查是否過期,過期則刪除
_check = (node) => {
const item = node.value;
if (item.t !== 0 && item.t < Date.now()) {
this.emit("expired", item.k, item.v);
this._del(node);
return false;
}
return true;
};
_checkData = () => {
for (const node of this._cache) {
this._check(node);
}
if (this.options.checkperiod > 0) {
const timeout = setTimeout(
this._checkData,
this.options.checkperiod * 1000
);
if (timeout.unref != null) {
timeout.unref();
}
}
};
_unwrap = (item) => {
return this.options.useClones ? clone(item.v) : item.v;
};
_wrap = (key, value, ttl, length) => {
ttl = ttl ?? this.options.stdTTL;
return {
k: key,
v: this.options.useClones ? clone(value) : value,
t: ttl === 0 ? 0 : Date.now() + ttl * 1000,
l: length,
};
};
_trim = () => {
const { maxLength } = this.options;
let walker = this._lruList.tail;
while (this._length > maxLength && walker !== null) {
// 刪除隊尾元素
const prev = walker.prev;
this._del(walker);
walker = prev;
}
};
}
代碼中還增加了兩個額外的配置選項:
options = {
lengthCalculator: () => 1, // 計算長度
maxLength: 1000, // 緩存最大長度
}
lengthCalculator支持我們自定義數(shù)據(jù)長度的計算方式。默認(rèn)情況下maxLength指的就是緩存數(shù)據(jù)的數(shù)量。然而在遇到Buffer類型的數(shù)據(jù)時,我們可能希望限制最大的字節(jié)數(shù),那么就可以像下面這樣定義:
const cache = new Cache({
maxLength: 500,
lengthCalculator: (value) => {
return value.length;
},
});
const data = Buffer.alloc(100);
cache.set("data", data);
console.log(cache.length); // 100
這一部分的代碼就是參考社區(qū)中的lru-cache實現(xiàn)的。
多級緩存
如果應(yīng)用本身已經(jīng)依賴了數(shù)據(jù)庫的話,我們不妨再加一層數(shù)據(jù)庫緩存,來實現(xiàn)多級緩存:將內(nèi)存作為一級緩存(容量小,速度快),將數(shù)據(jù)庫作為二級緩存(容量大,速度慢) 。有兩個優(yōu)點:
- 能夠存儲的緩存數(shù)據(jù)大大增加。雖然數(shù)據(jù)庫緩存查詢速度比內(nèi)存慢,但相比原始查詢還是要快得多的。
- 重啟應(yīng)用時能夠從數(shù)據(jù)庫恢復(fù)緩存。
通過下面的方法可以實現(xiàn)一個多級緩存:
function multiCaching(caches) {
return {
get: async (key) => {
let value, i;
for (i = 0; i < caches.length; i++) {
try {
value = await caches[i].get(key);
if (value !== undefined) break;
} catch (e) {}
}
// 如果上層緩存沒查到,下層緩存查到了,需要同時更新上層緩存
if (value !== undefined && i > 0) {
Promise.all(
caches.slice(0, i).map((cache) => cache.set(key, value))
).then();
}
return value;
},
set: async (key, value) => {
await Promise.all(caches.map((cache) => cache.set(key, value)));
},
del: async (key) => {
await Promise.all(caches.map((cache) => cache.del(key)));
},
};
}
const multiCache = multiCaching([memoryCache, dbCache]);
multiCache.set(key, value)
dbCache對數(shù)據(jù)量大小不是那么敏感,我們可以在執(zhí)行g(shù)et/set操作時設(shè)置數(shù)據(jù)的最近使用時間,并在某個時刻清除最近未使用數(shù)據(jù),比如在每天的凌晨自動清除超過30天未使用的數(shù)據(jù)。
另外我們還需要在初始化時加載數(shù)據(jù)庫緩存到內(nèi)存中,比如按最近使用時間倒序返回3000條數(shù)據(jù),并存儲到內(nèi)存緩存中。
參考
以上就是在Nodejs中實現(xiàn)一個緩存系統(tǒng)的方法詳解的詳細內(nèi)容,更多關(guān)于Nodejs實現(xiàn)緩存系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
教你用Node.js與Express建立一個GraphQL服務(wù)器
GraphQL是一種通過強類型查詢語言構(gòu)建api的新方法,下面這篇文章主要給大家介紹了關(guān)于用Node.js與Express建立一個GraphQL服務(wù)器的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-12-12
node.js express捕獲全局異常的三種方法實例分析
這篇文章主要介紹了node.js express捕獲全局異常的三種方法,結(jié)合實例形式簡單分析了node.js express捕獲全局異常的常見操作方法與使用注意事項,需要的朋友可以參考下2019-12-12

