關于Java中RabbitMQ的高級特性
RabbitMQ高級特性
1.消息的可靠投遞
在使用RabbitMQ的時候,作為消息發(fā)送方希望杜絕任何消息丟失或投遞失敗場景。RabbitMQ為我們提供了兩種方式來控制消息的投遞可靠性模式。
- confirm 確認模式
- return 退回模式
RabbitMQ整個消息投遞的路徑為:producer
>rabbitMQ broker
> exchange
> queue
> consumer
- 消息從producer到exchange則會返回一個
confirmCallback
- 消息從exchange到queue投遞失敗則會返回一個
returnCallback
利用這兩個callback來控制消息的可靠性傳遞。
1.1 confirm 確認模式
(1)開啟確認模式
在創(chuàng)建連接工廠的時候要開啟確認模式,關鍵字:publisher-confirms
,默認為false
。
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" />
(2)RabbitTemplate設置回調(diào)
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { /** * 注入RabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * 測試默認的隊列發(fā)送消息 */ @Test public void testConfirmCallback() throws InterruptedException { // 設置回調(diào) rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 回調(diào)方法 * @param correlationData 回調(diào)的相關數(shù)據(jù)。 * @param ack true 表示發(fā)送成功, false 發(fā)送失敗 * @param cause 失敗原因,ack==true->null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("發(fā)送成功"); } else { System.out.println("發(fā)送失敗,原因:" + cause); // 失敗后處理流程 } } }); rabbitTemplate.convertAndSend("spring_queue", "hello world"); // 防止發(fā)送完成后,未完成回調(diào)關閉通道 Thread.sleep(5000); } }
public void confirm(CorrelationData correlationData, boolean ack, String cause)
correlationData
參數(shù),發(fā)送數(shù)據(jù)的時候可以攜帶上ack
是否發(fā)送成功,成功為true,失敗為falsecause
失敗的原因,成功時為null
Thread.sleep(5000);
防止發(fā)送完成后,未完成回調(diào)關閉通道如果沒有加上會
clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
1.2 return 回退模式
(1)開啟回退模式
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-returns="true" />
(2)RabbitTemplate設置回調(diào)
@Test public void testReturnCallback() throws InterruptedException { // 設置交換機處理失敗消息的模式 rabbitTemplate.setMandatory(true); // 設置回調(diào) rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 返回消息 * @param message 消息對象 * @param replyCode 錯誤碼 * @param replyText 交換信息 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息對象:" + new String(message.getBody())); System.out.println("錯誤碼:" + replyCode); System.out.println("交換信息:" + replyText); System.out.println("交換機:" + exchange); System.out.println("路由鍵:" + routingKey); } }); rabbitTemplate.convertAndSend("spring_direct_exchange", "direct_key_3", "spring_direct_exchange_direct_key_1"); // 防止發(fā)送完成后,未完成回調(diào)關閉通道 Thread.sleep(5000); }
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
- message 消息對象
- replyCode 錯誤碼
- replyText 交換信息
- exchange 交換機
- routingKey 路由鍵
mandatory屬性的優(yōu)先級高于publisher-returns的優(yōu)先級
mandatory結(jié)果為true、false時會忽略掉publisher-returns屬性的值
mandatory結(jié)果為null(即不配置)時結(jié)果由publisher-returns確定
2.Consumer Ack(消費端)
Ack指Acknowledge,確認。表示消費端接收到消息后的確認方式。
有三種確認方式:
- 自動確認:
acknowledge="none"
- 手動確認:
acknowledge="manual"
- 根據(jù)異常情況確認:
acknowledge="auto"
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應message 從RabbitMQ的消息緩存中移除。
但是在實際業(yè)務處理中,很可能消息接收到,業(yè)務處理出現(xiàn)異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業(yè)務處理成功后,調(diào)用``channel.basicAck(),手動簽收,如果出現(xiàn)異常,則調(diào)用
channel.basicNack()`方法,讓其自動重新發(fā)送消息。
2.1 設置手動簽收
(1)創(chuàng)建一個監(jiān)聽器接收消息
設置手動接收時,讓監(jiān)聽器實現(xiàn)ChannelAwareMessageListener
接口
如果消息成功處理,則調(diào)用channel.basicAck()
如果消息處理失敗,則調(diào)用 channel.basicNack()
,broker重新發(fā)送consumer
/** * @author zhong * <p> * Consumer Ack機制 * 1.設置手動簽收,acknowledge="manual" * 2.讓監(jiān)聽器實現(xiàn)ChannelAwareMessageListener接口 * 3.如果消息成功處理,則調(diào)用channel.basicAck() * 4.如果消息處理失敗,則調(diào)用 channel.basicNack(),broker重新發(fā)送consumer */ @Component public class AckSpringQueueListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 接收消息 System.out.println("Message:" + new String(message.getBody())); // 手動簽收 /** * deliveryTag: 標識id * multiple: 確認所有消息 */ channel.basicAck(deliveryTag, true); // 手動拒絕 /** * requeue:如果被拒絕的消息應該被重新排隊而不是被丟棄/死信 */ //channel.basicNack(deliveryTag, true, true); } }
(2)設置手動,加入監(jiān)聽
設置手動簽收,acknowledge=“manual”
<context:component-scan base-package="org.example"/> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true"> <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/> </rabbit:listener-container>
3.消費端限流
MQ一個作用就是削峰填谷,通過消費端限流實現(xiàn)。
消費端限流包括一下操作:
- <rabbit:listener-container>配置
prefetch
???????屬性設置 - 消費端一次拉去多少消息消費端確認模式一定為手動確認。
acknowledge="nanual"
(1)關鍵配置文件:
<context:component-scan base-package="org.example"/> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" auto-declare="true"> <rabbit:listener ref="qosListener" queue-names="spring_queue"/> </rabbit:listener-container>
(1)手動確認
acknowledge="manual"
(2)設置閾值
prefetch="1"
(2)關鍵監(jiān)聽器代碼
/** * Consumer 限流機制 * 1.確保ack機制為手動確認 * 2.listener-container 配置屬性 * perfetch = 1 表示消費端每次從mq拉取一條消息來消費,直到手動確認消費完畢后,才會繼續(xù)拉去下一條消息。 */ @Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("QosListener:" + new String(message.getBody())); long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 簽收消息 Thread.sleep(1000); channel.basicAck(deliveryTag, true); } }
4.TTL(存活時間/過期時間)
TTL全稱Time To Live (存活時間/過期時間)。
- 當消息到達存活時間后,還沒有被消費,會被自動清除。
- RabbitMQ可以對消息設置過期時間,也可以對整個隊列(Queue)設置過期時間。
4.1 控制臺設置
RabbitMQ控制臺可以設置隊列的過期時間。
4.2 消息單獨過期
@Test public void testTTL() { // 消息后處理隊列,設置一下消息參數(shù)信息 MessagePostProcessor messagePostProcessor = message -> { // 1.設置message的消息 message.getMessageProperties().setExpiration("50000");// 設置過期時間,字符串,毫秒 // 2.返回消息 return message; }; // 傳入 rabbitTemplate.convertAndSend("spring_fanout_exchange", "key", "RabbitMQ", messagePostProcessor); }
4.3 小結(jié)
如果設置了消息的過期時間,也設置了隊列的過期時間,它以時間短的為準。隊列過期后,會將隊列所有消息全部移除。消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除)。
5.死信隊列
死信隊列,英文縮寫:DLX。Dead Letter Exchange(死信交換機)
。
當消息成為Dead Message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。
到此這篇關于關于Java中RabbitMQ的高級特性 的文章就介紹到這了,更多相關RabbitMQ的高級特性 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python高階函數(shù)functools模塊的具體使用
本文主要介紹了python高階函數(shù)functools模塊的具體使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-03-03Python中enumerate()函數(shù)編寫更Pythonic的循環(huán)
本篇文章主要大家通過實例講述了Python中enumerate()函數(shù)編寫更Pythonic的循環(huán)的知識點,有興趣的朋友參考學習下。2018-03-03安裝pytorch報錯torch.cuda.is_available()=false問題的解決過程
最近想用pytorch,因此裝了pytorch,但是碰到了問題,下面這篇文章主要給大家介紹了關于安裝pytorch報錯torch.cuda.is_available()=false問題的解決過程,需要的朋友可以參考下2022-05-05