探索Golang實現(xiàn)Redis持久化AOF實例
引言
用11篇文章實現(xiàn)一個可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關(guān)注學習。
[x] easyredis之TCP服務(wù)
[x] easyredis之網(wǎng)絡(luò)請求序列化協(xié)議(RESP)
[x] easyredis之內(nèi)存數(shù)據(jù)庫
[x] easyredis之過期時間 (時間輪實現(xiàn))
[x] easyredis之持久化 (AOF實現(xiàn))
[ ] easyredis之發(fā)布訂閱功能
[ ] easyredis之有序集合(跳表實現(xiàn))
[ ] easyredis之 pipeline 客戶端實現(xiàn)
[ ] easyredis之事務(wù)(原子性/回滾)
[ ] easyredis之連接池
[ ] easyredis之分布式集群存儲
【第五篇】EasyRedis之持久化AOF
AOF全稱Append Only File,就是將寫相關(guān)的命令,追加保存到文件中,當服務(wù)器重啟以后,將文件中的命令在服務(wù)端重放(重新執(zhí)行恢復數(shù)據(jù)),實現(xiàn)的一種持久化方式。
本篇通過3個部分講解AOF的實現(xiàn):
AOF的寫入過程
AOF的加載過程
AOF的重寫過程
AOF的寫入過程
在核心的數(shù)據(jù)結(jié)構(gòu) Engine中新增一個 aof *AOF對象
// 存儲引擎,負責數(shù)據(jù)的CRUD
type Engine struct {
// *DB
dbSet []*atomic.Value
// 時間輪(延遲任務(wù))
delay *timewheel.Delay
// Append Only File
aof *aof.AOF
}
在初始化函數(shù)func NewEngine() *Engine中,會基于是否啟用AOF日志,決定 aof *aof.AOF的初始化
func NewEngine() *Engine {
//.....省略....
// 啟用AOF日志
if conf.GlobalConfig.AppendOnly {
// 創(chuàng)建*AOF對象
aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync)
if err != nil {
panic(err)
}
engine.aof = aof
// 設(shè)定每個db,使用aof寫入日志
engine.aofBindEveryDB()
}
return engine
}因為實際執(zhí)行redis命令的對象是 *DB,所以會對每個*DB對象設(shè)定db.writeAof函數(shù)指針
func (e *Engine) aofBindEveryDB() {
for _, dbSet := range e.dbSet {
db := dbSet.Load().(*DB)
db.writeAof = func(redisCommand [][]byte) {
if conf.GlobalConfig.AppendOnly {
// 調(diào)用e.aof對象方法,保存命令
e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand))
}
}
}
}例如,當我們執(zhí)行 set key value命令的時候,實際會執(zhí)行 func cmdSet(db *DB, args [][]byte) protocol.Reply
func cmdSet(db *DB, args [][]byte) protocol.Reply {
//.....省略....
if result > 0 { // 1 表示存儲成功
//TODO: 過期時間處理
if ttl != nolimitedTTL { // 設(shè)定key過期
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.ExpireAt(key, expireTime)
//寫入日志
db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...))
db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime))
} else { // 設(shè)定key不過期
db.Persist(key)
//寫入日志
db.writeAof(aof.SetCmd(args...))
}
return protocol.NewOkReply()
}
return protocol.NewNullBulkReply()
}可以看到,會調(diào)用上面剛才設(shè)定的db.writeAof函數(shù),將當前的命令保存到AOF中。所以我們實際看下 SaveRedisCommand函數(shù)中具體在做什么事情。代碼路徑位于aof/aof.go
func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) {
// 關(guān)閉
if aof.atomicClose.Load() {
return
}
// 寫入文件 & 刷盤
if aof.aofFsync == FsyncAlways {
record := aofRecord{
dbIndex: dbIndex,
command: command,
}
aof.writeAofRecord(record)
return
}
// 寫入緩沖
aof.aofChan <- aofRecord{
dbIndex: dbIndex,
command: command,
}
}因為AOF的刷盤(Sync)有三種模式:
寫入 & 立即刷盤
寫入 & 每秒刷盤
寫入 & 不主動刷盤(取決于操作系統(tǒng)自動刷盤)
如果配置的是always模式,會立即執(zhí)行aof.writeAofRecord(record);否則就將數(shù)據(jù)先保存在緩沖aof.aofChan中(這里其實又是生產(chǎn)者消費者模型)最后在消費協(xié)程中,執(zhí)行寫入
func (aof *AOF) watchChan() {
for record := range aof.aofChan {
aof.writeAofRecord(record)
}
aof.aofFinished <- struct{}{}
}所以我們只需要看下 writeAofRecord函數(shù)即可,其實就是把命令按照Redis 序列化協(xié)議的格式,寫入到文件中。給大家看下更直觀的演示圖:

