原生Java操作兔子隊(duì)列RabbitMQ
一.前言
RabbitMQ 是一種快速、靈活、可靠的消息傳遞方式,可用于構(gòu)建分布式應(yīng)用程序、異步處理任務(wù)、實(shí)現(xiàn)消息隊(duì)列等。下面是 Java 原生操作 RabbitMQ 的一些好處和用途:
- 簡單易用:RabbitMQ 提供了豐富的 Java 客戶端庫,開發(fā)者可以輕松地使用 Java 代碼進(jìn)行消息發(fā)送和接收,無需學(xué)習(xí)復(fù)雜的消息傳遞協(xié)議和 API。
- 可擴(kuò)展性強(qiáng):RabbitMQ 支持集群和分布式部署,可以輕松地實(shí)現(xiàn)橫向和縱向擴(kuò)展,以適應(yīng)不同規(guī)模和負(fù)載的應(yīng)用需求。
- 可靠性高:RabbitMQ 提供了多種消息傳遞模式,包括持久化消息、確認(rèn)機(jī)制、事務(wù)機(jī)制等,確保消息傳遞的可靠性和一致性。
- 異步處理能力:RabbitMQ 可以異步處理任務(wù),提高應(yīng)用程序的響應(yīng)速度和吞吐量,實(shí)現(xiàn)任務(wù)削峰、應(yīng)對(duì)高并發(fā)等需求。
- 可用于多種場(chǎng)景:RabbitMQ 可以用于構(gòu)建分布式應(yīng)用程序、實(shí)現(xiàn)消息隊(duì)列、異步處理任務(wù)、實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步等場(chǎng)景,具有廣泛的應(yīng)用場(chǎng)景和發(fā)展前景。
二.原生Java操作RabbitMQ
Ⅰ. 簡單模式
1. 添加依賴
<!--rabbitmq依賴-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>2. 編寫生產(chǎn)者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//生產(chǎn)者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.創(chuàng)建隊(duì)列,若隊(duì)列已存在則使用該隊(duì)列
/**
* 參數(shù)1:隊(duì)列名
* 參數(shù)2:是否持久化,true表示MQ重啟后隊(duì)列還存在
* 參數(shù)3:是否私有化,false表示所有消費(fèi)者都可以訪問,true表示只有第一次訪問她的消費(fèi)者才能訪問
* 參數(shù)4:是否自動(dòng)刪除,true表示不再使用隊(duì)列時(shí),自動(dòng)刪除
* 參數(shù)5:其他額外參數(shù)
*/
channel.queueDeclare("simple_queue",false,false,false,null);
// 5.發(fā)送消息
String message = "hello rabbitmq";
/**
* 參數(shù)1:交換機(jī)名,""表示默認(rèn)交換機(jī)
* 參數(shù)2:路由鍵,簡單模式就是隊(duì)列名
* 參數(shù)3:其他額外參數(shù)
* 參數(shù)4:要傳遞的消息字節(jié)數(shù)組
*/
channel.basicPublish("","simple_queue",null,message.getBytes());
// 6.關(guān)閉信道和連接
channel.close();
connection.close();
System.out.println("=====發(fā)送成功====");
}
}
3. 編寫消費(fèi)者
因?yàn)橄M(fèi)者不知道生產(chǎn)者什么時(shí)候發(fā)送消息過來,所以消費(fèi)者需要一直監(jiān)聽生產(chǎn)者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費(fèi)者
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息,消息為:"+message);
}
});
}
}
Ⅱ. 工作隊(duì)列模式

與簡單模式相比,工作隊(duì)列模式(Work Queue)多了一些消費(fèi)者,該模式也使用direct交換機(jī),應(yīng)用于處理消息較多的情況。特點(diǎn)如下:
- 一個(gè)隊(duì)列對(duì)應(yīng)多個(gè)消費(fèi)者。
- 一條消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)。
- 消息隊(duì)列默認(rèn)采用輪詢的方式將消息平均發(fā)送給消費(fèi)者。
其實(shí)就是 簡單模式plus版本。
1. 編寫生產(chǎn)者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.創(chuàng)建隊(duì)列,持久化隊(duì)列
channel.queueDeclare("work_queue",true,false,false,null);
// 5.發(fā)送大量消息,參數(shù)3表示該消息為持久化消息,即除了保存到內(nèi)存還保存到磁盤
for (int i = 0; i < 100; i++) {
channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
("您好,這是今天的第"+i+"條消息").getBytes(StandardCharsets.UTF_8));
}
// 6.關(guān)閉資源
channel.close();
connection.close();
}
}
2. 編寫消費(fèi)者
這里使用創(chuàng)建了三個(gè)消費(fèi)者,來接收生產(chǎn)者的消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 監(jiān)聽隊(duì)列,處理消息
channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消費(fèi)者1消費(fèi)消息,消息為:"+message);
}
});
}
}3. 實(shí)現(xiàn)
先把三個(gè)消費(fèi)者運(yùn)行起來,再運(yùn)行生產(chǎn)者,得到的消息就會(huì)輪詢均分

