使用go實(shí)現(xiàn)一個超級mini的消息隊列的示例代碼
前言
趁著有空余時間,就想著擼一個mini的生產(chǎn)-消費(fèi)消息隊列,說干就干了。自己是個javer,這次實(shí)現(xiàn),特意換用了go。沒錯,是零基礎(chǔ)上手go,順便可以學(xué)學(xué)go。
前置知識:
- go基本語法
- 消息隊列概念,也就三個:生產(chǎn)者、消費(fèi)者、隊列
目的
- 沒想著實(shí)現(xiàn)多復(fù)雜,因?yàn)闀r間有限,就mini就好,mini到什么程度呢
- 使用雙向鏈表數(shù)據(jù)結(jié)構(gòu)作為隊列
- 有多個topic可供生產(chǎn)者生成消息和消費(fèi)者消費(fèi)消息
- 支持生產(chǎn)者并發(fā)寫
- 支持消費(fèi)者讀,且ok后,從隊列刪除
- 消息不丟失(持久化)
- 高性能(先這樣想)
設(shè)計
整體架構(gòu)

協(xié)議
通訊協(xié)議底層使用tcp,mq是基于tcp自定義了一個協(xié)議,協(xié)議如下

type Msg struct {
Id int64
TopicLen int64
Topic string
// 1-consumer 2-producer 3-comsumer-ack 4-error
MsgType int64 // 消息類型
Len int64 // 消息長度
Payload []byte // 消息
}
Payload使用字節(jié)數(shù)組,是因?yàn)椴还軘?shù)據(jù)是什么,只當(dāng)做字節(jié)數(shù)組來處理即可。Msg承載著生產(chǎn)者生產(chǎn)的消息,消費(fèi)者消費(fèi)的消息,ACK、和錯誤消息,前兩者會有負(fù)載,而后兩者負(fù)載和長度都為空
協(xié)議的編解碼處理,就是對字節(jié)的處理,接下來有從字節(jié)轉(zhuǎn)為Msg,和從Msg轉(zhuǎn)為字節(jié)兩個函數(shù)
func BytesToMsg(reader io.Reader) Msg {
m := Msg{}
var buf [128]byte
n, err := reader.Read(buf[:])
if err != nil {
fmt.Println("read failed, err:", err)
}
fmt.Println("read bytes:", n)
// id
buff := bytes.NewBuffer(buf[0:8])
binary.Read(buff, binary.LittleEndian, &m.Id)
// topiclen
buff = bytes.NewBuffer(buf[8:16])
binary.Read(buff, binary.LittleEndian, &m.TopicLen)
// topic
msgLastIndex := 16 + m.TopicLen
m.Topic = string(buf[16: msgLastIndex])
// msgtype
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
binary.Read(buff, binary.LittleEndian, &m.MsgType)
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
binary.Read(buff, binary.LittleEndian, &m.Len)
if m.Len <= 0 {
return m
}
m.Payload = buf[msgLastIndex + 16:]
return m
}
func MsgToBytes(msg Msg) []byte {
msg.TopicLen = int64(len([]byte(msg.Topic)))
msg.Len = int64(len([]byte(msg.Payload)))
var data []byte
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Id)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.TopicLen)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Topic)...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.MsgType)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Len)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Payload)...)
return data
}
隊列
使用container/list,實(shí)現(xiàn)先入先出,生產(chǎn)者在隊尾寫,消費(fèi)者在隊頭讀取
package broker
import (
"container/list"
"sync"
)
type Queue struct {
len int
data list.List
}
var lock sync.Mutex
func (queue *Queue) offer(msg Msg) {
queue.data.PushBack(msg)
queue.len = queue.data.Len()
}
func (queue *Queue) poll() Msg{
if queue.len == 0 {
return Msg{}
}
msg := queue.data.Front()
return msg.Value.(Msg)
}
func (queue *Queue) delete(id int64) {
lock.Lock()
for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
if msg.Value.(Msg).Id == id {
queue.data.Remove(msg)
queue.len = queue.data.Len()
break
}
}
lock.Unlock()
}
方法offer往隊列里插入數(shù)據(jù),poll從隊列頭讀取數(shù)據(jù)素,delete根據(jù)消息ID從隊列刪除數(shù)據(jù)。這里使用Queue結(jié)構(gòu)體對List進(jìn)行封裝,其實(shí)是有必要的,List作為底層的數(shù)據(jù)結(jié)構(gòu),我們希望隱藏更多的底層操作,只給客戶提供基本的操作
delete操作是在消費(fèi)者消費(fèi)成功且發(fā)送ACK后,對消息從隊列里移除的,因?yàn)橄M(fèi)者可以多個同時消費(fèi),所以這里進(jìn)入臨界區(qū)時加鎖(em,加鎖是否就一定會影響對性能有較大的影響呢)
broker
broker作為服務(wù)器角色,負(fù)責(zé)接收連接,接收和響應(yīng)請求
package broker
import (
"bufio"
"net"
"os"
"sync"
"time"
)
var topics = sync.Map{}
func handleErr(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
println(err.(string))
conn.Write(MsgToBytes(Msg{MsgType: 4}))
}
}()
}
func Process(conn net.Conn) {
handleErr(conn)
reader := bufio.NewReader(conn)
msg := BytesToMsg(reader)
queue, ok := topics.Load(msg.Topic)
var res Msg
if msg.MsgType == 1 {
// comsumer
if queue == nil || queue.(*Queue).len == 0{
return
}
msg = queue.(*Queue).poll()
msg.MsgType = 1
res = msg
} else if msg.MsgType == 2 {
// producer
if ! ok {
queue = &Queue{}
queue.(*Queue).data.Init()
topics.Store(msg.Topic, queue)
}
queue.(*Queue).offer(msg)
res = Msg{Id: msg.Id, MsgType: 2}
} else if msg.MsgType == 3 {
// consumer ack
if queue == nil {
return
}
queue.(*Queue).delete(msg.Id)
}
conn.Write(MsgToBytes(res))
}
MsgType等于1時,直接消費(fèi)消息;MsgType等于2時是生產(chǎn)者生產(chǎn)消息,如果隊列為空,那么還需創(chuàng)建一個新的隊列,放在對應(yīng)的topic下;MsgType等于3時,代表消費(fèi)者成功消費(fèi),可以
刪除消息
我們說消息不丟失,這里實(shí)現(xiàn)不完全,我就實(shí)現(xiàn)了持久化(持久化也沒全部實(shí)現(xiàn))。思路就是該topic對應(yīng)的隊列里的消息,按協(xié)議格式進(jìn)行序列化,當(dāng)broker啟動時,從文件恢復(fù)
持久化需要考慮的是增量還是全量,需要保存多久,這些都會影響實(shí)現(xiàn)的難度和性能(想想Kafka和Redis的持久化),這里表示簡單實(shí)現(xiàn)就好:定時器定時保存
func Save() {
ticker := time.NewTicker(60)
for {
select {
case <-ticker.C:
topics.Range(func(key, value interface{}) bool {
if value == nil {
return false
}
file, _ := os.Open(key.(string))
if file == nil {
file, _ = os.Create(key.(string))
}
for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
file.Write(MsgToBytes(msg.Value.(Msg)))
}
_ := file.Close()
return false
})
default:
time.Sleep(1)
}
}
}
有一個問題是,當(dāng)上面的delete操作時,這里的file文件需不需要跟著delete掉對應(yīng)的消息?答案是需要刪除的,如果不刪除,只能等下一次的全量持久化來覆蓋了,中間就有臟數(shù)據(jù)問題
下面是啟動邏輯
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("listen failed, err:", err)
return
}
go broker.Save()
for {
conn, err := listen.Accept()
if err != nil {
fmt.Print("accept failed, err:", err)
continue
}
go broker.Process(conn)
}
}
生產(chǎn)者
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func produce() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Print("write failed, err:", err)
}
fmt.Print(n)
}
消費(fèi)者
package main
import (
"awesomeProject/broker"
"bytes"
"fmt"
"net"
)
func comsume() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Topic: "topic-test", MsgType: 1}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("n", n)
var res [128]byte
conn.Read(res[:])
buf := bytes.NewBuffer(res[:])
receMsg := broker.BytesToMsg(buf)
fmt.Print(receMsg)
// ack
conn, _ = net.Dial("tcp", "127.0.0.1:12345")
l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
if e != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("l:", l)
}
消費(fèi)者這里ack時重新創(chuàng)建了連接,如果不創(chuàng)建連接的話,那服務(wù)端那里就需要一直從conn讀取數(shù)據(jù),直到結(jié)束。思考一下,像RabbitMQ的ack就有自動和手工的ack,如果是手工的ack,必然需要一個新的連接,因?yàn)椴恢揽蛻舳耸裁磿r候發(fā)送ack,自動的話,當(dāng)然可以使用同一個連接,but這里就簡單創(chuàng)建一條新連接吧
啟動
先啟動broker,再啟動producer,然后啟動comsumer,OK,能跑,能實(shí)現(xiàn)發(fā)送消息到隊列,從隊列消費(fèi)消息
總結(jié)
整體雖然簡單,但畢竟是使用go實(shí)現(xiàn)的,就是看似一頓操作猛如虎,實(shí)質(zhì)慌如狗。第一時間就被go的gopath和go mod困擾住,后面語法的使用,比如指針,傳值傳引用等,最頭疼的就是類型轉(zhuǎn)換,作為一個javer,使用go進(jìn)行類型轉(zhuǎn)換,著實(shí)被狠狠得虐了一番。
到此這篇關(guān)于使用go實(shí)現(xiàn)一個超級mini的消息隊列的示例代碼的文章就介紹到這了,更多相關(guān)go mini消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!?
相關(guān)文章
GOPROXY:解決go get golang.org/x包失敗問題
這篇文章主要介紹了GOPROXY:解決go get golang.org/x包失敗問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
一文帶你揭秘Go中new()和make()函數(shù)的區(qū)別和用途
Go(或 Golang)是一種現(xiàn)代、靜態(tài)類型、編譯型的編程語言,專為構(gòu)建可擴(kuò)展、并發(fā)和高效的軟件而設(shè)計,它提供了各種內(nèi)置的函數(shù)和特性,幫助開發(fā)人員編寫簡潔高效的代碼,在本博客文章中,我們將探討 new() 和 make() 函數(shù)之間的區(qū)別,了解何時以及如何有效地使用它們2023-10-10
golang微服務(wù)框架kratos實(shí)現(xiàn)Socket.IO服務(wù)的方法
本文主要介紹了golang微服務(wù)框架kratos實(shí)現(xiàn)Socket.IO服務(wù)的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
Golang控制通道實(shí)現(xiàn)協(xié)程等待詳解
這篇文章主要介紹了Golang控制通道實(shí)現(xiàn)協(xié)程等待,通道是Go語言程序的并發(fā)體goroutine是它們之間的通信機(jī)制。一個通道是一個通信機(jī)制,它可以讓一個goroutine通過它給另一個goroutine發(fā)送值信息。每個通道都有一個特殊的類型,也就是channels可發(fā)送數(shù)據(jù)的類型2022-11-11
Go?1.21.0?新增結(jié)構(gòu)化日志記錄標(biāo)準(zhǔn)庫log/slog使用詳解
這篇文章主要為大家介紹了Go?1.21.0?新增結(jié)構(gòu)化日志記錄標(biāo)準(zhǔn)庫log/slog使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11

