原生Java操作兔子隊列RabbitMQ
一.前言
RabbitMQ 是一種快速、靈活、可靠的消息傳遞方式,可用于構(gòu)建分布式應(yīng)用程序、異步處理任務(wù)、實現(xiàn)消息隊列等。下面是 Java 原生操作 RabbitMQ 的一些好處和用途:
- 簡單易用:RabbitMQ 提供了豐富的 Java 客戶端庫,開發(fā)者可以輕松地使用 Java 代碼進行消息發(fā)送和接收,無需學習復雜的消息傳遞協(xié)議和 API。
- 可擴展性強:RabbitMQ 支持集群和分布式部署,可以輕松地實現(xiàn)橫向和縱向擴展,以適應(yīng)不同規(guī)模和負載的應(yīng)用需求。
- 可靠性高:RabbitMQ 提供了多種消息傳遞模式,包括持久化消息、確認機制、事務(wù)機制等,確保消息傳遞的可靠性和一致性。
- 異步處理能力:RabbitMQ 可以異步處理任務(wù),提高應(yīng)用程序的響應(yīng)速度和吞吐量,實現(xiàn)任務(wù)削峰、應(yīng)對高并發(fā)等需求。
- 可用于多種場景:RabbitMQ 可以用于構(gòu)建分布式應(yīng)用程序、實現(xiàn)消息隊列、異步處理任務(wù)、實現(xiàn)實時數(shù)據(jù)同步等場景,具有廣泛的應(yīng)用場景和發(fā)展前景。
二.原生Java操作RabbitMQ
Ⅰ. 簡單模式
1. 添加依賴
<!--rabbitmq依賴--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.0</version> </dependency>
2. 編寫生產(chǎn)者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建隊列,若隊列已存在則使用該隊列 /** * 參數(shù)1:隊列名 * 參數(shù)2:是否持久化,true表示MQ重啟后隊列還存在 * 參數(shù)3:是否私有化,false表示所有消費者都可以訪問,true表示只有第一次訪問她的消費者才能訪問 * 參數(shù)4:是否自動刪除,true表示不再使用隊列時,自動刪除 * 參數(shù)5:其他額外參數(shù) */ channel.queueDeclare("simple_queue",false,false,false,null); // 5.發(fā)送消息 String message = "hello rabbitmq"; /** * 參數(shù)1:交換機名,""表示默認交換機 * 參數(shù)2:路由鍵,簡單模式就是隊列名 * 參數(shù)3:其他額外參數(shù) * 參數(shù)4:要傳遞的消息字節(jié)數(shù)組 */ channel.basicPublish("","simple_queue",null,message.getBytes()); // 6.關(guān)閉信道和連接 channel.close(); connection.close(); System.out.println("=====發(fā)送成功===="); } }
3. 編寫消費者
因為消費者不知道生產(chǎn)者什么時候發(fā)送消息過來,所以消費者需要一直監(jiān)聽生產(chǎn)者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消費者 */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息,消息為:"+message); } }); } }
Ⅱ. 工作隊列模式
與簡單模式相比,工作隊列模式(Work Queue)多了一些消費者,該模式也使用direct交換機,應(yīng)用于處理消息較多的情況。特點如下:
- 一個隊列對應(yīng)多個消費者。
- 一條消息只會被一個消費者消費。
- 消息隊列默認采用輪詢的方式將消息平均發(fā)送給消費者。
其實就是 簡單模式plus版本。
1. 編寫生產(chǎn)者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建隊列,持久化隊列 channel.queueDeclare("work_queue",true,false,false,null); // 5.發(fā)送大量消息,參數(shù)3表示該消息為持久化消息,即除了保存到內(nèi)存還保存到磁盤 for (int i = 0; i < 100; i++) { channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("您好,這是今天的第"+i+"條消息").getBytes(StandardCharsets.UTF_8)); } // 6.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫消費者
這里使用創(chuàng)建了三個消費者,來接收生產(chǎn)者的消息
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 監(jiān)聽隊列,處理消息 channel.basicConsume("work_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消費者1消費消息,消息為:"+message); } }); } }
3. 實現(xiàn)
先把三個消費者運行起來,再運行生產(chǎn)者,得到的消息就會輪詢均分
Ⅲ. 發(fā)布訂閱模式
在開發(fā)過程中,有一些消息需要不同消費者進行不同的處理,如電商網(wǎng)站的同一條促銷信息需要短信發(fā)送、郵件發(fā)送、站內(nèi)信發(fā)送等。此時可以使用發(fā)布訂閱模式(Publish/Subscribe)
特點:
- 生產(chǎn)者將消息發(fā)送給交換機,交換機將消息轉(zhuǎn)發(fā)到綁定此交換機的每個隊列中。
- 工作隊列模式的交換機只能將消息發(fā)送給一個隊列,發(fā)布訂閱模式的交換機能將消息發(fā)送給多個隊列。發(fā)布訂閱模式使用fanout交換機。
1. 編寫生產(chǎn)者
這里創(chuàng)建了三條隊列,一條是發(fā)送短信,一條是站內(nèi)信,一條是郵件隊列、
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import javax.swing.plaf.TreeUI; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機 /** * 參數(shù)1:交換機名 * 參數(shù)2:交換機類型 * 參數(shù)3:交換機是否持久化 */ channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true); // 5.創(chuàng)建隊列 channel.queueDeclare("SEND_MAIL",true,false,false,null); channel.queueDeclare("SEND_MESSAGE",true,false,false,null); channel.queueDeclare("SEND_STATION",true,false,false,null); // 6.交換機綁定隊列 /** * 參數(shù)1:隊列名 * 參數(shù)2:交換機名 * 參數(shù)3:路由關(guān)鍵字,發(fā)布訂閱模式只需要寫""即可 */ channel.queueBind("SEND_MAIL","exchange_fanout",""); channel.queueBind("SEND_MESSAGE","exchange_fanout",""); channel.queueBind("SEND_STATION","exchange_fanout",""); // 7.發(fā)送消息 for (int i = 0; i < 10; i++) { channel.basicPublish("exchange_fanout","",null, ("您好,尊敬的用戶,秒殺商品活動開始啦:"+i).getBytes(StandardCharsets.UTF_8)); } // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫消費者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //短信消費者 public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送短信,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //郵件消費者 public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送郵件,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //站內(nèi)信消費者 public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送站內(nèi)信,消息為:"+message); } }); } }
發(fā)布訂閱模式也允許多個消費者監(jiān)聽同一個隊列(工作模式),例如 兩個發(fā)送短信消費者監(jiān)聽同一個短信生產(chǎn)者,這樣短信生產(chǎn)者的消息將會被輪詢平分。
Ⅳ. 路由模式
使用發(fā)布訂閱模式時,所有消息都會發(fā)送到綁定的隊列中,但很多時候,不是所有消息都無差別的發(fā)布到所有隊列中。比如電商網(wǎng)站
的促銷活動,雙十一大促可能會發(fā)布到所有隊列;而一些小的促銷活動為了節(jié)約成本,只發(fā)布到站內(nèi)信隊列。此時需要使用路由模式
(Routing)完成這一需求。意思就是只發(fā)給與綁定相同路由關(guān)鍵字的隊列。
特點:
- 每個隊列綁定路由關(guān)鍵字RoutingKey。
- 生產(chǎn)者將帶有RoutingKey的消息發(fā)送給交換機,交換機根據(jù)RoutingKey轉(zhuǎn)發(fā)到指定隊列。路由模式使用direct交換機。
1. 編寫生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列 */ // 生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機 channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true); // 5.創(chuàng)建隊列 channel.queueDeclare("SEND_MAIL2",true,false,false,null); channel.queueDeclare("SEND_MESSAGE2",true,false,false,null); channel.queueDeclare("SEND_STATION2",true,false,false,null); // 6.交換機綁定隊列 channel.queueBind("SEND_MAIL2","exchange_routing","import"); channel.queueBind("SEND_MESSAGE2","exchange_routing","import"); channel.queueBind("SEND_STATION2","exchange_routing","import"); channel.queueBind("SEND_STATION2","exchange_routing","normal"); // 7.發(fā)送消息 channel.basicPublish("exchange_routing","import",null, "雙十一大促活動".getBytes()); channel.basicPublish("exchange_routing","normal",null, "小心促銷活動".getBytes()); // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫消費者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //發(fā)送郵件消費者 public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_MAIL2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送郵件,消息為:"+message); } }); } }
//發(fā)送信息消費者 public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_MESSAGE2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送短信,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //站內(nèi)信消費者 public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽隊列 /** * 參數(shù)1:監(jiān)聽的隊列名 * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。 * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息 */ channel.basicConsume("SEND_STATION2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送站內(nèi)信,消息為:"+message); } }); } }
Ⅴ. 通配符模式
通配符模式(Topic)是在路由模式的基礎(chǔ)上,給隊列綁定帶通配符的路由關(guān)鍵字,只要消息的RoutingKey能實現(xiàn)通配符匹配,就會將消
息轉(zhuǎn)發(fā)到該隊列。通配符模式比路由模式更靈活,使用topic交換機。
通配符規(guī)則:
1 消息設(shè)置RoutingKey時,RoutingKey由多個單詞構(gòu)成,中間以 . 分割。
2 隊列設(shè)置RoutingKey時, # 可以匹配任意多個單詞, * 可以匹配任意一個單詞。
1. 編寫生產(chǎn)者
package com.itbz.mq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列 */ // 生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機 channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true); // 5.創(chuàng)建隊列 channel.queueDeclare("SEND_MAIL3",true,false,false,null); channel.queueDeclare("SEND_MESSAGE3",true,false,false,null); channel.queueDeclare("SEND_STATION3",true,false,false,null); // 6.交換機綁定隊列 channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#"); channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#"); channel.queueBind("SEND_STATION3","exchange_topic","#.station.#"); // 7.發(fā)送消息 // 三個隊列都匹配上了 channel.basicPublish("exchange_topic","mail.message.station",null, "雙十一大促活動".getBytes()); // 只發(fā)給station channel.basicPublish("exchange_topic","station",null, "小心促銷活動".getBytes()); // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫消費者
跟前面差不多
三.總結(jié)
這篇萬字長文總結(jié)了原生Java操作RabbitMQ的各種過程,希望對您有幫助哦!
到此這篇關(guān)于原生Java操作兔子隊列RabbitMQ的文章就介紹到這了,更多相關(guān)原生Java操作RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Netty分布式ByteBuf中PooledByteBufAllocator剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf剖析PooledByteBufAllocator簡述,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03JsonFormat與@DateTimeFormat注解實例解析
這篇文章主要介紹了JsonFormat與@DateTimeFormat注解實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-12-12談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)
這篇文章主要給大家介紹java利用原始httpUrlConnection發(fā)送post數(shù)據(jù),設(shè)計到httpUrlConnection類的相關(guān)知識,感興趣的朋友跟著小編一起學習吧2015-10-10使用Feign調(diào)用時添加驗證信息token到請求頭方式
這篇文章主要介紹了使用Feign調(diào)用時添加驗證信息token到請求頭方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03