欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang實現(xiàn)Redis事務(wù)深入探究

 更新時間:2024年01月24日 09:33:20   作者:紹納?nullbody筆記  
這篇文章主要介紹了Golang實現(xiàn)Redis事務(wù)深入探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

用11篇文章實現(xiàn)一個可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關(guān)注學(xué)習(xí)。

  • [x] easyredis之TCP服務(wù)
  • [x] easyredis之網(wǎng)絡(luò)請求序列化協(xié)議(RESP)
  • [x] easyredis之內(nèi)存數(shù)據(jù)庫
  • [x] easyredis之過期時間 (時間輪實現(xiàn))
  • [x] easyredis之持久化 (AOF實現(xiàn))
  • [x] easyredis之發(fā)布訂閱功能
  • [x] easyredis之有序集合(跳表實現(xiàn))
  • [x] easyredis之 pipeline 客戶端實現(xiàn)
  • [x] easyredis之事務(wù)(原子性/回滾)
  • [ ] easyredis之連接池
  • [ ] easyredis之分布式集群存儲

EasyRedis事務(wù)

本篇實現(xiàn)的Redis命令如下:

Multi : 開啟事務(wù)
Exec [rb]:執(zhí)行事務(wù)
Watch key [key...]:監(jiān)視key在事務(wù)執(zhí)行之前不能發(fā)生變更,如果事務(wù)執(zhí)行時發(fā)現(xiàn)有變更,事務(wù)終止執(zhí)行
Unwatch:取消監(jiān)視
Discard: 退出事務(wù)

如何實現(xiàn)事務(wù)的原子性和回滾?

  • 這里我們可以聯(lián)想MYSQL如何實現(xiàn)事務(wù)的原子性。當(dāng)mysql的存儲引擎采用Innodb并且事務(wù)的隔離級別為可重復(fù)讀(RR)的時候, MYSQL的讀可以分成【當(dāng)前讀】和【快照讀】;【當(dāng)前讀】采用的機制就是加鎖。【快照讀】的機制就是利用MVCC的原理,在開啟事務(wù)的時候,生成一個事務(wù)版本,并根據(jù)記錄中的undo log鏈表中的每個記錄的事務(wù)版本號比對,判斷自己是否可以讀寫該條記錄(這個相當(dāng)于無鎖)。如果對MYSQL的八股文不了解的可以看這篇文章 https://xiaolincoding.com/mysql/transaction/mvcc.html
  • 回滾如何實現(xiàn):很簡單,比如本次我們要插入一條記錄,回滾就刪除一條記錄。如果是修改一條記錄,那就把修改前的記錄記下來,回滾就將記錄恢復(fù)回去即可。如果是刪除記錄,那就把記錄先記錄下來,回滾就是再插入回去即可。

在本次的項目中,我們將使用【鎖】保證原子性,使用【記錄原始數(shù)據(jù)】的方式實現(xiàn)回滾。

我們依次看下各個命令的關(guān)鍵代碼:

本篇代碼的入口位于engine/database.go

func (db *DB) Exec(c abstract.Connection, redisCommand [][]byte) protocol.Reply {
	cmdName := strings.ToLower(string(redisCommand[0]))
	if cmdName == "multi" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return StartMulti(c) // 開啟事務(wù)
	} elseif cmdName == "discard" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return DiscardMulti(c) // 取消事務(wù)
	} elseif cmdName == "watch" {
		return Watch(db, c, redisCommand[1:]) // 監(jiān)視watch key [key...]
	} elseif cmdName == "unwatch" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return UnWatch(db, c) // 取消監(jiān)視
	} elseif cmdName == "exec" {
		return ExecMulti(db, c, redisCommand[1:]) // 執(zhí)行事務(wù)
	}
	// **事務(wù)模式** 將命令入隊到命令緩沖隊列中
	if c != nil && c.IsTransaction() {
		return EnqueueCmd(c, redisCommand)
	}
	// ** 普通模式 **
	return db.execNormalCommand(c, redisCommand)
}

multi 開啟事務(wù)

事務(wù)的執(zhí)行是基于當(dāng)前的socket連接,所以設(shè)定開啟事務(wù),其實就是設(shè)定socket連接為開始事務(wù)狀態(tài)(設(shè)定為true)

