探索Golang實(shí)現(xiàn)Redis持久化AOF實(shí)例
引言
用11篇文章實(shí)現(xiàn)一個(gè)可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過(guò)文章將Redis掰開(kāi)撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺(jué),歡迎持續(xù)關(guān)注學(xué)習(xí)。
[x] easyredis之TCP服務(wù)
[x] easyredis之網(wǎng)絡(luò)請(qǐng)求序列化協(xié)議(RESP)
[x] easyredis之內(nèi)存數(shù)據(jù)庫(kù)
[x] easyredis之過(guò)期時(shí)間 (時(shí)間輪實(shí)現(xiàn))
[x] easyredis之持久化 (AOF實(shí)現(xiàn))
[ ] easyredis之發(fā)布訂閱功能
[ ] easyredis之有序集合(跳表實(shí)現(xiàn))
[ ] easyredis之 pipeline 客戶端實(shí)現(xiàn)
[ ] easyredis之事務(wù)(原子性/回滾)
[ ] easyredis之連接池
[ ] easyredis之分布式集群存儲(chǔ)
【第五篇】EasyRedis之持久化AOF
AOF
全稱Append Only File
,就是將寫(xiě)相關(guān)的命令,追加保存到文件中,當(dāng)服務(wù)器重啟以后,將文件中的命令在服務(wù)端重放(重新執(zhí)行恢復(fù)數(shù)據(jù)),實(shí)現(xiàn)的一種持久化方式。
本篇通過(guò)3個(gè)部分講解AOF的實(shí)現(xiàn):
AOF的寫(xiě)入過(guò)程
AOF的加載過(guò)程
AOF的重寫(xiě)過(guò)程
AOF的寫(xiě)入過(guò)程
在核心的數(shù)據(jù)結(jié)構(gòu) Engine
中新增一個(gè) aof *AOF
對(duì)象
// 存儲(chǔ)引擎,負(fù)責(zé)數(shù)據(jù)的CRUD type Engine struct { // *DB dbSet []*atomic.Value // 時(shí)間輪(延遲任務(wù)) delay *timewheel.Delay // Append Only File aof *aof.AOF }
在初始化函數(shù)func NewEngine() *Engine
中,會(huì)基于是否啟用AOF日志,決定 aof *aof.AOF
的初始化
func NewEngine() *Engine { //.....省略.... // 啟用AOF日志 if conf.GlobalConfig.AppendOnly { // 創(chuàng)建*AOF對(duì)象 aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync) if err != nil { panic(err) } engine.aof = aof // 設(shè)定每個(gè)db,使用aof寫(xiě)入日志 engine.aofBindEveryDB() } return engine }
因?yàn)閷?shí)際執(zhí)行redis
命令的對(duì)象是 *DB
,所以會(huì)對(duì)每個(gè)*DB
對(duì)象設(shè)定db.writeAof
函數(shù)指針
func (e *Engine) aofBindEveryDB() { for _, dbSet := range e.dbSet { db := dbSet.Load().(*DB) db.writeAof = func(redisCommand [][]byte) { if conf.GlobalConfig.AppendOnly { // 調(diào)用e.aof對(duì)象方法,保存命令 e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand)) } } } }
例如,當(dāng)我們執(zhí)行 set key value
命令的時(shí)候,實(shí)際會(huì)執(zhí)行 func cmdSet(db *DB, args [][]byte) protocol.Reply
func cmdSet(db *DB, args [][]byte) protocol.Reply { //.....省略.... if result > 0 { // 1 表示存儲(chǔ)成功 //TODO: 過(guò)期時(shí)間處理 if ttl != nolimitedTTL { // 設(shè)定key過(guò)期 expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond) db.ExpireAt(key, expireTime) //寫(xiě)入日志 db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...)) db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime)) } else { // 設(shè)定key不過(guò)期 db.Persist(key) //寫(xiě)入日志 db.writeAof(aof.SetCmd(args...)) } return protocol.NewOkReply() } return protocol.NewNullBulkReply() }
可以看到,會(huì)調(diào)用上面剛才設(shè)定的db.writeAof
函數(shù),將當(dāng)前的命令保存到AOF中。所以我們實(shí)際看下 SaveRedisCommand
函數(shù)中具體在做什么事情。代碼路徑位于aof/aof.go
func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) { // 關(guān)閉 if aof.atomicClose.Load() { return } // 寫(xiě)入文件 & 刷盤(pán) if aof.aofFsync == FsyncAlways { record := aofRecord{ dbIndex: dbIndex, command: command, } aof.writeAofRecord(record) return } // 寫(xiě)入緩沖 aof.aofChan <- aofRecord{ dbIndex: dbIndex, command: command, } }
因?yàn)锳OF的刷盤(pán)(Sync)有三種模式:
寫(xiě)入 & 立即刷盤(pán)
寫(xiě)入 & 每秒刷盤(pán)
寫(xiě)入 & 不主動(dòng)刷盤(pán)(取決于操作系統(tǒng)自動(dòng)刷盤(pán))
如果配置的是always
模式,會(huì)立即執(zhí)行aof.writeAofRecord(record)
;否則就將數(shù)據(jù)先保存在緩沖aof.aofChan
中(這里其實(shí)又是生產(chǎn)者消費(fèi)者模型)最后在消費(fèi)協(xié)程中,執(zhí)行寫(xiě)入
func (aof *AOF) watchChan() { for record := range aof.aofChan { aof.writeAofRecord(record) } aof.aofFinished <- struct{}{} }
所以我們只需要看下 writeAofRecord
函數(shù)即可,其實(shí)就是把命令按照Redis 序列化協(xié)議
的格式,寫(xiě)入到文件中。給大家看下更直觀的演示圖:
再看下在 append.aof
文件中具體的數(shù)據(jù)格式:
這里有個(gè)很重要點(diǎn):因?yàn)锳OF文件是所有的*DB
對(duì)象復(fù)用的文件,寫(xiě)入的redis命令歸屬于不同的數(shù)據(jù)庫(kù)的
舉個(gè)例子: 比如在0號(hào)數(shù)據(jù)庫(kù),我們執(zhí)行set key value
,在3號(hào)數(shù)據(jù)庫(kù),我們執(zhí)行set key value
,在日志文件中會(huì)記錄兩條命令,但是這兩個(gè)命令其實(shí)是不同數(shù)據(jù)庫(kù)的命令。在恢復(fù)命令到數(shù)據(jù)庫(kù)的時(shí)候,應(yīng)該在不同的數(shù)據(jù)庫(kù)中執(zhí)行該命令。所以在記錄命令的時(shí)候,我們還要記錄下他的數(shù)據(jù)庫(kù)是什么?這樣恢復(fù)的時(shí)候,才能知道命令的數(shù)據(jù)庫(kù)的歸屬問(wèn)題。
func (aof *AOF) writeAofRecord(record aofRecord) { aof.mu.Lock() defer aof.mu.Unlock() // 因?yàn)閍of對(duì)象是所有數(shù)據(jù)庫(kù)對(duì)象【復(fù)用】寫(xiě)入文件方法,每個(gè)數(shù)據(jù)庫(kù)的索引不同 // 所以,每個(gè)命令的執(zhí)行,有個(gè)前提就是操作的不同的數(shù)據(jù)庫(kù) if record.dbIndex != aof.lastDBIndex { // 構(gòu)建select index 命令 & 寫(xiě)入文件 selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))} data := protocol.NewMultiBulkReply(selectCommand).ToBytes() _, err := aof.aofFile.Write(data) if err != nil { logger.Warn(err) return } aof.lastDBIndex = record.dbIndex } // redis命令 data := protocol.NewMultiBulkReply(record.command).ToBytes() _, err := aof.aofFile.Write(data) if err != nil { logger.Warn(err) } logger.Debugf("write aof command:%q", data) // 每次寫(xiě)入刷盤(pán) if aof.aofFsync == FsyncAlways { aof.aofFile.Sync() } }
AOF的加載過(guò)程
在服務(wù)啟動(dòng)的時(shí)候,將*.aof
文件中的命令,在服務(wù)端進(jìn)行重放,效果演示如下:
代碼路徑位于aof/aof.go
// 構(gòu)建AOF對(duì)象 func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) { //...省略... // 啟動(dòng)加載aof文件 if load { aof.LoadAof(0) } //...省略... }
aof.LoadAof(0)
函數(shù)的本質(zhì)就是從文件中,按照行讀取數(shù)據(jù)。如果看過(guò)之前的文章,這里其實(shí)復(fù)用了parser.ParseStream(reader)
函數(shù),負(fù)責(zé)從文件解析redis序列化協(xié)議
格式的命令,最后利用數(shù)據(jù)庫(kù)引擎,將命令數(shù)據(jù)保存到內(nèi)存中(命令重放)
func (aof *AOF) LoadAof(maxBytes int) { // 目的:當(dāng)加載aof文件的時(shí)候,因?yàn)樾枰獜?fù)用engine對(duì)象,內(nèi)部重放命令的時(shí)候會(huì)自動(dòng)寫(xiě)aof日志,加載aof 禁用 SaveRedisCommand的寫(xiě)入 aof.atomicClose.Store(true) deferfunc() { aof.atomicClose.Store(false) }() // 只讀打開(kāi)文件 file, err := os.Open(aof.aofFileName) if err != nil { logger.Error(err.Error()) return } defer file.Close() file.Seek(0, io.SeekStart) var reader io.Reader if maxBytes > 0 { // 限定讀取的字節(jié)大小 reader = io.LimitReader(file, int64(maxBytes)) } else { // 不限定,直接讀取到文件結(jié)尾(為止) reader = file } // 文件中保存的格式和網(wǎng)絡(luò)傳輸?shù)母袷揭恢? ch := parser.ParseStream(reader) virtualConn := connection.NewVirtualConn() for payload := range ch { if payload.Err != nil { // 文件已經(jīng)讀取到“完成“ if payload.Err == io.EOF { break } // 讀取到非法的格式 logger.Errorf("LoadAof parser error %+v:", payload.Err) continue } if payload.Reply == nil { logger.Error("empty payload data") continue } // 從文件中讀取到命令 reply, ok := payload.Reply.(*protocol.MultiBulkReply) if !ok { logger.Error("require multi bulk protocol") continue } // 利用數(shù)據(jù)庫(kù)引擎,將命令數(shù)據(jù)保存到內(nèi)存中(命令重放) ret := aof.engine.Exec(virtualConn, reply.RedisCommand) // 判斷是否執(zhí)行失敗 if protocol.IsErrReply(ret) { logger.Error("exec err ", string(ret.ToBytes())) } // 判斷命令是否是"select" if strings.ToLower(string(reply.RedisCommand[0])) == "select" { dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1])) if err == nil { aof.lastDBIndex = dbIndex // 記錄下數(shù)據(jù)恢復(fù)過(guò)程中,選中的數(shù)據(jù)庫(kù)索引 } } } }
AOF的重寫(xiě)過(guò)程
代碼路徑aof/rewrite.go
重寫(xiě)的過(guò)程就是下面的函數(shù)
func (aof *AOF) Rewrite(engine abstract.Engine) { //1.對(duì)現(xiàn)有的aof文件做一次快照 snapShot, err := aof.startRewrite() if err != nil { logger.Errorf("StartRewrite err: %+v", err) return } //2. 將現(xiàn)在的aof文件數(shù)據(jù),加在到新(內(nèi)存)對(duì)象中,并重寫(xiě)入新aof文件中 err = aof.doRewrite(snapShot, engine) if err != nil { logger.Errorf("doRewrite err: %+v", err) return } //3. 將重寫(xiě)過(guò)程中的增量命令寫(xiě)入到新文件中 err = aof.finishRewrite(snapShot) if err != nil { logger.Errorf("finishRewrite err: %+v", err) } }
整個(gè)的處理思想很重要:如下圖
總結(jié)
代碼的思路應(yīng)該還是比較清晰,但是細(xì)節(jié)上的處理非常容易讓人大腦宕機(jī)。建議還是看下源碼,邊看邊自己敲一下,感受是不一樣
項(xiàng)目代碼地址: https://github.com/gofish2020/easyredis
以上就是探索Golang實(shí)現(xiàn)Redis持久化AOF實(shí)例的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis持久化AOF的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Golang實(shí)現(xiàn)自己的Redis數(shù)據(jù)庫(kù)內(nèi)存實(shí)例探究
- Golang實(shí)現(xiàn)Redis過(guò)期時(shí)間實(shí)例探究
- 探索Golang?Redis實(shí)現(xiàn)發(fā)布訂閱功能實(shí)例
- Golang實(shí)現(xiàn)自己的Redis(有序集合跳表)實(shí)例探究
- Golang實(shí)現(xiàn)自己的Redis(TCP篇)實(shí)例探究
- Golang實(shí)現(xiàn)自己的Redis(pipeline客戶端)實(shí)例探索
- Golang實(shí)現(xiàn)Redis事務(wù)深入探究
- Golang實(shí)現(xiàn)Redis網(wǎng)絡(luò)協(xié)議實(shí)例探究
相關(guān)文章
Golang中crypto/ecdsa庫(kù)實(shí)現(xiàn)數(shù)字簽名和驗(yàn)證
本文主要介紹了Golang中crypto/ecdsa庫(kù)實(shí)現(xiàn)數(shù)字簽名和驗(yàn)證,將從ECDSA的基本原理出發(fā),詳細(xì)解析如何在Go語(yǔ)言中實(shí)現(xiàn)數(shù)字簽名和驗(yàn)證,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02GoFrame實(shí)現(xiàn)順序性校驗(yàn)示例詳解
這篇文章主要為大家介紹了GoFrame實(shí)現(xiàn)順序性校驗(yàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06golang開(kāi)發(fā)?gorilla?websocket的使用示例詳解
這篇文章主要介紹了golang開(kāi)發(fā)?gorilla?websocket的使用示例詳解,介紹了websocket的簡(jiǎn)單使用,我們使用的版本是1.3.0,具體操作方法跟隨小編一起學(xué)習(xí)吧2024-05-05Go語(yǔ)言使用kafka-go實(shí)現(xiàn)Kafka消費(fèi)消息
本篇文章主要介紹了使用kafka-go庫(kù)消費(fèi)Kafka消息,包含F(xiàn)etchMessage和ReadMessage的區(qū)別和適用場(chǎng)景,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的可以了解一下2024-12-12Golang中g(shù)oroutine和channel使用介紹深入分析
一次只做一件事情并不是完成任務(wù)最快的方法,一些大的任務(wù)可以拆解成若干個(gè)小任務(wù),goroutine可以讓程序同時(shí)處理幾個(gè)不同的任務(wù),goroutine使用channel來(lái)協(xié)調(diào)它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個(gè)goroutine就不會(huì)領(lǐng)先于另一個(gè)goroutine2023-01-01Go并發(fā)原語(yǔ)之SingleFlight請(qǐng)求合并方法實(shí)例
本文我們來(lái)學(xué)習(xí)一下 Go 語(yǔ)言的擴(kuò)展并發(fā)原語(yǔ):SingleFlight,SingleFlight 的作用是將并發(fā)請(qǐng)求合并成一個(gè)請(qǐng)求,以減少重復(fù)的進(jìn)程來(lái)優(yōu)化 Go 代碼2023-12-12Golang函數(shù)重試機(jī)制實(shí)現(xiàn)代碼
在編寫(xiě)應(yīng)用程序時(shí),有時(shí)候會(huì)遇到一些短暫的錯(cuò)誤,例如網(wǎng)絡(luò)請(qǐng)求、服務(wù)鏈接終端失敗等,這些錯(cuò)誤可能導(dǎo)致函數(shù)執(zhí)行失敗,這篇文章主要介紹了Golang函數(shù)重試機(jī)制實(shí)現(xiàn)代碼,需要的朋友可以參考下2024-04-04go實(shí)現(xiàn)thrift的網(wǎng)絡(luò)傳輸性能及需要注意問(wèn)題示例解析
這篇文章主要為大家介紹了go實(shí)現(xiàn)thrift的網(wǎng)絡(luò)傳輸性能及需要注意問(wèn)題示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09