欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)

 更新時(shí)間:2024年02月11日 09:26:18   作者:NPE~  
消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu),本文主要介紹了Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka),需要的朋友可以了解一下

1 RabbitMQ

1.1 概念

①基本名詞

當(dāng)前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻(xiàn)給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。

在這里插入圖片描述

  • Broker:表示消息隊(duì)列服務(wù)實(shí)體
  • Virtual Host:虛擬主機(jī)。標(biāo)識一批交換機(jī)、消息隊(duì)列和相關(guān)對象。vhost是AMQP概念的基礎(chǔ),必須在鏈接時(shí)指定,RabbitMQ默認(rèn)的vhost是 /。
    • AMQP(Advanced Message Queuing Protocol)高級消息隊(duì)列協(xié)議
  • Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
  • Queue:消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。

②常見模式

1. simple簡單模式

在這里插入圖片描述

消息的消費(fèi)者(consumer) 監(jiān)聽(while) 消息隊(duì)列,如果隊(duì)列中有消息,就消費(fèi)掉,消息被拿走后,自動從隊(duì)列中刪除(隱患 消息可能沒有被消費(fèi)者正確處理,已經(jīng)從隊(duì)列中消失了,造成消息的丟失)

2. worker工作模式

在這里插入圖片描述

多個(gè)消費(fèi)者從一個(gè)隊(duì)列中爭搶消息

  • (隱患,高并發(fā)情況下,默認(rèn)會產(chǎn)生某一個(gè)消息被多個(gè)消費(fèi)者共同使用,可以設(shè)置一個(gè)開關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個(gè)消費(fèi)者使用)
  • 應(yīng)用場景:紅包;大項(xiàng)目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個(gè)任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊(duì)列中,空閑的系統(tǒng)自動爭搶)

3. publish/subscribe發(fā)布訂閱(共享資源)

在這里插入圖片描述

消費(fèi)者訂閱消息,然后從訂閱的隊(duì)列中獲取消息進(jìn)行消費(fèi)。

  • X代表交換機(jī)rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊(duì)列中,對應(yīng)消息隊(duì)列的消費(fèi)者拿到消息進(jìn)行消費(fèi)
  • 相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)

4. routing路由模式

在這里插入圖片描述

  • 交換機(jī)根據(jù)路由規(guī)則,將消息路由到不同的隊(duì)列中
  • 消息生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,路由是字符串(info) 當(dāng)前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機(jī)根據(jù)路由的key,只能匹配上路由key對應(yīng)的消息隊(duì)列,對應(yīng)的消費(fèi)者才能消費(fèi)消息;

5. topic主題模式(路由模式的一種)

在這里插入圖片描述

  • 星號井號代表通配符
  • 星號代表多個(gè)單詞,井號代表一個(gè)單詞
  • 路由功能添加模糊匹配
  • 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
  • 交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi)

1.2 搭建(docker方式)

①拉取鏡像

# 拉取鏡像
docker pull rabbitmq:3.7-management

②創(chuàng)建并啟動容器

# 創(chuàng)建并運(yùn)行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是項(xiàng)目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672)

# 輸入網(wǎng)址http://ip:15672即可進(jìn)入rabbitmq的web管理頁面,賬戶密碼:guest / guest

③web界面創(chuàng)建用戶和virtual host

在這里插入圖片描述

下面為了我們后續(xù)的操作,首先我們新建一個(gè)Virtual Host并且給他分配一個(gè)用戶名,用來隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建

新增virtual host

在這里插入圖片描述

新增用戶

在這里插入圖片描述

點(diǎn)擊新建好的用戶,設(shè)置其host

在這里插入圖片描述

在這里插入圖片描述

最終效果

在這里插入圖片描述

1.3 代碼操作

①RabbitMQ struct:包含創(chuàng)建、消費(fèi)、生產(chǎn)消息

package RabbitMQ

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

//amqp:// 賬號 密碼@地址:端口號/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"

type RabbitMQ struct {
	//連接
	conn *amqp.Connection
	//管道
	channel *amqp.Channel
	//隊(duì)列名稱
	QueueName string
	//交換機(jī)
	Exchange string
	//key Simple模式 幾乎用不到
	Key string
	//連接信息
	Mqurl string
}

//創(chuàng)建RabbitMQ結(jié)構(gòu)體實(shí)例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
	var err error
	//創(chuàng)建rabbitmq連接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "創(chuàng)建連接錯(cuò)誤!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "獲取channel失敗")
	return rabbitmq
}

//斷開channel和connection
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//錯(cuò)誤處理函數(shù)
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s:%s", message, err))
	}
}

//簡單模式step:1。創(chuàng)建簡單模式下RabbitMQ實(shí)例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

