Golang實現自己的Redis(TCP篇)實例探究
引言
用11篇文章實現一個可用的Redis服務,姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關注學習。
- [x] easyredis之TCP服務
- [ ] easyredis之網絡請求序列化協(xié)議(RESP)
- [ ] easyredis之內存數據庫
- [ ] easyredis之過期時間 (時間輪實現)
- [ ] easyredis之持久化 (AOF實現)
- [ ] easyredis之發(fā)布訂閱功能
- [ ] easyredis之有序集合(跳表實現)
- [ ] easyredis之 pipeline 客戶端實現
- [ ] easyredis之事務(原子性/回滾)
- [ ] easyredis之連接池
- [ ] easyredis之分布式集群存儲
EasyRedis之TCP服務
通過本篇文章可以學到什么?
- 如何構建一個日志庫(包括:生產者/消費者模型)
- 如何解析一個redis的conf配置文件(包括:文件按行讀取/reflect的使用)
- 如何實現一個TCP服務(包括:tcp服務的編寫/服務優(yōu)雅退出)
日志庫實現
代碼路徑: tool/logger
代碼設計的思路:生產者消費者模型
writeLog負責將數據保存到logMsgChan chan *logMessage通道中(生產者)- 啟動單獨的goroutine從
logMsgChan chan *logMessage中讀取數據(消費者),同時將日志輸出到文件or命令行中 - 好處在于:解耦、通過寫入緩沖而非直接輸出到文件,提升寫入并發(fā)能力

日志打印效果:不同的日志級別用不同的顏色區(qū)分

對外提供通用的日志函數
func Debug(msg string) {
if defaultLogger.logLevel >= DEBUG {
defaultLogger.writeLog(DEBUG, callerDepth, msg)
}
}
func Debugf(format string, v ...any) {
if defaultLogger.logLevel >= DEBUG {
msg := fmt.Sprintf(format, v...)
defaultLogger.writeLog(DEBUG, callerDepth, msg)
}
}
func Info(msg string) {
if defaultLogger.logLevel >= INFO {
defaultLogger.writeLog(INFO, callerDepth, msg)
}
}
func Infof(format string, v ...any) {
if defaultLogger.logLevel >= INFO {
msg := fmt.Sprintf(format, v...)
defaultLogger.writeLog(INFO, callerDepth, msg)
}
}
func Warn(msg string) {
if defaultLogger.logLevel >= WARN {
defaultLogger.writeLog(WARN, callerDepth, msg)
}
}
func Warnf(format string, v ...any) {
if defaultLogger.logLevel >= WARN {
msg := fmt.Sprintf(format, v...)
defaultLogger.writeLog(WARN, callerDepth, msg)
}
}
func Error(msg string) {
if defaultLogger.logLevel >= ERROR {
defaultLogger.writeLog(ERROR, callerDepth, msg)
}
}
func Errorf(format string, v ...any) {
if defaultLogger.logLevel >= ERROR {
msg := fmt.Sprintf(format, v...)
defaultLogger.writeLog(ERROR, callerDepth, msg)
}
}
func Fatal(msg string) {
if defaultLogger.logLevel >= FATAL {
defaultLogger.writeLog(FATAL, callerDepth, msg)
}
}
func Fatalf(format string, v ...any) {
if defaultLogger.logLevel >= FATAL {
msg := fmt.Sprintf(format, v...)
defaultLogger.writeLog(FATAL, callerDepth, msg)
}
}writelog函數
func (l *logger) writeLog(level LogLevel, callerDepth int, msg string) {
var formattedMsg string
_, file, line, ok := runtime.Caller(callerDepth)
if ok {
formattedMsg = fmt.Sprintf("[%s][%s:%d] %s", levelFlags[level], file, line, msg)
} else {
formattedMsg = fmt.Sprintf("[%s] %s", levelFlags[level], msg)
}
// 對象池,復用*logMessage對象
logMsg := l.logMsgPool.Get().(*logMessage)
logMsg.level = level
logMsg.msg = formattedMsg
// 保存到chan緩沖中
l.logMsgChan <- logMsg
}goroutine協(xié)程
gofunc() {
for {
select {
case <-fileLogger.close:
return
case logMsg := <-fileLogger.logMsgChan:
//檢查是否跨天,重新生成日志文件
logFilename := fmt.Sprintf("%s-%s.%s", settings.Name, time.Now().Format(settings.DateFormat), settings.Ext)
if path.Join(settings.Path, logFilename) != fileLogger.logFile.Name() {
fd, err := utils.OpenFile(logFilename, settings.Path)
if err != nil {
panic("open log " + logFilename + " failed: " + err.Error())
}
fileLogger.logFile.Close()
fileLogger.logFile = fd
}
msg := logMsg.msg
// 根據日志級別,增加不同的顏色
switch logMsg.level {
case DEBUG:
msg = Blue + msg + Reset
case INFO:
msg = Green + msg + Reset
case WARN:
msg = Yellow + msg + Reset
case ERROR, FATAL:
msg = Red + msg + Reset
}
// 標準輸出
fileLogger.logStd.Output(0, msg)
// 輸出到文件
fileLogger.logFile.WriteString(time.Now().Format(utils.DateTimeFormat) + " " + logMsg.msg + utils.CRLF)
}
}
}()conf配置文件解析
代碼路徑: tool/conf
核心思想:
按照行讀取
.conf配置文件,將解析的結果保存到lineMap中;利用
reflect將lineMap中保存的結果,存儲到*RedisConfig對象中
conf文件內容格式為(看代碼請參考):

