欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java RabbitMQ的TTL和DLX全面精解

 更新時間:2021年09月23日 08:59:31   作者:沒頭腦遇到不高興  
過期時間TTL表示可以對消息設(shè)置預(yù)期的時間,在這個時間內(nèi)都可以被消費者接收獲??;過了之后消息將自動被刪除。DLX, 可以稱之為死信交換機,當消息在一個隊列中變成死信之后,它能被重新發(fā)送到另一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之為死信隊列

本節(jié)繼續(xù)介紹RabbitMQ的高級特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交換機、死信隊列)

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全稱為Time-To-Live,表示的是消息的有效期。消息如果在隊列中一直沒有被消費并且存在時間超過了TTL,消息就會變成了"死信" (Dead Message),后續(xù)無法再被消費了。設(shè)置TTL有兩種方式:

  1. 第一種是聲明隊列的時候,在隊列的屬性中設(shè)置,這樣該隊列中的消息都會有相同的有效期;
  2. 第二種是發(fā)送消息時給消息設(shè)置屬性,可以為每條消息都設(shè)置不同的TTL。

如果兩種方式都設(shè)置了,則以設(shè)置的較小的為準。兩者的區(qū)別:如果聲明隊列時設(shè)置了有效期,則消息過期了就會被刪掉;如果是發(fā)消息時設(shè)置的有效期,消息過期了也不會被立馬刪掉,因為這時消息是否過期是在要投遞給消費者時判斷的。至于為啥要這樣處理很容易想清楚:第一種方式隊列的消息有效期都一樣,先入隊的在隊列頭部,頭部也是最早要過期的消息,RabbitMQ起一個定時任務(wù)從隊列的頭部開始掃描是否有過期消息即可;第二種方式每條消息的過期時間不同,所以只有遍歷整個隊列才可以篩選出來過期的消息,這樣效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投遞給消費者時再判斷刪除,雖然刪除的不夠及時但是不影響功能,其實就是用空間換時間。

如果不設(shè)置TTL,則表示此消息永久有效(默認消息是不會失效的)。如果將TTL設(shè)為0,則表示如果消息不能被立馬消費則會被立即丟掉,這個特性可以部分替代RabbitMQ3.0以前支持的immediate參數(shù),之所以所部分代替,是應(yīng)為immediate參數(shù)在投遞失敗會有basic.return方法將消息體返回(這個功能可以利用死信隊列來實現(xiàn))。

2、設(shè)置消息有效期

2.1、通過隊列設(shè)置有效期

還記得我們之前聲明隊列的方法嗎,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),該方法的最后一個參數(shù)可以設(shè)置隊列的屬性,屬性名為x-message-ttl,單位為毫秒。如果不清楚隊列屬性有哪些,可以查看web控制臺的添加隊列的地方。

具體代碼如下:

//設(shè)置隊列上所有的消息的有效期,單位為毫秒
Map<String, Object> argss = new HashMap<String , Object>();
arguments.put("x-message-ttl " , 5000);//5秒鐘
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制臺的隊列列表如下:D表示持久化,TTL表示設(shè)置了消息的有效期。

過了幾秒鐘后發(fā)現(xiàn)消息已經(jīng)不存在了。

也可以用RabbitMQ的命令行模式來設(shè)置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

還可以通過HTTP接口調(diào)用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' 
http://ip:15672/api/queues/{vhost}/{queuename}

2.2、通過發(fā)送消息時設(shè)置有效期

發(fā)送消息時basicPublish方法可以設(shè)置屬性參數(shù),里面通過expiration屬性設(shè)置消息有效期,單位為毫秒,代碼如下所示

Builder bd = new AMQP.BasicProperties().builder();
bd.deliveryMode(2);//持久化
bd.expiration("100000");//設(shè)置消息有效期100秒鐘
BasicProperties pros = bd.build();
String message = "測試ttl消息";
channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通過HTTPAPI 接口設(shè)置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通過隊列設(shè)置消息有效期、發(fā)布消息時通過屬性設(shè)置有效期的代碼如下:可以運行后,觀察下控制臺,可以發(fā)現(xiàn)同時設(shè)置時,消息的有效期是以較小的為準的。項目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。