Ⅲ. 發(fā)布訂閱模式
在開發(fā)過程中,有一些消息需要不同消費(fèi)者進(jìn)行不同的處理,如電商網(wǎng)站的同一條促銷信息需要短信發(fā)送、郵件發(fā)送、站內(nèi)信發(fā)送等。此時(shí)可以使用發(fā)布訂閱模式(Publish/Subscribe)
特點(diǎn):
- 生產(chǎn)者將消息發(fā)送給交換機(jī),交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列中。
- 工作隊(duì)列模式的交換機(jī)只能將消息發(fā)送給一個(gè)隊(duì)列,發(fā)布訂閱模式的交換機(jī)能將消息發(fā)送給多個(gè)隊(duì)列。發(fā)布訂閱模式使用fanout交換機(jī)。
1. 編寫生產(chǎn)者
這里創(chuàng)建了三條隊(duì)列,一條是發(fā)送短信,一條是站內(nèi)信,一條是郵件隊(duì)列、
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import javax.swing.plaf.TreeUI;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.創(chuàng)建交換機(jī)
/**
* 參數(shù)1:交換機(jī)名
* 參數(shù)2:交換機(jī)類型
* 參數(shù)3:交換機(jī)是否持久化
*/
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
// 5.創(chuàng)建隊(duì)列
channel.queueDeclare("SEND_MAIL",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
channel.queueDeclare("SEND_STATION",true,false,false,null);
// 6.交換機(jī)綁定隊(duì)列
/**
* 參數(shù)1:隊(duì)列名
* 參數(shù)2:交換機(jī)名
* 參數(shù)3:路由關(guān)鍵字,發(fā)布訂閱模式只需要寫""即可
*/
channel.queueBind("SEND_MAIL","exchange_fanout","");
channel.queueBind("SEND_MESSAGE","exchange_fanout","");
channel.queueBind("SEND_STATION","exchange_fanout","");
// 7.發(fā)送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange_fanout","",null,
("您好,尊敬的用戶,秒殺商品活動(dòng)開始啦:"+i).getBytes(StandardCharsets.UTF_8));
}
// 8.關(guān)閉資源
channel.close();
connection.close();
}
}2. 編寫消費(fèi)者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//短信消費(fèi)者
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送短信,消息為:"+message);
}
});
}
}import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//郵件消費(fèi)者
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送郵件,消息為:"+message);
}
});
}
}import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//站內(nèi)信消費(fèi)者
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送站內(nèi)信,消息為:"+message);
}
});
}
}發(fā)布訂閱模式也允許多個(gè)消費(fèi)者監(jiān)聽同一個(gè)隊(duì)列(工作模式),例如 兩個(gè)發(fā)送短信消費(fèi)者監(jiān)聽同一個(gè)短信生產(chǎn)者,這樣短信生產(chǎn)者的消息將會(huì)被輪詢平分。
Ⅳ. 路由模式

使用發(fā)布訂閱模式時(shí),所有消息都會(huì)發(fā)送到綁定的隊(duì)列中,但很多時(shí)候,不是所有消息都無差別的發(fā)布到所有隊(duì)列中。比如電商網(wǎng)站
的促銷活動(dòng),雙十一大促可能會(huì)發(fā)布到所有隊(duì)列;而一些小的促銷活動(dòng)為了節(jié)約成本,只發(fā)布到站內(nèi)信隊(duì)列。此時(shí)需要使用路由模式
(Routing)完成這一需求。意思就是只發(fā)給與綁定相同路由關(guān)鍵字的隊(duì)列。
特點(diǎn):
- 每個(gè)隊(duì)列綁定路由關(guān)鍵字RoutingKey。
- 生產(chǎn)者將帶有RoutingKey的消息發(fā)送給交換機(jī),交換機(jī)根據(jù)RoutingKey轉(zhuǎn)發(fā)到指定隊(duì)列。路由模式使用direct交換機(jī)。
1. 編寫生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列
*/
// 生產(chǎn)者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.創(chuàng)建交換機(jī)
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
// 5.創(chuàng)建隊(duì)列
channel.queueDeclare("SEND_MAIL2",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
channel.queueDeclare("SEND_STATION2",true,false,false,null);
// 6.交換機(jī)綁定隊(duì)列
channel.queueBind("SEND_MAIL2","exchange_routing","import");
channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","normal");
// 7.發(fā)送消息
channel.basicPublish("exchange_routing","import",null,
"雙十一大促活動(dòng)".getBytes());
channel.basicPublish("exchange_routing","normal",null,
"小心促銷活動(dòng)".getBytes());
// 8.關(guān)閉資源
channel.close();
connection.close();
}
}2. 編寫消費(fèi)者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//發(fā)送郵件消費(fèi)者
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_MAIL2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送郵件,消息為:"+message);
}
});
}
}//發(fā)送信息消費(fèi)者
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_MESSAGE2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送短信,消息為:"+message);
}
});
}
}import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//站內(nèi)信消費(fèi)者
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.創(chuàng)建信道
Channel channel = connection.createChannel();
// 4.監(jiān)聽隊(duì)列
/**
* 參數(shù)1:監(jiān)聽的隊(duì)列名
* 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。
* 參數(shù)3:Consumer的實(shí)現(xiàn)類,重寫該類方法表示接收到這個(gè)消息之后該如何消費(fèi)消息
*/
channel.basicConsume("SEND_STATION2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("發(fā)送站內(nèi)信,消息為:"+message);
}
});
}
}Ⅴ. 通配符模式

