關(guān)于Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)的7種通訊方式
環(huán)境說(shuō)明
- RabbitMQ環(huán)境
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 開發(fā)工具:IntelliJ IDEA
工程搭建
- 創(chuàng)建maven項(xiàng)目
- pom.xml文件引入RabbitMQ依賴
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>compile</scope> </dependency> </dependencies>
連接RabbitMQ
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MQConnections { public static final String RABBITMQ_HOST = "127.0.0.1"; public static final int RABBITMQ_PORT = 5672; public static final String RABBITMQ_USERNAME = "guest"; public static final String RABBITMQ_PASSWORD = "guest"; public static final String RABBITMQ_VIRTUAL_HOST = "/"; /** * 構(gòu)建RabbitMQ連接對(duì)象 * * @return */ public static Connection getConnection() throws IOException, TimeoutException { //1.創(chuàng)建Connection工廠 ConnectionFactory factory = new ConnectionFactory(); //2.設(shè)置Rabbitmq連接信息 factory.setHost(RABBITMQ_HOST); factory.setPort(RABBITMQ_PORT); factory.setUsername(RABBITMQ_USERNAME); factory.setPassword(RABBITMQ_PASSWORD); factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST); //3.返回連接對(duì)象 return factory.newConnection(); } }
通訊模式
1.簡(jiǎn)單通訊
即一個(gè)生產(chǎn)者可以向一個(gè)隊(duì)列發(fā)送消息,一個(gè)消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。如下圖:
public final static String HELLO_QUEUE_NAME = "hello"; @Test public void publish_hello() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null); //4.發(fā)布消息 String msg = "hello,world"; channel.basicPublish("", HELLO_QUEUE_NAME, null, msg.getBytes()); } @Test public void consume_hello() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Received '" + message + "'"); }; channel.basicConsume(HELLO_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
2.工作隊(duì)列通訊
與簡(jiǎn)單通訊一樣,當(dāng)消費(fèi)能力不足或想要提高吞吐時(shí)可添加多個(gè)消費(fèi)者進(jìn)行處理業(yè)務(wù)。如下圖,隊(duì)列中的消息會(huì)逐條被C1和C2消費(fèi)。
public final static String WORK_QUEUE_NAME = "work"; @Test public void publish_work_queue() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); //4.發(fā)布消息 String msg = "hello,work queue"; channel.basicPublish("", WORK_QUEUE_NAME, null, msg.getBytes()); } @Test public void consume_work_queue1() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume1 Received '" + message + "'"); }; channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); } @Test public void consume_work_queue2() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume2 Received '" + message + "'"); }; channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
3.發(fā)布/訂閱通訊
工作隊(duì)列背后的假設(shè)是,每個(gè)任務(wù)只交付給一個(gè)消費(fèi)者做同一件事。如果要交付給多個(gè)消費(fèi)者做不同的事,需要引入交換機(jī)實(shí)現(xiàn)一個(gè)完整的消息傳遞模型,這種模式被稱為“發(fā)布/訂閱”。如下圖,消息會(huì)發(fā)布到交換機(jī)中,交換機(jī)向綁定的隊(duì)列同時(shí)發(fā)送消息,最終C1和C2會(huì)同時(shí)消費(fèi)此條消息。
public final static String PUB_EXCHANGE_NAME = "pub-ex"; public final static String PUB1_QUEUE_NAME = "pub-que1"; public final static String PUB2_QUEUE_NAME = "pub-que2"; @Test public void publish_pub_sub() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建交換機(jī) channel.exchangeDeclare(PUB_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null); channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null); //5.綁定隊(duì)列 channel.queueBind(PUB1_QUEUE_NAME, PUB_EXCHANGE_NAME, ""); channel.queueBind(PUB2_QUEUE_NAME, PUB_EXCHANGE_NAME, ""); //6.發(fā)布消息 String msg = "hello,pub/sub"; channel.basicPublish(PUB_EXCHANGE_NAME, "", null, msg.getBytes()); } @Test public void consume_pub_sub1() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("pub_sub1 Received '" + message + "'"); }; channel.basicConsume(PUB1_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); } @Test public void consume_pub_sub2() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("pub_sub2 Received '" + message + "'"); }; channel.basicConsume(PUB2_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
4.路由通訊
發(fā)布/訂閱模式是交換機(jī)將一條消息同時(shí)路由給多個(gè)隊(duì)列,“路由”模式可以將消息通過(guò)交換機(jī)指定到某個(gè)隊(duì)列中從而被消費(fèi)。如下圖,交換機(jī)將所有類型的日志路由到一個(gè)隊(duì)列中,將error類型的日志路由到另一個(gè)隊(duì)列中。
public final static String ROUT_EXCHANGE_NAME = "rout-ex"; public final static String ROUTALL_QUEUE_NAME = "rout-queall"; public final static String ROUTONE_QUEUE_NAME = "rout-queone"; @Test public void publish_routing() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建交換機(jī) channel.exchangeDeclare(ROUT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null); channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null); //5.綁定隊(duì)列 channel.queueBind(ROUTALL_QUEUE_NAME, ROUT_EXCHANGE_NAME, "all"); channel.queueBind(ROUTONE_QUEUE_NAME, ROUT_EXCHANGE_NAME, "one"); //6.發(fā)布消息 String msg1 = "hello,1-all"; String msg2 = "hello,2-all"; String msg3 = "hello,1-one"; channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg1.getBytes()); channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg2.getBytes()); channel.basicPublish(ROUT_EXCHANGE_NAME, "one", null, msg3.getBytes()); } @Test public void consume_routing_all() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_routing_all Received '" + message + "'"); }; channel.basicConsume(ROUTALL_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); } @Test public void consume_routing_one() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_routing_one Received '" + message + "'"); }; channel.basicConsume(ROUTONE_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
5.主題通訊
“路由”模式仍然有局限性——它不能基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。主題可以帶來(lái)很大的靈活性,發(fā)送到主題交換的消息不能有任意的routing_key,它必須是一個(gè)用點(diǎn)分隔的單詞列表,routing_key有兩種重要的特殊情況:
- *只能代替一個(gè)詞。
- #可以替換零個(gè)或多個(gè)單詞。
public final static String TOPIC_EXCHANGE_NAME = "topic-ex"; public final static String TOPICALL_QUEUE_NAME = "topic-queall"; public final static String TOPICONE_QUEUE_NAME = "topic-queone"; @Test public void publish_topic() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建交換機(jī) channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null); channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null); //5.綁定隊(duì)列 channel.queueBind(TOPICALL_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "*.all.*"); channel.queueBind(TOPICONE_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "#.one"); //6.發(fā)布消息 String msg1 = "hello.all.world"; String msg2 = "hello.world.one"; channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.all.world", null, msg1.getBytes()); channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.world.one", null, msg2.getBytes()); } @Test public void consume_topic_all() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_topic_all Received '" + message + "'"); }; channel.basicConsume(TOPICALL_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); } @Test public void consume_topic_one() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_topic_one Received '" + message + "'"); }; channel.basicConsume(TOPICONE_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
6.RPC通訊
RabbitMQ作為消息中間件可以達(dá)到應(yīng)用解耦效果,如果想達(dá)到RPC遠(yuǎn)程調(diào)用同步返回結(jié)果,RabbitMQ同樣支持,其原理如下:
- 發(fā)布者發(fā)送消息時(shí)指定一個(gè)回調(diào)隊(duì)列和唯一id
- 消費(fèi)者處理完成后將結(jié)果發(fā)送到回調(diào)隊(duì)列中
- 發(fā)布者按照唯一id接收消息并處理
如下圖
public final static String RPC_QUEUE_NAME = "rpc-que"; public final static String RPCCALLBACK_QUEUE_NAME = "rpc-callback-que"; @Test public void publish_rpc() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // replyTo回調(diào)隊(duì)列 channel.queueDeclare(RPCCALLBACK_QUEUE_NAME, false, false, false, null); //4.發(fā)布消息 String msg = "hello rpc"; String correlationId = UUID.randomUUID().toString(); /*AMQP 協(xié)議預(yù)先定義了一組與消息一起使用的14個(gè)屬性。除了以下屬性外,大多數(shù)屬性很少使用: deliveryMode:將消息標(biāo)記為持久(值為2)或瞬時(shí)(任何其他值)。 contentType:用于描述編碼的mime類型。例如,對(duì)于常用的JSON編碼,最好將此屬性設(shè)置為:application/JSON。 replyTo:通常用于命名回調(diào)隊(duì)列。 correlationId:用于將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)。*/ AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(RPCCALLBACK_QUEUE_NAME).correlationId(correlationId).build(); //5.回調(diào)響應(yīng)結(jié)果 channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, msg.getBytes()); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); String recid = delivery.getProperties().getCorrelationId(); if (correlationId.equalsIgnoreCase(recid)) System.out.println("rpc-callback-que '" + message + "'"); }; channel.basicConsume(RPCCALLBACK_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); } @Test public void consume_rpc() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_rpc Received '" + message + "'"); String correlationId = delivery.getProperties().getCorrelationId(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build(); String replyTo = delivery.getProperties().getReplyTo(); String callbackmsg = "rpc callback"; channel.basicPublish("", replyTo, basicProperties, callbackmsg.getBytes()); }; channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); System.in.read(); }
7.Publisher確認(rèn)通訊
Publisher確認(rèn)是RabbitMQ擴(kuò)展以實(shí)現(xiàn)可靠發(fā)布。當(dāng)在通道上啟用發(fā)布者確認(rèn)時(shí),代理將異步確認(rèn)客戶端發(fā)布的消息,這意味著它們已在服務(wù)器端得到處理。
public final static String CONFIRM_EXCHANGE_NAME = "confirm-ex"; public final static String CONFIRM_QUEUE_NAME = "confirm-que"; @Test public void publish_confirm() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.開啟確認(rèn)選項(xiàng) channel.confirmSelect(); //4.構(gòu)建交換機(jī) channel.exchangeDeclare(CONFIRM_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null); //6.綁定隊(duì)列 String right_routing_key = "confirm"; String error_routing_key = "confirm_err"; channel.queueBind(CONFIRM_QUEUE_NAME, CONFIRM_EXCHANGE_NAME, right_routing_key); //7.消息到達(dá)交換機(jī)確認(rèn)監(jiān)聽 channel.addConfirmListener((sequenceNumber, multiple) -> { System.out.println("消息成功發(fā)送到交換機(jī)"); }, (sequenceNumber, multiple) -> { System.err.println("消息未發(fā)送到交換機(jī),補(bǔ)償操作。"); }); //8.消息到達(dá)隊(duì)列確認(rèn)監(jiān)聽 channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> { System.err.format("消息 %s 未路由到指定隊(duì)列: %s, replyText: %s,replyCode: %d%n", body, routingKey, replyText, replyCode); }); //設(shè)置消息持久化 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); //7.發(fā)布消息 String msg = "hello confirm"; channel.basicPublish(CONFIRM_EXCHANGE_NAME, error_routing_key,true, basicProperties, msg.getBytes()); System.in.read(); } @Test public void consume_ack() throws IOException, TimeoutException { //1.獲取連接對(duì)象 Connection connection = MQConnections.getConnection(); //2.構(gòu)建Channl Channel channel = connection.createChannel(); //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)") channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null); //4.監(jiān)聽消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("consume_routing_one Received '" + message + "'"); //消息處理后手動(dòng)ACK channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; // ack為false channel.basicConsume(CONFIRM_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); System.in.read(); }
代碼倉(cāng)庫(kù)
https://gitee.com/codeWBG/learn_rabbitmq
到此這篇關(guān)于關(guān)于Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)的7種通訊方式的文章就介紹到這了,更多相關(guān)Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mybatis中association標(biāo)簽的使用解讀
這篇文章主要介紹了mybatis中association標(biāo)簽的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Java中的WeakHashMap概念原理以及簡(jiǎn)單案例
這篇文章主要介紹了Java中的WeakHashMap概念原理以及簡(jiǎn)單案例,WeakHashMap使用了軟引用結(jié)構(gòu),它的對(duì)象在垃圾回收時(shí)會(huì)被刪除,垃圾回收是優(yōu)先級(jí)非常低的線程,不能被顯示調(diào)用,當(dāng)內(nèi)存不足的時(shí)候會(huì)啟用,需要的朋友可以參考下2023-09-09Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類
這篇文章主要介紹了Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類,可實(shí)現(xiàn)針對(duì)properties配置文件的相關(guān)修改與保存功能,需要的朋友可以參考下2017-11-11spring mvc4的日期/數(shù)字格式化、枚舉轉(zhuǎn)換示例
本篇文章主要介紹了spring mvc4的日期/數(shù)字格式化、枚舉轉(zhuǎn)換示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-01-01