package cn.wkp.rabbitmq.newest.ttl;
 
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
/**
 * 
 * @ClassName: Send
 * @Description: 消息有效期 
 * @author wkg
 * @date: 2019年4月1日 下午11:28:22
 */
public class Send {
 
	private final static String EXCHANGE_NAME = "ttl_exchange";
	private final static String QUEUE_NAME = "ttl_queue";
 
	public static void main(String[] argv) throws Exception {
		// 獲取到連接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		// 從連接中創(chuàng)建通道
		Channel channel = connection.createChannel();
 
		// 聲明交換機
		channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
		
		//*****1:通過隊列設(shè)置有效期 2:通過消息屬性設(shè)置有效期,如果都設(shè)置了以較小的為準*****
		//聲明隊列
		Map<String, Object> arguments=new HashMap<String,Object>();
		//設(shè)置隊列上所有的消息的有效期,單位為毫秒
		arguments.put("x-message-ttl", 5000);//5秒鐘
		channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
		//綁定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		
		Builder bd = new AMQP.BasicProperties().builder();
		bd.deliveryMode(2);//持久化
		bd.expiration("100000");//設(shè)置消息有效期100秒鐘
		BasicProperties pros = bd.build();
		String message = "測試ttl消息";
		channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
		System.out.println("Sent message:" + message);
//		 關(guān)閉通道和連接
		channel.close();
		connection.close();
	}
}

3、設(shè)置隊列有效期(不常用,僅作了解)

上面在web管控臺添加隊列的時候,我們看到有一個x-expires參數(shù),可以讓隊列在指定時間內(nèi) "未被使用" 的話會自動過期刪除,未使用的意思是 queue 上沒有任何 consumer,queue 沒有被重新聲明,并且在過期時間段內(nèi)未調(diào)用過 basic.get 命令。該方式可用于,例如,RPC-style 的回復 queue, 其中許多queue 會被創(chuàng)建出來,但是卻從未被使用。

服務(wù)器會確保在過期時間到達后 queue 被刪除,但是不保證刪除的動作有多么的及時。在服務(wù)器重啟后,持久化的queue 的超時時間將重新計算。 x-expires 參數(shù)值以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,且不能設(shè)置為 0 。所以,如果該參數(shù)設(shè)置為 1000 ,則表示該 queue 如果在 1s之內(nèi)未被使用則會被刪除。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 18000);  //隊列有效期18秒
channel.queueDeclare("myqueue", false, false, false, args);  

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的簡寫,意思是死信交換機。

它的作用其實是用來接收死信消息(dead message)的。那什么是死信消息呢?一般消息變成死信消息有如下幾種情況:

  • 消息被拒絕(Basic.Reject/Basic.Nack) ,井且設(shè)置requeue 參數(shù)為false
  • 消息過期
  • 隊列達到最大長度

當消息在一個隊列中變成了死信消息后,可以被發(fā)送到另一個交換機,這個交換機就是DLX,綁定DLX的隊列成為死信隊列。當這個隊列中存在死信時, RabbitMQ 就會立即自動地將這個消息重新發(fā)布到設(shè)置的DLX 上去,進而被路由到綁定該DLX的死信隊列上。可以監(jiān)聽這個隊列中的消息、以進行相應(yīng)的處理,這個特性與將消息的TTL 設(shè)置為0 配合使用可以彌補imrnediate 參數(shù)的功能。

2、DLX有什么用

因為消息如果未被正常消費并設(shè)置了requeue為false時會進入死信隊列,我們可以監(jiān)控消費死信隊列中消息,來觀察和分析系統(tǒng)的問題。DLX還有一個非常重要的作用,就是結(jié)合TTL實現(xiàn)延遲隊列(延遲隊列的使用范圍還是挺廣的:比如下單超過多長時間自動關(guān)閉;比如我們接入過第三方支付系統(tǒng)的同學一定知道,我們的訂單中會傳一個notify_url用于接收支付結(jié)果知,如果我們給第三方支付響應(yīng)的不是成功的消息,其會隔一段時間繼續(xù)調(diào)用通知我們的notify_url,超過幾次后不再進行通知,一般通知頻率都是 0秒-5秒-30秒-5分鐘-30分鐘-1小時-6小時-12小時;比如我們的家用電器定時關(guān)機。。。。。。這些場景都是可以用延遲隊列實現(xiàn)的)。

