Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)
1 RabbitMQ
1.1 概念
①基本名詞
當(dāng)前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻(xiàn)給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。

- Broker:表示消息隊(duì)列服務(wù)實(shí)體
- Virtual Host:虛擬主機(jī)。標(biāo)識(shí)一批交換機(jī)、消息隊(duì)列和相關(guān)對(duì)象。vhost是AMQP概念的基礎(chǔ),必須在鏈接時(shí)指定,RabbitMQ默認(rèn)的vhost是 /。
- AMQP(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議
- Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
- Queue:消息隊(duì)列,用來(lái)保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
②常見(jiàn)模式
1. simple簡(jiǎn)單模式

消息的消費(fèi)者(consumer) 監(jiān)聽(tīng)(while) 消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動(dòng)從隊(duì)列中刪除(隱患 消息可能沒(méi)有被消費(fèi)者正確處理,已經(jīng)從隊(duì)列中消失了,造成消息的丟失)
2. worker工作模式

多個(gè)消費(fèi)者從一個(gè)隊(duì)列中爭(zhēng)搶消息
- (隱患,高并發(fā)情況下,默認(rèn)會(huì)產(chǎn)生某一個(gè)消息被多個(gè)消費(fèi)者共同使用,可以設(shè)置一個(gè)開(kāi)關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個(gè)消費(fèi)者使用)
- 應(yīng)用場(chǎng)景:紅包;大項(xiàng)目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個(gè)任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊(duì)列中,空閑的系統(tǒng)自動(dòng)爭(zhēng)搶)
3. publish/subscribe發(fā)布訂閱(共享資源)

消費(fèi)者訂閱消息,然后從訂閱的隊(duì)列中獲取消息進(jìn)行消費(fèi)。
- X代表交換機(jī)rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊(duì)列中,對(duì)應(yīng)消息隊(duì)列的消費(fèi)者拿到消息進(jìn)行消費(fèi)
- 相關(guān)場(chǎng)景:郵件群發(fā),群聊天,廣播(廣告)
4. routing路由模式

- 交換機(jī)根據(jù)路由規(guī)則,將消息路由到不同的隊(duì)列中
- 消息生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,路由是字符串(info) 當(dāng)前產(chǎn)生的消息攜帶路由字符(對(duì)象的方法),交換機(jī)根據(jù)路由的key,只能匹配上路由key對(duì)應(yīng)的消息隊(duì)列,對(duì)應(yīng)的消費(fèi)者才能消費(fèi)消息;
5. topic主題模式(路由模式的一種)

- 星號(hào)井號(hào)代表通配符
- 星號(hào)代表多個(gè)單詞,井號(hào)代表一個(gè)單詞
- 路由功能添加模糊匹配
- 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
- 交換機(jī)根據(jù)key的規(guī)則模糊匹配到對(duì)應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽(tīng)消費(fèi)者接收消息消費(fèi)
1.2 搭建(docker方式)
①拉取鏡像
# 拉取鏡像 docker pull rabbitmq:3.7-management
②創(chuàng)建并啟動(dòng)容器
# 創(chuàng)建并運(yùn)行容器 docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management #5672是項(xiàng)目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672) # 輸入網(wǎng)址http://ip:15672即可進(jìn)入rabbitmq的web管理頁(yè)面,賬戶(hù)密碼:guest / guest
③web界面創(chuàng)建用戶(hù)和virtual host

下面為了我們后續(xù)的操作,首先我們新建一個(gè)Virtual Host并且給他分配一個(gè)用戶(hù)名,用來(lái)隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建
新增virtual host

新增用戶(hù)

點(diǎn)擊新建好的用戶(hù),設(shè)置其host


最終效果