func parse(r io.Reader) *RedisConfig {
newRedisConfig := &RedisConfig{}
//1.按行掃描文件
lineMap := make(map[string]string)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
line = strings.TrimLeft(line, " ")
// 空行 or 注釋行
iflen(line) == 0 || (len(line) > 0 && line[0] == '#') {
continue
}
// 解析行 例如: Bind 127.0.0.1
idx := strings.IndexAny(line, " ")
if idx > 0 && idx < len(line)-1 {
key := line[:idx]
value := strings.Trim(line[idx+1:], " ")
// 將每行的結果,保存到lineMap中
lineMap[strings.ToLower(key)] = value
}
}
if err := scanner.Err(); err != nil {
logger.Error(err.Error())
}
//2.將掃描結果保存到newRedisConfig 對象中
configValue := reflect.ValueOf(newRedisConfig).Elem()
configType := reflect.TypeOf(newRedisConfig).Elem()
// 遍歷結構體字段(類型)
for i := 0; i < configType.NumField(); i++ {
fieldType := configType.Field(i)
// 讀取字段名
fieldName := strings.Trim(fieldType.Tag.Get("conf"), " ")
if fieldName == "" {
fieldName = fieldType.Name
} else {
fieldName = strings.Split(fieldName, ",")[0]
}
fieldName = strings.ToLower(fieldName)
// 判斷該字段是否在config中有配置
fieldValue, ok := lineMap[fieldName]
if ok {
// 將結果保存到字段中
switch fieldType.Type.Kind() {
case reflect.String:
configValue.Field(i).SetString(fieldValue)
case reflect.Bool:
configValue.Field(i).SetBool("yes" == fieldValue)
case reflect.Int:
intValue, err := strconv.ParseInt(fieldValue, 10, 64)
if err == nil {
configValue.Field(i).SetInt(intValue)
}
case reflect.Slice:
// 切片的元素是字符串
if fieldType.Type.Elem().Kind() == reflect.String {
tmpSlice := strings.Split(fieldValue, ",")
configValue.Field(i).Set(reflect.ValueOf(tmpSlice))
}
}
}
}
return newRedisConfig
}TCP服務實現
代碼路徑: tcpserver
創(chuàng)建tcp服務對象
func NewTCPServer(conf TCPConfig, handler redis.Handler) *TCPServer {
server := &TCPServer{
conf: conf,
closeTcp: 0,
clientCounter: 0,
quit: make(chan os.Signal, 1),
redisHander: handler,
}
return server
}啟動tcp服務
func (t *TCPServer) Start() error {
// 開啟監(jiān)聽
listen, err := net.Listen("tcp", t.conf.Addr)
if err != nil {
return err
}
t.listener = listen
logger.Infof("bind %s listening...", t.conf.Addr)
// 接收連接
go t.accept()
// 阻塞于信號
signal.Notify(t.quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-t.quit
returnnil
}
// accept 死循環(huán)接收新連接的到來
func (t *TCPServer) accept() error {
for {
conn, err := t.listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
logger.Infof("accept occurs temporary error: %v, retry in 5ms", err)
time.Sleep(5 * time.Millisecond)
continue
}
// 說明監(jiān)聽listener出錯,無法接收新連接
logger.Warn(err.Error())
atomic.CompareAndSwapInt32(&t.closeTcp, 0, 1)
// 整個進程退出
t.quit <- syscall.SIGTERM
// 結束 for循環(huán)
break
}
// 啟動一個協(xié)程處理conn
go t.handleConn(conn)
}
returnnil
}處理連接請求
- waitDone 用于優(yōu)雅關閉
- clientCounter記錄當前客戶端連接數量
- redisHander.Handle 就是下一篇文章要實現的功能,解析RESP請求數據
func (t *TCPServer) handleConn(conn net.Conn) {
// 如果已關閉,新連接不再處理
if atomic.LoadInt32(&t.closeTcp) == 1 {
// 直接關閉
conn.Close()
return
}
logger.Debugf("accept new conn %s", conn.RemoteAddr().String())
t.waitDone.Add(1)
atomic.AddInt64(&t.clientCounter, 1)
deferfunc() {
t.waitDone.Done()
atomic.AddInt64(&t.clientCounter, -1)
}()
// TODO :處理連接
t.redisHander.Handle(context.Background(), conn)
}關閉服務
// 退出前,清理
func (t *TCPServer) Close() {
logger.Info("graceful shutdown easyredis server")
atomic.CompareAndSwapInt32(&t.closeTcp, 0, 1)
// 關閉監(jiān)聽
t.listener.Close()
// 關閉處理對象
t.redisHander.Close()
// 阻塞中...
t.waitDone.Wait()
}最終效果展示: 利用telnet連接服務端,可以看到服務端可以正常的accept到連接,并打印日志


項目代碼地址: https://github.com/gofish2020/easyredis
以上就是Golang實現自己的Redis(TCP篇)實例探究的詳細內容,更多關于Golang Redis TCP的資料請關注腳本之家其它相關文章!
相關文章
golang方法中receiver為指針與不為指針的區(qū)別詳析
這篇文章主要給大家介紹了關于golang方法中receiver為指針與不為指針區(qū)別的相關資料,其實最大的區(qū)別應該是指針傳遞的是對像的引用,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10

