GoLang RabbitMQ實(shí)現(xiàn)六種工作模式示例
六種工作模式介紹
1.簡(jiǎn)單(Simple)模式
P:生產(chǎn)者,也就是要發(fā)送消息的程序。
C:消費(fèi)者:消息的接收者,會(huì)一直等待消息到來(lái)。
queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
簡(jiǎn)單模式就是單發(fā)單收,消息的消費(fèi)者監(jiān)聽(tīng)消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動(dòng)從隊(duì)列中刪除。
2.工作隊(duì)列(Work Queue)模式
這種模式就是多個(gè)消費(fèi)者消費(fèi)同一個(gè)隊(duì)列中的消息,既然消費(fèi)者多了那么就出現(xiàn)了消息分配的問(wèn)題,所以對(duì)應(yīng)著兩種分配策略:
- 公平分發(fā):每個(gè)消費(fèi)者接收消息的概率是相等的,消息隊(duì)列會(huì)循環(huán)依次給每個(gè)消費(fèi)者發(fā)送消息,這種是默認(rèn)的策略。
- 公平派遣:保證消費(fèi)者在消費(fèi)完某個(gè)消息,并發(fā)送確認(rèn)信息后,消息隊(duì)列才會(huì)向它推送新的消息,在此之間若是有新的消息,將會(huì)被推送到其它消費(fèi)者,若所有的消費(fèi)者都在消費(fèi)消息,那么就會(huì)等待。
3.發(fā)布/訂閱(Pub/Sub)模式
在這種模型中,多了一個(gè) Exchange 角色,而且過(guò)程略有變化:
P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X (交換機(jī))。
C:消費(fèi)者,消息的接收者,會(huì)一直等待消息到來(lái)。
Queue:消息隊(duì)列,接收消息、緩存消息。
Exchange:交換機(jī)(X) ,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,如何處理消息,遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。 Exchange有以下4種類型:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。
- Direct:全值匹配,把消息交給符合指定
routing key
的隊(duì)列。 - Topic:通配符,與Direct類型類似,但Direct類型要求
routing key
完全相等,而Topic類型是對(duì)routing key
進(jìn)行模糊匹配,比Direct靈活。 - Headers:根據(jù)Message的一些頭部信息來(lái)分發(fā)過(guò)濾Message,用的比較少。
注意:Exchange負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失。
4.路由(Routing)模式
路由模式其實(shí)就是上述發(fā)布/訂閱模式的交換機(jī)轉(zhuǎn)發(fā)類型變成了Direct類型。在這種模式下:
Exchange 不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的routing key
進(jìn)行判斷,只有隊(duì)列的
routing key
與消息的routing key
完全一致,才會(huì)接收到消息。
P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key
。
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key
完全匹配的隊(duì)列。
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key
為error的消息。
C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key
為 info、error、warning 的消息。
5.通配符(Tpoic)模式
路由模式其實(shí)就是上述發(fā)布/訂閱模式的交換機(jī)轉(zhuǎn)發(fā)類型變成了Topic類型。在這種模式下:
隊(duì)列的routing key
與消息的routing key
符合匹配規(guī)則,就可以接收到消息,有兩種規(guī)則:
*
:可以(只能)匹配一個(gè)單詞。
#
:可以匹配多個(gè)單詞(或者零個(gè))。
所以圖中,routing key
為a.orange.b
的消息就會(huì)被轉(zhuǎn)發(fā)到Q1,而routing key
為Lazy.a.b.c
的消息就會(huì)被發(fā)送到Q2。
Go語(yǔ)言的實(shí)現(xiàn)
安裝操作庫(kù)
安裝API庫(kù)
Go可以使用streadway/amqp
庫(kù)來(lái)操作rabbit,使用以下命令來(lái)安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來(lái)我們對(duì)streadway/amqp
庫(kù)的內(nèi)容進(jìn)行一個(gè)二次封裝,封裝為一個(gè)rabbitmq.go
文件:
package rabbitmq import ( "encoding/json" "github.com/streadway/amqp" "log" ) // RabbitMQ RabbitMQ結(jié)構(gòu) type RabbitMQ struct { channel *amqp.Channel Name string exchange string } // Connect 連接服務(wù)器 func Connect(s string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失敗!") ch, e := conn.Channel() failOnError(e, "無(wú)法打開(kāi)頻道!") mq := new(RabbitMQ) mq.channel = ch return mq } // New 初始化消息隊(duì)列 //第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):隊(duì)列名字 func New(s string, name string) *RabbitMQ { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失??!") ch, e := conn.Channel() failOnError(e, "無(wú)法打開(kāi)頻道!") q, e := ch.QueueDeclare( name, //隊(duì)列名 false, //是否開(kāi)啟持久化 true, //不使用時(shí)刪除 false, //排他 false, //不等待 nil, //參數(shù) ) failOnError(e, "初始化消息隊(duì)列失??!") mq := new(RabbitMQ) mq.channel = ch mq.Name = q.Name return mq } // QueueDeclare 聲明queue func (q *RabbitMQ) QueueDeclare(queue string) { _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil) failOnError(e, "聲明queue失?。?) } // QueueDelete 刪除queue func (q *RabbitMQ) QueueDelete(queue string) { _, e := q.channel.QueueDelete(queue, false, true, false) failOnError(e, "刪除queue失??!") } // Qos 配置queue參數(shù) func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無(wú)法設(shè)置QoS") } // NewExchange 初始化交換機(jī) //第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):交換機(jī)名字,第三個(gè)參數(shù):交換機(jī)類型 func NewExchange(s string, name string, typename string) { //連接rabbitmq conn, e := amqp.Dial(s) failOnError(e, "連接Rabbitmq服務(wù)器失??!") ch, e := conn.Channel() failOnError(e, "無(wú)法打開(kāi)頻道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e, "初始化交換機(jī)失??!") } // ExchangeDelete 刪除交換機(jī) func (q *RabbitMQ) ExchangeDelete(exchange string) { e := q.channel.ExchangeDelete(exchange, false, true) failOnError(e, "刪除交換機(jī)失敗!") } // Bind 綁定消息隊(duì)列到exchange func (q *RabbitMQ) Bind(exchange string, key string) { e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e, "綁定隊(duì)列失??!") q.exchange = exchange } // Send 向消息隊(duì)列發(fā)送消息 //Send方法可以往某個(gè)消息隊(duì)列發(fā)送消息 func (q *RabbitMQ) Send(queue string, body interface{}) { str, e := json.Marshal(body) failOnError(e, "消息序列化失??!") e = q.channel.Publish( "", //交換 queue, //路由鍵 false, //必填 false, //立即 amqp.Publishing{ ReplyTo: q.Name, Body: []byte(str), }) msg := "向隊(duì)列:" + q.Name + "發(fā)送消息失??!" failOnError(e, msg) } // Publish 向exchange發(fā)送消息 //Publish方法可以往某個(gè)exchange發(fā)送消息 func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) { str, e := json.Marshal(body) failOnError(e, "消息序列化失??!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo: q.Name, Body: []byte(str)}, ) failOnError(e, "向交換機(jī)發(fā)送消息失?。?) } // Consume 接收某個(gè)消息隊(duì)列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個(gè)隊(duì)列中接收消息 "", true, false, false, false, nil, ) failOnError(e, "接收消息失??!") return c } // Close 關(guān)閉隊(duì)列連接 func (q *RabbitMQ) Close() { q.channel.Close() } //錯(cuò)誤處理函數(shù) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
簡(jiǎn)單(Simple)模式
生產(chǎn)者
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("simple", " simple message: "+strconv.Itoa(i)) i = i + 1 } }
消費(fèi)者
func main() { consumer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple") //接收消息時(shí),指定 messages := consumer.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費(fèi)消息要用3s time.Sleep(time.Second * 3) } }() select {} }
運(yùn)行結(jié)果:
2022/11/05 18:54:47 Received a message: " simple message: 0"
2022/11/05 18:54:52 Received a message: " simple message: 1"
2022/11/05 18:54:57 Received a message: " simple message: 2"
工作隊(duì)列(Work Queue)模式
公平分發(fā)模式:
公平分發(fā)模式采用的是輪詢機(jī)制,它會(huì)將數(shù)個(gè)任務(wù)按順序平均分發(fā)給消費(fèi)者。
生產(chǎn)者
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費(fèi)者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收消息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費(fèi)消息要用3s time.Sleep(time.Second * 3) } }() select {} }
消費(fèi)者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") //接收消息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費(fèi)消息要用3s time.Sleep(time.Second * 3) } }() select {} }
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/05 19:45:03 Received a message: " worker message: 0"
2022/11/05 19:45:07 Received a message: " worker message: 2"
2022/11/05 19:45:11 Received a message: " worker message: 4"# 消費(fèi)者2
2022/11/05 19:45:05 Received a message: " worker message: 1"
2022/11/05 19:45:09 Received a message: " worker message: 3"
2022/11/05 19:45:13 Received a message: " worker message: 5"
可以發(fā)現(xiàn),公平模式下,偶數(shù)消息都被發(fā)送給了消費(fèi)者1,而奇數(shù)消息都被發(fā)送給了消費(fèi)者2。
公平派遣模式:
有時(shí)候,如果消息之間的復(fù)雜度不同,那么不同消費(fèi)者消費(fèi)消息所用的時(shí)間會(huì)不同。這個(gè)時(shí)候如果使用公平派發(fā)模式,可能會(huì)造成某一個(gè)消費(fèi)者需要消費(fèi)的消息積壓過(guò)多??梢圆捎霉脚汕材J剑?/p>
公平派遣模式下發(fā)送端與公平分發(fā)相同,消費(fèi)者端只需要加一段配置代碼,我們可以將預(yù)取計(jì)數(shù)設(shè)置為1。這告訴RabbitMQ一次不要給消費(fèi)者一個(gè)以上的消息。換句話說(shuō),在處理并確認(rèn)上一條消息之前,不要將新消息發(fā)送給消費(fèi)者。而是將其分派給不忙的下一個(gè)消費(fèi)者。
關(guān)于消息的確認(rèn):
為了確保消息永不丟失,RabbitMQ支持 消息確認(rèn)。消費(fèi)者發(fā)送回一個(gè)確認(rèn)(acknowledgement),以告知RabbitMQ已經(jīng)接收,處理了特定的消息,并且RabbitMQ可以自由刪除它。
我們之前的代碼中,RabbitMQ一旦向消費(fèi)者傳遞了一條消息,便立即將其標(biāo)記為刪除(調(diào)用Consumer的第三個(gè)參數(shù)是autoAck,表示是否自動(dòng)回復(fù))。在這種情況下,如果你終止一個(gè)消費(fèi)者那么你就可能會(huì)丟失這個(gè)任務(wù),我們還將丟失所有已經(jīng)交付給這個(gè)消費(fèi)者的尚未消費(fèi)的消息。如果一個(gè)消費(fèi)者意外宕機(jī)了,那么我們希望將任務(wù)交付給其他消費(fèi)者來(lái)消費(fèi)者。
所以一旦向消費(fèi)者傳遞了一條消息,就不能馬上將其標(biāo)記為刪除,而是要手動(dòng)確認(rèn)。我們需要在創(chuàng)建消費(fèi)者的時(shí)候?qū)?code>autoAck參數(shù)標(biāo)記為false:
// Consume 接收某個(gè)消息隊(duì)列的消息 func (q *RabbitMQ) Consume() <-chan amqp.Delivery { c, e := q.channel.Consume( q.Name, //指定從哪個(gè)隊(duì)列中接收消息 "", false, // 不自動(dòng)確認(rèn)消息 false, false, false, nil, ) failOnError(e, "接收消息失?。?) return c }
然后每消費(fèi)完一條消息需要調(diào)用Ack(false)
函數(shù)手動(dòng)回復(fù)。
生產(chǎn)者
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") i := 0 for { // 每隔2s發(fā)送一次消息 time.Sleep(time.Second * 2) producer.Send("worker", " worker message: "+strconv.Itoa(i)) i = i + 1 } }
消費(fèi)端限流:
實(shí)現(xiàn)公平派遣模式我們需要設(shè)置消費(fèi)者端一次只能消費(fèi)一條消息,之前我們已經(jīng)進(jìn)行了封裝,直接在消費(fèi)者端調(diào)用即可:
// Qos 配置queue參數(shù) func (q *RabbitMQ) Qos() { e := q.channel.Qos(1, 0, false) failOnError(e, "無(wú)法設(shè)置QoS") }
消費(fèi)者1
func main() { consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收 consumer1.Qos() //接收消息 messages := consumer1.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費(fèi)消息要用10s time.Sleep(time.Second * 10) // 手動(dòng)回復(fù) ch.Ack(false) } }() select {} }
消費(fèi)者2
func main() { consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker") // 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收 consumer2.Qos() //接收消息 messages := consumer2.Consume() go func() { for ch := range messages { log.Printf("Received a message: %s", ch.Body) // 消費(fèi)消息要用2s time.Sleep(time.Second * 2) // 手動(dòng)回復(fù) ch.Ack(false) } }() select {} }
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/05 20:31:26 Received a message: " worker message: 0"
2022/11/05 20:31:36 Received a message: " worker message: 5"# 消費(fèi)者2
2022/11/05 20:31:28 Received a message: " worker message: 1"
2022/11/05 20:31:30 Received a message: " worker message: 2"
2022/11/05 20:31:32 Received a message: " worker message: 3"
2022/11/05 20:31:34 Received a message: " worker message: 4"
2022/11/05 20:31:38 Received a message: " worker message: 6"
2022/11/05 20:31:40 Received a message: " worker message: 7"
發(fā)布/訂閱(Pub/Sub)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout") i := 0 for { time.Sleep(time.Second) // fanout模式下不用routing key producer.Publish("exchange1", "pubsub message: "+strconv.Itoa(i), "") i = i + 1 } }
消費(fèi)者1
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊(duì)列綁定到exchange consumer1.Bind("exchange1", "") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費(fèi)者2
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊(duì)列綁定到exchange consumer2.Bind("exchange1", "") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/05 22:32:19 Consumer1 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer1 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer1 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer1 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer1 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer1 received a message: "pubsub message: 5"# 消費(fèi)者2
2022/11/05 22:32:19 Consumer2 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer2 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer2 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer2 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer2 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer2 received a message: "pubsub message: 5"
路由(Routing)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為direct類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct") i := 0 for { time.Sleep(time.Second) // 如果是奇數(shù),就發(fā)key1 // 如果是偶數(shù),就發(fā)key2 if i%2 != 0 { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1") } else { producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key2") } i = i + 1 } }
消費(fèi)者1
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊(duì)列綁定到exchange consumer1.Bind("exchange", "key1") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費(fèi)者2
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊(duì)列綁定到exchange consumer2.Bind("exchange", "key2") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/05 22:51:10 Consumer1 received a message: "routing message: 1"
2022/11/05 22:51:12 Consumer1 received a message: "routing message: 3"
2022/11/05 22:51:14 Consumer1 received a message: "routing message: 5"
2022/11/05 22:51:16 Consumer1 received a message: "routing message: 7"# 消費(fèi)者2
2022/11/05 22:51:11 Consumer2 received a message: "routing message: 0"
2022/11/05 22:51:13 Consumer2 received a message: "routing message: 2"
2022/11/05 22:51:15 Consumer2 received a message: "routing message: 4"
2022/11/05 22:51:17 Consumer2 received a message: "routing message: 6"
通配符(Tpoic)模式
生產(chǎn)者
func main() { producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue") // 指定為topic類型 rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange2", "topic") var i int for { time.Sleep(time.Second) if i%2 != 0 { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b.c") } else { producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b") } i++ } }
消費(fèi)者1
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1") // 隊(duì)列綁定到exchange consumer1.Bind("exchange2", "*.test.*") //接收消息 msgs := consumer1.Consume() go func() { for d := range msgs { log.Printf("Consumer1 received a message: %s", d.Body) d.Ack(false) } }() select {} }
消費(fèi)者2
func main() { //第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字 consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2") // 隊(duì)列綁定到exchange consumer2.Bind("exchange2", "#.test.#") //接收消息 msgs := consumer2.Consume() go func() { for d := range msgs { log.Printf("Consumer2 received a message: %s", d.Body) d.Ack(false) } }() select {} }
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/05 23:09:53 Consumer1 received a message: "topic message: 0"
2022/11/05 23:09:55 Consumer1 received a message: "topic message: 2"
2022/11/05 23:09:57 Consumer1 received a message: "topic message: 4"
2022/11/05 23:09:59 Consumer1 received a message: "topic message: 6"# 消費(fèi)者2
2022/11/05 23:09:53 Consumer2 received a message: "topic message: 0"
2022/11/05 23:09:54 Consumer2 received a message: "topic message: 1"
2022/11/05 23:09:55 Consumer2 received a message: "topic message: 2"
2022/11/05 23:09:56 Consumer2 received a message: "topic message: 3"
2022/11/05 23:09:57 Consumer2 received a message: "topic message: 4"
2022/11/05 23:09:58 Consumer2 received a message: "topic message: 5"
2022/11/05 23:09:59 Consumer2 received a message: "topic message: 6"
到此這篇關(guān)于GoLang RabbitMQ實(shí)現(xiàn)六種工作模式示例的文章就介紹到這了,更多相關(guān)GoLang RabbitMQ工作模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang 實(shí)現(xiàn)復(fù)制文件夾同時(shí)復(fù)制文件
這篇文章主要介紹了Golang 實(shí)現(xiàn)復(fù)制文件夾同時(shí)復(fù)制文件,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12Go語(yǔ)言string,int,int64 ,float之間類型轉(zhuǎn)換方法
Go語(yǔ)言中int類型和string類型都是屬于基本數(shù)據(jù)類型,兩種類型的轉(zhuǎn)化都非常簡(jiǎn)單。下面通過(guò)本文給大家分享Go語(yǔ)言string,int,int64 ,float之間類型轉(zhuǎn)換方法,感興趣的朋友一起看看吧2017-07-07Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體
這篇文章主要介紹了Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體,結(jié)構(gòu)體是包含多個(gè)字段的集合類型,用于將數(shù)據(jù)組合為記錄2022-07-07Golang?Redis連接池實(shí)現(xiàn)原理及示例探究
這篇文章主要為大家介紹了Golang?Redis連接池實(shí)現(xiàn)示例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01