// 開啟事務(wù)
func StartMulti(c abstract.Connection) protocol.Reply {
	if c.IsTransaction() {
		return protocol.NewGenericErrReply("multi is already start,do not repeat it")
	}
	// 設(shè)定開啟
	c.SetTransaction(true)
	return protocol.NewOkReply()
}

// socket連接
func (k *KeepConnection) SetTransaction(val bool) {
	if !val { // 取消事務(wù)模式,清空隊列和watch key
		k.queue = nil
		k.watchKey = nil
		k.txErrors = nil
	}
	// 開啟事務(wù)狀態(tài)
	k.trx.Store(val)
}

一旦開啟事務(wù)以后,后續(xù)的redis命令不再執(zhí)行直接,而是會緩存起來。最后在執(zhí)行exec命令的時候一次性批量執(zhí)行。

//... 省略
// **事務(wù)模式** 將命令入隊到命令緩沖隊列中
	if c != nil && c.IsTransaction() {
		return EnqueueCmd(c, redisCommand)
	}
//... 省略

discard 取消事務(wù)

可以看到和上面的開啟是一個相反的操作,設(shè)置為false

// 取消事務(wù)
func DiscardMulti(c abstract.Connection) protocol.Reply {
	if !c.IsTransaction() {
		return protocol.NewGenericErrReply("DISCARD without MULTI")
	}
	// 取消開啟
	c.SetTransaction(false)
	return protocol.NewOkReply()
}

watch監(jiān)視

這個命令平時應(yīng)該用的不多。含義:監(jiān)視key,希望在執(zhí)行命令的時候沒有發(fā)生過變化。例如我出門之前,我在門上做了一個標(biāo)記,如果在我出門之后,門口的標(biāo)記沒有變動過,說明在我出門的這個期間沒有人來過。否則就認定有人來過我家。我就趕快報警,不進門了。那這個標(biāo)記的實現(xiàn)方案就是利用版本號的機制,針對key記錄一個值,執(zhí)行命令的時候,比對下該值(版本號)有沒有變化即可。

實際變更版本號的代碼,繼續(xù)往下看就可以看到。

// 監(jiān)視 key [key...]
func Watch(db *DB, conn abstract.Connection, args [][]byte) protocol.Reply {
	iflen(args) < 1 {
		return protocol.NewArgNumErrReply("WATCH")
	}
	if conn.IsTransaction() {
		return protocol.NewGenericErrReply("WATCH inside MULTI is not allowed")
	}
	watching := conn.GetWatchKey()
	for _, bkey := range args {
		key := string(bkey)
		watching[key] = db.GetVersion(key) // 保存當(dāng)前key的版本號(利用版本號機制判斷key是否有變化)
	}
	return protocol.NewOkReply()
}

Unwatch 取消監(jiān)視

// 清空watch key
func UnWatch(db *DB, conn abstract.Connection) protocol.Reply {
	conn.CleanWatchKey()
	return protocol.NewOkReply()
}

Exec 執(zhí)行事務(wù)

批量執(zhí)行命令,這里我自己設(shè)計了一個rb參數(shù)(標(biāo)準(zhǔn)里面沒有)表示如果執(zhí)行錯誤,自動回滾。默認情況下,命令會全部執(zhí)行,出現(xiàn)錯誤不會回滾

// 執(zhí)行事務(wù)  exec rb
func ExecMulti(db *DB, conn abstract.Connection, args [][]byte) protocol.Reply {
	// 說明當(dāng)前不是【事務(wù)模式】
	if !conn.IsTransaction() {
		return protocol.NewGenericErrReply("EXEC without MULTI")
	}
	// 執(zhí)行完,自動退出事務(wù)模式
	defer conn.SetTransaction(false)
	// 如果在入隊的時候,就有格式錯誤,直接返回
	iflen(conn.GetTxErrors()) > 0 {
		return protocol.NewGenericErrReply("EXECABORT Transaction discarded because of previous errors.")
	}
	// 是否自動回滾(這里是自定義的一個參數(shù),標(biāo)準(zhǔn)redis中沒有)
	isRollBack := false
	iflen(args) > 0 && strings.ToUpper(string(args[0])) == "RB" { // 有rb參數(shù),說明要自動回滾
		isRollBack = true
	}
	// 獲取所有的待執(zhí)行命令
	cmdLines := conn.GetQueuedCmdLine()
	return db.execMulti(conn, cmdLines, isRollBack)
}

