Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)
1 RabbitMQ
1.1 概念
①基本名詞
當(dāng)前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻(xiàn)給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。
- Broker:表示消息隊(duì)列服務(wù)實(shí)體
- Virtual Host:虛擬主機(jī)。標(biāo)識一批交換機(jī)、消息隊(duì)列和相關(guān)對象。vhost是AMQP概念的基礎(chǔ),必須在鏈接時(shí)指定,RabbitMQ默認(rèn)的vhost是 /。
- AMQP(Advanced Message Queuing Protocol)高級消息隊(duì)列協(xié)議
- Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
- Queue:消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
②常見模式
1. simple簡單模式
消息的消費(fèi)者(consumer) 監(jiān)聽(while) 消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動從隊(duì)列中刪除(隱患 消息可能沒有被消費(fèi)者正確處理,已經(jīng)從隊(duì)列中消失了,造成消息的丟失)
2. worker工作模式
多個(gè)消費(fèi)者從一個(gè)隊(duì)列中爭搶消息
- (隱患,高并發(fā)情況下,默認(rèn)會產(chǎn)生某一個(gè)消息被多個(gè)消費(fèi)者共同使用,可以設(shè)置一個(gè)開關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個(gè)消費(fèi)者使用)
- 應(yīng)用場景:紅包;大項(xiàng)目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個(gè)任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊(duì)列中,空閑的系統(tǒng)自動爭搶)
3. publish/subscribe發(fā)布訂閱(共享資源)
消費(fèi)者訂閱消息,然后從訂閱的隊(duì)列中獲取消息進(jìn)行消費(fèi)。
- X代表交換機(jī)rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊(duì)列中,對應(yīng)消息隊(duì)列的消費(fèi)者拿到消息進(jìn)行消費(fèi)
- 相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)
4. routing路由模式
- 交換機(jī)根據(jù)路由規(guī)則,將消息路由到不同的隊(duì)列中
- 消息生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,路由是字符串(info) 當(dāng)前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機(jī)根據(jù)路由的key,只能匹配上路由key對應(yīng)的消息隊(duì)列,對應(yīng)的消費(fèi)者才能消費(fèi)消息;
5. topic主題模式(路由模式的一種)
- 星號井號代表通配符
- 星號代表多個(gè)單詞,井號代表一個(gè)單詞
- 路由功能添加模糊匹配
- 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
- 交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi)
1.2 搭建(docker方式)
①拉取鏡像
# 拉取鏡像 docker pull rabbitmq:3.7-management
②創(chuàng)建并啟動容器
# 創(chuàng)建并運(yùn)行容器 docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management #5672是項(xiàng)目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672) # 輸入網(wǎng)址http://ip:15672即可進(jìn)入rabbitmq的web管理頁面,賬戶密碼:guest / guest
③web界面創(chuàng)建用戶和virtual host
下面為了我們后續(xù)的操作,首先我們新建一個(gè)Virtual Host并且給他分配一個(gè)用戶名,用來隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建
新增virtual host
新增用戶
點(diǎn)擊新建好的用戶,設(shè)置其host
最終效果
1.3 代碼操作
①RabbitMQ struct:包含創(chuàng)建、消費(fèi)、生產(chǎn)消息
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) //amqp:// 賬號 密碼@地址:端口號/vhost const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi" type RabbitMQ struct { //連接 conn *amqp.Connection //管道 channel *amqp.Channel //隊(duì)列名稱 QueueName string //交換機(jī) Exchange string //key Simple模式 幾乎用不到 Key string //連接信息 Mqurl string } //創(chuàng)建RabbitMQ結(jié)構(gòu)體實(shí)例 func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL} var err error //創(chuàng)建rabbitmq連接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "創(chuàng)建連接錯(cuò)誤!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "獲取channel失敗") return rabbitmq } //斷開channel和connection func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } //錯(cuò)誤處理函數(shù) func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } //簡單模式step:1。創(chuàng)建簡單模式下RabbitMQ實(shí)例 func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } //訂閱模式創(chuàng)建rabbitmq實(shí)例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ { //創(chuàng)建rabbitmq實(shí)例 rabbitmq := NewRabbitMQ("", exchangeName, "") var err error //獲取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connecct rabbitmq!") //獲取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel!") return rabbitmq } //訂閱模式生成 func (r *RabbitMQ) PublishPub(message string) { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2 發(fā)送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //訂閱模式消費(fèi)端代碼 func (r *RabbitMQ) RecieveSub() { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列 q, err := r.channel.QueueDeclare( "", //隨機(jī)生產(chǎn)隊(duì)列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊(duì)列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 "", r.Exchange, false, nil, ) //消費(fèi)消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //話題模式 創(chuàng)建RabbitMQ實(shí)例 func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ { //創(chuàng)建rabbitmq實(shí)例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbingmq!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //話題模式發(fā)送信息 func (r *RabbitMQ) PublishTopic(message string) { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "topic failed to declare an excha"+"nge") //2發(fā)送信息 err = r.channel.Publish( r.Exchange, //要設(shè)置 r.Key, false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //話題模式接收信息 //要注意key //其中* 用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè)) //匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到 func (r *RabbitMQ) RecieveTopic() { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "failed to declare an exchange") //2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列 q, err := r.channel.QueueDeclare( "", //隨機(jī)生產(chǎn)隊(duì)列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊(duì)列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 r.Key, r.Exchange, false, nil, ) //消費(fèi)消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //路由模式 創(chuàng)建RabbitMQ實(shí)例 func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ { //創(chuàng)建rabbitmq實(shí)例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbingmq!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //路由模式發(fā)送信息 func (r *RabbitMQ) PublishRouting(message string) { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //發(fā)送信息 err = r.channel.Publish( r.Exchange, //要設(shè)置 r.Key, false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //路由模式接收信息 func (r *RabbitMQ) RecieveRouting() { //嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機(jī)名稱 r.Exchange, //交換機(jī)類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列 q, err := r.channel.QueueDeclare( "", //隨機(jī)生產(chǎn)隊(duì)列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊(duì)列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 r.Key, r.Exchange, false, nil, ) //消費(fèi)消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //簡單模式Step:2、簡單模式下生產(chǎn)代碼 func (r *RabbitMQ) PublishSimple(message string) { //1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建 //優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中 _, err := r.channel.QueueDeclare( //隊(duì)列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除 false, //是否具有排他性 true表示自己可見 其他用戶不能訪問 false, //是否阻塞 true表示要等待服務(wù)器的響應(yīng) false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //2.發(fā)送消息到隊(duì)列中 r.channel.Publish( //默認(rèn)的Exchange交換機(jī)是default,類型是direct直接類型 r.Exchange, //要賦值的隊(duì)列名稱 r.QueueName, //如果為true,根據(jù)exchange類型和routkey規(guī)則,如果無法找到符合條件的隊(duì)列那么會把發(fā)送的消息返回給發(fā)送者 false, //如果為true,當(dāng)exchange發(fā)送消息到隊(duì)列后發(fā)現(xiàn)隊(duì)列上沒有綁定消費(fèi)者,則會把消息還給發(fā)送者 false, //消息 amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } func (r *RabbitMQ) ConsumeSimple() { //1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建 //優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中 _, err := r.channel.QueueDeclare( //隊(duì)列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除 false, //是否具有排他性 false, //是否阻塞 false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //接收消息 msgs, err := r.channel.Consume( r.QueueName, //用來區(qū)分多個(gè)消費(fèi)者 "", //是否自動應(yīng)答 true, //是否具有排他性 false, //如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者 false, //隊(duì)列是否阻塞 false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) //啟用協(xié)程處理 go func() { for d := range msgs { //實(shí)現(xiàn)我們要處理的邏輯函數(shù) log.Printf("Received a message:%s", d.Body) //fmt.Println(d.Body) } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <-forever } func (r *RabbitMQ) ConsumeWorker(consumerName string) { //1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建 //優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中 _, err := r.channel.QueueDeclare( //隊(duì)列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除 false, //是否具有排他性 false, //是否阻塞 false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //接收消息 msgs, err := r.channel.Consume( r.QueueName, //用來區(qū)分多個(gè)消費(fèi)者 consumerName, //是否自動應(yīng)答 true, //是否具有排他性 false, //如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者 false, //隊(duì)列是否阻塞 false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) //啟用協(xié)程處理 go func() { for d := range msgs { //實(shí)現(xiàn)我們要處理的邏輯函數(shù) log.Printf("%s Received a message:%s", consumerName, d.Body) //fmt.Println(d.Body) } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <-forever }
②測試代碼
1. simple簡單模式
consumer.go
func main() { //消費(fèi)者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple") rabbitmq.ConsumeSimple() }
producer.go
func main() { //Simple模式 生產(chǎn)者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple") for i := 0; i < 5; i++ { time.Sleep(time.Second * 2) rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i)) } }
2. worker模式
consumer.go
func main() { /* worker模式無非就是多個(gè)消費(fèi)者去同一個(gè)隊(duì)列中消費(fèi)消息 */ //消費(fèi)者1 rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker") go rabbitmq1.ConsumeWorker("consumer1") //消費(fèi)者2 rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker") rabbitmq2.ConsumeWorker("consumer2") }
producer.go
func main() { //Worker模式 生產(chǎn)者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker") for i := 0; i < 100; i++ { //time.Sleep(time.Second * 2) rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i)) } }
3. publish/subscribe模式
consumer.go:
func main() { //消費(fèi)者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") rabbitmq.RecieveSub() }
producer.go
func main() { //訂閱模式發(fā)送者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") for i := 0; i <= 20; i++ { rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)") fmt.Println(i) time.Sleep(1 * time.Second) } }
4. router模式
consumer.go
func main() { //消費(fèi)者 rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one") rabbitmq.RecieveRouting() }
producer.go
func main() { //路由模式生產(chǎn)者 imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one") imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two") for i := 0; i <= 10; i++ { imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i)) imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } }
5. topic模式
consumer.go
func main() { /* 星號井號代表通配符 星號代表多個(gè)單詞,井號代表一個(gè)單詞 路由功能添加模糊匹配 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī) 交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi) */ //Topic消費(fèi)者 //rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99 rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的 rabbitmq.RecieveTopic() }
producer.go
func main() { //Topic模式生產(chǎn)者 imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four") for i := 0; i <= 10; i++ { imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i)) imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } }
2 Kafka
2.1 基本概念
Kafka是分布式的,其所有的構(gòu)件borker(server服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費(fèi)者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會存到kafka server里,然后consumer再向kafka server發(fā)起請求去消費(fèi)這些數(shù)據(jù)。
kafka server在這個(gè)過程中像是一個(gè)幫你保管數(shù)據(jù)的中間商。所以kafka服務(wù)器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀(jì)人的意思)。
在消息的生產(chǎn)時(shí)可以使用一個(gè)標(biāo)識topic來區(qū)分,且可以進(jìn)行分區(qū);每一個(gè)分區(qū)都是一個(gè)順序的、不可變的消息隊(duì)列, 并且可以持續(xù)的添加。
同時(shí)為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護(hù),而不是由server端維護(hù)。當(dāng)失敗時(shí)能自動平衡
- 應(yīng)用場景
- 監(jiān)控
- 消息隊(duì)列
- 流處理
- 日志聚合
- 持久性日志
- 基礎(chǔ)概念
- topic:話題
- broker:kafka服務(wù)集群,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(broker)
- partition:分區(qū),topic物理上的分組
- message:消息,每個(gè)producer可以向一個(gè)topic主題發(fā)布一些消息
1.生產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.生產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫入本地磁盤后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK
2.2 常見模式
①點(diǎn)對點(diǎn)模式:火車站出租車搶客
發(fā)送者將消息發(fā)送到消息隊(duì)列中,消費(fèi)者去消費(fèi),如果消費(fèi)者有多個(gè),他們會競爭地消費(fèi),也就是說對于某一條消息,只有一個(gè)消費(fèi)者能“搶“到它。類似于火車站門口的出租車搶客的場景。
②發(fā)布訂閱模式:組間無競爭,組內(nèi)有競爭
消費(fèi)者訂閱對應(yīng)的topic(主題),只有訂閱了對應(yīng)topic消費(fèi)者的才會接收到消息。
例如:
- 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會把光明牛奶送到對應(yīng)位置,你也才會有機(jī)會消費(fèi)這個(gè)牛奶
注意
:為了提高消費(fèi)者的消費(fèi)能力,kafka中引入了消費(fèi)者組的概念。相當(dāng)于是:不同消費(fèi)者組之間因?yàn)橛嗛喌膖opic不同,不會有競爭關(guān)系。但是消費(fèi)者組內(nèi)是有競爭關(guān)系。
例如:
- 成都、廈門的出租車司機(jī)分別組成各自的消費(fèi)者組。
- 成都的出租車司機(jī)只拉成都的人,廈門的只拉廈門的人。(因此他們兩個(gè)消費(fèi)者組不是競爭關(guān)系)
- 成都市內(nèi)的出租車司機(jī)之間是競爭關(guān)系。(消費(fèi)者組內(nèi)是競爭關(guān)系)
2.3 docker-compose部署
vim docker-compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:6.2.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:6.2.0 ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 #KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機(jī)的ip,例如我本地mac的ip為192.168.0.101 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper
# 進(jìn)入到docker-compose.yml所在目錄,執(zhí)行下面命令 docker-compose up -d # 查看部署結(jié)果,狀態(tài)為up表明部署成功 docker-compose ps
2.4 代碼操作
# 1. 創(chuàng)建對應(yīng)topic docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092 # 2. 查看topic列表 docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
①producer.go
package main import ( "fmt" "github.com/IBM/sarama" ) // 基于sarama第三方庫開發(fā)的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認(rèn) config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個(gè)partition config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 // 構(gòu)造一個(gè)消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 連接kafka client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 發(fā)送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
②consumer.go
package main import ( "fmt" "github.com/IBM/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū) if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍歷所有的分區(qū) // 針對每個(gè)分區(qū)創(chuàng)建一個(gè)對應(yīng)的分區(qū)消費(fèi)者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 異步從每個(gè)分區(qū)消費(fèi)信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } }(pc) } //演示時(shí)使用 select {} }
③運(yùn)行效果
到此這篇關(guān)于Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)的文章就介紹到這了,更多相關(guān)Go消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go 如何基于IP限制HTTP訪問頻率的方法實(shí)現(xiàn)
這篇文章主要介紹了Go 如何基于IP限制HTTP訪問頻率的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11go語言實(shí)現(xiàn)銀行卡號Luhn校驗(yàn)
這篇文章主要為大家介紹了go語言Luhn校驗(yàn)測試銀行卡號碼的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05使用Go語言實(shí)現(xiàn)benchmark解析器
這篇文章主要為大家詳細(xì)介紹了如何使用Go語言實(shí)現(xiàn)benchmark解析器并實(shí)現(xiàn)及Web UI 數(shù)據(jù)可視化,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下2025-04-04