欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

GoLang RabbitMQ實(shí)現(xiàn)六種工作模式示例

 更新時(shí)間:2022年12月17日 16:00:42   作者:Onemorelight95  
這篇文章主要介紹了GoLang RabbitMQ實(shí)現(xiàn)六種工作模式,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

六種工作模式介紹

1.簡(jiǎn)單(Simple)模式

P:生產(chǎn)者,也就是要發(fā)送消息的程序。

C:消費(fèi)者:消息的接收者,會(huì)一直等待消息到來(lái)。

queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。

簡(jiǎn)單模式就是單發(fā)單收,消息的消費(fèi)者監(jiān)聽(tīng)消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動(dòng)從隊(duì)列中刪除。

2.工作隊(duì)列(Work Queue)模式

這種模式就是多個(gè)消費(fèi)者消費(fèi)同一個(gè)隊(duì)列中的消息,既然消費(fèi)者多了那么就出現(xiàn)了消息分配的問(wèn)題,所以對(duì)應(yīng)著兩種分配策略:

  • 公平分發(fā):每個(gè)消費(fèi)者接收消息的概率是相等的,消息隊(duì)列會(huì)循環(huán)依次給每個(gè)消費(fèi)者發(fā)送消息,這種是默認(rèn)的策略。
  • 公平派遣:保證消費(fèi)者在消費(fèi)完某個(gè)消息,并發(fā)送確認(rèn)信息后,消息隊(duì)列才會(huì)向它推送新的消息,在此之間若是有新的消息,將會(huì)被推送到其它消費(fèi)者,若所有的消費(fèi)者都在消費(fèi)消息,那么就會(huì)等待。

3.發(fā)布/訂閱(Pub/Sub)模式

在這種模型中,多了一個(gè) Exchange 角色,而且過(guò)程略有變化:

P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X (交換機(jī))。

C:消費(fèi)者,消息的接收者,會(huì)一直等待消息到來(lái)。

Queue:消息隊(duì)列,接收消息、緩存消息。

Exchange:交換機(jī)(X) ,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,如何處理消息,遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。 Exchange有以下4種類型:

  • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。
  • Direct:全值匹配,把消息交給符合指定routing key的隊(duì)列。
  • Topic:通配符,與Direct類型類似,但Direct類型要求routing key完全相等,而Topic類型是對(duì)routing key進(jìn)行模糊匹配,比Direct靈活。
  • Headers:根據(jù)Message的一些頭部信息來(lái)分發(fā)過(guò)濾Message,用的比較少。

注意:Exchange負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失。

4.路由(Routing)模式

路由模式其實(shí)就是上述發(fā)布/訂閱模式的交換機(jī)轉(zhuǎn)發(fā)類型變成了Direct類型。在這種模式下:

Exchange 不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的routing key進(jìn)行判斷,只有隊(duì)列的

routing key與消息的routing key完全一致,才會(huì)接收到消息。

P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。

X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key完全匹配的隊(duì)列。

C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key為error的消息。

C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key為 info、error、warning 的消息。

5.通配符(Tpoic)模式

路由模式其實(shí)就是上述發(fā)布/訂閱模式的交換機(jī)轉(zhuǎn)發(fā)類型變成了Topic類型。在這種模式下:

隊(duì)列的routing key與消息的routing key符合匹配規(guī)則,就可以接收到消息,有兩種規(guī)則:

*:可以(只能)匹配一個(gè)單詞。

#:可以匹配多個(gè)單詞(或者零個(gè))。

所以圖中,routing keya.orange.b的消息就會(huì)被轉(zhuǎn)發(fā)到Q1,而routing keyLazy.a.b.c的消息就會(huì)被發(fā)送到Q2。

Go語(yǔ)言的實(shí)現(xiàn)

安裝操作庫(kù)

安裝API庫(kù)

Go可以使用streadway/amqp庫(kù)來(lái)操作rabbit,使用以下命令來(lái)安裝:

go get github.com/streadway/amqp

封裝rabbitmq

接下來(lái)我們對(duì)streadway/amqp庫(kù)的內(nèi)容進(jìn)行一個(gè)二次封裝,封裝為一個(gè)rabbitmq.go文件:

package rabbitmq
import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)
// RabbitMQ RabbitMQ結(jié)構(gòu)
type RabbitMQ struct {
	channel  *amqp.Channel
	Name     string
	exchange string
}
// Connect 連接服務(wù)器
func Connect(s string) *RabbitMQ {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失敗!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開(kāi)頻道!")
	mq := new(RabbitMQ)
	mq.channel = ch
	return mq
}
// New 初始化消息隊(duì)列
//第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):隊(duì)列名字
func New(s string, name string) *RabbitMQ {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失??!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開(kāi)頻道!")
	q, e := ch.QueueDeclare(
		name,  //隊(duì)列名
		false, //是否開(kāi)啟持久化
		true,  //不使用時(shí)刪除
		false, //排他
		false, //不等待
		nil,   //參數(shù)
	)
	failOnError(e, "初始化消息隊(duì)列失??!")
	mq := new(RabbitMQ)
	mq.channel = ch
	mq.Name = q.Name
	return mq
}
// QueueDeclare 聲明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
	failOnError(e, "聲明queue失?。?)
}
// QueueDelete 刪除queue
func (q *RabbitMQ) QueueDelete(queue string) {
	_, e := q.channel.QueueDelete(queue, false, true, false)
	failOnError(e, "刪除queue失??!")
}
// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無(wú)法設(shè)置QoS")
}
// NewExchange 初始化交換機(jī)
//第一個(gè)參數(shù):rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù):交換機(jī)名字,第三個(gè)參數(shù):交換機(jī)類型
func NewExchange(s string, name string, typename string) {
	//連接rabbitmq
	conn, e := amqp.Dial(s)
	failOnError(e, "連接Rabbitmq服務(wù)器失??!")
	ch, e := conn.Channel()
	failOnError(e, "無(wú)法打開(kāi)頻道!")
	e = ch.ExchangeDeclare(
		name,     // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e, "初始化交換機(jī)失??!")
}
// ExchangeDelete 刪除交換機(jī)
func (q *RabbitMQ) ExchangeDelete(exchange string) {
	e := q.channel.ExchangeDelete(exchange, false, true)
	failOnError(e, "刪除交換機(jī)失敗!")
}
// Bind 綁定消息隊(duì)列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
	e := q.channel.QueueBind(
		q.Name,
		key,
		exchange,
		false,
		nil,
	)
	failOnError(e, "綁定隊(duì)列失??!")
	q.exchange = exchange
}
// Send 向消息隊(duì)列發(fā)送消息
//Send方法可以往某個(gè)消息隊(duì)列發(fā)送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失??!")
	e = q.channel.Publish(
		"",    //交換
		queue, //路由鍵
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo: q.Name,
			Body:    []byte(str),
		})
	msg := "向隊(duì)列:" + q.Name + "發(fā)送消息失??!"
	failOnError(e, msg)
}
// Publish 向exchange發(fā)送消息
//Publish方法可以往某個(gè)exchange發(fā)送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
	str, e := json.Marshal(body)
	failOnError(e, "消息序列化失??!")
	e = q.channel.Publish(
		exchange,
		key,
		false,
		false,
		amqp.Publishing{ReplyTo: q.Name,
			Body: []byte(str)},
	)
	failOnError(e, "向交換機(jī)發(fā)送消息失?。?)
}
// Consume 接收某個(gè)消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個(gè)隊(duì)列中接收消息
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失??!")
	return c
}
// Close 關(guān)閉隊(duì)列連接
func (q *RabbitMQ) Close() {
	q.channel.Close()
}
//錯(cuò)誤處理函數(shù)
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

簡(jiǎn)單(Simple)模式

