Golang實現(xiàn)自己的Redis(TCP篇)實例探究
引言
用11篇文章實現(xiàn)一個可用的Redis服務,姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關注學習。
- [x] easyredis之TCP服務
- [ ] easyredis之網(wǎng)絡請求序列化協(xié)議(RESP)
- [ ] easyredis之內(nèi)存數(shù)據(jù)庫
- [ ] easyredis之過期時間 (時間輪實現(xiàn))
- [ ] easyredis之持久化 (AOF實現(xiàn))
- [ ] easyredis之發(fā)布訂閱功能
- [ ] easyredis之有序集合(跳表實現(xiàn))
- [ ] easyredis之 pipeline 客戶端實現(xiàn)
- [ ] easyredis之事務(原子性/回滾)
- [ ] easyredis之連接池
- [ ] easyredis之分布式集群存儲
EasyRedis之TCP服務
通過本篇文章可以學到什么?
- 如何構建一個日志庫(包括:生產(chǎn)者/消費者模型)
- 如何解析一個redis的conf配置文件(包括:文件按行讀取/reflect的使用)
- 如何實現(xiàn)一個TCP服務(包括:tcp服務的編寫/服務優(yōu)雅退出)
日志庫實現(xiàn)
代碼路徑: tool/logger
代碼設計的思路:生產(chǎn)者消費者模型
writeLog
負責將數(shù)據(jù)保存到logMsgChan chan *logMessage
通道中(生產(chǎn)者)- 啟動單獨的goroutine從
logMsgChan chan *logMessage
中讀取數(shù)據(jù)(消費者),同時將日志輸出到文件or命令行中 - 好處在于:解耦、通過寫入緩沖而非直接輸出到文件,提升寫入并發(fā)能力
日志打印效果:不同的日志級別用不同的顏色區(qū)分
對外提供通用的日志函數(shù)
func Debug(msg string) { if defaultLogger.logLevel >= DEBUG { defaultLogger.writeLog(DEBUG, callerDepth, msg) } } func Debugf(format string, v ...any) { if defaultLogger.logLevel >= DEBUG { msg := fmt.Sprintf(format, v...) defaultLogger.writeLog(DEBUG, callerDepth, msg) } } func Info(msg string) { if defaultLogger.logLevel >= INFO { defaultLogger.writeLog(INFO, callerDepth, msg) } } func Infof(format string, v ...any) { if defaultLogger.logLevel >= INFO { msg := fmt.Sprintf(format, v...) defaultLogger.writeLog(INFO, callerDepth, msg) } } func Warn(msg string) { if defaultLogger.logLevel >= WARN { defaultLogger.writeLog(WARN, callerDepth, msg) } } func Warnf(format string, v ...any) { if defaultLogger.logLevel >= WARN { msg := fmt.Sprintf(format, v...) defaultLogger.writeLog(WARN, callerDepth, msg) } } func Error(msg string) { if defaultLogger.logLevel >= ERROR { defaultLogger.writeLog(ERROR, callerDepth, msg) } } func Errorf(format string, v ...any) { if defaultLogger.logLevel >= ERROR { msg := fmt.Sprintf(format, v...) defaultLogger.writeLog(ERROR, callerDepth, msg) } } func Fatal(msg string) { if defaultLogger.logLevel >= FATAL { defaultLogger.writeLog(FATAL, callerDepth, msg) } } func Fatalf(format string, v ...any) { if defaultLogger.logLevel >= FATAL { msg := fmt.Sprintf(format, v...) defaultLogger.writeLog(FATAL, callerDepth, msg) } }
writelog
函數(shù)
func (l *logger) writeLog(level LogLevel, callerDepth int, msg string) { var formattedMsg string _, file, line, ok := runtime.Caller(callerDepth) if ok { formattedMsg = fmt.Sprintf("[%s][%s:%d] %s", levelFlags[level], file, line, msg) } else { formattedMsg = fmt.Sprintf("[%s] %s", levelFlags[level], msg) } // 對象池,復用*logMessage對象 logMsg := l.logMsgPool.Get().(*logMessage) logMsg.level = level logMsg.msg = formattedMsg // 保存到chan緩沖中 l.logMsgChan <- logMsg }
goroutine協(xié)程
gofunc() { for { select { case <-fileLogger.close: return case logMsg := <-fileLogger.logMsgChan: //檢查是否跨天,重新生成日志文件 logFilename := fmt.Sprintf("%s-%s.%s", settings.Name, time.Now().Format(settings.DateFormat), settings.Ext) if path.Join(settings.Path, logFilename) != fileLogger.logFile.Name() { fd, err := utils.OpenFile(logFilename, settings.Path) if err != nil { panic("open log " + logFilename + " failed: " + err.Error()) } fileLogger.logFile.Close() fileLogger.logFile = fd } msg := logMsg.msg // 根據(jù)日志級別,增加不同的顏色 switch logMsg.level { case DEBUG: msg = Blue + msg + Reset case INFO: msg = Green + msg + Reset case WARN: msg = Yellow + msg + Reset case ERROR, FATAL: msg = Red + msg + Reset } // 標準輸出 fileLogger.logStd.Output(0, msg) // 輸出到文件 fileLogger.logFile.WriteString(time.Now().Format(utils.DateTimeFormat) + " " + logMsg.msg + utils.CRLF) } } }()
conf配置文件解析
代碼路徑: tool/conf
核心思想:
按照行讀取
.conf
配置文件,將解析的結果保存到lineMap
中;利用
reflect
將lineMap
中保存的結果,存儲到*RedisConfig
對象中
conf文件內(nèi)容格式為(看代碼請參考):
func parse(r io.Reader) *RedisConfig { newRedisConfig := &RedisConfig{} //1.按行掃描文件 lineMap := make(map[string]string) scanner := bufio.NewScanner(r) for scanner.Scan() { line := scanner.Text() line = strings.TrimLeft(line, " ") // 空行 or 注釋行 iflen(line) == 0 || (len(line) > 0 && line[0] == '#') { continue } // 解析行 例如: Bind 127.0.0.1 idx := strings.IndexAny(line, " ") if idx > 0 && idx < len(line)-1 { key := line[:idx] value := strings.Trim(line[idx+1:], " ") // 將每行的結果,保存到lineMap中 lineMap[strings.ToLower(key)] = value } } if err := scanner.Err(); err != nil { logger.Error(err.Error()) } //2.將掃描結果保存到newRedisConfig 對象中 configValue := reflect.ValueOf(newRedisConfig).Elem() configType := reflect.TypeOf(newRedisConfig).Elem() // 遍歷結構體字段(類型) for i := 0; i < configType.NumField(); i++ { fieldType := configType.Field(i) // 讀取字段名 fieldName := strings.Trim(fieldType.Tag.Get("conf"), " ") if fieldName == "" { fieldName = fieldType.Name } else { fieldName = strings.Split(fieldName, ",")[0] } fieldName = strings.ToLower(fieldName) // 判斷該字段是否在config中有配置 fieldValue, ok := lineMap[fieldName] if ok { // 將結果保存到字段中 switch fieldType.Type.Kind() { case reflect.String: configValue.Field(i).SetString(fieldValue) case reflect.Bool: configValue.Field(i).SetBool("yes" == fieldValue) case reflect.Int: intValue, err := strconv.ParseInt(fieldValue, 10, 64) if err == nil { configValue.Field(i).SetInt(intValue) } case reflect.Slice: // 切片的元素是字符串 if fieldType.Type.Elem().Kind() == reflect.String { tmpSlice := strings.Split(fieldValue, ",") configValue.Field(i).Set(reflect.ValueOf(tmpSlice)) } } } } return newRedisConfig }
TCP服務實現(xiàn)
代碼路徑: tcpserver
創(chuàng)建tcp服務對象
func NewTCPServer(conf TCPConfig, handler redis.Handler) *TCPServer { server := &TCPServer{ conf: conf, closeTcp: 0, clientCounter: 0, quit: make(chan os.Signal, 1), redisHander: handler, } return server }
啟動tcp服務
func (t *TCPServer) Start() error { // 開啟監(jiān)聽 listen, err := net.Listen("tcp", t.conf.Addr) if err != nil { return err } t.listener = listen logger.Infof("bind %s listening...", t.conf.Addr) // 接收連接 go t.accept() // 阻塞于信號 signal.Notify(t.quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT) <-t.quit returnnil } // accept 死循環(huán)接收新連接的到來 func (t *TCPServer) accept() error { for { conn, err := t.listener.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { logger.Infof("accept occurs temporary error: %v, retry in 5ms", err) time.Sleep(5 * time.Millisecond) continue } // 說明監(jiān)聽listener出錯,無法接收新連接 logger.Warn(err.Error()) atomic.CompareAndSwapInt32(&t.closeTcp, 0, 1) // 整個進程退出 t.quit <- syscall.SIGTERM // 結束 for循環(huán) break } // 啟動一個協(xié)程處理conn go t.handleConn(conn) } returnnil }
處理連接請求
- waitDone 用于優(yōu)雅關閉
- clientCounter記錄當前客戶端連接數(shù)量
- redisHander.Handle 就是下一篇文章要實現(xiàn)的功能,解析RESP請求數(shù)據(jù)
func (t *TCPServer) handleConn(conn net.Conn) { // 如果已關閉,新連接不再處理 if atomic.LoadInt32(&t.closeTcp) == 1 { // 直接關閉 conn.Close() return } logger.Debugf("accept new conn %s", conn.RemoteAddr().String()) t.waitDone.Add(1) atomic.AddInt64(&t.clientCounter, 1) deferfunc() { t.waitDone.Done() atomic.AddInt64(&t.clientCounter, -1) }() // TODO :處理連接 t.redisHander.Handle(context.Background(), conn) }
關閉服務
// 退出前,清理 func (t *TCPServer) Close() { logger.Info("graceful shutdown easyredis server") atomic.CompareAndSwapInt32(&t.closeTcp, 0, 1) // 關閉監(jiān)聽 t.listener.Close() // 關閉處理對象 t.redisHander.Close() // 阻塞中... t.waitDone.Wait() }
最終效果展示: 利用telnet連接服務端,可以看到服務端可以正常的accept
到連接,并打印日志
項目代碼地址: https://github.com/gofish2020/easyredis
以上就是Golang實現(xiàn)自己的Redis(TCP篇)實例探究的詳細內(nèi)容,更多關于Golang Redis TCP的資料請關注腳本之家其它相關文章!
相關文章
golang數(shù)組和切片作為參數(shù)和返回值的實現(xiàn)
本文主要介紹了golang數(shù)組和切片作為參數(shù)和返回值的實現(xiàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02golang方法中receiver為指針與不為指針的區(qū)別詳析
這篇文章主要給大家介紹了關于golang方法中receiver為指針與不為指針區(qū)別的相關資料,其實最大的區(qū)別應該是指針傳遞的是對像的引用,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10