3、DLX使用方式

下面在web管控臺添加隊列的時候,我們看到有兩個DLX相關(guān)的參數(shù):x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是設(shè)置隊列的DLX的;x-dead-letter-routing-key是設(shè)置死信消息進入DLX時的routing key的,這個是可以不設(shè)置的,如果不設(shè)置,則默認使用原隊列的routing key。

客戶端可以通過channel.queueDeclare方法聲明隊列時設(shè)置x-dead-letter-exchange參數(shù),具體代碼如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //創(chuàng)建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//設(shè)置DLX
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//設(shè)置DLX的路由鍵(可以不設(shè)置)
//為隊列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

上面說的可能比較抽象,下面我們通過一個具體的例子,來演示一下DLX的具體使用:

package cn.wkp.rabbitmq.newest.dlx;
 
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class SendDLX {
 
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//聲明一個交換機,做死信交換機用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//聲明一個隊列,做死信隊列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//隊列綁定到交換機上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
		
		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 5000);//設(shè)置消息有效期1秒,過期后變成私信消息,然后進入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//設(shè)置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//設(shè)置DLX的路由鍵(可以不設(shè)置)
		//為隊列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("測試死信消息").getBytes());
		System.out.println("發(fā)送消息時間:"+ConnectionUtil.formatDate(new Date()));
		
		channel.close();
		connection.close();
	}
}

上面是發(fā)送者的代碼,運行后觀察控制臺可以看到如下所示:

死信隊列dlx_queue的綁定如下,其已與死信交換機dlx_exchange(topic類型)進行了綁定,routing key為"dlx.*"

隊列normal_queue的綁定如下,其已與交換機normal_exchange(fanout類型)進行了綁定

queues視圖如下:DLX和DLK表示設(shè)置給normal_queue設(shè)置了死信交換機和死信消息的routing key,我們看到消息已經(jīng)被路由到了死信隊列上面。整個流程為:

  • 消息發(fā)送到交換機normal_exchange,然后路由到隊列normal_queue上
  • 因為隊列normal_queue沒有消費者,消息過期后成為死信消息
  • 死信消息攜帶設(shè)置的x-dead-letter-routing-key=dlx.test進入到死信交換機dlx_exechage
  • dlx_exechage與dlx_queue綁定的routing key為"dlx.*",死信消息的路由鍵dlx.test符合該規(guī)則被路由到dlx.queue上面。

然后我們給死信隊列添加消費者如下:我們測試一下死信消息進入DLX的時間,先將之前的那個死信消息刪除

package cn.wkp.rabbitmq.newest.dlx;
 
import java.io.IOException;
import java.util.Date;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class RecvDLX {
 
	public static void main(String[] argv) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		channel.queueDeclare("dlx_queue", true, false, false, null);
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
 
		// 指該消費者在接收到隊列里的消息但沒有返回確認結(jié)果之前,它不會將新的消息分發(fā)給它。
		channel.basicQos(1);
 
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消費者收到消息:" + new String(body)+",當前時間:"+ConnectionUtil.formatDate(new Date()));
				// 消費者手動發(fā)送ack應(yīng)答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		System.out.println("消費死信隊列中的消息======================");
		// 監(jiān)聽隊列
		channel.basicConsume("dlx_queue", false, consumer);
	}
}

運行結(jié)果如下(先運行的死信隊列消費者,然后運行生產(chǎn)者):我們看到消息過期后10毫秒就被死信隊列的消費者消費到了,顯然,消息成為死信后是立即被發(fā)送到了DLX中。

消費死信隊列中的消息======================
消費者收到消息:測試死信消息,當前時間:2019-04-13 16:30:05:740

