使用go語言實現(xiàn)Redis持久化的示例代碼
redis
是一個內(nèi)存數(shù)據(jù)庫,如果你把進程殺掉,那么里面存儲的數(shù)據(jù)都會消失,那么這篇文章就是來解決 redis
持久化的問題
我們在 redis.conf
文件中增加兩個配置
appendonly yes appendfilename appendonly.aof
appenonly
表示只追加appendfilename
表示追加到那什么文件中
指令: *3\r\n$3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nvalue\r\n
落在 appendonly.aof
文件中
*3 $3 SET $3 KEY $5 value
這里要實現(xiàn)的功能就是把用戶發(fā)過來的指令,用 RESP
的形式記錄在 appendonly.aof
文件中
這個文件是在機器的硬盤上,當 redis
停了之后,內(nèi)存中的數(shù)據(jù)都沒了,但這個文件會保存下
redis
重啟后,會讀取這個文件,把之前內(nèi)存中的數(shù)據(jù)再次加載回來
定義 AofHandler
在項目下新建文件 aof/aof.go
在里面定義一個 AofHandler
結(jié)構(gòu)體,它的作用就是用來處理 appendonly.aof
文件
type AofHandler struct { database databaseface.Database // 持有 db,db 有業(yè)務(wù)核心 aofFile *os.File // 持有 aof 文件 aofFilename string // aof 文件名 currentDB int // 當前 db aofChan chan *payload // 寫文件的緩沖區(qū) }
這里有注意的是 aofChan
,它是寫文件的緩沖區(qū)
因為從文件中讀取指令,指令是非常密集的,但是將指令寫入硬盤時非常慢的,我們又不可能每次都等待指令寫完成后再去操作 redis
這時我們就把所有想寫入 aof
文件的指令放到 aofChan
中,然后在另一個 goroutine
中去寫入硬盤
所以這個 aofChan
的類型是 payload
結(jié)構(gòu)體
type CmdLine = [][]byte type payload struct { cmdLine CmdLine // 指令 dbIndex int // db 索引 }
AofHandler
結(jié)構(gòu)體定義好之后,我們需要定義一個 NewAofHandler
函數(shù)來初始化 AofHandler
結(jié)構(gòu)體
還需要定義一個 AddAof
方法,用來往 aofChan
中添加指令
放到緩沖區(qū)之后,還需要一個方法 HandleAof
將指令寫入硬盤
最后還要實現(xiàn)一個從硬盤加載 aof
文件到內(nèi)存的的函數(shù) LoadAof
實現(xiàn) NewAofHandler
NewAofHandler
函數(shù)用來初始化 AofHandler
結(jié)構(gòu)體
func NewAofHandler(database databaseface.Database) (*AofHandler, error) { // 初始化 AofHandler 結(jié)構(gòu)體 handler := &AofHandler{} // 從配置文件中讀取 aof 文件名 handler.aofFilename = config.Properties.AppendFilename // 持有 db handler.database = database // 從硬盤加載 aof 文件 handler.LoadAof() // 打開 aof 文件, 如果不存在則創(chuàng)建 aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } // 持有 aof 文件 handler.aofFile = aofFile // 初始化 aofChan handler.aofChan = make(chan *payload, aofBufferSize) // 啟動一個 goroutine 處理 aofChan go func() { handler.HandleAof() }() // 返回 AofHandler 結(jié)構(gòu)體 return handler, nil }
實現(xiàn) AddAof
AddAof
方法用來往 aofChan
中添加指令,它不做落盤的操作
因為在執(zhí)行指令的時候,等待它落盤的話,效率太低了,所以我們把指令放到 aofChan
中,然后在另一個 goroutine
中去處理
func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { // 如果配置文件中的 appendonly 為 true 并且 aofChan 不為 nil if config.Properties.AppendOnly && handler.aofChan != nil { // 往 aofChan 中添加指令 handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } }
實現(xiàn) HandleAof
HandleAof
方法用來處理 aofChan
中的指令,將指令寫入硬盤
currentDB
記錄的是當前工作的 DB
,如果切換了 DB
,會在 aof
文件中插入 select 0
這樣切換 DB
的語句
func (handler *AofHandler) HandleAof() { // 初始化 currentDB handler.currentDB = 0 // 遍歷 chan for p := range handler.aofChan { // 如果當前 db 不等于上一次工作的 db,就要插入一條 select 語句 if p.dbIndex != handler.currentDB { // 我們要把 select 0 編碼成 RESP 格式 // 也就是 *2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() // 寫入 aof 文件 _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue } // 更新 currentDB handler.currentDB = p.dbIndex } // 這里是插入正常的指令 data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() // 寫入 aof 文件 _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } }
實現(xiàn) Aof 落盤功能
我們之前在實現(xiàn)指令的部分,都是直接執(zhí)行指令,現(xiàn)在我們要把指令寫入 aof
文件
我們在 StandaloneDatabase
結(jié)構(gòu)體中增加一個 aofHandler
字段
type StandaloneDatabase struct { dbSet []*DB aofHandler *aof.AofHandler // 增加落盤功能 }
然后新建 database
時需要對 aofHandler
進行初始化
func NewStandaloneDatabase() *StandaloneDatabase { // ... // 先看下配置文件中的 appendonly 是否為 true if config.Properties.AppendOnly { // 初始化 aofHandler aofHandler, err := aof.NewAofHandler(database) if err != nil { panic(err) } // 持有 aofHandler database.aofHandler = aofHandler // 遍歷 dbSet for _, db := range database.dbSet { // 解決閉包問題 sdb := db // 為每個 db 添加 AddAof 方法 // 這個 addAof 方法是在執(zhí)行指令的時候調(diào)用的 sdb.addAof = func(line CmdLine) { database.aofHandler.AddAof(sdb.index, line) } } } return database }
這里要注意的是 addAof
方法,它是在執(zhí)行指令的時候調(diào)用的
因為我們需要在指令中調(diào)用 Addaof
函數(shù),實現(xiàn)指令寫入 aof
文件
但是在指令中,???們只能拿到 db
,db
上又沒有操作 aof
相關(guān)的方法,所以我們需要在 db
中增加一個 addAof
方法
type DB struct { index int // 數(shù)據(jù)的編號 data dict.Dict // 數(shù)據(jù)類型 addAof func(CmdLine) // 每個 db 都有一個 addAof 方法 }
然后就在需要落盤的指令中調(diào)用 addAof
方法
DEL
方法需要記錄下來,因為 DEL
方法是刪除數(shù)據(jù)的,如果不記錄下來,那么 aof
文件中的數(shù)據(jù)就會和內(nèi)存中的數(shù)據(jù)不一致
// DEL K1 K2 K3 func DEL(db *DB, args [][]byte) resp.Reply { deleted := db.Removes(keys...) // delete 大于 0 說明有數(shù)據(jù)被刪除 if deleted > 0 { db.addAof(utils.ToCmdLine2("DEL", args...)) } }
FLUSHDB
方法也需要記錄下來,因為 FLUSHDB
方法是刪除當前 DB
中的所有數(shù)據(jù)
// FLUSHDB func FLUSHDB(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("FLUSEHDB", args...)) }
RENAME
和 RENAMENX
方法也需要記錄下來,因為這兩個方法是修改 key
的名字
// RENAME K1 K2 func RENAME(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("RENAME", args...)) } // RENAMENX K1 K2 func RENAMENX(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("RENAMENX", args...)) }
SET
和 SETNX
方法也需要記錄下來,因為這兩個方法是設(shè)置數(shù)據(jù)的
// SET K1 v func SET(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("SET", args...)) } // SETNX K1 v func SETNX(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("SETNX", args...)) }
GETSET
方法也需要記錄下來,因為這個方法是設(shè)置數(shù)據(jù)的同時返回舊數(shù)據(jù)
// GETSET K1 v1 func GETSET(db *DB, args [][]byte) resp.Reply { db.addAof(utils.ToCmdLine2("GETSET", args...)) }
實現(xiàn) LoadAof
LoadAof
方法用來從硬盤加載 aof
文件到內(nèi)存
aof
中的指令是符合 RESP
協(xié)議的,我們就可以把這些指令當成用戶發(fā)過來的指令,執(zhí)行就可以了
func (handler *AofHandler) LoadAof() { // 打開 aof 文件 file, err := os.Open(handler.aofFilename) if err != nil { logger.Error(err) return } // 關(guān)閉文件 defer func() { _ = file.Close() }() // 創(chuàng)建一個 RESP 解析器,將 file 傳入,解析后的指令會放到 chan 中 ch := parser.ParseStream(file) fackConn := &connection.Connection{} // 遍歷 chan,執(zhí)行指令 for p := range ch { if p.Err != nil { // 如果是 EOF,說明文件讀取完畢 if p.Err == io.EOF { break } logger.Error(err) continue } if p.Data == nil { logger.Error("empty payload") continue } // 將指令轉(zhuǎn)換成 MultiBulkReply 類型 r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("exec multi mulk") continue } // 執(zhí)行指令 rep := handler.database.Exec(fackConn, r.Args) if reply.IsErrReply(rep) { logger.Error(rep) } } }
到此這篇關(guān)于使用go語言實現(xiàn)Redis持久化的示例代碼的文章就介紹到這了,更多相關(guān)go實現(xiàn)Redis持久化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go錯誤處理之panic函數(shù)和recover函數(shù)使用及捕獲異常方法
這篇文章主要介紹了Go錯誤處理之panic函數(shù)使用及捕獲,本篇探討了如何使用 panic 和 recover 來處理 Go 語言中的異常,需要的朋友可以參考下2023-03-03Golang編程并發(fā)工具庫MapReduce使用實踐
這篇文章主要為大家介紹了Golang并發(fā)工具庫MapReduce的使用實踐,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04Go語言CSP并發(fā)模型goroutine及channel底層實現(xiàn)原理
這篇文章主要為大家介紹了Go語言CSP并發(fā)模型goroutine?channel底層實現(xiàn)原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05Go語言k8s?kubernetes使用leader?election實現(xiàn)選舉
這篇文章主要為大家介紹了Go語言?k8s?kubernetes?使用leader?election選舉,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10Go語言七篇入門教程二程序結(jié)構(gòu)與數(shù)據(jù)類型
這篇文章主要為大家介紹了Go語言的程序結(jié)構(gòu)與數(shù)據(jù)類型,本篇文章是Go語言七篇入門系列文,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11