在RabbitMQ中實(shí)現(xiàn)Work queues工作隊(duì)列模式
一、模式說明
Work Queues 與入門程序的簡單模式相比,多了一個或一些消費(fèi)端,多個消費(fèi)端共同消費(fèi)同一個隊(duì)列中的消息。
應(yīng)用場景 :對于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
二、代碼
Work Queues 與入門程序的 簡單模式 的代碼是幾乎一樣的:可以完全復(fù)制,并復(fù)制多一個消費(fèi)者進(jìn)行多個消費(fèi)者同時(shí)消費(fèi)消息的測試。
①生產(chǎn)者
package com.itheima.rabbitmq.work; import com.itheima.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { //創(chuàng)建連接 Connection connection = ConnectionUtil.getConnection(); // 創(chuàng)建頻道 Channel channel = connection.createChannel(); // 聲明(創(chuàng)建)隊(duì)列 /** * 參數(shù)1:隊(duì)列名稱 * 參數(shù)2:是否定義持久化隊(duì)列 * 參數(shù)3:是否獨(dú)占本次連接 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 * 參數(shù)5:隊(duì)列其它參數(shù) */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 1; i <= 30; i++) { // 發(fā)送信息 String message = "你好;小兔子!work模式--" + i; /** * 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage * 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱 * 參數(shù)3:消息其它屬性 * 參數(shù)4:消息內(nèi)容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已發(fā)送消息:" + message); } // 關(guān)閉資源 channel.close(); connection.close(); } }
②消費(fèi)者1
package com.itheima.rabbitmq.work; import com.itheima.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 創(chuàng)建頻道 Channel channel = connection.createChannel(); // 聲明(創(chuàng)建)隊(duì)列 /** * 參數(shù)1:隊(duì)列名稱 * 參數(shù)2:是否定義持久化隊(duì)列 * 參數(shù)3:是否獨(dú)占本次連接 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 * 參數(shù)5:隊(duì)列其它參數(shù) */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //一次只能接收并處理一個消息 channel.basicQos(1); //創(chuàng)建消費(fèi)者;并設(shè)置消息處理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定 * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送) * properties 屬性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key為:" + envelope.getRoutingKey()); //交換機(jī) System.out.println("交換機(jī)為:" + envelope.getExchange()); //消息id System.out.println("消息id為:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8")); Thread.sleep(1000); //確認(rèn)消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //監(jiān)聽消息 /** * 參數(shù)1:隊(duì)列名稱 * 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn) * 參數(shù)3:消息接收到后回調(diào) */ channel.basicConsume(Producer.QUEUE_NAME, false, consumer); } }
③消費(fèi)者2
package com.itheima.rabbitmq.work; import com.itheima.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 創(chuàng)建頻道 Channel channel = connection.createChannel(); // 聲明(創(chuàng)建)隊(duì)列 /** * 參數(shù)1:隊(duì)列名稱 * 參數(shù)2:是否定義持久化隊(duì)列 * 參數(shù)3:是否獨(dú)占本次連接 * 參數(shù)4:是否在不使用的時(shí)候自動刪除隊(duì)列 * 參數(shù)5:隊(duì)列其它參數(shù) */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //一次只能接收并處理一個消息 channel.basicQos(1); //創(chuàng)建消費(fèi)者;并設(shè)置消息處理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定 * envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送) * properties 屬性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key為:" + envelope.getRoutingKey()); //交換機(jī) System.out.println("交換機(jī)為:" + envelope.getExchange()); //消息id System.out.println("消息id為:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8")); Thread.sleep(1000); //確認(rèn)消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //監(jiān)聽消息 /** * 參數(shù)1:隊(duì)列名稱 * 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn) * 參數(shù)3:消息接收到后回調(diào) */ channel.basicConsume(Producer.QUEUE_NAME, false, consumer); } }
三、測試
啟動兩個消費(fèi)者,然后再啟動生產(chǎn)者發(fā)送消息;到IDEA的兩個消費(fèi)者對應(yīng)的控制臺查看是否競爭性的接收到消息。
總結(jié)
在一個隊(duì)列中如果有多個消費(fèi)者,那么消費(fèi)者之間對于同一個消息的關(guān)系是競爭的關(guān)系。
到此這篇關(guān)于如何在RabbitMQ中實(shí)現(xiàn)Work queues模式的文章就介紹到這了,希望對你有所幫助,更多相關(guān)RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Security使用Lambda DSL配置流程詳解
Spring Security 5.2 對 Lambda DSL 語法的增強(qiáng),允許使用lambda配置HttpSecurity、ServerHttpSecurity,重要提醒,之前的配置方法仍然有效。lambda的添加旨在提供更大的靈活性,但是用法是可選的。讓我們看一下HttpSecurity的lambda配置與以前的配置樣式相比2023-02-02Elasticsearch8.1中的Script使用實(shí)例深入解讀
這篇文章主要為大家介紹了Elasticsearch8.1中的Script使用實(shí)例深入解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Java并發(fā)編程之ConcurrentLinkedQueue解讀
這篇文章主要介紹了Java并發(fā)編程之ConcurrentLinkedQueue解讀,非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來實(shí)現(xiàn),而ConcurrentLinkedQueue就是juc包中自帶的經(jīng)典非堵塞方式實(shí)現(xiàn)的工具類,需要的朋友可以參考下2023-12-12SpringBoot項(xiàng)目中訪問HTML頁面的三種方法
這篇文章主要介紹了SpringBoot項(xiàng)目中訪問HTML頁面的三種方法,文中通過代碼示例和圖文結(jié)合的方式講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-07-07