生產(chǎn)者

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple")
	i := 0
	for {
		// 每隔2s發(fā)送一次消息
		time.Sleep(time.Second * 2)
		producer.Send("simple", " simple message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費(fèi)者

func main() {
	consumer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "simple")
	//接收消息時(shí),指定
	messages := consumer.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費(fèi)消息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

2022/11/05 18:54:47 Received a message: " simple message: 0"
2022/11/05 18:54:52 Received a message: " simple message: 1"
2022/11/05 18:54:57 Received a message: " simple message: 2"

工作隊(duì)列(Work Queue)模式

公平分發(fā)模式:

公平分發(fā)模式采用的是輪詢機(jī)制,它會(huì)將數(shù)個(gè)任務(wù)按順序平均分發(fā)給消費(fèi)者。

生產(chǎn)者

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	i := 0
	for {
		// 每隔2s發(fā)送一次消息
		time.Sleep(time.Second * 2)
		producer.Send("worker", " worker message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費(fèi)者1

func main() {
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	//接收消息
	messages := consumer1.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費(fèi)消息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	//接收消息
	messages := consumer2.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費(fèi)消息要用3s
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/05 19:45:03 Received a message: " worker message: 0"
2022/11/05 19:45:07 Received a message: " worker message: 2"
2022/11/05 19:45:11 Received a message: " worker message: 4"

# 消費(fèi)者2
2022/11/05 19:45:05 Received a message: " worker message: 1"
2022/11/05 19:45:09 Received a message: " worker message: 3"
2022/11/05 19:45:13 Received a message: " worker message: 5"

可以發(fā)現(xiàn),公平模式下,偶數(shù)消息都被發(fā)送給了消費(fèi)者1,而奇數(shù)消息都被發(fā)送給了消費(fèi)者2。

公平派遣模式:

有時(shí)候,如果消息之間的復(fù)雜度不同,那么不同消費(fèi)者消費(fèi)消息所用的時(shí)間會(huì)不同。這個(gè)時(shí)候如果使用公平派發(fā)模式,可能會(huì)造成某一個(gè)消費(fèi)者需要消費(fèi)的消息積壓過(guò)多??梢圆捎霉脚汕材J剑?/p>

公平派遣模式下發(fā)送端與公平分發(fā)相同,消費(fèi)者端只需要加一段配置代碼,我們可以將預(yù)取計(jì)數(shù)設(shè)置為1。這告訴RabbitMQ一次不要給消費(fèi)者一個(gè)以上的消息。換句話說(shuō),在處理并確認(rèn)上一條消息之前,不要將新消息發(fā)送給消費(fèi)者。而是將其分派給不忙的下一個(gè)消費(fèi)者。

關(guān)于消息的確認(rèn):

為了確保消息永不丟失,RabbitMQ支持 消息確認(rèn)。消費(fèi)者發(fā)送回一個(gè)確認(rèn)(acknowledgement),以告知RabbitMQ已經(jīng)接收,處理了特定的消息,并且RabbitMQ可以自由刪除它。

我們之前的代碼中,RabbitMQ一旦向消費(fèi)者傳遞了一條消息,便立即將其標(biāo)記為刪除(調(diào)用Consumer的第三個(gè)參數(shù)是autoAck,表示是否自動(dòng)回復(fù))。在這種情況下,如果你終止一個(gè)消費(fèi)者那么你就可能會(huì)丟失這個(gè)任務(wù),我們還將丟失所有已經(jīng)交付給這個(gè)消費(fèi)者的尚未消費(fèi)的消息。如果一個(gè)消費(fèi)者意外宕機(jī)了,那么我們希望將任務(wù)交付給其他消費(fèi)者來(lái)消費(fèi)者。

所以一旦向消費(fèi)者傳遞了一條消息,就不能馬上將其標(biāo)記為刪除,而是要手動(dòng)確認(rèn)。我們需要在創(chuàng)建消費(fèi)者的時(shí)候?qū)?code>autoAck參數(shù)標(biāo)記為false:

// Consume 接收某個(gè)消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
	c, e := q.channel.Consume(
		q.Name, //指定從哪個(gè)隊(duì)列中接收消息
		"",
		false, // 不自動(dòng)確認(rèn)消息
		false,
		false,
		false,
		nil,
	)
	failOnError(e, "接收消息失?。?)
	return c
}

然后每消費(fèi)完一條消息需要調(diào)用Ack(false)函數(shù)手動(dòng)回復(fù)。

生產(chǎn)者

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	i := 0
	for {
		// 每隔2s發(fā)送一次消息
		time.Sleep(time.Second * 2)
		producer.Send("worker", " worker message: "+strconv.Itoa(i))
		i = i + 1
	}
}

消費(fèi)端限流:

實(shí)現(xiàn)公平派遣模式我們需要設(shè)置消費(fèi)者端一次只能消費(fèi)一條消息,之前我們已經(jīng)進(jìn)行了封裝,直接在消費(fèi)者端調(diào)用即可:

// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
	e := q.channel.Qos(1, 0, false)
	failOnError(e, "無(wú)法設(shè)置QoS")
}

消費(fèi)者1

func main() {
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
	consumer1.Qos()
	//接收消息
	messages := consumer1.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費(fèi)消息要用10s
			time.Sleep(time.Second * 10)
			// 手動(dòng)回復(fù)
			ch.Ack(false)
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "worker")
	// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
	consumer2.Qos()
	//接收消息
	messages := consumer2.Consume()
	go func() {
		for ch := range messages {
			log.Printf("Received a message: %s", ch.Body)
			// 消費(fèi)消息要用2s
			time.Sleep(time.Second * 2)
			// 手動(dòng)回復(fù)
			ch.Ack(false)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/05 20:31:26 Received a message: " worker message: 0"
2022/11/05 20:31:36 Received a message: " worker message: 5"

# 消費(fèi)者2
2022/11/05 20:31:28 Received a message: " worker message: 1"
2022/11/05 20:31:30 Received a message: " worker message: 2"
2022/11/05 20:31:32 Received a message: " worker message: 3"
2022/11/05 20:31:34 Received a message: " worker message: 4"
2022/11/05 20:31:38 Received a message: " worker message: 6"
2022/11/05 20:31:40 Received a message: " worker message: 7"

發(fā)布/訂閱(Pub/Sub)模式

生產(chǎn)者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
	i := 0
	for {
		time.Sleep(time.Second)
		// fanout模式下不用routing key
		producer.Publish("exchange1", "pubsub message: "+strconv.Itoa(i), "")
		i = i + 1
	}
}

消費(fèi)者1

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 隊(duì)列綁定到exchange
	consumer1.Bind("exchange1", "")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 隊(duì)列綁定到exchange
	consumer2.Bind("exchange1", "")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/05 22:32:19 Consumer1 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer1 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer1 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer1 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer1 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer1 received a message: "pubsub message: 5"

# 消費(fèi)者2
2022/11/05 22:32:19 Consumer2 received a message: "pubsub message: 0"
2022/11/05 22:32:20 Consumer2 received a message: "pubsub message: 1"
2022/11/05 22:32:21 Consumer2 received a message: "pubsub message: 2"
2022/11/05 22:32:22 Consumer2 received a message: "pubsub message: 3"
2022/11/05 22:32:23 Consumer2 received a message: "pubsub message: 4"
2022/11/05 22:32:24 Consumer2 received a message: "pubsub message: 5"

路由(Routing)模式

生產(chǎn)者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為direct類型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
	i := 0
	for {
		time.Sleep(time.Second)
		// 如果是奇數(shù),就發(fā)key1
		// 如果是偶數(shù),就發(fā)key2
		if i%2 != 0 {
			producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
		} else {
			producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key2")
		}
		i = i + 1
	}
}

消費(fèi)者1

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 隊(duì)列綁定到exchange
	consumer1.Bind("exchange", "key1")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 隊(duì)列綁定到exchange
	consumer2.Bind("exchange", "key2")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/05 22:51:10 Consumer1 received a message: "routing message: 1"
2022/11/05 22:51:12 Consumer1 received a message: "routing message: 3"
2022/11/05 22:51:14 Consumer1 received a message: "routing message: 5"
2022/11/05 22:51:16 Consumer1 received a message: "routing message: 7"

# 消費(fèi)者2
2022/11/05 22:51:11 Consumer2 received a message: "routing message: 0"
2022/11/05 22:51:13 Consumer2 received a message: "routing message: 2"
2022/11/05 22:51:15 Consumer2 received a message: "routing message: 4"
2022/11/05 22:51:17 Consumer2 received a message: "routing message: 6"

通配符(Tpoic)模式

生產(chǎn)者

func main() {
	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
	// 指定為topic類型
	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange2", "topic")
	var i int
	for {
		time.Sleep(time.Second)
		if i%2 != 0 {
			producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b.c")
		} else {
			producer.Publish("exchange2", "topic message: "+strconv.Itoa(i), "a.test.b")
		}
		i++
	}
}

消費(fèi)者1

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
	// 隊(duì)列綁定到exchange
	consumer1.Bind("exchange2", "*.test.*")
	//接收消息
	msgs := consumer1.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer1 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

消費(fèi)者2

func main() {
	//第一個(gè)參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個(gè)參數(shù)指定創(chuàng)建隊(duì)列的名字
	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue2")
	// 隊(duì)列綁定到exchange
	consumer2.Bind("exchange2", "#.test.#")
	//接收消息
	msgs := consumer2.Consume()
	go func() {
		for d := range msgs {
			log.Printf("Consumer2 received a message: %s", d.Body)
			d.Ack(false)
		}
	}()
	select {}
}

運(yùn)行結(jié)果:

# 消費(fèi)者1
2022/11/05 23:09:53 Consumer1 received a message: "topic message: 0"
2022/11/05 23:09:55 Consumer1 received a message: "topic message: 2"
2022/11/05 23:09:57 Consumer1 received a message: "topic message: 4"
2022/11/05 23:09:59 Consumer1 received a message: "topic message: 6"

# 消費(fèi)者2
2022/11/05 23:09:53 Consumer2 received a message: "topic message: 0"
2022/11/05 23:09:54 Consumer2 received a message: "topic message: 1"
2022/11/05 23:09:55 Consumer2 received a message: "topic message: 2"
2022/11/05 23:09:56 Consumer2 received a message: "topic message: 3"
2022/11/05 23:09:57 Consumer2 received a message: "topic message: 4"
2022/11/05 23:09:58 Consumer2 received a message: "topic message: 5"
2022/11/05 23:09:59 Consumer2 received a message: "topic message: 6"

到此這篇關(guān)于GoLang RabbitMQ實(shí)現(xiàn)六種工作模式示例的文章就介紹到這了,更多相關(guān)GoLang RabbitMQ工作模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • go中的protobuf和grpc使用教程

    go中的protobuf和grpc使用教程

    gRPC 是 Google 公司基于 Protobuf 開(kāi)發(fā)的跨語(yǔ)言的開(kāi)源 RPC 框架,這篇文章主要介紹了go中的protobuf和grpc使用教程,需要的朋友可以參考下
    2024-08-08
  • go-cache的基本使用場(chǎng)景示例解析

    go-cache的基本使用場(chǎng)景示例解析

    這篇文章主要為大家介紹了go-cache的基本使用場(chǎng)景示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Golang 實(shí)現(xiàn)復(fù)制文件夾同時(shí)復(fù)制文件

    Golang 實(shí)現(xiàn)復(fù)制文件夾同時(shí)復(fù)制文件

    這篇文章主要介紹了Golang 實(shí)現(xiàn)復(fù)制文件夾同時(shí)復(fù)制文件,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-12-12
  • Go語(yǔ)言學(xué)習(xí)筆記之反射用法詳解

    Go語(yǔ)言學(xué)習(xí)筆記之反射用法詳解

    這篇文章主要介紹了Go語(yǔ)言學(xué)習(xí)筆記之反射用法,詳細(xì)分析了Go語(yǔ)言中反射的概念、使用方法與相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2017-05-05
  • VSCode必裝Go語(yǔ)言以下插件的思路詳解

    VSCode必裝Go語(yǔ)言以下插件的思路詳解

    這篇文章主要介紹了VSCode必裝Go語(yǔ)言以下插件的思路詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-04-04
  • golang中map增刪改查的示例代碼

    golang中map增刪改查的示例代碼

    在Go語(yǔ)言中,map是一種內(nèi)置的數(shù)據(jù)結(jié)構(gòu),用于存儲(chǔ)鍵值對(duì),本文主要介紹了golang中map增刪改查的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-11-11
  • Go語(yǔ)言string,int,int64 ,float之間類型轉(zhuǎn)換方法

    Go語(yǔ)言string,int,int64 ,float之間類型轉(zhuǎn)換方法

    Go語(yǔ)言中int類型和string類型都是屬于基本數(shù)據(jù)類型,兩種類型的轉(zhuǎn)化都非常簡(jiǎn)單。下面通過(guò)本文給大家分享Go語(yǔ)言string,int,int64 ,float之間類型轉(zhuǎn)換方法,感興趣的朋友一起看看吧
    2017-07-07
  • Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體

    Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體

    這篇文章主要介紹了Golang如何編寫內(nèi)存高效及CPU調(diào)優(yōu)的Go結(jié)構(gòu)體,結(jié)構(gòu)體是包含多個(gè)字段的集合類型,用于將數(shù)據(jù)組合為記錄
    2022-07-07
  • 深入理解go緩存庫(kù)freecache的使用

    深入理解go緩存庫(kù)freecache的使用

    go開(kāi)發(fā)緩存場(chǎng)景一般使用map或者緩存框架,為了線程安全會(huì)使用sync.Map或線程安全的緩存框架,本文就詳細(xì)的介紹了go緩存庫(kù)freecache,感興趣的可以了解一下
    2022-02-02
  • Golang?Redis連接池實(shí)現(xiàn)原理及示例探究

    Golang?Redis連接池實(shí)現(xiàn)原理及示例探究

    這篇文章主要為大家介紹了Golang?Redis連接池實(shí)現(xiàn)示例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2024-01-01

最新評(píng)論