一篇文章帶你從入門到精通:RabbitMQ
1. 淺淺道來
1.1 什么是中間件?
IDC(互聯(lián)網(wǎng)數(shù)據(jù)中心)的定義:中間件是一種獨(dú)立的系統(tǒng)軟件服務(wù)程序,分布式應(yīng)用軟件借助這種軟件在不同的技術(shù)之間共享資源,中間件位于客戶機(jī)服務(wù)器的操作系統(tǒng)之上,管理計(jì)算資源和網(wǎng)絡(luò)通信。
首先,中間件是某一類軟件的總稱,而不是某一種具體的軟件。它是一種位于平臺(操作系統(tǒng)硬件) 和 應(yīng)用程序之間的通用服務(wù),它屏蔽了底層操作系統(tǒng)的各種復(fù)雜性,減輕了開發(fā)人員的技術(shù)負(fù)擔(dān),同時(shí)它的設(shè)計(jì)不針對某一具體目標(biāo),而是提供具有普遍通用特點(diǎn)的功能模塊服務(wù),這些服務(wù)具有標(biāo)準(zhǔn)的程序接口和協(xié)議,根據(jù)平臺的不同,也可以有不同的實(shí)現(xiàn)。
通俗的例子(僅供參考,并不算完全一致):
我開了一家咖啡店,我身邊有 A B C 等 n 家咖啡豆的供應(yīng)商,但是我肯定要挑選價(jià)格又實(shí)惠,質(zhì)量還不錯(cuò)的豆子,但是市場是受到多方面因素波動(dòng)的,可能我現(xiàn)在的選擇,在一段時(shí)間后已經(jīng)不是最佳選項(xiàng)了。所以我專門找到一家市場中介,讓他幫我操心這一攤子事情,我只和你說清價(jià)格和質(zhì)量要求,你去找就是了,過程我一點(diǎn)也不操心。這個(gè)中介的概念,就類似中間件的
1.1.1 分布式的概念(補(bǔ)充)
這一段,來自我之前寫的 Dubbo 入門的那篇文章哈
在百度以及維基中的定義都相對專業(yè)且晦澀,大部分博客或者教程經(jīng)常會使用《分布式系統(tǒng)原理和范型》中的定義,即:“分布式系統(tǒng)是若干獨(dú)立計(jì)算機(jī)的集合,這些計(jì)算機(jī)對于用戶來說就像是單個(gè)相關(guān)系統(tǒng)”
下面我們用一些篇幅來通俗的解釋一下什么叫做分布式
1.1.1.1 什么是集中式系統(tǒng)
提到分布式,不得不提的就是 “集中式系統(tǒng)”,這個(gè)概念最好理解了,它就是將功能,程序等安裝在同一臺設(shè)備上,就由這一臺主機(jī)設(shè)備向外提供服務(wù)
舉個(gè)最簡單的例子:你拿一臺PC主機(jī),將其改裝成了一臺簡單的服務(wù)器,配置好各種內(nèi)容后,你將MySQL,Web服務(wù)器,F(xiàn)TP,Nginx 等等,全部安裝在其中,打包部署項(xiàng)目后,就可以對外提供服務(wù)了,但是一旦這臺機(jī)器無論是軟件還是硬件出現(xiàn)了問題,整個(gè)系統(tǒng)都會受到嚴(yán)重的牽連錯(cuò)誤,雞蛋放在一個(gè)籃子里,要打就全打了
1.1.12 什么是分布式系統(tǒng)
既然集中式系統(tǒng)有這樣一種牽一發(fā)而動(dòng)全身的問題,那么分布式的其中一個(gè)作用,自然是來解決這樣的問題了,正如定義中所知,分布式系統(tǒng)在用戶的體驗(yàn)感官里,就像傳統(tǒng)的單系統(tǒng)一樣,一些變化都是這個(gè)系統(tǒng)本身內(nèi)部進(jìn)行的,對于用戶并沒有什么太大的感覺
例如:淘寶,京東這種大型電商平臺,它們的主機(jī)都是數(shù)以萬計(jì)的,否則根本沒法處理大量的數(shù)據(jù)和請求,具體其中有什么劃分,以及操作,我們下面會說到,但是對于用戶的我們,我們不需要也不想關(guān)心這些,我們?nèi)钥梢詥渭兊恼J(rèn)為,我們面對的就是 “淘寶” 這一臺 “主機(jī)”
所以分布式的一個(gè)相對專業(yè)一些的說法是這樣的(進(jìn)程粒度)兩個(gè)或者多個(gè)程序,分別運(yùn)行在不同的主機(jī)進(jìn)程上,它們互相配合協(xié)調(diào),完成共同的功能,那么這幾個(gè)程序之間構(gòu)成的系統(tǒng)就可以叫做分布式系統(tǒng)
這幾者都是相同的程序 —— 分布式這幾者都是不同的程序 —— 集群
1.2 什么是消息中間件/消息隊(duì)列(MQ)
消息中間件,顧名思義就是用來處理消息相關(guān)服務(wù)的中間件,它提供了一種系統(tǒng)之間通信交互的通道,例如發(fā)送方只需要把想傳輸?shù)男畔⒔唤o消息中間件,而發(fā)送的協(xié)議,方式,發(fā)送過程中出現(xiàn)的網(wǎng)絡(luò),故障等等問題,都由中間件進(jìn)行處理,因此它負(fù)責(zé)保證信息的可靠傳輸。
所以消息中間件,就是一種用來接受數(shù)據(jù),存儲數(shù)據(jù),發(fā)送數(shù)據(jù)的技術(shù),它提供了各種功能,可以實(shí)現(xiàn)消息的高可用,高可靠,也提供了很好的容錯(cuò)機(jī)制等。可以程序?qū)ο到y(tǒng)資源的占用,以及傳輸效率的提升有很大幫助。
常說的 MQ 就是指消息隊(duì)列,即 Message Quene,常見的消息隊(duì)列有,經(jīng)典的 ActivieMQ,熱門的 Kafka,阿里的 RocketMQ 等等,以及這里講解的 RabbitMQ。
不同的 MQ 有著不同的特點(diǎn),以及其更加擅長的方向,倒也說不上誰好誰壞,只有誰更合適。
1.2.1 消息隊(duì)列應(yīng)用場景
根據(jù)業(yè)務(wù)的需要,其實(shí)它可以有多種應(yīng)用場景,例如解耦,削峰填谷,廣播等,我們舉兩個(gè)場景來梳理一下簡單的過程
1.2.1.1 業(yè)務(wù)解耦
最近在考慮買幾本書看,就以買書下訂單舉例,當(dāng)我點(diǎn)擊購買之后,可能會有這么一串業(yè)務(wù)邏輯執(zhí)行,① 減去庫存容量 ② 生成訂單 ③ 支付 ④ 更新訂單狀態(tài) ⑤ 發(fā)送購買成功短信 ⑥ 更新商品快遞攬收狀態(tài)。在初期階段,我們完全可以讓這些業(yè)務(wù)同步執(zhí)行,但是后期為了提升效率,就可以將需要立即執(zhí)行的任務(wù)和可稍緩執(zhí)行的任務(wù)進(jìn)行分離,例如 ⑤ 發(fā)送購買成功短信 ⑥ 更新商品快遞攬收狀態(tài),都可以考慮異執(zhí)行。在主流程執(zhí)行結(jié)束后,這些可稍緩的業(yè)務(wù)可以通過給 MQ 發(fā)送消息,就判定已經(jīng)執(zhí)行,保證流程先結(jié)束。然后再通過拉取 MQ 消息,或者 MQ 主動(dòng)推送去異步執(zhí)行其他的業(yè)務(wù)。
1.2.1.2 削峰填谷
例如發(fā)送一條帶有已讀未讀標(biāo)識的公告信息,所以需要對每一個(gè)用戶都寫一條這樣的公告消息,例如存到 MongoDB 中,即便 MongoDB 也支撐不下來瞬時(shí)寫入百萬、千萬記錄的情況,所以可以考慮使用消息隊(duì)列。比如說我們可以在Java后端系統(tǒng)上面,用異步多線程的方法,向消息隊(duì)列MQ中發(fā)送消息,這樣Web系統(tǒng)發(fā)布公告消息的時(shí)候就不占用數(shù)據(jù)庫正常的 CRUD 操作。系統(tǒng)消息保存在消息隊(duì)列中,我們只是用它來做削峰填谷,系統(tǒng)消息最終還是要存儲在數(shù)據(jù)庫上面。于是我們可以這樣設(shè)計(jì),在用戶登陸系統(tǒng)的時(shí)候,用異步線程從消息隊(duì)列MQ中,接收該用戶的系統(tǒng)消息,然后把系統(tǒng)消息存儲在數(shù)據(jù)庫中,最后消息隊(duì)列MQ中的該條消息自動(dòng)刪除。因?yàn)橛脩舻腻e(cuò)峰登錄,所以往數(shù)據(jù)庫中寫入消息的任務(wù)也變成了錯(cuò)峰寫入。
1.3 什么是 RabbitMQ
RabbitMQ 是一個(gè)使用 Erlang 語言編寫,且遵循 AMQP協(xié)議的開源消息隊(duì)列系統(tǒng),支持多種客戶端(語言),用于在分布式系統(tǒng)中存儲消息,轉(zhuǎn)發(fā)消息,具有高可用,高可擴(kuò)性,易用性等特征。
更詳細(xì)的介紹可以直接看一下官網(wǎng):https://www.rabbitmq.com/
總之這就是一種常見的消息隊(duì)列,它的這些特點(diǎn),都會在后面逐條講解到,我們首先從入門下載安裝部分先說起,然后再到使用。
2. 下載與安裝
一般來說,安裝的方式有手動(dòng)安裝和 Docker 安裝,大部分場景下,都會使用 Docker 安裝,但是作為學(xué)習(xí)階段,如果不是特別著急,學(xué)習(xí)一下手動(dòng)安裝,也不是什么壞事。
注:云服務(wù)器和虛擬機(jī)都可以,演示的 Linux 版本為 CentOS 7.9
2.1 手動(dòng)安裝
2.1.1 下載安裝過程
注:可以在 Linux 中通過 yum 直接下載安裝,這里選擇了在自己的 Windows 主機(jī)先下載文件,然后再通過 FTP 傳到 Linux 上,直接安裝??梢员苊馓摂M機(jī)上因?yàn)榫W(wǎng)絡(luò)而造成的一些下載問題。
首先打開官網(wǎng)的下載目錄,然后根據(jù)自己 Linux 的版本,選擇版本。
1.地址:https://www.rabbitmq.com/download.html
2.因?yàn)?RabbitMQ 是 Erlang 語言編寫的,所以還需要提供 Erlang 環(huán)境,接著去下載 Erlang。
- 地址:https://www.erlang-solutions.com/downloads
- A:此網(wǎng)站訪問速度極慢,請耐心等待,或者需要掛上梯子
- B:Erlang 版本需要與 RabbitMQ 匹配(如圖,有最大和最小版本的限制)
- 版本查看地址:https://www.rabbitmq.com/which-erlang.html
- 這里選擇了 RabbitMQ 3.8.14 和 Erlang 23.2.3
3.將文件上傳到 Linux 中(我這里指定位置是 /usr/local/bin/rabbitmq ,可以自己更改選擇)
- 現(xiàn)在很多 Shell 軟件都自帶內(nèi)置的 FTP 上傳,例如 FinalShell,MobaXterm 等等
- 上傳后的文件和目錄位置如下
[root@centos7 rabbitmq]# ls esl-erlang_23.2.3-1_centos_7_amd64.rpm rabbitmq-server-3.8.14-1.el7.noarch.rpm [root@centos7 rabbitmq]# pwd /usr/local/bin/rabbitmq
4.安裝 Erlang 、Socat 和 RabbitMQ
- Erlang 、Socat 都是 RabbitMQ 所依賴的
# 安裝 Erlang,安裝后執(zhí)行 erl -v 顯示版本號則代表成功 rpm -ivh esl-erlang_23.2.3-1_centos_7_amd64.rpm # 安裝 Socat 這里沒有下載源文件,而是直接通過 yum 在線安裝,因?yàn)樗⒉淮? yum install -y socat # 安裝 RabbitMQ rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm
5.安裝結(jié)束,啟動(dòng)服務(wù)查看 RabbitMQ 是否可以啟動(dòng)成功
# 啟動(dòng)服務(wù) systemctl start rabbitmq-server # 開機(jī)自啟 systemctl enable rabbitmq-server # 停止服務(wù) systemctl stop rabbitmq-server # 查看服務(wù)狀態(tài) systemctl status rabbitmq-server.service
如圖所示,即安裝啟動(dòng)成功
2.1.2 配置 Web 界面管理
上面的安裝其實(shí)已經(jīng)結(jié)束了,但是 RabbitMQ 提供給了我們一個(gè) Web 形式的管理界面,默認(rèn)是沒有的,需要進(jìn)行安裝。
1.安裝 Web 管理插件,然后重啟服務(wù)
# 安裝命令 rabbitmq-plugins enable rabbitmq_management # 重啟服務(wù) systemctl restart rabbitmq-server
2.一定要開放 Linux 防火墻 的 15672 端口,否則就會無法訪問,在學(xué)習(xí)階段,你甚至可以去查詢命令把防火墻關(guān)掉
對應(yīng)服務(wù)器(阿里云,騰訊云等)就是在安全組中開放 15672 端口
訪問 Linux IP:15672 ,例如 http://192.168.122.1:15672
# 查詢 15672 是否開放,一般默認(rèn)都是 no firewall-cmd --query-port=15672/tcp # 開放指定端口 15672 firewall-cmd --add-port=15672/tcp --permanent # 重新載入 firewall-cmd --reload # 再次查詢,結(jié)果就是 yes 了 firewall-cmd --query-port=15672/tcp
3.添加遠(yuǎn)程登錄的賬戶
- RabbitMQ 有一個(gè)默認(rèn)賬號和密碼都是 guest 但是只能在 localhost 下訪問
# 新增用戶 用戶名和密碼都是 admin rabbitmqctl add_user admin admin
4.為遠(yuǎn)程登錄的賬戶添加權(quán)限
- administrator(超級管理員):登錄控制臺、查看所有信息、操作用戶、操作策略
- monitoring(監(jiān)控者): 登錄控制臺、查看所有信息
- policymaker(策略制定者): 登錄控制臺、指定策略
- managment(普通管理員): 登錄控制臺復(fù)制代碼 代碼如下:
# 設(shè)置用戶分配操作權(quán)限,admin 用戶的權(quán)限為 administrator rabbitmqctl set_user_tags admin administrator
5.為用戶添加資源權(quán)限
- 因?yàn)?admin 已經(jīng)是超級管理員權(quán)限了,所以其實(shí)不分配資源權(quán)限也可以,會默認(rèn)去做。
# 命令格式為: set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 這里即為 admin 用戶開啟 配置文件和讀寫的權(quán)限 rabbitmqctl set_permissions -p / admin ".*"".*"".*"
6.訪問 Linux IP:15672 ,例如
http://192.168.122.1:15672
,輸入剛才設(shè)置好的用戶名密碼 admin- 如圖:訪問成功
2.1.2.1 命令小結(jié)
1.添加用戶:
rabbitmqctl add_user <username> <password>
2.修改密碼:
rabbitmqctl change_password <username> <newpass>
3.刪除用戶:
rabbitmqctl delete_user <username>
4.用戶列表:
rabbitmqctl list_users
5.設(shè)置用戶角色:
rabbitmqctl set_user_tags <username> <tag1,tag2>
6.刪除用戶所有角色:
rabbitmqctl set_user_tags <username>
7.為用戶添加資源權(quán)限:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
使用:輸入 rabbitmqctl ,則會提示可能使用的命令,然后 使用 rabbitmqctl hepl <命令> 可以查看具體命令的使用方法和參數(shù)。
2.1.3 簡單介紹 Web 界面管理
- Connections(連接):此處用來管理與
- RabbitMQ 建立連接后的生產(chǎn)者和消費(fèi)者
- Channels(通道):連接建立后,會形成通道,消息的投遞獲取依賴通道。
- Exchanges(交換機(jī)):用來實(shí)現(xiàn)消息的路由
- Queues(隊(duì)列):存放消息的隊(duì)列,消息等待被消費(fèi),消費(fèi)后被移除隊(duì)列。
- Admin(管理):用于對管理用戶,以及對應(yīng)權(quán)限進(jìn)行設(shè)置,如下圖所示
Tags 就是用來指定用戶的角色
- administrator(超級管理員):登錄控制臺、查看所有信息、操作用戶、操作策略
- monitoring(監(jiān)控者): 登錄控制臺、查看所有信息
- policymaker(策略制定者): 登錄控制臺、指定策略
- managment(普通管理員): 登錄控制臺
2.2 Docker 安裝
在 Docker 中安裝 RabbitMQ 不需要自己去考慮版本,環(huán)境等的各種沖突不兼容問題,是非常便捷的,我演示的這臺虛擬機(jī)是一個(gè) CentOS 7.9 裸機(jī),所以我們從更新 yum,到安裝 Docker 和 安裝 RabbitMQ 按步驟都講一下
2.2.1 配置 yum
1.更新 yum 到最新版
# 更新 yum yum update # 檢查yum依賴的幾個(gè)包 yum-utils 提供 yum-config-manager 功能, 后面兩個(gè)是 devicemapper 用到的 yum install -y yum-utils device-mapper-persistent-data lvm2
2.設(shè)置 yum 源為阿里云
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
2.2.2 安裝 docker
2.2.2.1 步驟
1.使用 yum 安裝 docker
- docker-ce 是社區(qū)版的意思,ee為企業(yè)版
yum install docker-ce -y
2.通過查看版本,檢查安裝是否成功
docker -v
sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://<你的ID>.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker
- 國內(nèi)從 DockerHub 拉取鏡像有時(shí)會遇到困難,此時(shí)可以配置鏡像加速器。Docker 官方和國內(nèi)很多云服務(wù)商都提供了國內(nèi)加速器服務(wù),例如:
- 科大鏡像:https://docker.mirrors.ustc.edu.cn/
- 網(wǎng)易:https://hub-mirror.c.163.com/
- 阿里云:https://<你的ID>.mirror.aliyuncs.com
- 七牛云加速器:https://reg-mirror.qiniu.com
當(dāng)配置某一個(gè)加速器地址之后,若發(fā)現(xiàn)拉取不到鏡像,請切換到另一個(gè)加速器地址。國內(nèi)各大云服務(wù)商均提供了 Docker 鏡像加速服務(wù),建議根據(jù)運(yùn)行 Docker 的云平臺選擇對應(yīng)的鏡像加速服務(wù)。
阿里云鏡像獲取地址:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors,登陸后,左側(cè)菜單選中鏡像加速器就可以看到你的專屬地址了
2.2.2.2 Docker 常見命令
2.2.2.2.1 管理命令
- 就啟動(dòng),停止,重啟這些簡單的命令使用 service 也是可以的,systemctl 功能稍微強(qiáng)大一些
# 啟動(dòng) docker systemctl docker start # 停止 docker systemctl docker stop # 重啟 docker systemctl docker restart # 查看 docker 狀態(tài) systemctl status docker # 開機(jī)自啟 systemctl enable docker systemctl unenable docker
2.2.2.2.2 鏡像命令
# 導(dǎo)入鏡像文件 docker load < xxx.tar.gz # 查看安裝的鏡像 docker images # 刪除鏡像 docker rmi 鏡像名
2.2.3 安裝 RabbitMQ (任選其一)
注:直接用 2.2.3.2 一句話安裝 會更好一些
2.2.3.1 一步一步安裝獲取
1.RabbitMQ 的鏡像
docker pull rabbitmq:management
2.創(chuàng)建并運(yùn)行容器(具體參數(shù)在 3 中介紹)
docker run -id --name 容器名 -p 15672:15672 -p 5672:5672 rabbitmq:management
2.2.3.2 一句話安裝
上面的安裝方式,就是先獲取到 RabbitMQ 鏡像后再開始安裝,這里是沒有問題的,創(chuàng)建時(shí)會有一個(gè)問題,因?yàn)槲覀円惭b management 也就是它的 web 管理,如果不做一些處理,默認(rèn)裝好的是沒有用戶的,所以還需要像前面一樣自己進(jìn)去配置,而 Docker Hub 已經(jīng)給出了我們配置的示例,即使用
-e
代表配置,使用 RABBITMQ_DEFAULT_USER
和RABBITMQ_DEFAULT_PASS
配置用戶名和密碼更多請查看 Docker Hub 官方給予例子中的 Setting default user and password 章節(jié)https://registry.hub.docker.com/_/rabbitmq/
1.執(zhí)行安裝
docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
2.通過容器狀態(tài),查看是否運(yùn)行成功
# 查看容器運(yùn)行狀態(tài)docker ps -a# 啟動(dòng)docker start 容器名# 停止docker stop 容器名# 退出命令行,不停止exit# 進(jìn)入到node容器(如果開啟了 -t 的情況)docker exec -it 容器名 bash
2.2.3.2.1 參數(shù)介紹
下面分別講解一下這些參數(shù)的說明:
-i
:表示運(yùn)行容器。-t
:表示為容器保留交互的方式(命令行),即分配一個(gè)偽終端。所以常常會見到-it
這樣的搭配。--name
:為容器起個(gè)名字。-v
:表示目錄映射關(guān)系(前者是宿主機(jī)目錄,后者是映射到宿主機(jī)上的目錄),可以使用多個(gè)-v
做多個(gè)目錄或文件映射。注意:推薦做目錄映射,在宿主機(jī)上做修改,然后共享到容器上。-d
:表示創(chuàng)建一個(gè)守護(hù)式容器在后臺運(yùn)行(這樣創(chuàng)建容器后不會自動(dòng)登錄容器,如果只加-i -t
兩個(gè)參數(shù),創(chuàng)建后就會自動(dòng)進(jìn)去容器),即后端掛起運(yùn)行。-p
:表示端口映射,前者是宿主機(jī)端口,后者是容器內(nèi)的映射端口。可以使用多個(gè)-p
做多個(gè)端口映射,只有做了端口映射,才能被外界訪問。
給大家舉個(gè)例子:
# 查看容器運(yùn)行狀態(tài) docker ps -a # 啟動(dòng) docker start 容器名 # 停止 docker stop 容器名 # 退出命令行,不停止 exit # 進(jìn)入到node容器(如果開啟了 -t 的情況) docker exec -it 容器名 bash
因?yàn)槭褂昧?-t 這個(gè)參數(shù),所以可以分配到一個(gè)偽終端,通過 docker exec -it 容器名 bash 進(jìn)入命令行
-v 目錄映射后,進(jìn)入容器后,也會有一個(gè)一模一樣的 demo 文件夾,例如在其中可以執(zhí)行 python 程序2.2.3.2.1 端口介紹
4369 :erlang發(fā)現(xiàn)端口
5672:client端通信端口
15672:管理界面ui端口
25672:server間內(nèi)部通信端口
61613:不帶TLS和帶TLS的STOMP客戶端
1883:不啟用和啟用TLS的MQTT客戶端
比較關(guān)鍵的就是 5672 和 15672
更多端口詳情可以訪問官網(wǎng)文檔https://www.rabbitmq.com/networking.html
注:如果要通過遠(yuǎn)程連接,例如訪問 web 管理頁面的 15672 端口,Java 客戶端連接的 5672 端口, 一定要進(jìn)行一個(gè)開放操作,否則都連接不到。
- 以下為基于 CentOS 7.9 開放 15672 端口的例子
# 查詢 15672 是否開放,一般默認(rèn)都是 no firewall-cmd --query-port=15672/tcp # 開放指定端口 15672 firewall-cmd --add-port=15672/tcp --permanent # 重新載入 firewall-cmd --reload # 再次查詢,結(jié)果就是 yes 了 firewall-cmd --query-port=15672/tcp
]以下是關(guān)閉防火墻的命令
systemctl disable firewalld systemctl stop firewalld
3. RabbitMQ 協(xié)議和模型
安裝結(jié)束后,就要進(jìn)入主題,即用 Java 或者 Springboot 代碼來實(shí)現(xiàn) RabbitMQ的幾種方式,但是想要很好的理解這幾種路由交換方式,就需要對它的協(xié)議和架構(gòu)模型有所了解。
3.1 協(xié)議
3.1.1 什么是協(xié)議?
協(xié)議,網(wǎng)絡(luò)協(xié)議的簡稱,網(wǎng)絡(luò)協(xié)議是通信計(jì)算機(jī)雙方必須共同遵從的一組約定。如怎么樣建立連接、怎么樣互相識別等。只有遵守這個(gè)約定,計(jì)算機(jī)之間才能相互通信交流。它的三要素是:語法、語義、時(shí)序。
為了使數(shù)據(jù)在網(wǎng)絡(luò)上從源到達(dá)目的,網(wǎng)絡(luò)通信的參與方必須遵循相同的規(guī)則,這套規(guī)則稱為協(xié)議(protocol),它最終體現(xiàn)為在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)包的格式。
3.1.1.1 網(wǎng)絡(luò)協(xié)議的三要素
1.語法:數(shù)據(jù)與控制信息的結(jié)構(gòu)和格式,以及數(shù)據(jù)出現(xiàn)的順序。
2.語義:解釋控制信息每個(gè)部分的意義,以及規(guī)定了需要發(fā)出何種控制信息以及完成的動(dòng)作做出何種響應(yīng)。
3.時(shí)序:對事件發(fā)生順序的詳細(xì)說明。
人們形象地把這三個(gè)要素描述為:做什么,怎么做,做的順序。
舉個(gè)例子 HTTP 協(xié)議
語法:HTTP 規(guī)定了請求報(bào)文和響應(yīng)報(bào)文的格式
語義:客戶端主動(dòng)發(fā)起請求稱為請求,服務(wù)端隨之返回?cái)?shù)據(jù),稱為響應(yīng)
時(shí)序: 一個(gè)請求對應(yīng)一個(gè)響應(yīng),而且先有請求后有響應(yīng)3.1.1.1.1 面試題:為什么消息中間件不直接使用 HTTP 協(xié)議
對于一個(gè)消息中間件來說,其主要責(zé)任就是負(fù)責(zé)數(shù)據(jù)傳遞,存儲,分發(fā),高性能和簡潔才是我們所追求的,而 HTTP 請求報(bào)文頭和響應(yīng)報(bào)文頭是比較復(fù)雜的,包含了Cookie,數(shù)據(jù)的加密解密,窗臺嗎,響應(yīng)碼等附加的功能,我們并不需要這么復(fù)雜的功能。
同時(shí)大部分情況下 HTTP 大部分都是短鏈接,在實(shí)際的交互過程中,一個(gè)請求到響應(yīng)都很有可能會中斷,中斷以后就不會執(zhí)行持久化,就會造成請求的丟失。這樣就不利于消息中間件的業(yè)務(wù)場景,因?yàn)橄⒅虚g件可能是一個(gè)長期的獲取信息的過程,出現(xiàn)問題和故障要對數(shù)據(jù)或消息執(zhí)行持久化等,目的是為了保證消息和數(shù)據(jù)的高可靠和穩(wěn)健的運(yùn)行
3.1.2 RabbitMQ 的 AMQP 協(xié)議
RabbitMQ 的使用的協(xié)議是 AMQP(advanced message queuing protocol),它在2003年時(shí)被提出,最早用于解決金融領(lǐng)不同平臺之間的消息傳遞交互問題。
AMQP 更準(zhǔn)確的說是一種 binary wire-level protocol(鏈接協(xié)議)。這是其和 JMS 的本質(zhì)差別,AMQP 不從 API 層進(jìn)行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實(shí)現(xiàn)了AMQP的 Provider(Producer) 天然性就是跨平臺的。
相比較其它消息協(xié)議,其特性為:
1.分布式事務(wù)支持
2.消息的持久化支持
3.高性能和高可靠的消息處理優(yōu)勢
3.1.3 架構(gòu)模型
想要學(xué)習(xí)后面的幾種消息具體的發(fā)送模式,這個(gè)模型圖就必須理解清楚,因?yàn)檫@幾種方式就是對這個(gè)模型不同程度的選擇和縮減
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Connection
:應(yīng)用程序與Broker之間的網(wǎng)絡(luò)連接。Channel
:信道,即信息傳輸?shù)耐ǖ?,可以建立多個(gè) Channel,每個(gè) Channel 代表一個(gè)會話任務(wù)。- 信道是建立在 TCP 連接內(nèi)的虛擬連接,信息的讀寫都通過信道傳輸,因?yàn)閷τ诓倏v系統(tǒng)而言,建立和銷毀 TCP 是非常昂貴的,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
Broker(Server)
:標(biāo)識消息隊(duì)列服務(wù)器實(shí)體,例如這里就是 RabbitMQ Server。Virtual Host
:虛擬主機(jī),一個(gè) Broker 中可以設(shè)置多個(gè) Virtual Host,用作不同用戶的權(quán)限隔離。- Broker 可以理解為整個(gè)數(shù)據(jù)庫服務(wù),而 Virtual Host 就是其中每個(gè)數(shù)據(jù)庫的感覺,不同項(xiàng)目可以對應(yīng)不同的數(shù)據(jù)庫,其中有著項(xiàng)目所屬的業(yè)務(wù)表等等。
- 每個(gè) Virtual Host 中,可以有若干個(gè) Exchange 和 Queue。
Exchange
:交換機(jī),用來接收生產(chǎn)者發(fā)送的消息,然后將這些消息根據(jù)路由鍵發(fā)送到隊(duì)列。Binding
:Exchange 和 Queue 之間的虛擬連接,Binding 中可以包括多個(gè) Routing key。Routing key
:路由規(guī)則,虛擬機(jī)用它來確認(rèn)如何路由一個(gè)特定消息。Queue
:消息隊(duì)列,它是消息的容器,用來保存消息,每一條消息都能傳入一個(gè)或者多個(gè)隊(duì)列中,等待消費(fèi)者消費(fèi),即取出這個(gè)消息。Consumer
:消息的消費(fèi)者(接收消息的程序)。
4. Java 實(shí)現(xiàn) RabbitMQ
4.1 環(huán)境搭建
官網(wǎng)介紹幾種模型:https://www.rabbitmq.com/getstarted.html
截止目前為止,官網(wǎng)一共提供了 7 中模型的介紹,我們主要介紹前五種基本的模式,也有人將 Direct 和 Topic模式都?xì)w入 Routing 模式,也可以看做四大種。
4.1.1 創(chuàng)建 Java 項(xiàng)目
首先創(chuàng)建好一個(gè)不使用骨架的 Maven 項(xiàng)目,然后引入 RabbitMQ 依賴,還有單元測試依賴即可
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency>
4.1.2 創(chuàng)建虛擬主機(jī)(可選)
在這里,我們創(chuàng)建了一個(gè)新的 Virtual Hosts,用來為這個(gè)Java項(xiàng)目服務(wù),大家還可以創(chuàng)建一個(gè)新的用戶,然后對其開啟這個(gè) Virtual Hosts 的訪問權(quán)限(即將虛擬主機(jī)與用戶綁定)。我們這里還是用 admin(我之前創(chuàng)建的一個(gè)管理員權(quán)限用戶) 來演示。
注:這部分不去做也可以,直接用 / 和 admin 用戶也行
4.1.3 創(chuàng)建連接工具類
由于我們后面要演示多種例子,而每一次獲取連接和釋放連接、關(guān)閉資源等操作代碼都是一致的,為了防止代碼冗余,優(yōu)化代碼,更易理解,提取出一個(gè)工具類,這樣大家將重心放在不同實(shí)現(xiàn)方式的對比上就行了。
- RabbitMqUtil 工具類
- properties
public class RabbitMqUtil { /** * 主機(jī)名 即 Linux IP地址 */ private static String host = ""; /** * 端口號 客戶端訪問默認(rèn)都是 5672 */ private static int port = 0; /** * 虛擬主機(jī) 可以設(shè)置為默認(rèn)的 / 或者自己創(chuàng)建出指定的虛擬主機(jī) */ private static String virtualHost = ""; /** * 用戶名 */ private static String username = ""; /** * 密碼 */ private static String password = ""; // 使用靜態(tài)代碼塊為Properties對象賦值 static { try { //實(shí)例化對象 Properties properties = new Properties(); //獲取properties文件的流對象 InputStream in = RabbitMqUtil.class.getClassLoader().getResourceAsStream("rabbitmq.properties"); properties.load(in); // 分別獲取 value host = properties.getProperty("host"); port = Integer.parseInt(properties.getProperty("port")); virtualHost = properties.getProperty("virtualHost"); username = properties.getProperty("username"); password = properties.getProperty("password"); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取連接 * * @return 連接 */ public static Connection getConnection() { try { // 創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); // 設(shè)置連接 rabbitmq 主機(jī) connectionFactory.setHost(host); // 設(shè)置端口號 connectionFactory.setPort(port); // 設(shè)置連接的虛擬主機(jī)(數(shù)據(jù)庫的感覺) connectionFactory.setVirtualHost(virtualHost); // 設(shè)置訪問虛擬主機(jī)的用戶名和密碼 connectionFactory.setUsername(username); connectionFactory.setPassword(password); // 返回一個(gè)新連接 return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 關(guān)閉通道和釋放連接 * * @param channel channel * @param connection connection */ public static void close(Channel channel, Connection connection) { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } }
host=192.168.122.1 port=5672 virtualHost=/rabbitmq_maven_01 username=admin password=adminv
4.2 五種實(shí)現(xiàn)方式
說明:
- 隊(duì)列名,消息等等字符串內(nèi)容,更推薦定義成變量傳入,我文中都是直接寫在參數(shù)中的,這種魔法值的寫法,并不是很優(yōu)美。
- 生產(chǎn)者中使用了 Junit 單元測試,但是消費(fèi)者中卻在 main 函數(shù)中編寫,這是因?yàn)椋覀兿MM(fèi)者處于一個(gè)持續(xù)運(yùn)行等待的狀態(tài),如果使用 Junit 會導(dǎo)致,程序在執(zhí)行一次后結(jié)束掉.
- 除了在 main 函數(shù)中編寫,還可以考慮使用 sleep 等待或者 while(true) 讓程序不要直接終止掉。
4.2.1 簡單隊(duì)列模式(Hello Word)
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Queue
:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費(fèi)者消費(fèi)。Consumer
:消息的消費(fèi)者(接收消息的程序)。
4.2.1.1 如何理解
由圖所示,簡單隊(duì)列模式,一個(gè)生產(chǎn)者,經(jīng)過一個(gè)隊(duì)列,對應(yīng)一個(gè)消費(fèi)者??梢钥醋鍪屈c(diǎn)對點(diǎn)的一種傳輸方式,相較與 3.1.3 中的模型圖,最主要的特點(diǎn)就是看不到 Exchange(交換機(jī)) 和 routekey(路由鍵) ,正是因?yàn)檫@種模式簡單,所以并不會涉及到復(fù)雜的條件分發(fā)等等,因此也不需要用戶去顯式的考慮交換機(jī)和路由鍵的問題。
- 但是要注意,這種模式并不是生產(chǎn)者直接對接隊(duì)列,而是用了默認(rèn)的交換機(jī),默認(rèn)的交換機(jī)會把消息發(fā)送到和 routekey 名稱相同的隊(duì)列中去,這也是我們在后面代碼中在 routekey 位置填寫了隊(duì)列名稱的原因
4.2.1.2 代碼實(shí)現(xiàn)
4.2.1.2.1 生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("queue1",false,false,false,null); // 發(fā)布消息 channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes()); // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel,connection); } }public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("queue1",false,false,false,null); // 發(fā)布消息 channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes()); // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel,connection); } }
1.通過工具類獲取連接
2.獲取連接通道:根據(jù) 3.1.3 的模型圖可知,生產(chǎn)者需要在獲取到連接后,再獲取信道,才能去訪問后面的交換機(jī)隊(duì)列等。
3.通道綁定消息隊(duì)列:綁定隊(duì)列前,應(yīng)該綁定交換機(jī),但是此模式中隱蔽了交換機(jī)的概念,背后使用了默認(rèn)的交換機(jī),所以直接綁定隊(duì)列。
- queueDeclare 方法解釋
- 參數(shù)1:queue(隊(duì)列名稱),如果隊(duì)列不存在,則自動(dòng)創(chuàng)建。
- 參數(shù)2:durable(隊(duì)列是否持久化),持久化可以保證服務(wù)器重啟后此隊(duì)列仍然存在。
- 參數(shù)3:exclusive(排他隊(duì)列)即是否獨(dú)占隊(duì)列,如果此項(xiàng)為 true,該隊(duì)列僅對首次申明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。
- 參數(shù)4:autoDelete(自動(dòng)刪除),最后一個(gè)消費(fèi)者將消息消費(fèi)完畢后,自動(dòng)刪除隊(duì)列。
- 參數(shù)5:arguments(攜帶附加屬性)。
4.發(fā)布消息:此處可以指定消息隊(duì)列的發(fā)送方法,以及內(nèi)容等,因?yàn)榇四J奖容^簡單,所以沒有涉及到全部參數(shù),后面的模式會有詳細(xì)的講解
- basicPublish 方法解釋
- 參數(shù)1:exchange(交換機(jī)名稱)。
- 參數(shù)2:routingKey(路由key),此處填寫隊(duì)列名,可理解為把消息發(fā)送到和 routekey 名稱相同的隊(duì)列中去。
- 參數(shù)3:props(消息的控制狀態(tài)),可以在此處控制消息的持久化。參數(shù)為:MessageProperties.PERSISTENT_TEXT_PLAIN參數(shù)4:body(消息主體),類型是一個(gè)字節(jié)數(shù)組,要轉(zhuǎn)一下類型。
5.通過工具關(guān)閉channel和釋放連接:先關(guān)閉通道,再釋放連接。
4.2.1.2.2 消費(fèi)者代碼
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException{ // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("queue1", false, false, false, null); // 消費(fèi)消息 channel.basicConsume("queue1", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body): " + new String(body)); } }); } }
1.通過工具類獲取連接
2.獲取連接通道
3.通道綁定消息隊(duì)列
4.消費(fèi)消息:此處用來指定消費(fèi)哪個(gè)隊(duì)列的消息,以及一些機(jī)制和回調(diào)
- basicConsume 方法解釋
- 參數(shù)1:queue(隊(duì)列名稱),即消費(fèi)哪個(gè)隊(duì)列的消息 。
- 參數(shù)2:autoAck(自動(dòng)應(yīng)答)開始消息的自動(dòng)確認(rèn)機(jī)制,只要消費(fèi)了就從隊(duì)列刪除消息。
- 參數(shù)3:callback(消費(fèi)時(shí)的回調(diào)接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個(gè)實(shí)現(xiàn)類。其中重寫 handleDelivery 方法,就可以獲取到消費(fèi)的數(shù)據(jù)內(nèi)容了,這里主要使用了其中的 body,即查看消息主體,其他三個(gè)參數(shù)暫時(shí)還沒用到,有興趣可以先打印輸出一下,能先有個(gè)大概的了解。4.2.2 工作隊(duì)列模式(Work Queue)
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Queue
:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費(fèi)者消費(fèi)。- Consumer:消息的消費(fèi)者(接收消息的程序)。
- 此處我們假設(shè) Consumer1、Consumer2、Consumer3 分別為完成任務(wù)速度不一樣快的消費(fèi)者,這會引出此模式的一個(gè)重點(diǎn)問題。
4.2.2.1 如何理解
工作模式由圖可以看出,就是在簡單隊(duì)列模式的基礎(chǔ)上,增加了多個(gè)消費(fèi)者,也就是讓多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,共同去消費(fèi),這樣能解決簡單隊(duì)列模式中,如果生產(chǎn)速速遠(yuǎn)大于消費(fèi)速度,而導(dǎo)致的消息堆積現(xiàn)象。
- 因?yàn)橄⒈幌M(fèi)后就會消失,所以不必?fù)?dān)心任務(wù)會重復(fù)執(zhí)行。
4.2.2.2 代碼實(shí)現(xiàn)
注:工作隊(duì)列模式有兩種
輪詢模式:每個(gè)消費(fèi)者均分消息公平分發(fā)模式(能者多勞):按能力分發(fā),處理速度快的分發(fā)的多,處理速度慢的分發(fā)的少
我們首先演示的是輪詢模式,根據(jù)它的缺點(diǎn),又能引出公平分發(fā)模式
下面只描述與上面有差異的部分,在簡單模式中,這些基本的方法都有介紹過
4.2.2.2.1 輪詢模式-生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); for (int i = 1; i <= 20; i++) { // 發(fā)布消息 channel.basicPublish("", "work", null, (i + "號消息").getBytes()); } // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel, connection); } }
流程和簡單隊(duì)列模式基本一致,有一些小小的改動(dòng),生產(chǎn)者中主要就是加了層循環(huán),因?yàn)橛卸鄠€(gè)消費(fèi)者,所以多發(fā)送一些消息,可以看出一些特點(diǎn)和問題。
4.2.2.2.2 輪詢模式-消費(fèi)者代碼
消費(fèi)者 1
public class Consumer1 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); // 消費(fèi)消息 channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費(fèi)者1號:消費(fèi)-" + new String(body)); } }); } }
消費(fèi)者 2
public class Consumer2 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); // 消費(fèi)消息 channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2號:消費(fèi)-" + new String(body)); } }); }
上述兩個(gè)消費(fèi)者都在 basicConsume中開啟了自動(dòng) Ack 應(yīng)答,這一點(diǎn)下面會詳述,同時(shí)在消費(fèi)者 1 中,增加了 sleep 2s 的語句,模擬消費(fèi)者1處理消息速度慢,而消費(fèi)者2處理消息速度快的場景。
運(yùn)行結(jié)果:
Consumer1
消費(fèi)者1號:消費(fèi)-1號消息
消費(fèi)者1號:消費(fèi)-3號消息
消費(fèi)者1號:消費(fèi)-5號消息
消費(fèi)者1號:消費(fèi)-7號消息
消費(fèi)者1號:消費(fèi)-9號消息
消費(fèi)者1號:消費(fèi)-11號消息
消費(fèi)者1號:消費(fèi)-13號消息
消費(fèi)者1號:消費(fèi)-15號消息
消費(fèi)者1號:消費(fèi)-17號消息
消費(fèi)者1號:消費(fèi)-19號消息
Consumer2
消費(fèi)者2號:消費(fèi)-2號消息
消費(fèi)者2號:消費(fèi)-4號消息
消費(fèi)者2號:消費(fèi)-6號消息
消費(fèi)者2號:消費(fèi)-8號消息
消費(fèi)者2號:消費(fèi)-10號消息
消費(fèi)者2號:消費(fèi)-12號消息
消費(fèi)者2號:消費(fèi)-14號消息
消費(fèi)者2號:消費(fèi)-16號消息
消費(fèi)者2號:消費(fèi)-18號消息
消費(fèi)者2號:消費(fèi)-20號消息
觀察執(zhí)行過程:發(fā)現(xiàn)兩個(gè)消費(fèi)者雖然每個(gè)人最后都各自處理了一半的消息,而且是按照一人一條分配的,但是消費(fèi)者2號處理速度快,一下子就全部處理完了,但是消費(fèi)者1號,每一次處理都需要 2s 所以,只能緩慢的處理,而消費(fèi)者2號就處于一個(gè)空閑浪費(fèi)的情況了。
如何切換為公平分發(fā)模式呢?
這就和 basicConsume 中的第二個(gè)參數(shù),開啟自動(dòng)確認(rèn)消費(fèi)有關(guān)了,它默認(rèn)是 true,也就代表只要一旦拿到隊(duì)列中分發(fā)給這個(gè)消費(fèi)者的消息,我就會自動(dòng)返回一個(gè)確認(rèn)消費(fèi)的標(biāo)識,隊(duì)列收到后就會自動(dòng)刪除掉隊(duì)列中的消息。
- 但是這其中有一個(gè)很重要的問題,這種方式就是將風(fēng)險(xiǎn)交給了消費(fèi)者,例如消費(fèi)者收到了自己需要處理的 10 條消息,剛消費(fèi)了 4 個(gè),消費(fèi)者宕機(jī),掛掉了,后面的 6 個(gè)消息就丟失了。
如果想要修改為按能力分配的方式,有兩個(gè)要點(diǎn)
1.設(shè)置通道一次只能消費(fèi)一個(gè)消息
2.關(guān)閉消息的自動(dòng)確認(rèn),手動(dòng)確認(rèn)消息
4.2.2.2.3 公平分發(fā)模式-生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 一次只發(fā)送一條消息 channel.basicQos(1); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); for (int i = 1; i <= 20; i++) { // 發(fā)布消息 channel.basicPublish("", "work", null, (i + "號消息").getBytes()); } // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel, connection); }
4.2.2.2.4 公平分發(fā)模式-消費(fèi)者代碼
消費(fèi)者1
public class Consumer1 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); // 一次只接受一條未確認(rèn)的消息 channel.basicQos(1); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); // 消費(fèi)消息 channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費(fèi)者1號:消費(fèi)-" + new String(body)); // 返回 deliveryTag 代表隊(duì)列可以刪除此消息了 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
消費(fèi)者2
public class Consumer2 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); //步驟一:一次只接受一條未確認(rèn)的消息 channel.basicQos(1); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); // 消費(fèi)消息 channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2號:消費(fèi)-" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } public class Consumer2 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); //步驟一:一次只接受一條未確認(rèn)的消息 channel.basicQos(1); // 通道綁定消息隊(duì)列 channel.queueDeclare("work", true, false, false, null); // 消費(fèi)消息 channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2號:消費(fèi)-" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); }
運(yùn)行結(jié)果:
Consumer1
消費(fèi)者1號:消費(fèi)-1號消息
Consumer2
消費(fèi)者2號:消費(fèi)-2號消息
消費(fèi)者2號:消費(fèi)-3號消息
消費(fèi)者2號:消費(fèi)-4號消息
消費(fèi)者2號:消費(fèi)-5號消息
消費(fèi)者2號:消費(fèi)-6號消息
消費(fèi)者2號:消費(fèi)-7號消息
消費(fèi)者2號:消費(fèi)-8號消息
消費(fèi)者2號:消費(fèi)-9號消息
消費(fèi)者2號:消費(fèi)-10號消息
消費(fèi)者2號:消費(fèi)-11號消息
消費(fèi)者2號:消費(fèi)-12號消息
消費(fèi)者2號:消費(fèi)-13號消息
消費(fèi)者2號:消費(fèi)-14號消息
消費(fèi)者2號:消費(fèi)-15號消息
消費(fèi)者2號:消費(fèi)-16號消息
消費(fèi)者2號:消費(fèi)-17號消息
消費(fèi)者2號:消費(fèi)-18號消息
消費(fèi)者2號:消費(fèi)-19號消息
消費(fèi)者2號:消費(fèi)-20號消息
4.2.3 發(fā)布與訂閱模式(Fanout 廣播)
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Exchange
:交換機(jī),負(fù)責(zé)發(fā)送消息給指定隊(duì)列。Queue
:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費(fèi)者消費(fèi)。Consumer
:消息的消費(fèi)者(接收消息的程序)。
4.2.3.1 如何理解
Fanout 直譯為 “扇出” 但是大家更多的會把它叫做廣播或者發(fā)布與訂閱,它是一種沒有路由key的模式,生產(chǎn)者將消息發(fā)送給交換機(jī),交換機(jī)會把所有消息復(fù)制同步到所有與它綁定過的隊(duì)列上,而每個(gè)隊(duì)列只能有一個(gè)消費(fèi)者拿到這條消息,如果在一個(gè)消費(fèi)者連接中,創(chuàng)建多個(gè)通道,則會出現(xiàn)爭搶消息的結(jié)果。
4.2.3.2 代碼實(shí)現(xiàn)
注:下面只描述與上面有差異的部分,在簡單模式中,這些基本的方法都有介紹過
4.2.3.2.1 生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 final Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order", "fanout"); for (int i = 1; i <= 20; i++) { // 發(fā)布消息 channel.basicPublish("order", "", null, "fanout!".getBytes()); } // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel, connection); } }
1.聲明交換機(jī)
- exchangeDeclare 方法解釋
- 參數(shù)1:exchange(交換機(jī)名稱),如果交換機(jī)不存在,則自動(dòng)創(chuàng)建
- 參數(shù)2:type(類型),此處選擇 fanout 模式
2.發(fā)布消息:在 basicPublish 方法的第一個(gè)參數(shù)中輸入上述定義好的交換機(jī)的名字,第二個(gè)參數(shù),路由鍵為空
- 循環(huán) 20 條是為了演示消費(fèi)者
4.2.3.2.2 消費(fèi)者代碼
public class Consumer1 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order", "fanout"); // 創(chuàng)建臨時(shí)隊(duì)列 String queue = channel.queueDeclare().getQueue(); // 綁定臨時(shí)隊(duì)列和交換機(jī) channel.queueBind(queue, "order", ""); // 消費(fèi)消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者1號:消費(fèi)-" + new String(body)); } }); } }
1.聲明交換機(jī)
2.創(chuàng)建臨時(shí)隊(duì)列
3..綁定臨時(shí)隊(duì)列和交換機(jī)
- queueBind 方法解釋
- 參數(shù)1:queue(臨時(shí)隊(duì)列)
- 參數(shù)2:exchange(交換機(jī))
- 參數(shù)3:routingKey(路由key)
- 消費(fèi)者2:演示了一個(gè)連接中,多個(gè)通道的情況
public class Consumer2 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); Channel channel2 = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order", "fanout"); channel2.exchangeDeclare("order", "fanout"); // 創(chuàng)建臨時(shí)隊(duì)列 String queue = channel.queueDeclare().getQueue(); System.out.println(queue); // 綁定臨時(shí)隊(duì)列和交換機(jī) channel.queueBind(queue, "order", ""); channel2.queueBind(queue, "order", ""); // 消費(fèi)消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2號:消費(fèi)-" + new String(body)); } }); // 消費(fèi)消息 channel2.basicConsume(queue, true, new DefaultConsumer(channel2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2-2號:消費(fèi)-" + new String(body)); } }); } }
運(yùn)行結(jié)果:
消費(fèi)者2號:消費(fèi)-2號消息
消費(fèi)者2號:消費(fèi)-3號消息
消費(fèi)者2號:消費(fèi)-4號消息
消費(fèi)者2號:消費(fèi)-5號消息
消費(fèi)者2號:消費(fèi)-6號消息
消費(fèi)者2號:消費(fèi)-7號消息
消費(fèi)者2號:消費(fèi)-8號消息
消費(fèi)者2號:消費(fèi)-9號消息
消費(fèi)者2號:消費(fèi)-10號消息
消費(fèi)者2號:消費(fèi)-11號消息
消費(fèi)者2號:消費(fèi)-12號消息
消費(fèi)者2號:消費(fèi)-13號消息
消費(fèi)者2號:消費(fèi)-14號消息
消費(fèi)者2號:消費(fèi)-15號消息
消費(fèi)者2號:消費(fèi)-16號消息
消費(fèi)者2號:消費(fèi)-17號消息
消費(fèi)者2號:消費(fèi)-18號消息
消費(fèi)者2號:消費(fèi)-19號消息
消費(fèi)者2號:消費(fèi)-20號消息
4.2.3.2.3 為什么消費(fèi)者中也聲明交換機(jī)?
從上面的代碼中可以看出,在 Producer 和 Conusmer 中我們都分別聲明了交換機(jī),但是消費(fèi)者由圖可知,并不會與交換機(jī)有直接的接觸,為什么消費(fèi)者中也聲明交換機(jī)呢?
這是為了保證 Producer 或者 Producer 執(zhí)行的時(shí)候,永遠(yuǎn)不會因?yàn)榻粨Q機(jī)還沒被聲明而出錯(cuò),例如你只在 Producer 聲明了交換機(jī),那么你就必須先啟動(dòng) Producer ,如果直接執(zhí)行 Conusmer 此時(shí)交換機(jī)就還不存在,就會報(bào)錯(cuò)。而全部寫入聲明,則可以保證不論先啟動(dòng)誰,都會聲明到交換機(jī)。
4.2.4 路由模式( Routing / Direct)
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Exchange
:交換機(jī),負(fù)責(zé)發(fā)送消息給指定隊(duì)列。routingKey
:路由key,即上圖的 key1,key2 等,相當(dāng)于在交換機(jī)和隊(duì)列之間又加了一層限制Queue
:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費(fèi)者消費(fèi)。Consumer
:消息的消費(fèi)者(接收消息的程序)。
4.2.4.1 如何理解
路由模式的交換機(jī)類型是 direct,與 fanout 模式相比,多了路由 key 這個(gè)概念。生產(chǎn)者發(fā)送攜帶指定 routingKey(路由key) 的消息到交換機(jī),交換機(jī)拿著此 routingKey 去找到綁定了這個(gè) routingKey 的隊(duì)列,然后發(fā)送到此隊(duì)列,一個(gè)隊(duì)列可以綁定多個(gè) routingKey 。
4.2.4.2 代碼實(shí)現(xiàn)
4.2.4.2.1 生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order_direct", "direct"); // 指定 routingKey String key = "info"; // 發(fā)布消息 channel.basicPublish("order_direct", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes()); // 通過工具關(guān)閉channel和釋放連接 RabbitMqUtil.close(channel, connection); } }
1.指定 routingKey ,即在 basicPublish 方法 的第二個(gè)參數(shù)中,指定 key 的值
4.2.4.2.2 消費(fèi)者代碼
- 消費(fèi)者 1
public class Consumer1 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order_direct", "direct"); // 獲取臨時(shí)隊(duì)列 String queue = channel.queueDeclare().getQueue(); // 綁定臨時(shí)隊(duì)列和交換機(jī) channel.queueBind(queue, "order_direct", "info"); channel.queueBind(queue, "order_direct", "error"); channel.queueBind(queue, "order_direct", "warn"); // 消費(fèi)消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者1:消費(fèi)-" + new String(body)); } }); } }
1.只是在綁定隊(duì)列和交換機(jī)的時(shí)候,增加了 key 這個(gè)值
- 消費(fèi)者2
public class Consumer2 { public static void main(String[] args) throws IOException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare("order_direct", "direct"); // 獲取臨時(shí)隊(duì)列 String queue = channel.queueDeclare().getQueue(); // 綁定臨時(shí)隊(duì)列和交換機(jī) channel.queueBind(queue, "order_direct", "error"); // 消費(fèi)消息 channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2:消費(fèi)-" + new String(body)); } }); } }
運(yùn)行結(jié)果:只有消費(fèi)者 1 收到了消息
[code]消費(fèi)者1:消費(fèi)-發(fā)送給指定路由info的消息
4.2.5 通配符匹配模式(Topic)
Producer
:消息的生產(chǎn)者(發(fā)送消息的程序)。Exchange
:交換機(jī),負(fù)責(zé)發(fā)送消息給指定隊(duì)列。routingKey
:路由key,即上圖的 key1,key2 等,相當(dāng)于在交換機(jī)和隊(duì)列之間又加了一層限制但是 Topic 中的 key 為通配符的形式,這樣可以大大的提高效率Queue
:消息隊(duì)列,理解為一個(gè)容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費(fèi)者消費(fèi)。Consumer
:消息的消費(fèi)者(接收消息的程序)。
4.2.5.1 如何理解
通配符匹配模式的交換機(jī)類型為 topic,因?yàn)樗c Direct 模式很相似,所以大家有時(shí)候也會把 Direct 模式和 Topic 共同歸入路由模式下,它們的區(qū)別就是,Direct 模式的 routingKey 是一個(gè)指定的值,而 Topic 模式的 routingKey 可以使用通配符, 而且一般都是由一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: ideal.insert。
*
:匹配正好一個(gè)詞,例如:order.*
可以匹配到 order.insert#
:匹配一個(gè)或者多個(gè)詞,例如:order.#
可以匹配到 order.insert.common#
就像一個(gè)多層的概念,而*
只是一個(gè)單層的概念
4.2.5.2 代碼實(shí)現(xiàn)
4.2.5.2.1 生產(chǎn)者代碼
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); channel.exchangeDeclare("order_topic", "topic"); // 聲明交換機(jī) String key = "user.query.all"; // 發(fā)布消息 channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes()); RabbitMqUtil.close(channel, connection); } }
4.2.5.2.2 消費(fèi)者代碼
- 消費(fèi)者1
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); channel.exchangeDeclare("order_topic", "topic"); // 聲明交換機(jī) String key = "user.query.all"; // 發(fā)布消息 channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes()); RabbitMqUtil.close(channel, connection); } }
消費(fèi)者2
public class Producer { @Test public void sendMessage() throws IOException, TimeoutException { // 通過工具類獲取連接 Connection connection = RabbitMqUtil.getConnection(); // 獲取連接通道 Channel channel = connection.createChannel(); channel.exchangeDeclare("order_topic", "topic"); // 聲明交換機(jī) String key = "user.query.all"; // 發(fā)布消息 channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes()); RabbitMqUtil.close(channel, connection); } }
運(yùn)行結(jié)果:只有消費(fèi)者 2 收到了消息,因?yàn)橄⑹且粋€(gè)多層的結(jié)構(gòu),只有
user.#
能匹配到消費(fèi)者2:消費(fèi)-發(fā)送給指定路由user.query.all的消息
5. Springboot 實(shí)現(xiàn) RabbitMQ
SpringBoot 提供 Spring For RabbitMQ 的啟動(dòng)器,同時(shí)提供了一系列注解以及 RabbitTemplate 供我們使用,能夠極大的簡化開發(fā) RabbitMQ 的步驟,下面分別演示了【5.1 基于純注解】 以及【 5.2 基于注解 + 配置類】 的寫法,其使用方式大同小異,只是聲明和綁定隊(duì)列交換機(jī)等的位置不同。一般認(rèn)為后者更好維護(hù)管理,任選其一即可。
環(huán)境準(zhǔn)備:
1.首先創(chuàng)建 SprinBoot 項(xiàng)目,然后選擇 RabbitMQ 的啟動(dòng)器,以及單元測試等基本啟動(dòng)器
2.編寫 yml 配置文件,編寫連接 RabbitMQ 需要的數(shù)據(jù)
RabbitMQ 依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml 配置文件
spring: rabbitmq: host: 192.168.122.1 # 服務(wù)器地址 port: 5672 # tcp端口 username: admin # 用戶名 password: admin # 用戶密碼 virtual-host: /rabbitmq_springboot_01 # 虛擬主機(jī)
5.1 基于純注解
注:此方式?jīng)]有創(chuàng)建配置類來管理隊(duì)列以及交換機(jī)的聲明和綁定等,而是全部通過注解的方式直接在消費(fèi)者中寫入
5.1.1 簡單隊(duì)列模式
所有生產(chǎn)消息的代碼,我們都放到 Test 中去做
- 生產(chǎn)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleSendMessage() { rabbitTemplate.convertAndSend("simple_queue", "This is a message !"); } }
第一步就是注入 SpringBoot 提供給我們的 RabbitTemplate
通過 RabbitTemplate 的 convertAndSend 方法用來發(fā)送消息,他有多種重載方式,今天分別會用到 2 個(gè) 和 3 個(gè)參數(shù)的
- convertAndSend 方法詳解(兩個(gè)參數(shù))
- 參數(shù)1:routingKey(路由key)
- 參數(shù)2:object(發(fā)送的消息正文)
- convertAndSend 方法詳解(三個(gè)參數(shù))
- 參數(shù)1:exchange(交換機(jī))
- 參數(shù)2:routingKey(路由key)
- 參數(shù)3:object(發(fā)送的消息正文)
- 消費(fèi)者
// 注入容器 @Component // 監(jiān)聽 RabbitMQ @RabbitListener(queuesToDeclare = @Queue(value = "simple_queue", durable = "true", exclusive = "false", autoDelete = "false")) public class SimpleConsumer { // 自動(dòng)回調(diào) @RabbitHandler public void receiveMessage(String message) { System.out.println("消費(fèi)者:" + message); } }
1.注入容器
2.監(jiān)聽 RabbitMQ,在 @RabbitListener 注解中,可以實(shí)現(xiàn),隊(duì)列的聲明,以及后面交換機(jī)與隊(duì)列的綁定等
- @Queue 可以有四個(gè)參數(shù),因?yàn)槠涓饔心J(rèn)值,所以只給定 value 值,就會按照 持久化,非獨(dú)占,非自動(dòng)刪除的方式默認(rèn)創(chuàng)建
- 參數(shù)1:value(隊(duì)列名)
- 參數(shù)2:durable ( 持久化消息隊(duì)列)RabbitMQ 重啟后,隊(duì)列仍存在,默認(rèn) true
- 參數(shù)3:exclusive(是否獨(dú)占) 表示該消息隊(duì)列是否只在當(dāng)前 Connection 生效,默認(rèn)是 false
- 參數(shù)4:auto-delete(自動(dòng)刪除)表示消息隊(duì)列沒有在使用時(shí)將被自動(dòng)刪除,默認(rèn)是 false
3.在方法上添加 @RabbitHandler 注解,就能夠?qū)崿F(xiàn)自動(dòng)回調(diào),這樣我們就能拿到生產(chǎn)者中的消息了
- 注:receiveMessage 這個(gè)方法的參數(shù)類型,取決于你在生產(chǎn)者有發(fā)送了什么類型的數(shù)據(jù)
5.1.2 工作隊(duì)列模式
5.1.2.1 輪詢模式
生產(chǎn)者:沒什么好說的,因?yàn)楣ぷ髂J接卸鄠€(gè)消費(fèi)者,所以多發(fā)送幾條消息
- 消費(fèi)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testWorkSendMessage() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序號:" + i); } } }@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testWorkSendMessage() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序號:" + i); } } }
- 1.@RabbitListener 注解,既可以放在類上,也可以放在方法上,例如上述代碼,我們就分別放在了兩個(gè)方法上,用來指代不同的消費(fèi)者。
- 但是如果在類上加入 @RabbitListener 注解,而在下面兩個(gè)方法中,添加 @RabbitHandler 注解則會報(bào)錯(cuò),需要分別為每個(gè)消費(fèi)者都創(chuàng)建一個(gè)類
@Component public class WorkConsumer { // 監(jiān)聽 RabbitMQ @RabbitListener(queuesToDeclare = @Queue("work_queue")) // 消費(fèi)者1 public void receiveMessage1(String message) { System.out.println("消費(fèi)者1:" + message); // 監(jiān)聽 RabbitMQ @RabbitListener(queuesToDeclare = @Queue("work_queue") // 消費(fèi)者2 public void receiveMessage2(String message) { System.out.println("消費(fèi)者2:" + message); } }
5.1.2.2 公平模式(按能力分配)
5.1.2.2.1 修改配置文件的方式
- 生產(chǎn)者不變
- 修改配置文件 yml / properties
spring: rabbitmq: host: 192.168.122.1 # 服務(wù)器地址 port: 5672 # tcp端口 username: admin # 用戶名 password: admin # 用戶密碼 virtual-host: /rabbitmq_springboot_01 # 虛擬主機(jī) # 新增部分 listener: simple: acknowledge-mode: manual # 開啟 ack 手動(dòng)應(yīng)答 prefetch: 1 # 每次只能消費(fèi) 1 條消息
acknowledge-mode 選項(xiàng)介紹
auto:自動(dòng)確認(rèn),為默認(rèn)選項(xiàng)
manual:手動(dòng)確認(rèn)(按能力分配就需要設(shè)置為手動(dòng)確認(rèn))
none:不確認(rèn),發(fā)送后自動(dòng)丟棄
- 消費(fèi)者
@Component public class WorkConsumer { // 監(jiān)聽 RabbitMQ @RabbitListener(queuesToDeclare = @Queue("work_queue")) // 消費(fèi)者 1 public void receiveMessage(String body, Message message, Channel channel) throws IOException { try { // 打印輸出消息主題 System.out.println("消費(fèi)者1:" + body); // 返回 deliveryTag 代表隊(duì)列可以刪除此消息了 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); // 消費(fèi)者告訴隊(duì)列信息消費(fèi)失敗 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } // 監(jiān)聽 RabbitMQ @RabbitListener(queuesToDeclare = @Queue("work_queue")) // 消費(fèi)者 2 public void receiveMessage2(String body, Message message, Channel channel) throws IOException{ try { // 延遲 2s 代表處理業(yè)務(wù)慢 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } try { // 打印輸出消息主題 System.out.println("消費(fèi)者2:" + body); // 返回 deliveryTag 代表隊(duì)列可以刪除此消息了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); // 消費(fèi)者告訴隊(duì)列信息消費(fèi)失敗 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
1.因?yàn)樵?yml 配置中開啟了手動(dòng)確認(rèn),所以,需要在成功和失敗后分別返回確認(rèn)消息
2.basicAck 方法解釋
- 參數(shù)1:deliveryTag(交付標(biāo)志,即該消息的index),返回即代表確認(rèn)收到消息,隊(duì)列可以刪除此消息了
- 參數(shù)2:mutiple(是否批量)選擇 true 將一次性拒絕所有小于 deliveryTag 的消息
3.basicNack 方法解釋
- 參數(shù) 1 |參數(shù) 2 同上
- 參數(shù)3:requeue(被拒絕的是否重新進(jìn)入隊(duì)列)
運(yùn)行結(jié)果:
消費(fèi)者1:This is a message !, 序號:2
消費(fèi)者1:This is a message !, 序號:3
消費(fèi)者1:This is a message !, 序號:4
消費(fèi)者1:This is a message !, 序號:5
消費(fèi)者1:This is a message !, 序號:6
消費(fèi)者1:This is a message !, 序號:7
消費(fèi)者1:This is a message !, 序號:8
消費(fèi)者1:This is a message !, 序號:9
消費(fèi)者1:This is a message !, 序號:10
消費(fèi)者1:This is a message !, 序號:11
消費(fèi)者1:This is a message !, 序號:12
消費(fèi)者1:This is a message !, 序號:13
消費(fèi)者1:This is a message !, 序號:14
消費(fèi)者1:This is a message !, 序號:15
消費(fèi)者1:This is a message !, 序號:16
消費(fèi)者1:This is a message !, 序號:17
消費(fèi)者1:This is a message !, 序號:18
消費(fèi)者1:This is a message !, 序號:19
消費(fèi)者1:This is a message !, 序號:20
消費(fèi)者2:This is a message !, 序號:1
到現(xiàn)在已經(jīng)實(shí)現(xiàn)了修改配置文件的方式實(shí)現(xiàn)按能力分配,補(bǔ)充幾個(gè)配置的內(nèi)容,我們上面只用了一部分,其他的方便大家參考,yml 和 properties 大家自己選擇即可
# 發(fā)送確認(rèn) spring.rabbitmq.publisher-confirm-type=correlated # spring.rabbitmq.publisher-confirms=true(舊版) # 發(fā)送回調(diào) spring.rabbitmq.publisher-returns=true # 消費(fèi)手動(dòng)確認(rèn) spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual # 并發(fā)消費(fèi)者初始化值 spring.rabbitmq.listener.simple.concurrency=1 # 并發(fā)消費(fèi)者的最大值 spring.rabbitmq.listener.simple.max-concurrency=10 # 每個(gè)消費(fèi)者每次監(jiān)聽時(shí)可拉取處理的消息數(shù)量 # 在單個(gè)請求中處理的消息個(gè)數(shù),他應(yīng)該大于等于事務(wù)數(shù)量(unack的最大數(shù)量) spring.rabbitmq.listener.simple.prefetch=1 # 是否支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
5.1.2.2.1 配置工廠的方式
- 消費(fèi)者修改
/** * 設(shè)置消費(fèi)者的確認(rèn)機(jī)制,并達(dá)到能者多勞的效果 * * @param connectionFactory 連接工廠 * @return */ @Bean("workListenerFactory") public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(connectionFactory); // 修改為手動(dòng)確認(rèn) containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 拒絕策略,true 回到隊(duì)列 false丟棄,默認(rèn)是true containerFactory.setDefaultRequeueRejected(true); // 默認(rèn)的PrefetchCount是250 修改為 1 containerFactory.setPrefetchCount(1); return containerFactory; }
@RabbitListener(queuesToDeclare = @Queue("work_queue")) // 將上面的監(jiān)聽,增加 containerFactory 屬性,然后將配置好的工廠傳入 @RabbitListener(queuesToDeclare = @Queue("work_queue"), containerFactory = "workListenerFactory")
5.1.3 發(fā)布與訂閱模式
- 生產(chǎn)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testFanoutSendMessage() { rabbitTemplate.convertAndSend("order_exchange", "", "This is a message !"); } }
1.因?yàn)閺倪@個(gè)模式開始,就涉及到交換機(jī)了,所以用的是三個(gè)參數(shù)的方法
- 消費(fèi)者
@Component public class FanoutConsumer { // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "order_exchange", type = "fanout") // 交換機(jī)與類型 ) }) public void receiveMessage1(String message) { System.out.println("消費(fèi)者1:" + message); } // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "order_exchange", type = "fanout") // 交換機(jī)與類型 ) }) public void receiveMessage2(String message) { System.out.println("消費(fèi)者2:" + message); } }
5.1.4 路由模式(Direct)
- 生產(chǎn)者
- 消費(fèi)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testDirectSendMessage() { rabbitTemplate.convertAndSend("direct_exchange", "info", "This is a message !"); } }
@Component public class DirectConsumer { // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交換機(jī)和類型 key = {"info", "warn", "error"} // 路由key ) }) public void receiveMessage1(String message) { System.out.println("消費(fèi)者1:" + message); } // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交換機(jī)和類型 key = {"info", "warn", "error"} // 路由key ) }) public void receiveMessage2(String message) { System.out.println("消費(fèi)者2:" + message); } }
5.1.5 主題模式
- 生產(chǎn)者
- 消費(fèi)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testTopicSendMessage() { rabbitTemplate.convertAndSend("topic_exchange", "order.insert.common", "This is a message !"); } }
@Component public class TopicConsumer { // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交換機(jī)和類型 key = {"order.*"} // 通配符路由key ) }) public void receiveMessage1(String message) { System.out.println("消費(fèi)者1:" + message); } // 綁定臨時(shí)隊(duì)列和交換機(jī) @RabbitListener(bindings = { @QueueBinding( value = @Queue(), // 臨時(shí)隊(duì)列 exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交換機(jī)和類型 key = {"order.*"} // 通配符路由key ) }) public void receiveMessage2(String message) { System.out.println("消費(fèi)者2:" + message); } }
5.2 基于注解 + 配置類
其實(shí)這種方式,就是將交換機(jī),隊(duì)列的聲明和綁定都在配置類中進(jìn)行,一個(gè)是消費(fèi)者中的注解變的簡潔了,再有就是統(tǒng)一管理,更加條理,而且生產(chǎn)者和消費(fèi)者引用的時(shí)候也更加方便,日后修改的時(shí)候,也不需要對每一處都修改。
由于篇幅過長了,這里演示最復(fù)雜的 Topic 方式,其他的也是信手拈來。
配置類
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; public static final String TOPIC_ROUTINGKEY_2 = "test.#"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE_NAME_1); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE_NAME_2); } @Bean public Binding bindingTopic1(){ return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); } @Bean public Binding bindingTopic2(){ return BindingBuilder.bind(topicQueue2()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_2); } }
1.添加 @Configuration 注解:表明這是一個(gè)配置類
2.定義常量:將交換機(jī)名,隊(duì)列名,路由key 等都可以創(chuàng)建為常量,調(diào)用,管理和修改都非常方便,還可以創(chuàng)建出一個(gè)專門的 RabbitMQ 的常量類。
3.定義交換機(jī):因?yàn)檫@個(gè)例子是 Topic 所以選擇 TopicExchange 類型
4.定義隊(duì)列:傳入隊(duì)列名常量即可,因?yàn)槌志没却嬖谀J(rèn)值,也可以自己自定持久化,是否獨(dú)占等參數(shù)
5.綁定交換機(jī)和隊(duì)列:利用 BindingBuilder 的 bind 方法綁定隊(duì)列,to 綁定到指定交換機(jī),with 傳入路由key
- 生產(chǎn)者
- 消費(fèi)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testTopicSendMessage() { rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !"); } }
@Component public class TopicConsumer { // 綁定隊(duì)列即可 @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_1}) public void receiveMessage1(String message) { System.out.println("消費(fèi)者1:" + message); } // 綁定隊(duì)列即可 @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_2}) public void receiveMessage2(String message) { System.out.println("消費(fèi)者2:" + message); } }
5、總結(jié)
這篇文章就到這里了,希望大家可以多多關(guān)注腳本之家的其他文章!
您可能感興趣的文章:
相關(guān)文章
erlang?on_load_function_failed排查過程解析
這篇文章主要為大家介紹了erlang?on_load_function_failed的排查過程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Erlang實(shí)現(xiàn)的一個(gè)Web服務(wù)器代碼實(shí)例
這篇文章主要介紹了Erlang實(shí)現(xiàn)的一個(gè)Web服務(wù)器代碼實(shí)例,本文直接給出實(shí)現(xiàn)代碼,需要的朋友可以參考下2015-04-04Erlang實(shí)現(xiàn)的百度云推送Android服務(wù)端實(shí)例
這篇文章主要介紹了Erlang實(shí)現(xiàn)的百度云推送Android服務(wù)端實(shí)例,本文先是講解了實(shí)現(xiàn)步驟,然后給出實(shí)現(xiàn)代碼,需要的朋友可以參考下2015-01-01