深入理解go緩存庫freecache的使用
go開發(fā)緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。
緩存場景中如果數(shù)據(jù)量大于百萬級別,需要特別考慮數(shù)據(jù)類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了。但實際應用場景中,key和value是(包含)指針類型數(shù)據(jù)是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:
- 零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個數(shù);
- 有GC開銷:直接基于Map來實現(xiàn)的緩存框架。
對于map而言,gc時會掃描所有key/value鍵值對,如果其都是基本類型,那么gc便不會再掃描。下面以freecache為例分析下其實現(xiàn)原理,代碼示例如下:
func main() { ? ?cacheSize := 100 * 1024 * 1024 ? ?cache := freecache.NewCache(cacheSize) ? ?for i := 0; i < N; i++ { ? ? ? str := strconv.Itoa(i) ? ? ? _ = cache.Set([]byte(str), []byte(str), 1) ? ?} ? ?now := time.Now() ? ?runtime.GC() ? ?fmt.Printf("freecache, GC took: %s\n", time.Since(now)) ? ?_, _ = cache.Get([]byte("aa")) ? ?now = time.Now() ? ?for i := 0; i < N; i++ { ? ? ? str := strconv.Itoa(i) ? ? ? _, _ = cache.Get([]byte(str)) ? ?} ? ?fmt.Printf("freecache, Get took: %s\n\n", time.Since(now)) }
1 初始化
freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個segment,每個segment是獨立的存儲單元,freecache加鎖維度也是基于segment的,每個segment有一個ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個,每個segment有2個,分別是rb和slotData(注意切片為指針類型)。
type segment struct { ? ?rb ? ? ? ? ? ?RingBuf // ring buffer that stores data ? ?segId ? ? ? ? int ? ?_ ? ? ? ? ? ? uint32 ?// 占位 ? ?missCount ? ? int64 ? ?hitCount ? ? ?int64 ? ?entryCount ? ?int64 ? ?totalCount ? ?int64 ? ? ?// number of entries in ring buffer, including deleted entries. ? ?totalTime ? ? int64 ? ? ?// used to calculate least recent used entry. ? ?timer ? ? ? ? Timer ? ? ?// Timer giving current time ? ?totalEvacuate int64 ? ? ?// used for debug ? ?totalExpired ?int64 ? ? ?// used for debug ? ?overwrites ? ?int64 ? ? ?// used for debug ? ?touched ? ? ? int64 ? ? ?// used for debug ? ?vacuumLen ? ? int64 ? ? ?// up to vacuumLen, new data can be written without overwriting old data. ? ?slotLens ? ? ?[256]int32 // The actual length for every slot. ? ?slotCap ? ? ? int32 ? ? ?// max number of entry pointers a slot can hold. ? ?slotsData ? ? []entryPtr // 索引指針 } func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) { ? ? cache = new(Cache) ? ? for i := 0; i < segmentCount; i++ { ? ? ? ? cache.segments[i] = newSegment(size/segmentCount, i, timer) ? ? } } func newSegment(bufSize int, segId int, timer Timer) (seg segment) { ? ? seg.rb = NewRingBuf(bufSize, 0) ? ? seg.segId = segId ? ? seg.timer = timer ? ? seg.vacuumLen = int64(bufSize) ? ? seg.slotCap = 1 ? ? seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個slotData初始化256個單位大小 }
2 讀寫流程
freecache的key和value都是[]byte數(shù)組,使用時需要自行序列化和反序列化,如果緩存復雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:
_ = cache.Set([]byte(str), []byte(str), 1)
Set流程首先對key進行hash,hashVal類型uint64,其低8位segID對應segment數(shù)組,低8-15位表示slotId對應slotsData下標,高16位表示slotsData下標對應的[]entryPtr某個數(shù)據(jù),這里需要查找操作。注意[]entryPtr數(shù)組大小為slotCap(初始為1),當擴容時會slotCap倍增。
每個segment對應一個lock(sync.Mutex),因此其能夠支持較大并發(fā)量,而不像sync.Map只有一個鎖。
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { ? ?hashVal := hashFunc(key) ? ?segID := hashVal & segmentAndOpVal // 低8位 ? ?cache.locks[segID].Lock() // 加鎖 ? ?err = cache.segments[segID].set(key, value, hashVal, expireSeconds) ? ?cache.locks[segID].Unlock() } func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { ? ?slotId := uint8(hashVal >> 8) ? ?hash16 := uint16(hashVal >> 16) ? ?slot := seg.getSlot(slotId) ? ?idx, match := seg.lookup(slot, hash16, key) ? ?var hdrBuf [ENTRY_HDR_SIZE]byte ? ?hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) ? ?if match { // 有數(shù)據(jù)更新操作 ? ? ? matchedPtr := &slot[idx] ? ? ? seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) ? ? ? hdr.slotId = slotId ? ? ? hdr.hash16 = hash16 ? ? ? hdr.keyLen = uint16(len(key)) ? ? ? originAccessTime := hdr.accessTime ? ? ? hdr.accessTime = now ? ? ? hdr.expireAt = expireAt ? ? ? hdr.valLen = uint32(len(value)) ? ? ? if hdr.valCap >= hdr.valLen { ? ? ? ? ?// 已存在數(shù)據(jù)value空間能存下此次value大小 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) ? ? ? ? ?seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) ? ? ? ? ?seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) ? ? ? ? ?atomic.AddInt64(&seg.overwrites, 1) ? ? ? ? ?return ? ? ? } ? ? ? // 刪除對應entryPtr,涉及到slotsData內(nèi)存copy,ringbug中只是標記刪除 ? ? ? seg.delEntryPtr(slotId, slot, idx) ? ? ? match = false ? ? ? // increase capacity and limit entry len. ? ? ? for hdr.valCap < hdr.valLen { ? ? ? ? ?hdr.valCap *= 2 ? ? ? } ? ? ? if hdr.valCap > uint32(maxKeyValLen-len(key)) { ? ? ? ? ?hdr.valCap = uint32(maxKeyValLen - len(key)) ? ? ? } ? ?} else { // 無數(shù)據(jù) ? ? ? hdr.slotId = slotId ? ? ? hdr.hash16 = hash16 ? ? ? hdr.keyLen = uint16(len(key)) ? ? ? hdr.accessTime = now ? ? ? hdr.expireAt = expireAt ? ? ? hdr.valLen = uint32(len(value)) ? ? ? hdr.valCap = uint32(len(value)) ? ? ? if hdr.valCap == 0 { // avoid infinite loop when increasing capacity. ? ? ? ? ?hdr.valCap = 1 ? ? ? } ? ?} ? ? ? ?// 數(shù)據(jù)實際長度為 ENTRY_HDR_SIZE=24 + key和value的長度 ? ? ? ?entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap) ? ?slotModified := seg.evacuate(entryLen, slotId, now) ? ?if slotModified { ? ? ? // the slot has been modified during evacuation, we need to looked up for the 'idx' again. ? ? ? // otherwise there would be index out of bound error. ? ? ? slot = seg.getSlot(slotId) ? ? ? idx, match = seg.lookup(slot, hash16, key) ? ? ? // assert(match == false) ? ?} ? ?newOff := seg.rb.End() ? ?seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen) ? ?seg.rb.Write(hdrBuf[:]) ? ?seg.rb.Write(key) ? ?seg.rb.Write(value) ? ?seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) ? ?atomic.AddInt64(&seg.totalTime, int64(now)) ? ?atomic.AddInt64(&seg.totalCount, 1) ? ?seg.vacuumLen -= entryLen ? ?return }
seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數(shù)據(jù)的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應數(shù)據(jù)已被邏輯deleted或者已過期,那么該塊內(nèi)存可以直接回收,如果不滿足回收條件,則將entry從環(huán)頭調(diào)換到環(huán)尾,再更新entry的索引,如果這樣循環(huán)5次還是不行,那么需要將當前oldHdrBuf回收以滿足內(nèi)存需要。
執(zhí)行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數(shù)據(jù)了,insertEntryPtr就是寫入索引操作,當[]entryPtr中元素個數(shù)大于seg.slotCap(初始1)時,需要擴容操作,對應方法見seg.expand,這里不再贅述。
寫入ringbuf就是執(zhí)行rb.Write即可。
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) { ? ?var oldHdrBuf [ENTRY_HDR_SIZE]byte ? ?consecutiveEvacuate := 0 ? ?for seg.vacuumLen < entryLen { ? ? ? oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size() ? ? ? seg.rb.ReadAt(oldHdrBuf[:], oldOff) ? ? ? oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0])) ? ? ? oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) ? ? ? if oldHdr.deleted { // 已刪除 ? ? ? ? ?consecutiveEvacuate = 0 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) ? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1) ? ? ? ? ?seg.vacuumLen += oldEntryLen ? ? ? ? ?continue ? ? ? } ? ? ? expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now ? ? ? leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) ? ? ? if expired || leastRecentUsed || consecutiveEvacuate > 5 { ? ? ? // 可以回收 ? ? ? ? ?seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash16, oldOff) ? ? ? ? ?if oldHdr.slotId == slotId { ? ? ? ? ? ? slotModified = true ? ? ? ? ?} ? ? ? ? ?consecutiveEvacuate = 0 ? ? ? ? ?atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) ? ? ? ? ?atomic.AddInt64(&seg.totalCount, -1) ? ? ? ? ?seg.vacuumLen += oldEntryLen ? ? ? ? ?if expired { ? ? ? ? ? ? atomic.AddInt64(&seg.totalExpired, 1) ? ? ? ? ?} else { ? ? ? ? ? ? atomic.AddInt64(&seg.totalEvacuate, 1) ? ? ? ? ?} ? ? ? } else { ? ? ? ? ?// evacuate an old entry that has been accessed recently for better cache hit rate. ? ? ? ? ?newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) ? ? ? ? ?seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) ? ? ? ? ?consecutiveEvacuate++ ? ? ? ? ?atomic.AddInt64(&seg.totalEvacuate, 1) ? ? ? } ? ?} }
freecache的Get流程相對來說簡單點,通過hash找到對應segment,通過slotId找到對應索引slot,然后通過二分+遍歷尋找數(shù)據(jù),如果找不到直接返回ErrNotFound,否則更新一些time指標。Get流程還會更新緩存命中率相關(guān)指標。
func (cache *Cache) Get(key []byte) (value []byte, err error) { ? ?hashVal := hashFunc(key) ? ?segID := hashVal & segmentAndOpVal ? ?cache.locks[segID].Lock() ? ?value, _, err = cache.segments[segID].get(key, nil, hashVal, false) ? ?cache.locks[segID].Unlock() ? ?return } func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) { ? ?hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找 ? ?if err != nil { ? ? ? return ? ?} ? ?expireAt = hdr.expireAt ? ?if cap(buf) >= int(hdr.valLen) { ? ? ? value = buf[:hdr.valLen] ? ?} else { ? ? ? value = make([]byte, hdr.valLen) ? ?} ? ?seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) }
定位到數(shù)據(jù)之后,讀取ringbuf即可,注意一般來說讀取到的value是新創(chuàng)建的內(nèi)存空間,因此涉及到[]byte數(shù)據(jù)的復制操作。
3 總結(jié)
從常見的幾個緩存框架壓測性能來看,Set性能差異較大但還不是數(shù)量級別的差距,Get性能差異不大,因此對于絕大多數(shù)場景來說不用太關(guān)注Set/Get性能,重點應該看功能是否滿足業(yè)務需求和gc影響,性能壓測比較見:https://golang2.eddycjy.com/posts/ch5/04-performance/
緩存有一個特殊的場景,那就是將數(shù)據(jù)全部緩存在內(nèi)存,涉及到更新時都是全量更新(替換),該場景下使用freecache,如果size未設置好可能導致部分數(shù)據(jù)被淘汰,是不符合預期的,這個一定要注意。為了使用freecache避免該問題,需要將size設置"足夠大",但也要注意其內(nèi)存空間占用。
到此這篇關(guān)于深入理解go緩存庫freecache的使用的文章就介紹到這了,更多相關(guān)go freecache內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用golang如何優(yōu)雅的關(guān)機或重啟操作示例
這篇文章主要為大家介紹了使用golang如何優(yōu)雅的關(guān)機或重啟操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析
這篇文章主要為大家介紹了go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09Go語言對JSON數(shù)據(jù)進行序列化和反序列化
這篇文章介紹了Go語言對JSON數(shù)據(jù)進行序列化和反序列化的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07