前面就是一個基本的狀態(tài)判斷,最后真正執(zhí)行的函數(shù)是db.execMulti(conn, cmdLines, isRollBack)代碼如下:

  • 在代碼執(zhí)行前,將所有命令的key全部都提取出來,分成【讀key】【寫key】。例如set key value這里的key就是寫key. get key這里的key就是讀key?!緦慿ey】就是對數(shù)據(jù)會進行變更,【讀key】只是讀,不會對值發(fā)生變更。
  • 如果有watch key看下版本號是否發(fā)生改變
  • 接下來最重要的來了,就是對所有的key上鎖(原子性),保證所有的命令一起執(zhí)行(讀key上讀鎖,寫key上寫鎖)
  • 鎖上完以后,就執(zhí)行所有的命令。如果需要回滾,那么在命令執(zhí)行之前,將原始的數(shù)據(jù)通過db.GetUndoLog(cmdLine)函數(shù)記錄原始數(shù)據(jù)的回滾命令。
  • 如果需要執(zhí)行回滾命令(就【倒序】執(zhí)行所有的回滾命令)。還記得上面有對key加鎖吧,所以這里是可以直接執(zhí)行回滾(因為鎖保證原子性)
  • 最后對【寫key】更新版本號(表示發(fā)生了狀態(tài)變更),這里就和前面的記錄版本號對應(yīng)起來了
// 執(zhí)行事務(wù):本質(zhì)就是一堆命令一起執(zhí)行, isRollback 表示出錯是否回滾
func (db *DB) execMulti(conn abstract.Connection, cmdLines []CmdLine, isRollback bool) protocol.Reply {
	// 命令的執(zhí)行結(jié)果
	results := make([]protocol.Reply, len(cmdLines))
	versionKeys := make([][]string, len(cmdLines))
	var writeKeys []string
	var readKeys []string
	for idx, cmdLine := range cmdLines {
		cmdName := strings.ToLower(string(cmdLine[0]))
		cmd, ok := commandCenter[cmdName]
		if !ok {
			// 這里正常不會執(zhí)行
			continue
		}
		keyFunc := cmd.keyFunc
		readKs, writeKs := keyFunc(cmdLine[1:])
		// 讀寫key
		readKeys = append(readKeys, readKs...)
		writeKeys = append(writeKeys, writeKs...)
		// 寫key需要 變更版本號
		versionKeys[idx] = append(versionKeys[idx], writeKs...)
	}
	watchingKey := conn.GetWatchKey()
	if isWatchingChanged(db, watchingKey) { // 判斷watch key是否發(fā)生了變更
		return protocol.NewEmptyMultiBulkReply()
	}
	// 所有key上鎖(原子性)
	db.RWLock(readKeys, writeKeys)
	defer db.RWUnLock(readKeys, writeKeys)
	undoCmdLines := [][]CmdLine{}
	aborted := false
	for idx, cmdLine := range cmdLines {
		// 生成回滾命令
		if isRollback {
			undoCmdLines = append(undoCmdLines, db.GetUndoLog(cmdLine))
		}
		// 執(zhí)行命令
		reply := db.execWithLock(cmdLine)
		if protocol.IsErrReply(reply) { // 執(zhí)行出錯
			if isRollback { // 需要回滾
				undoCmdLines = undoCmdLines[:len(undoCmdLines)-1] // 命令執(zhí)行失?。ú挥没貪L),剔除最后一個回滾命令
				aborted = true
				break
			}
		}
		// 執(zhí)行結(jié)果
		results[idx] = reply
	}
	// 中斷,執(zhí)行回滾
	if aborted {
		size := len(undoCmdLines)
		// 倒序執(zhí)行回滾指令(完成回滾)
		for i := size - 1; i >= 0; i-- {
			curCmdLines := undoCmdLines[i]
			iflen(curCmdLines) == 0 {
				continue
			}
			for _, cmdLine := range curCmdLines {
				db.execWithLock(cmdLine)
			}
		}
		return protocol.NewGenericErrReply("EXECABORT Transaction discarded because of previous errors.")
	}
	// 執(zhí)行到這里,說明命令執(zhí)行完成(可能全部成功,也可能部分成功)
	for idx, keys := range versionKeys {
		if !protocol.IsErrReply(results[idx]) { // 針對執(zhí)行成功的命令(寫命令),變更版本號
			db.addVersion(keys...)
		}
	}
	// 將多個命令執(zhí)行的結(jié)果,進行合并返回
	mixReply := protocol.NewMixReply()
	mixReply.Append(results...)
	return mixReply
}