再看下在 append.aof文件中具體的數(shù)據(jù)格式:

這里有個很重要點:因為AOF文件是所有的*DB對象復用的文件,寫入的redis命令歸屬于不同的數(shù)據(jù)庫的
舉個例子: 比如在0號數(shù)據(jù)庫,我們執(zhí)行set key value,在3號數(shù)據(jù)庫,我們執(zhí)行set key value,在日志文件中會記錄兩條命令,但是這兩個命令其實是不同數(shù)據(jù)庫的命令。在恢復命令到數(shù)據(jù)庫的時候,應(yīng)該在不同的數(shù)據(jù)庫中執(zhí)行該命令。所以在記錄命令的時候,我們還要記錄下他的數(shù)據(jù)庫是什么?這樣恢復的時候,才能知道命令的數(shù)據(jù)庫的歸屬問題。
func (aof *AOF) writeAofRecord(record aofRecord) {
aof.mu.Lock()
defer aof.mu.Unlock()
// 因為aof對象是所有數(shù)據(jù)庫對象【復用】寫入文件方法,每個數(shù)據(jù)庫的索引不同
// 所以,每個命令的執(zhí)行,有個前提就是操作的不同的數(shù)據(jù)庫
if record.dbIndex != aof.lastDBIndex {
// 構(gòu)建select index 命令 & 寫入文件
selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))}
data := protocol.NewMultiBulkReply(selectCommand).ToBytes()
_, err := aof.aofFile.Write(data)
if err != nil {
logger.Warn(err)
return
}
aof.lastDBIndex = record.dbIndex
}
// redis命令
data := protocol.NewMultiBulkReply(record.command).ToBytes()
_, err := aof.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
logger.Debugf("write aof command:%q", data)
// 每次寫入刷盤
if aof.aofFsync == FsyncAlways {
aof.aofFile.Sync()
}
}AOF的加載過程
在服務(wù)啟動的時候,將*.aof文件中的命令,在服務(wù)端進行重放,效果演示如下:

代碼路徑位于aof/aof.go
// 構(gòu)建AOF對象
func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) {
//...省略...
// 啟動加載aof文件
if load {
aof.LoadAof(0)
}
//...省略...
}aof.LoadAof(0)函數(shù)的本質(zhì)就是從文件中,按照行讀取數(shù)據(jù)。如果看過之前的文章,這里其實復用了parser.ParseStream(reader)函數(shù),負責從文件解析redis序列化協(xié)議格式的命令,最后利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內(nèi)存中(命令重放)
func (aof *AOF) LoadAof(maxBytes int) {
// 目的:當加載aof文件的時候,因為需要復用engine對象,內(nèi)部重放命令的時候會自動寫aof日志,加載aof 禁用 SaveRedisCommand的寫入
aof.atomicClose.Store(true)
deferfunc() {
aof.atomicClose.Store(false)
}()
// 只讀打開文件
file, err := os.Open(aof.aofFileName)
if err != nil {
logger.Error(err.Error())
return
}
defer file.Close()
file.Seek(0, io.SeekStart)
var reader io.Reader
if maxBytes > 0 { // 限定讀取的字節(jié)大小
reader = io.LimitReader(file, int64(maxBytes))
} else { // 不限定,直接讀取到文件結(jié)尾(為止)
reader = file
}
// 文件中保存的格式和網(wǎng)絡(luò)傳輸?shù)母袷揭恢?
ch := parser.ParseStream(reader)
virtualConn := connection.NewVirtualConn()
for payload := range ch {
if payload.Err != nil {
// 文件已經(jīng)讀取到“完成“
if payload.Err == io.EOF {
break
}
// 讀取到非法的格式
logger.Errorf("LoadAof parser error %+v:", payload.Err)
continue
}
if payload.Reply == nil {
logger.Error("empty payload data")
continue
}
// 從文件中讀取到命令
reply, ok := payload.Reply.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
// 利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內(nèi)存中(命令重放)
ret := aof.engine.Exec(virtualConn, reply.RedisCommand)
// 判斷是否執(zhí)行失敗
if protocol.IsErrReply(ret) {
logger.Error("exec err ", string(ret.ToBytes()))
}
// 判斷命令是否是"select"
if strings.ToLower(string(reply.RedisCommand[0])) == "select" {
dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1]))
if err == nil {
aof.lastDBIndex = dbIndex // 記錄下數(shù)據(jù)恢復過程中,選中的數(shù)據(jù)庫索引
}
}
}
}AOF的重寫過程
代碼路徑aof/rewrite.go重寫的過程就是下面的函數(shù)
func (aof *AOF) Rewrite(engine abstract.Engine) {
//1.對現(xiàn)有的aof文件做一次快照
snapShot, err := aof.startRewrite()
if err != nil {
logger.Errorf("StartRewrite err: %+v", err)
return
}
//2. 將現(xiàn)在的aof文件數(shù)據(jù),加在到新(內(nèi)存)對象中,并重寫入新aof文件中
err = aof.doRewrite(snapShot, engine)
if err != nil {
logger.Errorf("doRewrite err: %+v", err)
return
}
//3. 將重寫過程中的增量命令寫入到新文件中
err = aof.finishRewrite(snapShot)
if err != nil {
logger.Errorf("finishRewrite err: %+v", err)
}
}整個的處理思想很重要:如下圖

總結(jié)
代碼的思路應(yīng)該還是比較清晰,但是細節(jié)上的處理非常容易讓人大腦宕機。建議還是看下源碼,邊看邊自己敲一下,感受是不一樣
項目代碼地址: https://github.com/gofish2020/easyredis
以上就是探索Golang實現(xiàn)Redis持久化AOF實例的詳細內(nèi)容,更多關(guān)于Golang Redis持久化AOF的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證
本文主要介紹了Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證,將從ECDSA的基本原理出發(fā),詳細解析如何在Go語言中實現(xiàn)數(shù)字簽名和驗證,具有一定的參考價值,感興趣的可以了解一下2024-02-02
golang開發(fā)?gorilla?websocket的使用示例詳解
這篇文章主要介紹了golang開發(fā)?gorilla?websocket的使用示例詳解,介紹了websocket的簡單使用,我們使用的版本是1.3.0,具體操作方法跟隨小編一起學習吧2024-05-05
Go語言使用kafka-go實現(xiàn)Kafka消費消息
本篇文章主要介紹了使用kafka-go庫消費Kafka消息,包含F(xiàn)etchMessage和ReadMessage的區(qū)別和適用場景,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的可以了解一下2024-12-12
Golang中g(shù)oroutine和channel使用介紹深入分析
一次只做一件事情并不是完成任務(wù)最快的方法,一些大的任務(wù)可以拆解成若干個小任務(wù),goroutine可以讓程序同時處理幾個不同的任務(wù),goroutine使用channel來協(xié)調(diào)它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個goroutine就不會領(lǐng)先于另一個goroutine2023-01-01
Go并發(fā)原語之SingleFlight請求合并方法實例
本文我們來學習一下 Go 語言的擴展并發(fā)原語:SingleFlight,SingleFlight 的作用是將并發(fā)請求合并成一個請求,以減少重復的進程來優(yōu)化 Go 代碼2023-12-12
go實現(xiàn)thrift的網(wǎng)絡(luò)傳輸性能及需要注意問題示例解析
這篇文章主要為大家介紹了go實現(xiàn)thrift的網(wǎng)絡(luò)傳輸性能及需要注意問題示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09