//訂閱模式創(chuàng)建rabbitmq實(shí)例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
	//創(chuàng)建rabbitmq實(shí)例
	rabbitmq := NewRabbitMQ("", exchangeName, "")
	var err error
	//獲取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
	//獲取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel!")
	return rabbitmq
}

//訂閱模式生成
func (r *RabbitMQ) PublishPub(message string) {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 廣播類型
		"fanout",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")

	//2 發(fā)送消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//訂閱模式消費(fèi)端代碼
func (r *RabbitMQ) RecieveSub() {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 廣播類型
		"fanout",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
	q, err := r.channel.QueueDeclare(
		"", //隨機(jī)生產(chǎn)隊(duì)列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊(duì)列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		"",
		r.Exchange,
		false,
		nil,
	)
	//消費(fèi)消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//話題模式 創(chuàng)建RabbitMQ實(shí)例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
	//創(chuàng)建rabbitmq實(shí)例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//話題模式發(fā)送信息
func (r *RabbitMQ) PublishTopic(message string) {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 話題模式
		"topic",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "topic failed to declare an excha"+"nge")
	//2發(fā)送信息
	err = r.channel.Publish(
		r.Exchange,
		//要設(shè)置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//話題模式接收信息
//要注意key
//其中* 用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè))
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 話題模式
		"topic",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an exchange")
	//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
	q, err := r.channel.QueueDeclare(
		"", //隨機(jī)生產(chǎn)隊(duì)列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊(duì)列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消費(fèi)消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//路由模式 創(chuàng)建RabbitMQ實(shí)例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
	//創(chuàng)建rabbitmq實(shí)例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//路由模式發(fā)送信息
func (r *RabbitMQ) PublishRouting(message string) {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 廣播類型
		"direct",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//發(fā)送信息
	err = r.channel.Publish(
		r.Exchange,
		//要設(shè)置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
	//嘗試創(chuàng)建交換機(jī),不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機(jī)名稱
		r.Exchange,
		//交換機(jī)類型 廣播類型
		"direct",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2試探性創(chuàng)建隊(duì)列,創(chuàng)建隊(duì)列
	q, err := r.channel.QueueDeclare(
		"", //隨機(jī)生產(chǎn)隊(duì)列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊(duì)列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消費(fèi)消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//簡單模式Step:2、簡單模式下生產(chǎn)代碼
func (r *RabbitMQ) PublishSimple(message string) {
	//1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建
	//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
	_, err := r.channel.QueueDeclare(
		//隊(duì)列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除
		false,
		//是否具有排他性 true表示自己可見 其他用戶不能訪問
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}

	//2.發(fā)送消息到隊(duì)列中
	r.channel.Publish(
		//默認(rèn)的Exchange交換機(jī)是default,類型是direct直接類型
		r.Exchange,
		//要賦值的隊(duì)列名稱
		r.QueueName,
		//如果為true,根據(jù)exchange類型和routkey規(guī)則,如果無法找到符合條件的隊(duì)列那么會把發(fā)送的消息返回給發(fā)送者
		false,
		//如果為true,當(dāng)exchange發(fā)送消息到隊(duì)列后發(fā)現(xiàn)隊(duì)列上沒有綁定消費(fèi)者,則會把消息還給發(fā)送者
		false,
		//消息
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

func (r *RabbitMQ) ConsumeSimple() {
	//1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建
	//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
	_, err := r.channel.QueueDeclare(
		//隊(duì)列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用來區(qū)分多個(gè)消費(fèi)者
		"",
		//是否自動應(yīng)答
		true,
		//是否具有排他性
		false,
		//如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者
		false,
		//隊(duì)列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//啟用協(xié)程處理
	go func() {
		for d := range msgs {
			//實(shí)現(xiàn)我們要處理的邏輯函數(shù)
			log.Printf("Received a message:%s", d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}

func (r *RabbitMQ) ConsumeWorker(consumerName string) {
	//1、申請隊(duì)列,如果隊(duì)列存在就跳過,不存在創(chuàng)建
	//優(yōu)點(diǎn):保證隊(duì)列存在,消息能發(fā)送到隊(duì)列中
	_, err := r.channel.QueueDeclare(
		//隊(duì)列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個(gè)消費(fèi)者斷開連接之后,是否把消息從隊(duì)列中刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用來區(qū)分多個(gè)消費(fèi)者
		consumerName,
		//是否自動應(yīng)答
		true,
		//是否具有排他性
		false,
		//如果設(shè)置為true,表示不能同一個(gè)connection中發(fā)送的消息傳遞給這個(gè)connection中的消費(fèi)者
		false,
		//隊(duì)列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//啟用協(xié)程處理
	go func() {
		for d := range msgs {
			//實(shí)現(xiàn)我們要處理的邏輯函數(shù)
			log.Printf("%s Received a message:%s", consumerName, d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}

②測試代碼

1. simple簡單模式

consumer.go

func main() {
	//消費(fèi)者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	rabbitmq.ConsumeSimple()
}

producer.go

func main() {
	//Simple模式 生產(chǎn)者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	for i := 0; i < 5; i++ {
		time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}

2. worker模式

consumer.go

func main() {
	/*
		worker模式無非就是多個(gè)消費(fèi)者去同一個(gè)隊(duì)列中消費(fèi)消息
	*/
	//消費(fèi)者1
	rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	go rabbitmq1.ConsumeWorker("consumer1")
	//消費(fèi)者2
	rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	rabbitmq2.ConsumeWorker("consumer2")
}

producer.go

func main() {
	//Worker模式 生產(chǎn)者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	for i := 0; i < 100; i++ {
		//time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}

3. publish/subscribe模式

consumer.go:

func main() {
	//消費(fèi)者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	rabbitmq.RecieveSub()
}

producer.go

func main() {
	//訂閱模式發(fā)送者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	for i := 0; i &lt;= 20; i++ {
		rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)")
		fmt.Println(i)
		time.Sleep(1 * time.Second)
	}
}

4. router模式

consumer.go

func main() {
	//消費(fèi)者
	rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	rabbitmq.RecieveRouting()
}

producer.go

func main() {
	//路由模式生產(chǎn)者
	imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
		imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}

5. topic模式

consumer.go

func main() {
	/*
		星號井號代表通配符
		星號代表多個(gè)單詞,井號代表一個(gè)單詞
		路由功能添加模糊匹配
		消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)
		交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊(duì)列,由隊(duì)列的監(jiān)聽消費(fèi)者接收消息消費(fèi)
	*/
	//Topic消費(fèi)者
	//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
	rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
	rabbitmq.RecieveTopic()
}

producer.go

func main() {
	//Topic模式生產(chǎn)者
	imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
	imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
		imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}

2 Kafka

2.1 基本概念

在這里插入圖片描述

Kafka是分布式的,其所有的構(gòu)件borker(server服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費(fèi)者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會存到kafka server里,然后consumer再向kafka server發(fā)起請求去消費(fèi)這些數(shù)據(jù)。
kafka server在這個(gè)過程中像是一個(gè)幫你保管數(shù)據(jù)的中間商。所以kafka服務(wù)器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀(jì)人的意思)。

在消息的生產(chǎn)時(shí)可以使用一個(gè)標(biāo)識topic來區(qū)分,且可以進(jìn)行分區(qū);每一個(gè)分區(qū)都是一個(gè)順序的、不可變的消息隊(duì)列, 并且可以持續(xù)的添加。
同時(shí)為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護(hù),而不是由server端維護(hù)。當(dāng)失敗時(shí)能自動平衡

  • 應(yīng)用場景
    • 監(jiān)控
    • 消息隊(duì)列
    • 流處理
    • 日志聚合
    • 持久性日志
  • 基礎(chǔ)概念
    • topic:話題
    • broker:kafka服務(wù)集群,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(broker)
    • partition:分區(qū),topic物理上的分組
    • message:消息,每個(gè)producer可以向一個(gè)topic主題發(fā)布一些消息

在這里插入圖片描述

1.生產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.生產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫入本地磁盤后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK

2.2 常見模式

①點(diǎn)對點(diǎn)模式:火車站出租車搶客

發(fā)送者將消息發(fā)送到消息隊(duì)列中,消費(fèi)者去消費(fèi),如果消費(fèi)者有多個(gè),他們會競爭地消費(fèi),也就是說對于某一條消息,只有一個(gè)消費(fèi)者能“搶“到它。類似于火車站門口的出租車搶客的場景。

在這里插入圖片描述

②發(fā)布訂閱模式:組間無競爭,組內(nèi)有競爭

消費(fèi)者訂閱對應(yīng)的topic(主題),只有訂閱了對應(yīng)topic消費(fèi)者的才會接收到消息。

例如:

  • 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會把光明牛奶送到對應(yīng)位置,你也才會有機(jī)會消費(fèi)這個(gè)牛奶

注意:為了提高消費(fèi)者的消費(fèi)能力,kafka中引入了消費(fèi)者組的概念。相當(dāng)于是:不同消費(fèi)者組之間因?yàn)橛嗛喌膖opic不同,不會有競爭關(guān)系。但是消費(fèi)者組內(nèi)是有競爭關(guān)系。

例如:

  • 成都、廈門的出租車司機(jī)分別組成各自的消費(fèi)者組。
  • 成都的出租車司機(jī)只拉成都的人,廈門的只拉廈門的人。(因此他們兩個(gè)消費(fèi)者組不是競爭關(guān)系)
  • 成都市內(nèi)的出租車司機(jī)之間是競爭關(guān)系。(消費(fèi)者組內(nèi)是競爭關(guān)系)

2.3 docker-compose部署

 vim docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:6.2.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      #KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機(jī)的ip,例如我本地mac的ip為192.168.0.101
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
# 進(jìn)入到docker-compose.yml所在目錄,執(zhí)行下面命令
docker-compose up -d
# 查看部署結(jié)果,狀態(tài)為up表明部署成功
docker-compose ps 

在這里插入圖片描述

2.4 代碼操作

# 1. 創(chuàng)建對應(yīng)topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092

# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181

在這里插入圖片描述

①producer.go

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// 基于sarama第三方庫開發(fā)的kafka client

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 發(fā)送完數(shù)據(jù)需要leader和follow都確認(rèn)
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個(gè)partition
	config.Producer.Return.Successes = true                   // 成功交付的消息將在success channel返回

	// 構(gòu)造一個(gè)消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 連接kafka
	client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 發(fā)送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

②consumer.go

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// kafka consumer

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍歷所有的分區(qū)
		// 針對每個(gè)分區(qū)創(chuàng)建一個(gè)對應(yīng)的分區(qū)消費(fèi)者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 異步從每個(gè)分區(qū)消費(fèi)信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)
	}
	//演示時(shí)使用
	select {}
}

③運(yùn)行效果

在這里插入圖片描述

 到此這篇關(guān)于Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)的文章就介紹到這了,更多相關(guān)Go消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go 如何基于IP限制HTTP訪問頻率的方法實(shí)現(xiàn)

    Go 如何基于IP限制HTTP訪問頻率的方法實(shí)現(xiàn)

    這篇文章主要介紹了Go 如何基于IP限制HTTP訪問頻率的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • 一文帶大家搞懂Go語言中的迭代器

    一文帶大家搞懂Go語言中的迭代器

    迭代器是使用戶可在容器對象上遍訪的對象,設(shè)計(jì)人員使用此接口無需關(guān)心容器對象的內(nèi)存分配的實(shí)現(xiàn)細(xì)節(jié),本文主要為大家詳細(xì)介紹一下Go語言中的迭代器的實(shí)現(xiàn),需要的可以了解下
    2025-02-02
  • 深入探索Go語言中unsafe包的使用

    深入探索Go語言中unsafe包的使用

    Go語言的unsafe包被譽(yù)為黑科技,它為Go語言提供了底層訪問和操控內(nèi)存的能力,本文將深入探討Go語言中unsafe包的使用方法和注意事項(xiàng),需要的可以參考一下
    2023-04-04
  • go語言實(shí)現(xiàn)銀行卡號Luhn校驗(yàn)

    go語言實(shí)現(xiàn)銀行卡號Luhn校驗(yàn)

    這篇文章主要為大家介紹了go語言Luhn校驗(yàn)測試銀行卡號碼的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • 使用Go語言實(shí)現(xiàn)benchmark解析器

    使用Go語言實(shí)現(xiàn)benchmark解析器

    這篇文章主要為大家詳細(xì)介紹了如何使用Go語言實(shí)現(xiàn)benchmark解析器并實(shí)現(xiàn)及Web UI 數(shù)據(jù)可視化,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下
    2025-04-04
  • 一文讓你理解go語言的Context

    一文讓你理解go語言的Context

    在Go語言中,Context(上下文)是一個(gè)類型,用于在程序中傳遞請求范圍的值、截止時(shí)間、取消信號和其他與請求相關(guān)的上下文信息,它在多個(gè)goroutine之間傳遞這些值,使得并發(fā)編程更加可靠和簡單,本文詳細(xì)介紹go語言的Context,需要的朋友可以參考下
    2023-05-05
  • Go語言中的指針運(yùn)算實(shí)例分析

    Go語言中的指針運(yùn)算實(shí)例分析

    這篇文章主要介紹了Go語言中的指針運(yùn)算技巧,實(shí)例分析了Go語言指針運(yùn)算的實(shí)現(xiàn)方法,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-02-02
  • Go語言中的IO操作及Flag包的用法

    Go語言中的IO操作及Flag包的用法

    這篇文章介紹了Go語言中的IO操作及Flag包的用法,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-07-07
  • Go語言HTTP請求流式寫入body的示例代碼

    Go語言HTTP請求流式寫入body的示例代碼

    這篇文章主要介紹了Go語言HTTP請求流式寫入body,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-06-06
  • 8個(gè)Elasticsearch高頻面試題和答案整理

    8個(gè)Elasticsearch高頻面試題和答案整理

    這篇文章為大家精選了8道Elasticsearch高頻面試題和答案,并且給出了這些知識點(diǎn)的應(yīng)用場景、也給出了解決這些問題的思路,希望對大家有所幫助
    2023-06-06

最新評論