一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化
GO實(shí)現(xiàn)Redis的AOF持久化
將用戶發(fā)來的指令以RESP協(xié)議的形式存儲在本地的AOF文件,重啟Redis后執(zhí)行此文件恢復(fù)數(shù)據(jù)
https://github.com/csgopher/go-redis
本文涉及以下文件:redis.conf:配置文件
aof:實(shí)現(xiàn)aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int } func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } } func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer file.Close() ch := parser.ParseStream(file) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } }
- AofHandler:1.從管道中接收數(shù)據(jù) 2.寫入AOF文件
- AddAof:用戶的指令包裝成payload放入管道
- handleAof:將管道中的payload寫入磁盤
- LoadAof:重啟Redis后加載aof文件
database/database.go
type Database struct { dbSet []*DB aofHandler *aof.AofHandler } func NewDatabase() *Database { mdb := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } mdb.dbSet = make([]*DB, config.Properties.Databases) for i := range mdb.dbSet { singleDB := makeDB() singleDB.index = i mdb.dbSet[i] = singleDB } if config.Properties.AppendOnly { aofHandler, err := aof.NewAOFHandler(mdb) if err != nil { panic(err) } mdb.aofHandler = aofHandler for _, db := range mdb.dbSet { singleDB := db singleDB.addAof = func(line CmdLine) { mdb.aofHandler.AddAof(singleDB.index, line) } } } return mdb }
將AOF加入到database里
使用singleDB的原因:因?yàn)樵谘h(huán)中獲取返回變量的地址都完全相同,因此當(dāng)我們想要訪問數(shù)組中元素所在的地址時(shí),不應(yīng)該直接獲取 range 返回的變量地址 db,而應(yīng)該使用 singleDB := db
database/db.go
type DB struct { index int data dict.Dict addAof func(CmdLine) } func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), addAof: func(line CmdLine) {}, } return db }
由于分?jǐn)?shù)據(jù)庫db引用不到aof,所以添加一個(gè)addAof匿名函數(shù),在NewDatabase中用這個(gè)匿名函數(shù)調(diào)用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { ...... if deleted > 0 { db.addAof(utils.ToCmdLine2("del", args...)) } return reply.MakeIntReply(int64(deleted)) } func execFlushDB(db *DB, args [][]byte) resp.Reply { db.Flush() db.addAof(utils.ToCmdLine2("flushdb", args...)) return &reply.OkReply{} } func execRename(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("rename", args...)) return &reply.OkReply{} } func execRenameNx(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("renamenx", args...)) return reply.MakeIntReply(1) }
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("set", args...)) return &reply.OkReply{} } func execSetNX(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("setnx", args...)) return reply.MakeIntReply(int64(result)) } func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, exists := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{Data: value}) db.addAof(utils.ToCmdLine2("getset", args...)) ...... }
添加addAof方法
測試命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
到此這篇關(guān)于一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化的文章就介紹到這了,更多相關(guān)GO實(shí)現(xiàn)Redis AOF持久化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一文帶你掌握掌握 Golang結(jié)構(gòu)體與方法
在 Golang 中,結(jié)構(gòu)體和方法是實(shí)現(xiàn)面向?qū)ο缶幊痰闹匾M成部分,也是 Golang 的核心概念之一。在本篇文章中,我們將深入介紹 Golang 結(jié)構(gòu)體與方法的概念、使用方法以及相關(guān)的編程技巧和最佳實(shí)踐2023-04-04GoLang使goroutine停止的五種方法實(shí)例
goroutine是Go并行設(shè)計(jì)的核心,下面這篇文章主要給大家介紹了關(guān)于GoLang使goroutine停止的五種方法,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Go語言函數(shù)的延遲調(diào)用(Deferred Code)詳解
本文將介紹Go語言函數(shù)和方法中的延遲調(diào)用,正如名稱一樣,這部分定義不會立即執(zhí)行,一般會在函數(shù)返回前再被調(diào)用,我們通過一些示例來了解一下延遲調(diào)用的使用場景2022-07-07Go語言基本的語法和內(nèi)置數(shù)據(jù)類型初探
這篇文章主要介紹了Go語言基本的語法和內(nèi)置數(shù)據(jù)類型,是golang入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-10-10探索Golang實(shí)現(xiàn)Redis持久化AOF實(shí)例
這篇文章主要為大家介紹了Golang實(shí)現(xiàn)Redis持久化AOF實(shí)例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01windows下使用vscode搭建golang環(huán)境并調(diào)試的過程
這篇文章主要介紹了在windows下使用vscode搭建golang環(huán)境并進(jìn)行調(diào)試,主要包括安裝方法及環(huán)境變量配置技巧,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09