Redis延遲隊(duì)列和分布式延遲隊(duì)列的簡(jiǎn)答實(shí)現(xiàn)
最近,又重新學(xué)習(xí)了下Redis,Redis不僅能快還能慢,簡(jiǎn)直利器,今天就為大家介紹一下Redis延遲隊(duì)列和分布式延遲隊(duì)列的簡(jiǎn)單實(shí)現(xiàn)。
在我們的工作中,很多地方使用延遲隊(duì)列,比如訂單到期沒(méi)有付款取消訂單,制訂一個(gè)提醒的任務(wù)等都需要延遲隊(duì)列,那么我們需要實(shí)現(xiàn)延遲隊(duì)列。我們本文的梗概如下,同學(xué)們可以選擇性閱讀。
1. 實(shí)現(xiàn)一個(gè)簡(jiǎn)單的延遲隊(duì)列。
我們知道目前JAVA可以有DelayedQueue,我們首先開(kāi)一個(gè)DelayQueue的結(jié)構(gòu)類(lèi)圖。DelayQueue實(shí)現(xiàn)了Delay、BlockingQueue接口。也就是DelayQueue是一種阻塞隊(duì)列。
我們?cè)诳匆幌翫elay的類(lèi)圖。Delayed接口也實(shí)現(xiàn)了Comparable接口,也就是我們使用Delayed的時(shí)候需要實(shí)現(xiàn)CompareTo方法。因?yàn)殛?duì)列中的數(shù)據(jù)需要排一下先后,根據(jù)我們自己的實(shí)現(xiàn)。Delayed接口里邊有一個(gè)方法就是getDelay方法,用于獲取延遲時(shí)間,判斷是否時(shí)間已經(jīng)到了延遲的時(shí)間,如果到了延遲的時(shí)間就可以從隊(duì)列里邊獲取了。
我們創(chuàng)建一個(gè)Message類(lèi),實(shí)現(xiàn)了Delayed接口,我們主要把getDelay和compareTo進(jìn)行實(shí)現(xiàn)。在Message的構(gòu)造方法的地方傳入延遲的時(shí)間,單位是毫秒,計(jì)算好觸發(fā)時(shí)間fireTime。同時(shí)按照延遲時(shí)間的升序進(jìn)行排序。我重寫(xiě)了里邊的toString方法,用于將Message按照我寫(xiě)的方法進(jìn)行輸出。
package com.hqs.delayQueue.bean; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2020-04-18 */ public class Message implements Delayed { private String body; private long fireTime; public String getBody() { return body; } public long getFireTime() { return fireTime; } public Message(String body, long delayTime) { this.body = body; this.fireTime = delayTime + System.currentTimeMillis(); } public long getDelay(TimeUnit unit) { return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return System.currentTimeMillis() + ":" + body; } public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + ":start"); BlockingQueue<Message> queue = new DelayQueue<>(); Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); queue.put(message1); queue.put(message2); while (queue.size() > 0) { System.out.println(queue.take()); } } }
里邊的main方法里邊聲明了兩個(gè)Message,一個(gè)延遲5秒,一個(gè)延遲7秒,時(shí)間到了之后會(huì)將接取出并且打印。輸出的結(jié)果如下,正是我們所期望的。
1587218430786:start
1587218435789:hello
1587218437793:world
這個(gè)方法實(shí)現(xiàn)起來(lái)真的非常簡(jiǎn)單。但是缺點(diǎn)也是很明顯的,就是數(shù)據(jù)在內(nèi)存里邊,數(shù)據(jù)比較容易丟失。那么我們需要采用Redis實(shí)現(xiàn)分布式的任務(wù)處理。
2. 使用Redis的list實(shí)現(xiàn)分布式延遲隊(duì)列。
本地需要安裝一個(gè)Redis,我自己是使用Docker構(gòu)建一個(gè)Redis,非??焖伲钜矝](méi)多少。我們直接啟動(dòng)Redis并且暴露6379端口。進(jìn)入之后直接使用客戶(hù)端命令即可查看和調(diào)試數(shù)據(jù)。
docker pull redis docker run -itd --name redisLocal -p 6379:6379 redis docker exec -it redisLocal /bin/bash redis-cli
我本地采用spring-boot的方式連接redis,pom文件列一下,供大家參考。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hqs</groupId> <artifactId>delayQueue</artifactId> <version>0.0.1-SNAPSHOT</version> <name>delayQueue</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
加上Redis的配置放到application.properties里邊即可實(shí)現(xiàn)Redis連接,非常的方便。
# redis redis.host=127.0.0.1 redis.port=6379 redis.password= redis.maxIdle=100 redis.maxTotal=300 redis.maxWait=10000 redis.testOnBorrow=true redis.timeout=100000
接下來(lái)實(shí)現(xiàn)一個(gè)基于Redis的list數(shù)據(jù)類(lèi)型進(jìn)行實(shí)現(xiàn)的一個(gè)類(lèi)。我們使用RedisTemplate操作Redis,這個(gè)里邊封裝好我們所需要的Redis的一些方法,用起來(lái)非常方便。這個(gè)類(lèi)允許延遲任務(wù)做多有10W個(gè),也是避免數(shù)據(jù)量過(guò)大對(duì)Redis造成影響。如果在線上使用的時(shí)候也需要考慮延遲任務(wù)的多少。太多幾百萬(wàn)幾千萬(wàn)的時(shí)候可能數(shù)據(jù)量非常大,我們需要計(jì)算Redis的空間是否夠。這個(gè)代碼也是非常的簡(jiǎn)單,一個(gè)用于存放需要延遲的消息,采用offer的方法。另外一個(gè)是啟動(dòng)一個(gè)線程, 如果消息時(shí)間到了,那么就將數(shù)據(jù)lpush到Redis里邊。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j public class RedisListDelayedQueue{ private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate<String, String> redisTemplate; private String queueName; private BlockingQueue<Message> delayedQueue; public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; init(); } public void offerMessage(Message message) { if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超過(guò)隊(duì)列要求最大值,請(qǐng)檢查"); } try { log.info("offerMessage:" + message); delayedQueue.offer(message); } catch (Exception e) { log.error("offMessage異常", e); } } public void init() { new Thread(() -> { while(true) { try { Message message = delayedQueue.take(); redisTemplate.opsForList().leftPush(queueName, message.toString()); } catch (InterruptedException e) { log.error("取消息錯(cuò)誤", e); } } }).start(); } }
接下來(lái)我們看一下,我們寫(xiě)一個(gè)測(cè)試的controller。大家看一下這個(gè)請(qǐng)求/redis/listDelayedQueue的代碼位置。我們也是生成了兩個(gè)消息,然后把消息放到隊(duì)列里邊,另外我們?cè)趩?dòng)一個(gè)線程任務(wù),用于將數(shù)據(jù)從Redis的list中獲取。方法也非常簡(jiǎn)單。
package com.hqs.delayQueue.controller; import com.hqs.delayQueue.bean.Message; import com.hqs.delayQueue.cache.RedisListDelayedQueue; import com.hqs.delayQueue.cache.RedisZSetDelayedQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.Set; import java.util.concurrent.*; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j @Controller public class DelayQueueController { private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); //注意RedisTemplate用的String,String,后續(xù)所有用到的key和value都是String的 @Autowired RedisTemplate<String, String> redisTemplate; private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); @GetMapping("/redisTest") @ResponseBody public String redisTest() { redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS); System.out.println(redisTemplate.opsForValue().get("a")); return "s"; } @GetMapping("/redis/listDelayedQueue") @ResponseBody public String listDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "list_queue"; BlockingQueue<Message> delayedQueue = new DelayQueue<>(); RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue); redisListDelayedQueue.offerMessage(message1); redisListDelayedQueue.offerMessage(message2); asyncListTask(queueName); return "success"; } @GetMapping("/redis/zSetDelayedQueue") @ResponseBody public String zSetDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "zset_queue"; BlockingQueue<Message> delayedQueue = new DelayQueue<>(); RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue); redisZSetDelayedQueue.offerMessage(message1); redisZSetDelayedQueue.offerMessage(message2); asyncZSetTask(queueName); return "success"; } public void asyncListTask(String queueName) { taskExecPool.execute(() -> { for(;;) { String message = redisTemplate.opsForList().rightPop(queueName); if(message != null) { log.info(message); } } }); } public void asyncZSetTask(String queueName) { taskExecPool.execute(() -> { for(;;) { Long nowTimeInMs = System.currentTimeMillis(); System.out.println("nowTimeInMs:" + nowTimeInMs); Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs); if(messages != null && messages.size() != 0) { redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs); for (String message : messages) { log.info("asyncZSetTask:" + message + " " + nowTimeInMs); } log.info(redisTemplate.opsForZSet().zCard(queueName).toString()); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
我就不把運(yùn)行結(jié)果寫(xiě)出來(lái)了,感興趣的同學(xué)自己自行試驗(yàn)。當(dāng)然這個(gè)方法也是從內(nèi)存中拿出數(shù)據(jù),到時(shí)間之后放到Redis里邊,還是會(huì)存在程序啟動(dòng)的時(shí)候,任務(wù)進(jìn)行丟失。我們繼續(xù)看另外一種方法更好的進(jìn)行這個(gè)問(wèn)題的處理。
3.使用Redis的zSet實(shí)現(xiàn)分布式延遲隊(duì)列。
我們需要再寫(xiě)一個(gè)ZSet的隊(duì)列處理。下邊的offerMessage主要是把消息直接放入緩存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即將key=value的數(shù)據(jù)賦予一個(gè)score, 放入緩存中。score就是計(jì)算出來(lái)延遲的毫秒數(shù)。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j public class RedisZSetDelayedQueue { private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate<String, String> redisTemplate; private String queueName; private BlockingQueue<Message> delayedQueue; public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; } public void offerMessage(Message message) { if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超過(guò)隊(duì)列要求最大值,請(qǐng)檢查"); } long delayTime = message.getFireTime() - System.currentTimeMillis(); log.info("zset offerMessage" + message + delayTime); redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime()); } }
上邊的Controller方法已經(jīng)寫(xiě)好了測(cè)試的方法。/redis/zSetDelayedQueue,里邊主要使用ZSet的zRangeByScore(key, min, max)。主要是從score從0,當(dāng)前時(shí)間的毫秒數(shù)獲取。取出數(shù)據(jù)后再采用removeRangeByScore,將數(shù)據(jù)刪除。這樣數(shù)據(jù)可以直接寫(xiě)到Redis里邊,然后取出數(shù)據(jù)后直接處理。這種方法比前邊的方法稍微好一些,但是實(shí)際上還存在一些問(wèn)題,因?yàn)橐蕾?lài)Redis,如果Redis內(nèi)存不足或者連不上的時(shí)候,系統(tǒng)將變得不可用。
4. 總結(jié)一下,另外還有哪些可以延遲隊(duì)列。
上面的方法其實(shí)還是存在問(wèn)題的,比如系統(tǒng)重啟的時(shí)候還是會(huì)造成任務(wù)的丟失。所以我們?cè)谏a(chǎn)上使用的時(shí)候,我們還需要將任務(wù)保存起來(lái),比如放到數(shù)據(jù)庫(kù)和文件存儲(chǔ)系統(tǒng)將數(shù)據(jù)存儲(chǔ)起來(lái),這樣做到double-check,雙重檢查,最終達(dá)到任務(wù)的99.999%能夠處理。
其實(shí)還有很多東西可以實(shí)現(xiàn)延遲隊(duì)列。
1) RabbitMQ就可以實(shí)現(xiàn)此功能。這個(gè)消息隊(duì)列可以把數(shù)據(jù)保存起來(lái)并且進(jìn)行處理。
2)Kafka也可以實(shí)現(xiàn)這個(gè)功能。
3)Netty的HashedWheelTimer也可以實(shí)現(xiàn)這個(gè)功能。
最后放上我的代碼: https://github.com/stonehqs/delayQueue
到此這篇關(guān)于Redis延遲隊(duì)列和分布式延遲隊(duì)列的簡(jiǎn)答實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Redis延遲隊(duì)列和分布式延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis底層數(shù)據(jù)結(jié)構(gòu)之dict、ziplist、quicklist詳解
本文給大家詳細(xì)介紹了Redis的底層數(shù)據(jù)結(jié)構(gòu):dict、ziplist、quicklist的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-09-09Redis分布式鎖升級(jí)版RedLock及SpringBoot實(shí)現(xiàn)方法
這篇文章主要介紹了Redis分布式鎖升級(jí)版RedLock及SpringBoot實(shí)現(xiàn),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02Redis基本數(shù)據(jù)類(lèi)型Zset有序集合常用操作
這篇文章主要為大家介紹了redis基本數(shù)據(jù)類(lèi)型Zset有序集合常用操作,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Redis集群利用Redisson實(shí)現(xiàn)分布式鎖方式
這篇文章主要介紹了Redis集群利用Redisson實(shí)現(xiàn)分布式鎖方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05