Docker啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的詳細(xì)過(guò)程
一、Docker拉取鏡像并啟動(dòng)RabbitMQ
拉取鏡像
docker pull rabbitmq:3.8.8-management
查看鏡像
docker images rabbitmq
啟動(dòng)鏡像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虛擬機(jī)記得開(kāi)放5672端口或者關(guān)閉防火墻,在window通過(guò) 主機(jī)ip:15672 訪問(wèn)rabbitmq控制臺(tái)
用戶名密碼默認(rèn)為guest
二、Hello World
(一)依賴導(dǎo)入
<!--指定 jdk 編譯版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依賴客戶端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一個(gè)依賴--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
(二)消息生產(chǎn)者
工作原理
- Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
- Connection:publisher/consumer 和 broker 之間的 TCP 連接
- Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷
- Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最終被送到這里等待 consumer 取走
我們需要先獲取連接(Connection),然后通過(guò)連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過(guò)交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)
public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置主機(jī)ip factory.setHost("182.92.234.71"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉 Connection connection = factory.newConnection(); // 獲取信道 Channel channel = connection.createChannel(); /* * 生成一個(gè)隊(duì)列 * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 1.隊(duì)列名稱 * 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中 * 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi) * 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除 * 5.其他參數(shù) **/ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello rabbitmq"; /* * 發(fā)送一個(gè)消息 * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 1.發(fā)送到哪個(gè)交換機(jī) * 2.路由的key是哪個(gè) * 3.其他的參數(shù)信息 * 4.發(fā)送消息的消息體 * **/ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("發(fā)送成功"); } }
(三)消息消費(fèi)者
public class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置主機(jī)ip factory.setHost("182.92.234.71"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉 Connection connection = factory.newConnection(); // 獲取信道 Channel channel = connection.createChannel(); // 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; // 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消費(fèi)被中斷"); }; /* * 消費(fèi)者消費(fèi)消息 * basicConsume(String queue, boolean autoAck, * DeliverCallback deliverCallback, CancelCallback cancelCallback) * 1.消費(fèi)哪個(gè)隊(duì)列 * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答 * 3.消費(fèi)者未成功消費(fèi)的回調(diào) **/ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
三、實(shí)現(xiàn)輪訓(xùn)分發(fā)消息
(一)抽取工具類
可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化
public class RabbitMqUtils { private static Channel channel; static { ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置ip地址 factory.setHost("192.168.23.100"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); try { // 創(chuàng)建連接 Connection connection = factory.newConnection(); // 獲取信道 channel = connection.createChannel(); } catch (Exception e) { System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage()); } } public static Channel getChannel() { return channel; } }
(二)啟動(dòng)兩個(gè)工作線程
相當(dāng)于前面的消費(fèi)者,我們只需要寫一個(gè)類,通過(guò)ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程
public class Worker01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { System.out.println("接受到消息:" + new String(message.getBody())); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; // 啟動(dòng)兩次,第一次為C1, 第二次為C2 System.out.println("C2消費(fèi)者等待消費(fèi)消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback); } }
(三)啟動(dòng)發(fā)送線程
public class Test01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 通過(guò)控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息發(fā)送完成:" + message); } } }
結(jié)果
四、實(shí)現(xiàn)手動(dòng)應(yīng)答
(一)消息應(yīng)答概念
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。
自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。
當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。
手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除
(二)消息應(yīng)答的方法
- Channel.basicAck(用于肯定確認(rèn))
- RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
- Channel.basicNack(用于否定確認(rèn))
- Channel.basicReject(用于否定確認(rèn))
- 與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple
- multiple 的 true 和 false 代表不同意思
true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
false 同上面相比
只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
- 不處理該消息了直接拒絕,可以將其丟棄了
(三)消息自動(dòng)重新入隊(duì)
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。
(四)消息手動(dòng)應(yīng)答代碼
1、生產(chǎn)者
public class Test01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息發(fā)送完成:" + message); } } }
2、睡眠工具類模擬業(yè)務(wù)執(zhí)行
public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
3、消費(fèi)者
public class Worker01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C1,業(yè)務(wù)時(shí)間短"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(1); // 模擬業(yè)務(wù)執(zhí)行1秒 System.out.println("接受到消息:" + new String(message.getBody())); /* * 1、消息標(biāo)識(shí) * 2、是否啟動(dòng)批量確認(rèn),false:否。 * 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息 * 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); } } ============================================================================== public class Worker02 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(15); // 模擬業(yè)務(wù)執(zhí)行15秒 System.out.println("接受到消息:" + new String(message.getBody())); /* * 1、消息標(biāo)識(shí) * 2、是否啟動(dòng)批量確認(rèn),false:否。 * 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息 * 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); } }
worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。
注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)
最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。
到此這篇關(guān)于Docker啟動(dòng)RabbitMQ,實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的文章就介紹到這了,更多相關(guān)Docker啟動(dòng)RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何使用Docker和cpolar在Linux服務(wù)器上搭建DashDot監(jiān)控面板
本文主要介紹如何在Linux服務(wù)器上使用Docker和cpolar技術(shù)搭建DashDot監(jiān)控面板,實(shí)現(xiàn)實(shí)時(shí)服務(wù)器監(jiān)控,DashDot提供直觀的監(jiān)控界面和豐富的指標(biāo),通過(guò)cpolar可以實(shí)現(xiàn)公網(wǎng)訪問(wèn),方便用戶隨時(shí)了解服務(wù)器狀態(tài),文章詳細(xì)說(shuō)明了環(huán)境準(zhǔn)備、安裝Docker、配置DashDot和cpolar的步驟2024-09-09阿里云鏡像安裝docker報(bào)錯(cuò)的問(wèn)題及解決方案
這篇文章主要介紹了阿里云鏡像安裝docker報(bào)錯(cuò)的問(wèn)題及解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08docker-compose部署zk+kafka+storm集群的實(shí)現(xiàn)
這篇文章主要介紹了docker-compose部署zk+kafka+storm集群,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法
這篇文章主要介紹了docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解
今天小編就為大家分享一篇關(guān)于第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03MySQL容器中docker-entrypoint-initdb.d目錄的使用
這篇文章主要介紹了MySQL容器中docker-entrypoint-initdb.d目錄的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05