1.3 代碼操作
①RabbitMQ struct:包含創(chuàng)建、消費(fèi)、生產(chǎn)消息
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//amqp:// 賬號(hào) 密碼@地址:端口號(hào)/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"
type RabbitMQ struct {
//連接
conn *amqp.Connection
//管道
channel *amqp.Channel
//隊(duì)列名稱(chēng)
QueueName string
//交換機(jī)
Exchange string
//key Simple模式 幾乎用不到
Key string
//連接信息
Mqurl string
}
//創(chuàng)建RabbitMQ結(jié)構(gòu)體實(shí)例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
var err error
//創(chuàng)建rabbitmq連接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "創(chuàng)建連接錯(cuò)誤!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "獲取channel失敗")
return rabbitmq
}
//斷開(kāi)channel和connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//錯(cuò)誤處理函數(shù)
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
//簡(jiǎn)單模式step:1。創(chuàng)建簡(jiǎn)單模式下RabbitMQ實(shí)例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//訂閱模式創(chuàng)建rabbitmq實(shí)例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
//創(chuàng)建rabbitmq實(shí)例
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//獲取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
//獲取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel!")
return rabbitmq
}
//訂閱模式生成
func (r *RabbitMQ) PublishPub(message string) {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 廣播類(lèi)型
"fanout",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2 發(fā)送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
//類(lèi)型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//訂閱模式消費(fèi)端代碼
func (r *RabbitMQ) RecieveSub() {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 廣播類(lèi)型
"fanout",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
q, err := r.channel.QueueDeclare(
"", //隨機(jī)生產(chǎn)隊(duì)列名稱(chēng)
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊(duì)列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
"",
r.Exchange,
false,
nil,
)
//消費(fèi)消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請(qǐng)按 Ctrl+C")
<-forever
}
//話(huà)題模式 創(chuàng)建RabbitMQ實(shí)例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
//創(chuàng)建rabbitmq實(shí)例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//話(huà)題模式發(fā)送信息
func (r *RabbitMQ) PublishTopic(message string) {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 話(huà)題模式
"topic",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "topic failed to declare an excha"+"nge")
//2發(fā)送信息
err = r.channel.Publish(
r.Exchange,
//要設(shè)置
r.Key,
false,
false,
amqp.Publishing{
//類(lèi)型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//話(huà)題模式接收信息
//要注意key
//其中* 用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè))
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 話(huà)題模式
"topic",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an exchange")
//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
q, err := r.channel.QueueDeclare(
"", //隨機(jī)生產(chǎn)隊(duì)列名稱(chēng)
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊(duì)列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
r.Key,
r.Exchange,
false,
nil,
)
//消費(fèi)消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請(qǐng)按 Ctrl+C")
<-forever
}
//路由模式 創(chuàng)建RabbitMQ實(shí)例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
//創(chuàng)建rabbitmq實(shí)例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//路由模式發(fā)送信息
func (r *RabbitMQ) PublishRouting(message string) {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 廣播類(lèi)型
"direct",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//發(fā)送信息
err = r.channel.Publish(
r.Exchange,
//要設(shè)置
r.Key,
false,
false,
amqp.Publishing{
//類(lèi)型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機(jī)名稱(chēng)
r.Exchange,
//交換機(jī)類(lèi)型 廣播類(lèi)型
"direct",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個(gè)exchange不可以被client用來(lái)推送消息,僅用來(lái)進(jìn)行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
q, err := r.channel.QueueDeclare(
"", //隨機(jī)生產(chǎn)隊(duì)列名稱(chēng)
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊(duì)列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
r.Key,
r.Exchange,
false,
nil,
)
//消費(fèi)消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請(qǐng)按 Ctrl+C")
<-forever
}
//簡(jiǎn)單模式Step:2、簡(jiǎn)單模式下生產(chǎn)代碼
func (r *RabbitMQ) PublishSimple(message string) {
//1、申請(qǐng)隊(duì)列,如果隊(duì)列存在就跳過(guò),不存在創(chuàng)建
//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
_, err := r.channel.QueueDeclare(
//隊(duì)列名稱(chēng)
r.QueueName,
//是否持久化
false,
//是否為自動(dòng)刪除 當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后,是否把消息從隊(duì)列中刪除
false,
//是否具有排他性 true表示自己可見(jiàn) 其他用戶(hù)不能訪(fǎng)問(wèn)
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//2.發(fā)送消息到隊(duì)列中
r.channel.Publish(
//默認(rèn)的Exchange交換機(jī)是default,類(lèi)型是direct直接類(lèi)型
r.Exchange,
//要賦值的隊(duì)列名稱(chēng)
r.QueueName,
//如果為true,根據(jù)exchange類(lèi)型和routkey規(guī)則,如果無(wú)法找到符合條件的隊(duì)列那么會(huì)把發(fā)送的消息返回給發(fā)送者
false,
//如果為true,當(dāng)exchange發(fā)送消息到隊(duì)列后發(fā)現(xiàn)隊(duì)列上沒(méi)有綁定消費(fèi)者,則會(huì)把消息還給發(fā)送者
false,
//消息
amqp.Publishing{
//類(lèi)型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
func (r *RabbitMQ) ConsumeSimple() {
//1、申請(qǐng)隊(duì)列,如果隊(duì)列存在就跳過(guò),不存在創(chuàng)建
//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
_, err := r.channel.QueueDeclare(
//隊(duì)列名稱(chēng)
r.QueueName,
//是否持久化
false,
//是否為自動(dòng)刪除 當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后,是否把消息從隊(duì)列中刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用來(lái)區(qū)分多個(gè)消費(fèi)者
"",
//是否自動(dòng)應(yīng)答
true,
//是否具有排他性
false,
//如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者
false,
//隊(duì)列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//啟用協(xié)程處理
go func() {
for d := range msgs {
//實(shí)現(xiàn)我們要處理的邏輯函數(shù)
log.Printf("Received a message:%s", d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
func (r *RabbitMQ) ConsumeWorker(consumerName string) {
//1、申請(qǐng)隊(duì)列,如果隊(duì)列存在就跳過(guò),不存在創(chuàng)建
//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
_, err := r.channel.QueueDeclare(
//隊(duì)列名稱(chēng)
r.QueueName,
//是否持久化
false,
//是否為自動(dòng)刪除 當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接之后,是否把消息從隊(duì)列中刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用來(lái)區(qū)分多個(gè)消費(fèi)者
consumerName,
//是否自動(dòng)應(yīng)答
true,
//是否具有排他性
false,
//如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者
false,
//隊(duì)列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//啟用協(xié)程處理
go func() {
for d := range msgs {
//實(shí)現(xiàn)我們要處理的邏輯函數(shù)
log.Printf("%s Received a message:%s", consumerName, d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
②測(cè)試代碼
1. simple簡(jiǎn)單模式
consumer.go
func main() {
//消費(fèi)者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
rabbitmq.ConsumeSimple()
}
producer.go
func main() {
//Simple模式 生產(chǎn)者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
2. worker模式
consumer.go
func main() {
/*
worker模式無(wú)非就是多個(gè)消費(fèi)者去同一個(gè)隊(duì)列中消費(fèi)消息
*/
//消費(fèi)者1
rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
go rabbitmq1.ConsumeWorker("consumer1")
//消費(fèi)者2
rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
rabbitmq2.ConsumeWorker("consumer2")
}
producer.go
func main() {
//Worker模式 生產(chǎn)者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
for i := 0; i < 100; i++ {
//time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
3. publish/subscribe模式
consumer.go:
func main() {
//消費(fèi)者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
rabbitmq.RecieveSub()
}
producer.go
func main() {
//訂閱模式發(fā)送者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
for i := 0; i <= 20; i++ {
rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)")
fmt.Println(i)
time.Sleep(1 * time.Second)
}
}
4. router模式
consumer.go
func main() {
//消費(fèi)者
rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
rabbitmq.RecieveRouting()
}
producer.go
func main() {
//路由模式生產(chǎn)者
imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")
for i := 0; i <= 10; i++ {
imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
5. topic模式
consumer.go
func main() {
/*
星號(hào)井號(hào)代表通配符
星號(hào)代表多個(gè)單詞,井號(hào)代表一個(gè)單詞
路由功能添加模糊匹配
消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
交換機(jī)根據(jù)key的規(guī)則模糊匹配到對(duì)應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽(tīng)消費(fèi)者接收消息消費(fèi)
*/
//Topic消費(fèi)者
//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
rabbitmq.RecieveTopic()
}
producer.go
func main() {
//Topic模式生產(chǎn)者
imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")
for i := 0; i <= 10; i++ {
imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
2 Kafka
2.1 基本概念

Kafka是分布式的,其所有的構(gòu)件borker(server服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費(fèi)者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會(huì)存到kafka server里,然后consumer再向kafka server發(fā)起請(qǐng)求去消費(fèi)這些數(shù)據(jù)。
kafka server在這個(gè)過(guò)程中像是一個(gè)幫你保管數(shù)據(jù)的中間商。所以kafka服務(wù)器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀(jì)人的意思)。
在消息的生產(chǎn)時(shí)可以使用一個(gè)標(biāo)識(shí)topic來(lái)區(qū)分,且可以進(jìn)行分區(qū);每一個(gè)分區(qū)都是一個(gè)順序的、不可變的消息隊(duì)列, 并且可以持續(xù)的添加。
同時(shí)為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬(wàn)消息(50 MB),每秒處理55萬(wàn)消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護(hù),而不是由server端維護(hù)。當(dāng)失敗時(shí)能自動(dòng)平衡
- 應(yīng)用場(chǎng)景
- 監(jiān)控
- 消息隊(duì)列
- 流處理
- 日志聚合
- 持久性日志
- 基礎(chǔ)概念
- topic:話(huà)題
- broker:kafka服務(wù)集群,已發(fā)布的消息保存在一組服務(wù)器中,稱(chēng)之為kafka集群。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(broker)
- partition:分區(qū),topic物理上的分組
- message:消息,每個(gè)producer可以向一個(gè)topic主題發(fā)布一些消息

1.生產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.生產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫(xiě)入本地磁盤(pán)
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫(xiě)入本地磁盤(pán)后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK
2.2 常見(jiàn)模式
①點(diǎn)對(duì)點(diǎn)模式:火車(chē)站出租車(chē)搶客
發(fā)送者將消息發(fā)送到消息隊(duì)列中,消費(fèi)者去消費(fèi),如果消費(fèi)者有多個(gè),他們會(huì)競(jìng)爭(zhēng)地消費(fèi),也就是說(shuō)對(duì)于某一條消息,只有一個(gè)消費(fèi)者能“搶“到它。類(lèi)似于火車(chē)站門(mén)口的出租車(chē)搶客的場(chǎng)景。

②發(fā)布訂閱模式:組間無(wú)競(jìng)爭(zhēng),組內(nèi)有競(jìng)爭(zhēng)
消費(fèi)者訂閱對(duì)應(yīng)的topic(主題),只有訂閱了對(duì)應(yīng)topic消費(fèi)者的才會(huì)接收到消息。
例如:
- 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會(huì)把光明牛奶送到對(duì)應(yīng)位置,你也才會(huì)有機(jī)會(huì)消費(fèi)這個(gè)牛奶
注意:為了提高消費(fèi)者的消費(fèi)能力,kafka中引入了消費(fèi)者組的概念。相當(dāng)于是:不同消費(fèi)者組之間因?yàn)橛嗛喌膖opic不同,不會(huì)有競(jìng)爭(zhēng)關(guān)系。但是消費(fèi)者組內(nèi)是有競(jìng)爭(zhēng)關(guān)系。
例如:
- 成都、廈門(mén)的出租車(chē)司機(jī)分別組成各自的消費(fèi)者組。
- 成都的出租車(chē)司機(jī)只拉成都的人,廈門(mén)的只拉廈門(mén)的人。(因此他們兩個(gè)消費(fèi)者組不是競(jìng)爭(zhēng)關(guān)系)
- 成都市內(nèi)的出租車(chē)司機(jī)之間是競(jìng)爭(zhēng)關(guān)系。(消費(fèi)者組內(nèi)是競(jìng)爭(zhēng)關(guān)系)
2.3 docker-compose部署
vim docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機(jī)的ip,例如我本地mac的ip為192.168.0.101
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
# 進(jìn)入到docker-compose.yml所在目錄,執(zhí)行下面命令 docker-compose up -d # 查看部署結(jié)果,狀態(tài)為up表明部署成功 docker-compose ps

2.4 代碼操作
# 1. 創(chuàng)建對(duì)應(yīng)topic docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092 # 2. 查看topic列表 docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181

①producer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方庫(kù)開(kāi)發(fā)的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認(rèn)
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個(gè)partition
config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
// 構(gòu)造一個(gè)消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 連接kafka
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 發(fā)送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
②consumer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍歷所有的分區(qū)
// 針對(duì)每個(gè)分區(qū)創(chuàng)建一個(gè)對(duì)應(yīng)的分區(qū)消費(fèi)者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 異步從每個(gè)分區(qū)消費(fèi)信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
//演示時(shí)使用
select {}
}
③運(yùn)行效果

到此這篇關(guān)于Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)的文章就介紹到這了,更多相關(guān)Go消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go zero微服務(wù)高在請(qǐng)求量下如何優(yōu)化
這篇文章主要為大家介紹了go zero微服務(wù)高在請(qǐng)求量下的優(yōu)化處理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
詳解如何在Go中循環(huán)中使用Defer關(guān)鍵字示例詳解
這篇文章主要為大家介紹了詳解如何在Go中循環(huán)中使用Defer關(guān)鍵字示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
Go語(yǔ)言開(kāi)發(fā)區(qū)塊鏈只需180行代碼(推薦)
這篇文章主要介紹了Go語(yǔ)言開(kāi)發(fā)區(qū)塊鏈只需180行代碼,文章中將不會(huì)涉及工作量證明算法(PoW)以及權(quán)益證明算法(PoS)這類(lèi)的共識(shí)算法。需要的朋友可以參考下2018-05-05
Go語(yǔ)言中實(shí)現(xiàn)完美錯(cuò)誤處理實(shí)踐分享
Go?語(yǔ)言是一門(mén)非常流行的編程語(yǔ)言,由于其高效的并發(fā)編程和出色的網(wǎng)絡(luò)編程能力,越來(lái)越受到廣大開(kāi)發(fā)者的青睞。本文我們就來(lái)深入探討一下Go?語(yǔ)言中的錯(cuò)誤處理機(jī)制吧2023-04-04
Windows下Goland的環(huán)境搭建過(guò)程詳解
這篇文章主要介紹了Windows下Goland的環(huán)境搭建過(guò)程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Goland使用delve進(jìn)行遠(yuǎn)程調(diào)試的詳細(xì)教程
網(wǎng)上給出的使用delve進(jìn)行遠(yuǎn)程調(diào)試,都需要先在本地交叉編譯或者在遠(yuǎn)程主機(jī)上編譯出可運(yùn)行的程序,然后再用delve在遠(yuǎn)程啟動(dòng)程序,本教程會(huì)將上面的步驟簡(jiǎn)化為只需要兩步,1,在遠(yuǎn)程運(yùn)行程序2,在本地啟動(dòng)調(diào)試,需要的朋友可以參考下2024-08-08
golang?channel多協(xié)程通信常用方法底層原理全面解析
channel?是?goroutine?與?goroutine?之間通信的重要橋梁,借助?channel,我們能很輕易的寫(xiě)出一個(gè)多協(xié)程通信程序,今天,我們就來(lái)看看這個(gè)?channel?的常用用法以及底層原理2023-09-09
Golang中 import cycle not allowed 問(wèn)題
這篇文章主要介紹了Golang中 import cycle not allowed 問(wèn)題的解決方法,問(wèn)題從描述到解決都非常詳細(xì),需要的小伙伴可以參考一下2022-03-03
golang連接kafka消費(fèi)進(jìn)ES操作
這篇文章主要介紹了golang連接kafka消費(fèi)進(jìn)ES操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
go語(yǔ)言標(biāo)準(zhǔn)庫(kù)fmt包的一鍵入門(mén)
這篇文章主要為大家介紹了go語(yǔ)言標(biāo)準(zhǔn)庫(kù)fmt包的一鍵入門(mén)使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08

