redis實(shí)現(xiàn)隊(duì)列的阻塞、延時(shí)、發(fā)布和訂閱
Redis不僅可作為緩存服務(wù)器,還可以用作消息隊(duì)列。它的列表類(lèi)型天生支持用作消息隊(duì)列。如下圖所示:
由于Redis的列表是使用雙向鏈表實(shí)現(xiàn)的,保存了頭節(jié)點(diǎn)和尾節(jié)點(diǎn),所以在列表的頭部和尾部?jī)蛇叢迦牖颢@取元素都是非??斓?,時(shí)間復(fù)雜度為O(1)。
普通隊(duì)列
可以直接使用Redis的list數(shù)據(jù)類(lèi)型實(shí)現(xiàn)消息隊(duì)列,只需簡(jiǎn)單的兩個(gè)指令lpush和rpop或者rpush和lpop。
- lpush+rpop:左進(jìn)右出的隊(duì)列
- rpush+lpop:左出右進(jìn)的隊(duì)列
下面使用redis的命令來(lái)模擬普通隊(duì)列。
使用lpush命令生產(chǎn)消息:
>lpush queue:single 1 "1" >lpush queue:single 2 "2" >lpush queue:single 3 "3"
使用rpop命令消費(fèi)消息:
>rpop queue:single "1" >rpop queue:single "2" >rpop queue:single "3"
下面使用Java代碼來(lái)實(shí)現(xiàn)普通隊(duì)列。
生產(chǎn)者SingleProducer
package com.morris.redis.demo.queue.single; import redis.clients.jedis.Jedis; /** ?* 生產(chǎn)者 ?*/ public class SingleProducer { ? ? public static final String SINGLE_QUEUE_NAME = "queue:single"; ? ? public static void main(String[] args) { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? for (int i = 0; i < 100; i++) { ? ? ? ? ? ? jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i); ? ? ? ? } ? ? ? ? jedis.close(); ? ? } }
消費(fèi)者SingleConsumer:
package com.morris.redis.demo.queue.single; import redis.clients.jedis.Jedis; import java.util.Objects; import java.util.concurrent.TimeUnit; /** ?* 消費(fèi)者 ?*/ public class SingleConsumer { ? ? public static void main(String[] args) throws InterruptedException { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? while (true) { ? ? ? ? ? ? String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME); ? ? ? ? ? ? if(Objects.nonNull(message)) { ? ? ? ? ? ? ? ? System.out.println(message); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500); ? ? ? ? ? ? } ? ? ? ? } ? ? } }
上面的代碼已經(jīng)基本實(shí)現(xiàn)了普通隊(duì)列的生產(chǎn)與消費(fèi),但是上述的例子中消息的消費(fèi)者存在兩個(gè)問(wèn)題:
- 消費(fèi)者需要不停的調(diào)用rpop方法查看redis的list中是否有待處理的數(shù)據(jù)(消息)。每調(diào)用一次都會(huì)發(fā)起一次連接,有可能list中沒(méi)有數(shù)據(jù),造成大量的空輪詢(xún),導(dǎo)致造成不必要的浪費(fèi)。也許你可以使用Thread.sleep()等方法讓消費(fèi)者線(xiàn)程隔一段時(shí)間再消費(fèi),如果睡眠時(shí)間過(guò)長(zhǎng),這樣不能處理一些時(shí)效性要求高的消息,睡眠時(shí)間過(guò)短,也會(huì)在連接上造成比較大的開(kāi)銷(xiāo)。
- 如果生產(chǎn)者速度大于消費(fèi)者消費(fèi)速度,消息隊(duì)列長(zhǎng)度會(huì)一直增大,時(shí)間久了會(huì)占用大量?jī)?nèi)存空間。
阻塞隊(duì)列
消費(fèi)者可以使用brpop指令從redis的list中獲取數(shù)據(jù),這個(gè)指令只有在有元素時(shí)才返回,沒(méi)有則會(huì)阻塞直到超時(shí)返回null,于是消費(fèi)端就不需要休眠后獲取數(shù)據(jù)了,這樣就相當(dāng)于實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列,
使用redis的brpop命令來(lái)模擬阻塞隊(duì)列。
>brpop queue:single 30
可以看到命令行阻塞在了brpop這里了,30s后沒(méi)數(shù)據(jù)就返回。
Java代碼實(shí)現(xiàn)如下:
生產(chǎn)者與普通隊(duì)列的生產(chǎn)者一致。
消費(fèi)者BlockConsumer:
package com.morris.redis.demo.queue.block; import redis.clients.jedis.Jedis; import java.util.List; /** ?* 消費(fèi)者 ?*/ public class BlockConsumer { ? ? public static void main(String[] args) { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? while (true) { ? ? ? ? ? ? // 超時(shí)時(shí)間為1s ? ? ? ? ? ? List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME); ? ? ? ? ? ? if (null != messageList && !messageList.isEmpty()) { ? ? ? ? ? ? ? ? System.out.println(messageList); ? ? ? ? ? ? } ? ? ? ? } ? ? } }
缺點(diǎn):無(wú)法實(shí)現(xiàn)一次生產(chǎn)多次消費(fèi)。
發(fā)布訂閱模式
Redis除了對(duì)消息隊(duì)列提供支持外,還提供了一組命令用于支持發(fā)布/訂閱模式。利用Redis的pub/sub模式可以實(shí)現(xiàn)一次生產(chǎn)多次消費(fèi)的隊(duì)列。
發(fā)布:PUBLISH指令可用于發(fā)布一條消息,格式:
PUBLISH channel message
返回值表示訂閱了該消息的數(shù)量。
訂閱:SUBSCRIBE指令用于接收一條消息,格式:
SUBSCRIBE channel
使用SUBSCRIBE指令后進(jìn)入了訂閱模式,但是不會(huì)接收到訂閱之前publish發(fā)送的消息,這是因?yàn)橹挥性谙l(fā)出去前訂閱才會(huì)接收到。在這個(gè)模式下其他指令,只能看到回復(fù)。
回復(fù)分為三種類(lèi)型:
- 如果為subscribe,第二個(gè)值表示訂閱的頻道,第三個(gè)值表示是已訂閱的頻道的數(shù)量
- 如果為message(消息),第二個(gè)值為產(chǎn)生該消息的頻道,第三個(gè)值為消息
- 如果為unsubscribe,第二個(gè)值表示取消訂閱的頻道,第三個(gè)值表示當(dāng)前客戶(hù)端的訂閱數(shù)量。
下面使用redis的命令來(lái)模擬發(fā)布訂閱模式。
生產(chǎn)者:
127.0.0.1:6379> publish queue hello (integer) 1 127.0.0.1:6379> publish queue hi (integer) 1
消費(fèi)者:
127.0.0.1:6379> subscribe queue Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "queue" 3) (integer) 1 1) "message" 2) "queue" 3) "hello" 1) "message" 2) "queue" 3) "hi"
Java代碼實(shí)現(xiàn)如下:
生產(chǎn)者PubsubProducer:
? package com.morris.redis.demo.queue.pubsub; import redis.clients.jedis.Jedis; /** ?* 生產(chǎn)者 ?*/ public class PubsubProducer { ? ? public static final String PUBSUB_QUEUE_NAME = "queue:pubsub"; ? ? public static void main(String[] args) { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? for (int i = 0; i < 100; i++) { ? ? ? ? ? ? jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i); ? ? ? ? } ? ? ? ? jedis.close(); ? ? } } ?
消費(fèi)者PubsubConsumer:
package com.morris.redis.demo.queue.pubsub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; /** ?* 消費(fèi)者 ?*/ public class PubsubConsumer { ? ? public static void main(String[] args) throws InterruptedException { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? JedisPubSub jedisPubSub = new JedisPubSub() { ? ? ? ? ? ? @Override ? ? ? ? ? ? public void onMessage(String channel, String message) { ? ? ? ? ? ? ? ? System.out.println("receive message: " + message); ? ? ? ? ? ? ? ? if(message.indexOf("99") > -1) { ? ? ? ? ? ? ? ? ? ? this.unsubscribe(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? @Override ? ? ? ? ? ? public void onSubscribe(String channel, int subscribedChannels) { ? ? ? ? ? ? ? ? System.out.println("subscribe channel: " + channel); ? ? ? ? ? ? } ? ? ? ? ? ? @Override ? ? ? ? ? ? public void onUnsubscribe(String channel, int subscribedChannels) { ? ? ? ? ? ? ? ? System.out.println("unsubscribe channel " + channel); ? ? ? ? ? ? } ? ? ? ? }; ? ? ? ? jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME); ? ? } }
消費(fèi)者可以啟動(dòng)多個(gè),每個(gè)消費(fèi)者都能收到所有的消息。
可以使用指令UNSUBSCRIBE退訂,如果不加參數(shù),則會(huì)退訂所有由SUBSCRIBE指令訂閱的頻道。
Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:
psubscribe channel.*
用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無(wú)法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。
同時(shí)PUNSUBSCRIBE指令通配符不會(huì)展開(kāi)。例如:PUNSUBSCRIBE \*不會(huì)匹配到channel.\*,所以要取消訂閱channel.\*就要這樣寫(xiě)PUBSUBSCRIBE channel.\*。
Redis的pub/sub也有其缺點(diǎn),那就是如果消費(fèi)者下線(xiàn),生產(chǎn)者的消息會(huì)丟失。
延時(shí)隊(duì)列和優(yōu)先級(jí)隊(duì)列
Redis中有個(gè)數(shù)據(jù)類(lèi)型叫Zset,其本質(zhì)就是在數(shù)據(jù)類(lèi)型Set的基礎(chǔ)上加了個(gè)排序的功能而已,除了保存原始的數(shù)據(jù)value之外,還提供另一個(gè)屬性score,這一屬性在添加修改元素時(shí)候可以進(jìn)行指定,每次指定后,Zset會(huì)自動(dòng)重新按新的score值進(jìn)行排序。
如果score字段設(shè)置為消息的優(yōu)先級(jí),優(yōu)先級(jí)最高的消息排在第一位,這樣就能實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)隊(duì)列。
如果score字段代表的是消息想要執(zhí)行時(shí)間的時(shí)間戳,將它插入Zset集合中,便會(huì)按照時(shí)間戳大小進(jìn)行排序,也就是對(duì)執(zhí)行時(shí)間先后進(jìn)行排序,集合中最先要執(zhí)行的消息就會(huì)排在第一位,這樣的話(huà),只需要起一個(gè)死循環(huán)線(xiàn)程不斷獲取集合中的第一個(gè)元素,如果當(dāng)前時(shí)間戳大于等于該元素的score就將它取出來(lái)進(jìn)行消費(fèi)刪除,就可以達(dá)到延時(shí)執(zhí)行的目的,注意不需要遍歷整個(gè)Zset集合,以免造成性能浪費(fèi)。
下面使用redis的zset來(lái)模擬延時(shí)隊(duì)列。
生產(chǎn)者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3 (integer) 0
消費(fèi)者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores 1) "order1" 2) "1" 127.0.0.1:6379> zrem queue:delay order1 (integer) 1
Java代碼如下:
生產(chǎn)者DelayProducer:
package com.morris.redis.demo.queue.delay; import redis.clients.jedis.Jedis; import java.util.Date; import java.util.Random; /** ?* 生產(chǎn)者 ?*/ public class DelayProducer { ? ? public static final String DELAY_QUEUE_NAME = "queue:delay"; ? ? public static void main(String[] args) { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? long now = new Date().getTime(); ? ? ? ? Random random = new Random(); ? ? ? ? for (int i = 0; i < 10; i++) { ? ? ? ? ? ? int second = random.nextInt(30); // 隨機(jī)訂單失效時(shí)間 ? ? ? ? ? ? jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i); ? ? ? ? } ? ? ? ? jedis.close(); ? ? } }
消費(fèi)者:
package com.morris.redis.demo.queue.delay; import redis.clients.jedis.Jedis; import redis.clients.jedis.Tuple; import java.util.Date; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; /** ?* 消費(fèi)者 ?*/ public class DelayConsumer { ? ? public static void main(String[] args) throws InterruptedException { ? ? ? ? Jedis jedis = new Jedis(); ? ? ? ? while (true) { ? ? ? ? ? ? long now = new Date().getTime(); ? ? ? ? ? ? Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0); ? ? ? ? ? ? if(tupleSet.isEmpty()) { ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? for (Tuple tuple : tupleSet) { ? ? ? ? ? ? ? ? ? ? Double score = tuple.getScore(); ? ? ? ? ? ? ? ? ? ? long time = score.longValue(); ? ? ? ? ? ? ? ? ? ? if(time < now) { ? ? ? ? ? ? ? ? ? ? ? ? jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement()); ? ? ? ? ? ? ? ? ? ? ? ? System.out.println("order[" + tuple.getElement() +"] is timeout at " + time); ? ? ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? break; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } }
應(yīng)用場(chǎng)景
延時(shí)隊(duì)列可用于訂單超時(shí)失效的場(chǎng)景
二級(jí)緩存(local+redis)中,當(dāng)有緩存需要更新時(shí),可以使用發(fā)布訂閱模式通知其他服務(wù)器使得本地緩存失效。
到此這篇關(guān)于redis實(shí)現(xiàn)隊(duì)列的阻塞、延時(shí)、發(fā)布和訂閱的文章就介紹到這了,更多相關(guān)redis 隊(duì)列阻塞、延時(shí)、發(fā)布和訂閱內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JSON 與對(duì)象、集合之間的轉(zhuǎn)換的示例
在開(kāi)發(fā)過(guò)程中,經(jīng)常需要和別的系統(tǒng)交換數(shù)據(jù),數(shù)據(jù)交換的格式有XML、JSON等,JSON作為一個(gè)輕量級(jí)的數(shù)據(jù)格式比xml效率要高,本篇文章主要介紹了JSON 與 對(duì)象 、集合 之間的轉(zhuǎn)換,有興趣的可以了解一下。2017-01-01關(guān)于java中可變長(zhǎng)參數(shù)的定義及使用方法詳解
下面小編就為大家?guī)?lái)一篇關(guān)于java中可變長(zhǎng)參數(shù)的定義及使用方法詳解。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12java項(xiàng)目打包成可執(zhí)行jar用log4j將日志寫(xiě)在jar所在目錄操作
這篇文章主要介紹了java項(xiàng)目打包成可執(zhí)行jar用log4j將日志寫(xiě)在jar所在目錄操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08Java實(shí)現(xiàn)經(jīng)典游戲2048的示例代碼
2014年Gabriele Cirulli利用周末的時(shí)間寫(xiě)2048這個(gè)游戲的程序。本文將用java語(yǔ)言實(shí)現(xiàn)這一經(jīng)典游戲,并采用了swing技術(shù)進(jìn)行了界面化處理,需要的可以參考一下2022-02-02Spring單元測(cè)試類(lèi)ApplicationTests錯(cuò)誤的解決
這篇文章主要介紹了Spring單元測(cè)試類(lèi)ApplicationTests錯(cuò)誤的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java8新特性之接口中的默認(rèn)方法和靜態(tài)方法詳解
今天帶大家學(xué)習(xí)的是Java8新特性的相關(guān)知識(shí),文章圍繞著Java接口中的默認(rèn)方法和靜態(tài)方法展開(kāi),文中有非常詳細(xì)的的代碼示例,需要的朋友可以參考下2021-06-06mybatis plus表的創(chuàng)建時(shí)間和修改時(shí)間的操作方法
這篇文章主要介紹了mybatis plus表的創(chuàng)建時(shí)間和修改時(shí)間的實(shí)現(xiàn)方法,本文給大家分享兩種方法,每種方法通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-09-09Springboot項(xiàng)目實(shí)現(xiàn)將類(lèi)從@ComponentScan中排除
這篇文章主要介紹了Springboot項(xiàng)目實(shí)現(xiàn)將類(lèi)從@ComponentScan中排除,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11