GoLang RabbitMQ實現(xiàn)六種工作模式示例
六種工作模式介紹
1.簡單(Simple)模式
P:生產(chǎn)者,也就是要發(fā)送消息的程序。
C:消費者:消息的接收者,會一直等待消息到來。
queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費者從其中取出消息。
簡單模式就是單發(fā)單收,消息的消費者監(jiān)聽消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除。
2.工作隊列(Work Queue)模式
這種模式就是多個消費者消費同一個隊列中的消息,既然消費者多了那么就出現(xiàn)了消息分配的問題,所以對應(yīng)著兩種分配策略:
- 公平分發(fā):每個消費者接收消息的概率是相等的,消息隊列會循環(huán)依次給每個消費者發(fā)送消息,這種是默認的策略。
- 公平派遣:保證消費者在消費完某個消息,并發(fā)送確認信息后,消息隊列才會向它推送新的消息,在此之間若是有新的消息,將會被推送到其它消費者,若所有的消費者都在消費消息,那么就會等待。
3.發(fā)布/訂閱(Pub/Sub)模式
在這種模型中,多了一個 Exchange 角色,而且過程略有變化:
P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X (交換機)。
C:消費者,消息的接收者,會一直等待消息到來。
Queue:消息隊列,接收消息、緩存消息。
Exchange:交換機(X) ,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,如何處理消息,遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。 Exchange有以下4種類型:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列。
- Direct:全值匹配,把消息交給符合指定
routing key
的隊列。 - Topic:通配符,與Direct類型類似,但Direct類型要求
routing key
完全相等,而Topic類型是對routing key
進行模糊匹配,比Direct靈活。 - Headers:根據(jù)Message的一些頭部信息來分發(fā)過濾Message,用的比較少。
注意:Exchange負責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失。
4.路由(Routing)模式
路由模式其實就是上述發(fā)布/訂閱模式的交換機轉(zhuǎn)發(fā)類型變成了Direct類型。在這種模式下:
Exchange 不再把消息交給每一個綁定的隊列,而是根據(jù)消息的routing key
進行判斷,只有隊列的
routing key
與消息的routing key
完全一致,才會接收到消息。
P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時,會指定一個routing key
。
X:Exchange(交換機),接收生產(chǎn)者的消息,然后把消息遞交給與routing key
完全匹配的隊列。
C1:消費者,其所在隊列指定了需要routing key
為error的消息。
C2:消費者,其所在隊列指定了需要routing key
為 info、error、warning 的消息。
5.通配符(Tpoic)模式
路由模式其實就是上述發(fā)布/訂閱模式的交換機轉(zhuǎn)發(fā)類型變成了Topic類型。在這種模式下:
隊列的routing key
與消息的routing key
符合匹配規(guī)則,就可以接收到消息,有兩種規(guī)則:
*
:可以(只能)匹配一個單詞。
#
:可以匹配多個單詞(或者零個)。
所以圖中,routing key
為a.orange.b
的消息就會被轉(zhuǎn)發(fā)到Q1,而routing key
為Lazy.a.b.c
的消息就會被發(fā)送到Q2。
Go語言的實現(xiàn)
安裝操作庫
安裝API庫
Go可以使用streadway/amqp
庫來操作rabbit,使用以下命令來安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來我們對streadway/amqp
庫的內(nèi)容進行一個二次封裝,封裝為一個rabbitmq.go
文件:
package rabbitmq import ( "encoding/json" "github.com/streadway/amqp" "log" ) // RabbitMQ RabbitMQ結(jié)構(gòu) type RabbitMQ struct { channel *amqp.Channel Name string exchange string } // Connect 連接服務(wù)器 func Connect(s string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失?。?) ch, e := conn.Channel() failOnError(e, "無法打開頻道!") mq := new(RabbitMQ) mq.channel = ch return mq } // New 初始化消息隊列 //第一個參數(shù):rabbitmq服務(wù)器的鏈接,第二個參數(shù):隊列名字 func New(s string, name string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失敗!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") q, e := ch.QueueDeclare( name, //隊列名 false, //是否開啟持久化 true, //不使用時刪除 false, //排他 false, //不等待 nil, //參數(shù) ) failOnError(e, "初始化消息隊列失敗!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name return mq } // QueueDeclare 聲明queue func (q *RabbitMQ) QueueDeclare(queue string) { _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil) failOnError(e, "聲明queue失敗!") } // QueueDelete 刪除queue func (q *RabbitMQ) QueueDelete(queue string) { _, e := q.channel.QueueDelete(queue, false, true, false) failOnError(e, "刪除queue失敗!") } // Qos 配置queue參數(shù) func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設(shè)置QoS") } // NewExchange 初始化交換機 //第一個參數(shù):rabbitmq服務(wù)器的鏈接,第二個參數(shù):交換機名字,第三個參數(shù):交換機類型 func NewExchange(s string, name string, typename string) { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失??!") ch, e := conn.Channel() failOnError(e, "無法打開頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e, "初始化交換機失?。?) } // ExchangeDelete 刪除交換機 func (q *RabbitMQ) ExchangeDelete(exchange string) { e := q.channel.ExchangeDelete(exchange, false, true) failOnError(e, "刪除交換機失??!") } // Bind 綁定消息隊列到exchange func (q *RabbitMQ) Bind(exchange string, key string) { e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e, "綁定隊列失??!") q.exchange = exchange } // Send 向消息隊列發(fā)送消息 //Send方法可以往某個消息隊列發(fā)送消息 func (q *RabbitMQ) Send(queue string, body interface{}) { str, e := json.Marshal(body) failOnError(e, "消息序列化失敗!") e = q.channel.Publish( "", //交換 queue, //路由鍵 false, //必填 false, //立即 amqp.Publishing{ ReplyTo: q.Name, Body: []byte(str), }) msg := "向隊列:" + q.Name + "發(fā)送消息失敗!" failOnError(e, msg) } // Publish 向exchange發(fā)送消息 //Publish方法可以往某個exchange發(fā)送消息 func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) { str, e := json.Marshal(body) failOnError(e, "消息序列化失??!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo: q.Name, Body: []byte(str)}, ) failOnError(e, "向交換機發(fā)送消息失敗!") } // Consume 接收某個消息隊列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個隊列中接收消息 "", true, false, false, false, nil, ) failOnError(e, "接收消息失??!") return c } // Close 關(guān)閉隊列連接 func (q *RabbitMQ) Close() { q.channel.Close() } //錯誤處理函數(shù) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
簡單(Simple)模式
生產(chǎn)者
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("simple", " simple message: "+strconv.Itoa(i)) i = i + 1 } }
消費者
func main() { consumer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") //接收消息時,指定 messages := consumer.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費消息要用3s time.Sleep(time.Second * 3) } }() select {} }
運行結(jié)果:
2022/11/05 18:54:47 Received a message: " simple message: 0"
2022/11/05 18:54:52 Received a message: " simple message: 1"
2022/11/05 18:54:57 Received a message: " simple message: 2"
工作隊列(Work Queue)模式
公平分發(fā)模式:
公平分發(fā)模式采用的是輪詢機制,它會將數(shù)個任務(wù)按順序平均分發(fā)給消費者。
生產(chǎn)者
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收消息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費消息要用3s time.Sleep(time.Second * 3) } }() select {} }
消費者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收消息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費消息要用3s time.Sleep(time.Second * 3) } }() select {} }
運行結(jié)果:
# 消費者1
2022/11/05 19:45:03 Received a message: " worker message: 0"
2022/11/05 19:45:07 Received a message: " worker message: 2"
2022/11/05 19:45:11 Received a message: " worker message: 4"# 消費者2
2022/11/05 19:45:05 Received a message: " worker message: 1"
2022/11/05 19:45:09 Received a message: " worker message: 3"
2022/11/05 19:45:13 Received a message: " worker message: 5"
可以發(fā)現(xiàn),公平模式下,偶數(shù)消息都被發(fā)送給了消費者1,而奇數(shù)消息都被發(fā)送給了消費者2。
公平派遣模式:
有時候,如果消息之間的復(fù)雜度不同,那么不同消費者消費消息所用的時間會不同。這個時候如果使用公平派發(fā)模式,可能會造成某一個消費者需要消費的消息積壓過多??梢圆捎霉脚汕材J剑?/p>
公平派遣模式下發(fā)送端與公平分發(fā)相同,消費者端只需要加一段配置代碼,我們可以將預(yù)取計數(shù)設(shè)置為1。這告訴RabbitMQ一次不要給消費者一個以上的消息。換句話說,在處理并確認上一條消息之前,不要將新消息發(fā)送給消費者。而是將其分派給不忙的下一個消費者。
關(guān)于消息的確認:
為了確保消息永不丟失,RabbitMQ支持 消息確認。消費者發(fā)送回一個確認(acknowledgement),以告知RabbitMQ已經(jīng)接收,處理了特定的消息,并且RabbitMQ可以自由刪除它。
我們之前的代碼中,RabbitMQ一旦向消費者傳遞了一條消息,便立即將其標記為刪除(調(diào)用Consumer的第三個參數(shù)是autoAck,表示是否自動回復(fù))。在這種情況下,如果你終止一個消費者那么你就可能會丟失這個任務(wù),我們還將丟失所有已經(jīng)交付給這個消費者的尚未消費的消息。如果一個消費者意外宕機了,那么我們希望將任務(wù)交付給其他消費者來消費者。
所以一旦向消費者傳遞了一條消息,就不能馬上將其標記為刪除,而是要手動確認。我們需要在創(chuàng)建消費者的時候?qū)?code>autoAck參數(shù)標記為false:
// Consume 接收某個消息隊列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個隊列中接收消息 "", false, // 不自動確認消息 false, false, false, nil, ) failOnError(e, "接收消息失??!") return c }
然后每消費完一條消息需要調(diào)用Ack(false)
函數(shù)手動回復(fù)。
生產(chǎn)者
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費端限流:
實現(xiàn)公平派遣模式我們需要設(shè)置消費者端一次只能消費一條消息,之前我們已經(jīng)進行了封裝,直接在消費者端調(diào)用即可:
// Qos 配置queue參數(shù) func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無法設(shè)置QoS") }
消費者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費一條消息,直到消費完才重新接收 consumer1.Qos() //接收消息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費消息要用10s time.Sleep(time.Second * 10) // 手動回復(fù) ch.Ack(false) } }() select {} }
消費者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費一條消息,直到消費完才重新接收 consumer2.Qos() //接收消息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費消息要用2s time.Sleep(time.Second * 2) // 手動回復(fù) ch.Ack(false) } }() select {} }
運行結(jié)果:
# 消費者1
2022/11/05 20:31:26 Received a message: " worker message: 0"
2022/11/05 20:31:36 Received a message: " worker message: 5"# 消費者2
2022/11/05 20:31:28 Received a message: " worker message: 1"
2022/11/05 20:31:30 Received a message: " worker message: 2"
2022/11/05 20:31:32 Received a message: " worker message: 3"
2022/11/05 20:31:34 Received a message: " worker message: 4"
2022/11/05 20:31:38 Received a message: " worker message: 6"
2022/11/05 20:31:40 Received a message: " worker message: 7"
發(fā)布/訂閱(Pub/Sub)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout") i := 0 for { time.Sleep(time.Second) // fanout模式下不用routing key producer.Publish("exchange1", "pubsub message: "+strconv.Itoa(i), "") i = i + 1 } }
消費者1
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊列綁定到exchange consumer1.Bind("exchange1", "") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊列綁定到exchange consumer2.Bind("exchange1", "") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運行結(jié)果:
# 消費者1
2022/11/05 22:32:19 Consumer1 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer1 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer1 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer1 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer1 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer1 received a message: "pubsub message: 5"# 消費者2
2022/11/05 22:32:19 Consumer2 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer2 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer2 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer2 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer2 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer2 received a message: "pubsub message: 5"
路由(Routing)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為direct類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct") i := 0 for { time.Sleep(time.Second) // 如果是奇數(shù),就發(fā)key1 // 如果是偶數(shù),就發(fā)key2 if i%2 != 0 { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1") } else { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key2") } i = i + 1 } }
消費者1
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊列綁定到exchange consumer1.Bind("exchange", "key1") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊列綁定到exchange consumer2.Bind("exchange", "key2") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運行結(jié)果:
# 消費者1
2022/11/05 22:51:10 Consumer1 received a message: "routing message: 1"
2022/11/05 22:51:12 Consumer1 received a message: "routing message: 3"
2022/11/05 22:51:14 Consumer1 received a message: "routing message: 5"
2022/11/05 22:51:16 Consumer1 received a message: "routing message: 7"# 消費者2
2022/11/05 22:51:11 Consumer2 received a message: "routing message: 0"
2022/11/05 22:51:13 Consumer2 received a message: "routing message: 2"
2022/11/05 22:51:15 Consumer2 received a message: "routing message: 4"
2022/11/05 22:51:17 Consumer2 received a message: "routing message: 6"
通配符(Tpoic)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為topic類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange2", "topic") var i int for { time.Sleep(time.Second) if i%2 != 0 { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b.c") } else { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b") } i++ } }
消費者1
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊列綁定到exchange consumer1.Bind("exchange2", "*.test.*") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費者2
func main() { //第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊列綁定到exchange consumer2.Bind("exchange2", "#.test.#") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運行結(jié)果:
# 消費者1
2022/11/05 23:09:53 Consumer1 received a message: "topic message: 0"
2022/11/05 23:09:55 Consumer1 received a message: "topic message: 2"
2022/11/05 23:09:57 Consumer1 received a message: "topic message: 4"
2022/11/05 23:09:59 Consumer1 received a message: "topic message: 6"# 消費者2
2022/11/05 23:09:53 Consumer2 received a message: "topic message: 0"
2022/11/05 23:09:54 Consumer2 received a message: "topic message: 1"
2022/11/05 23:09:55 Consumer2 received a message: "topic message: 2"
2022/11/05 23:09:56 Consumer2 received a message: "topic message: 3"
2022/11/05 23:09:57 Consumer2 received a message: "topic message: 4"
2022/11/05 23:09:58 Consumer2 received a message: "topic message: 5"
2022/11/05 23:09:59 Consumer2 received a message: "topic message: 6"
到此這篇關(guān)于GoLang RabbitMQ實現(xiàn)六種工作模式示例的文章就介紹到這了,更多相關(guān)GoLang RabbitMQ工作模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件
這篇文章主要介紹了Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12Go語言string,int,int64 ,float之間類型轉(zhuǎn)換方法
Go語言中int類型和string類型都是屬于基本數(shù)據(jù)類型,兩種類型的轉(zhuǎn)化都非常簡單。下面通過本文給大家分享Go語言string,int,int64 ,float之間類型轉(zhuǎn)換方法,感興趣的朋友一起看看吧2017-07-07Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體
這篇文章主要介紹了Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體,結(jié)構(gòu)體是包含多個字段的集合類型,用于將數(shù)據(jù)組合為記錄2022-07-07Golang?Redis連接池實現(xiàn)原理及示例探究
這篇文章主要為大家介紹了Golang?Redis連接池實現(xiàn)示例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01