在RabbitMQ中實(shí)現(xiàn)Work queues工作隊(duì)列模式
一、模式說明

Work Queues 與入門程序的簡單模式相比,多了一個或一些消費(fèi)端,多個消費(fèi)端共同消費(fèi)同一個隊(duì)列中的消息。
應(yīng)用場景 :對于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
二、代碼
Work Queues 與入門程序的 簡單模式 的代碼是幾乎一樣的:可以完全復(fù)制,并復(fù)制多一個消費(fèi)者進(jìn)行多個消費(fèi)者同時消費(fèi)消息的測試。
①生產(chǎn)者
package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//創(chuàng)建連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時候自動刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 30; i++) {
// 發(fā)送信息
String message = "你好;小兔子!work模式--" + i;
/**
* 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
}
// 關(guān)閉資源
channel.close(); connection.close();
}
}
②消費(fèi)者1
package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時候自動刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//一次只能接收并處理一個消息
channel.basicQos(1);
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
//確認(rèn)消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
}
}
③消費(fèi)者2
package com.itheima.rabbitmq.work;
import com.itheima.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時候自動刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//一次只能接收并處理一個消息
channel.basicQos(1);
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
//確認(rèn)消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動確認(rèn),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置為false則需要手動確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
}
}
三、測試
啟動兩個消費(fèi)者,然后再啟動生產(chǎn)者發(fā)送消息;到IDEA的兩個消費(fèi)者對應(yīng)的控制臺查看是否競爭性的接收到消息。


總結(jié)
在一個隊(duì)列中如果有多個消費(fèi)者,那么消費(fèi)者之間對于同一個消息的關(guān)系是競爭的關(guān)系。
到此這篇關(guān)于如何在RabbitMQ中實(shí)現(xiàn)Work queues模式的文章就介紹到這了,希望對你有所幫助,更多相關(guān)RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Security使用Lambda DSL配置流程詳解
Spring Security 5.2 對 Lambda DSL 語法的增強(qiáng),允許使用lambda配置HttpSecurity、ServerHttpSecurity,重要提醒,之前的配置方法仍然有效。lambda的添加旨在提供更大的靈活性,但是用法是可選的。讓我們看一下HttpSecurity的lambda配置與以前的配置樣式相比2023-02-02
Elasticsearch8.1中的Script使用實(shí)例深入解讀
這篇文章主要為大家介紹了Elasticsearch8.1中的Script使用實(shí)例深入解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
Java并發(fā)編程之ConcurrentLinkedQueue解讀
這篇文章主要介紹了Java并發(fā)編程之ConcurrentLinkedQueue解讀,非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來實(shí)現(xiàn),而ConcurrentLinkedQueue就是juc包中自帶的經(jīng)典非堵塞方式實(shí)現(xiàn)的工具類,需要的朋友可以參考下2023-12-12
SpringBoot項(xiàng)目中訪問HTML頁面的三種方法
這篇文章主要介紹了SpringBoot項(xiàng)目中訪問HTML頁面的三種方法,文中通過代碼示例和圖文結(jié)合的方式講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-07-07