效果演示(無回滾)

效果演示(有回滾)

因為set key 1將key的內(nèi)存對象設(shè)定為字符串; 而zadd key 1 member操作的內(nèi)存對象是有序集合; 內(nèi)存對象不對,所以不能執(zhí)行。這就是出錯的原因。

項目代碼地址: https://github.com/gofish2020/easyredis 

以上就是Golang實現(xiàn)Redis事務(wù)深入探究的詳細內(nèi)容,更多關(guān)于Golang Redis事務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang import 導(dǎo)入包語法及一些特殊用法詳解

    Golang import 導(dǎo)入包語法及一些特殊用法詳解

    這篇文章主要介紹了Golang import 導(dǎo)入包語法及一些特殊用法,需要的朋友可以參考下
    2020-02-02
  • Golang開發(fā)gRPC服務(wù)入門介紹

    Golang開發(fā)gRPC服務(wù)入門介紹

    這篇文章主要介紹了Golang開發(fā)gRPC服務(wù),Golang開發(fā)gRPC應(yīng)用程序的套路也已經(jīng)很清晰,這篇文章就來做一個簡單的介紹,算是入門,需要的朋友可以參考下
    2022-04-04
  • Go依賴注入DI工具wire使用詳解(golang常用庫包)

    Go依賴注入DI工具wire使用詳解(golang常用庫包)

    依賴注入是指程序運行過程中,如果需要調(diào)用另一個對象協(xié)助時,無須在代碼中創(chuàng)建被調(diào)用者,而是依賴于外部的注入,本文結(jié)合示例代碼給大家介紹Go依賴注入DI工具wire使用,感興趣的朋友一起看看吧
    2022-04-04
  • 關(guān)于Go語言中特有的設(shè)計模式與實現(xiàn)方式講解

    關(guān)于Go語言中特有的設(shè)計模式與實現(xiàn)方式講解

    雖然Go語言沒有像其他語言那樣明確的設(shè)計模式,但在實踐中,開發(fā)者們?nèi)匀话l(fā)現(xiàn)了一些在Go語言中特別適用的設(shè)計模式和實現(xiàn)方式,本文就來和大家一一進行講解
    2023-05-05
  • Go語言中println和fmt.Println區(qū)別

    Go語言中println和fmt.Println區(qū)別

    本文主要介紹了Go語言中println和fmt.Println區(qū)別,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • Golang利用Recover進行錯誤處理

    Golang利用Recover進行錯誤處理

    Golang?中的?recover?是一個鮮為人知但非常有趣和強大的功能,這篇文章小編就來帶大家深入了解一下在Golang中是如何利用Recover進行錯誤處理吧
    2023-12-12
  • Go語言如何實現(xiàn)線程安全的Map

    Go語言如何實現(xiàn)線程安全的Map

    Go語言內(nèi)置的map雖然高效,但并不是線程安全的,若在多線程環(huán)境中直接操作map,可能會引發(fā)并發(fā)寫入的錯誤,下面我們就來看看如何實現(xiàn)線程安全的Map吧
    2024-11-11
  • 一文詳解Go語言io.LimitedReader類型

    一文詳解Go語言io.LimitedReader類型

    這篇文章主要為大家介紹了Go語言io.LimitedReader類型示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-07-07
  • golang連接kafka消費進ES操作

    golang連接kafka消費進ES操作

    這篇文章主要介紹了golang連接kafka消費進ES操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Golang實現(xiàn)自己的Redis(有序集合跳表)實例探究

    Golang實現(xiàn)自己的Redis(有序集合跳表)實例探究

    這篇文章主要為大家介紹了Golang實現(xiàn)自己的Redis(有序集合跳表)實例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2024-01-01

最新評論