通配符模式(Topic)是在路由模式的基礎(chǔ)上,給隊(duì)列綁定帶通配符的路由關(guān)鍵字,只要消息的RoutingKey能實(shí)現(xiàn)通配符匹配,就會(huì)將消
息轉(zhuǎn)發(fā)到該隊(duì)列。通配符模式比路由模式更靈活,使用topic交換機(jī)。
通配符規(guī)則:
1 消息設(shè)置RoutingKey時(shí),RoutingKey由多個(gè)單詞構(gòu)成,中間以 . 分割。
2 隊(duì)列設(shè)置RoutingKey時(shí), # 可以匹配任意多個(gè)單詞, * 可以匹配任意一個(gè)單詞。
1. 編寫生產(chǎn)者

package com.itbz.mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列
*/
// 生產(chǎn)者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.79.50.65");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lion");
connectionFactory.setPassword("lion");
connectionFactory.setVirtualHost("/");
// 2.創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.創(chuàng)建交換機(jī)
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
// 5.創(chuàng)建隊(duì)列
channel.queueDeclare("SEND_MAIL3",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
channel.queueDeclare("SEND_STATION3",true,false,false,null);
// 6.交換機(jī)綁定隊(duì)列
channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
// 7.發(fā)送消息
// 三個(gè)隊(duì)列都匹配上了
channel.basicPublish("exchange_topic","mail.message.station",null,
"雙十一大促活動(dòng)".getBytes());
// 只發(fā)給station
channel.basicPublish("exchange_topic","station",null,
"小心促銷活動(dòng)".getBytes());
// 8.關(guān)閉資源
channel.close();
connection.close();
}
}2. 編寫消費(fèi)者
跟前面差不多
三.總結(jié)
這篇萬字長文總結(jié)了原生Java操作RabbitMQ的各種過程,希望對(duì)您有幫助哦!
到此這篇關(guān)于原生Java操作兔子隊(duì)列RabbitMQ的文章就介紹到這了,更多相關(guān)原生Java操作RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Netty分布式ByteBuf中PooledByteBufAllocator剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf剖析PooledByteBufAllocator簡述,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
JsonFormat與@DateTimeFormat注解實(shí)例解析
這篇文章主要介紹了JsonFormat與@DateTimeFormat注解實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
SpringCloud修改Feign日志記錄級(jí)別過程淺析
OpenFeign源于Netflix的Feign,是http通信的客戶端。屏蔽了網(wǎng)絡(luò)通信的細(xì)節(jié),直接面向接口的方式開發(fā),讓開發(fā)者感知不到網(wǎng)絡(luò)通信細(xì)節(jié)。所有遠(yuǎn)程調(diào)用,都像調(diào)用本地方法一樣完成2023-02-02
解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法
這篇文章主要介紹了解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)
這篇文章主要給大家介紹java利用原始httpUrlConnection發(fā)送post數(shù)據(jù),設(shè)計(jì)到httpUrlConnection類的相關(guān)知識(shí),感興趣的朋友跟著小編一起學(xué)習(xí)吧2015-10-10
全網(wǎng)最新Log4j?漏洞修復(fù)和臨時(shí)補(bǔ)救方法
Apache?Log4j?遠(yuǎn)程代碼執(zhí)行漏洞,如何快速修復(fù)log4j2漏洞,本文給大家介紹下Log4j?漏洞修復(fù)和臨時(shí)補(bǔ)救方法,感興趣的朋友跟隨小編一起看看吧2021-12-12
使用Feign調(diào)用時(shí)添加驗(yàn)證信息token到請(qǐng)求頭方式
這篇文章主要介紹了使用Feign調(diào)用時(shí)添加驗(yàn)證信息token到請(qǐng)求頭方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