發(fā)送消息時間:2019-04-13 16:30:00:730

關(guān)于RabbitMQ的TTL和DLX就先介紹到這里,下一節(jié)會繼續(xù)介紹RabbitMQ的高級特性:RabbitMQ的延遲隊列。

參考 朱忠華《RabbitMQ實戰(zhàn)指南》

到此這篇關(guān)于Java RabbitMQ的TTL和DLX全面精解的文章就介紹到這了,更多相關(guān)Java RabbitMQ TTL DLX內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • idea 實現(xiàn)git rebase操作應(yīng)用場景

    idea 實現(xiàn)git rebase操作應(yīng)用場景

    本文結(jié)合idea工具進行rebase的各種場景的操作,借助工具更能直觀地觀察到分支之間地操作差異,方便我們理解rebase的各種操作以及場景的使用,對idea  git rebase操作知識感興趣的朋友一起看看吧
    2024-01-01
  • Java設(shè)計模式之責任鏈模式

    Java設(shè)計模式之責任鏈模式

    這篇文章介紹了Java設(shè)計模式之責任鏈模式,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-10-10
  • Java硬幣翻轉(zhuǎn)倍數(shù)遞增試算實例

    Java硬幣翻轉(zhuǎn)倍數(shù)遞增試算實例

    這篇文章主要介紹了Java硬幣翻轉(zhuǎn)倍數(shù)遞增試算實例,有需要的朋友可以參考一下
    2013-12-12
  • 分析講解Java?Random類里的種子問題

    分析講解Java?Random類里的種子問題

    Random類中實現(xiàn)的隨機算法是偽隨機,也就是有規(guī)則的隨機。在進行隨機時,隨機算法的起源數(shù)字稱為種子數(shù)(seed),在種子數(shù)的基礎(chǔ)上進行一定的變換,從而產(chǎn)生需要的隨機數(shù)字
    2022-05-05
  • java控制臺輸出版多人聊天室

    java控制臺輸出版多人聊天室

    這篇文章主要為大家詳細介紹了java控制臺輸出版多人聊天室,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • 使用Java第三方實現(xiàn)發(fā)送短信功能

    使用Java第三方實現(xiàn)發(fā)送短信功能

    這篇文章主要介紹了使用Java第三方實現(xiàn)發(fā)送短信功能,在一些開發(fā)中,經(jīng)常需要有給用戶發(fā)送短信接收驗證碼的功能,那么在Java中該如何實現(xiàn)呢,今天我們就一起來看一看
    2023-03-03
  • Springboot使用jsp具體案例解析

    Springboot使用jsp具體案例解析

    這篇文章主要介紹了Springboot使用jsp具體案例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-04-04
  • java中@SuppressWarnings注解用法詳解

    java中@SuppressWarnings注解用法詳解

    這篇文章主要介紹了java中@SuppressWarnings注解用法詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-02-02
  • Java的JSON格式轉(zhuǎn)換庫GSON的初步使用筆記

    Java的JSON格式轉(zhuǎn)換庫GSON的初步使用筆記

    GSON是Google開發(fā)并在在GitHub上開源的Java對象與JSON互轉(zhuǎn)功能類庫,在Android開發(fā)者中也大受歡迎,這里我們就來看一下Java的JSON格式轉(zhuǎn)換庫GSON的初步使用筆記:
    2016-06-06
  • Java基礎(chǔ)知識精通循環(huán)結(jié)構(gòu)與break及continue

    Java基礎(chǔ)知識精通循環(huán)結(jié)構(gòu)與break及continue

    循環(huán)結(jié)構(gòu)是指在程序中需要反復執(zhí)行某個功能而設(shè)置的一種程序結(jié)構(gòu)。它由循環(huán)體中的條件,判斷繼續(xù)執(zhí)行某個功能還是退出循環(huán),選擇結(jié)構(gòu)用于判斷給定的條件,根據(jù)判斷的結(jié)果判斷某些條件,根據(jù)判斷的結(jié)果來控制程序的流程
    2022-04-04

最新評論