redis redisson 集合的使用案例(RList、Rset、RMap)
redis redisson 集合操作
相關(guān)類及接口
Rlist:鏈表
public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<List<V>>, RandomAccess { List<V> get(int... var1); //獲取指定的節(jié)點值 int addAfter(V var1, V var2); //在var1前添加var2 int addBefore(V var1, V var2); //在var1后添加var2 void fastSet(int var1, V var2); //修改var1處的只為var2 List<V> readAll(); //獲取鏈表的所有值 RList<V> subList(int var1, int var2); //獲取var1到var2的子鏈表 List<V> range(int var1); //返回var1往后的鏈表 List<V> range(int var1, int var2); //返回var1到var2的鏈表 void trim(int var1, int var2); //保留var1到var2處的鏈表,其余刪除 void fastRemove(int var1); //刪除var1處的值 boolean remove(Object var1, int var2); //判斷元素是否刪除 <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce(); //mapreduce操作 }
RSet:無序集合
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> { V removeRandom(); Set<V> removeRandom(int var1); //刪除對象 V random(); Set<V> random(int var1); //隨機返回對象 boolean move(String var1, V var2); //判斷集合var1中是否存在var2,類似contains()方法 Set<V> readAll(); //獲取所有對象 int union(String... var1); //集合并集對象個數(shù) Set<V> readUnion(String... var1); //集合并集 int diff(String... var1); //集合差集對象個數(shù) Set<V> readDiff(String... var1); //集合差集 int intersection(String... var1); //集合交集的對象個數(shù) Set<V> readIntersection(String... var1); //集合交集 Iterator<V> iterator(int var1); Iterator<V> iterator(String var1, int var2); Iterator<V> iterator(String var1); //遍歷集合 <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce(); RSemaphore getSemaphore(V var1); RCountDownLatch getCountDownLatch(V var1); RPermitExpirableSemaphore getPermitExpirableSemaphore(V var1); //信號量 RLock getLock(V var1); RLock getFairLock(V var1); RReadWriteLock getReadWriteLock(V var1); //鎖操作 Stream<V> stream(int var1); Stream<V> stream(String var1, int var2); Stream<V> stream(String var1); //流操作 }
RMap:鍵值對
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> { void loadAll(boolean var1, int var2); void loadAll(Set<? extends K> var1, boolean var2, int var3); V get(Object var1); //獲取var1的值 V put(K var1, V var2); //添加對象 V putIfAbsent(K var1, V var2); //對象不存在則設(shè)置 V replace(K var1, V var2); //替換對象 boolean replace(K var1, V var2, V var3); //替換對象 V remove(Object var1); //移除對象 boolean remove(Object var1, Object var2); //移除對象 void putAll(Map<? extends K, ? extends V> var1); void putAll(Map<? extends K, ? extends V> var1, int var2); //添加對象 Map<K, V> getAll(Set<K> var1); //獲取key在集合var1中的鍵值對 int valueSize(K var1); //key為var1的value大小 V addAndGet(K var1, Number var2); //key為var1的value加var2 long fastRemove(K... var1); //移除對象 boolean fastPut(K var1, V var2); //添加對象 boolean fastReplace(K var1, V var2); //替換key為var1的值為var2 boolean fastPutIfAbsent(K var1, V var2); //如果不存在則設(shè)置 Set<K> readAllKeySet(); //獲取所有key,以set形式返回 Collection<V> readAllValues(); //獲取所有value,以collection返回 Set<Entry<K, V>> readAllEntrySet(); //遍歷鍵值對 Map<K, V> readAllMap(); //集合形式轉(zhuǎn)換為map類型 Set<K> keySet(); Set<K> keySet(int var1); Set<K> keySet(String var1, int var2); Set<K> keySet(String var1); //獲取key集合 Collection<V> values(); Collection<V> values(String var1); Collection<V> values(String var1, int var2); Collection<V> values(int var1); //獲取所有value Set<Entry<K, V>> entrySet(); Set<Entry<K, V>> entrySet(String var1); Set<Entry<K, V>> entrySet(String var1, int var2); Set<Entry<K, V>> entrySet(int var1); //遍歷鍵值對 <KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce(); RSemaphore getSemaphore(K var1); RCountDownLatch getCountDownLatch(K var1); RPermitExpirableSemaphore getPermitExpirableSemaphore(K var1); //信號量操作 RLock getLock(K var1); RLock getFairLock(K var1); RReadWriteLock getReadWriteLock(K var1); //鎖操作 }
使用示例
public class MyTest { public static void main(String[] args){ Config config=new Config(); config.useSingleServer().setAddress("redis://******:6379").setPassword("123456"); RedissonClient client= Redisson.create(config); RList<String> list=client.getList("list"); for (int i=0;i<10;i++){ list.add("瓜田李下 "+i); } list.readAll().forEach(System.out::println); System.out.println("list的數(shù)量為:"+list.size()+"\n"); RSet<String> set=client.getSet("set"); for (int i=0;i<10;i++){ set.add("瓜田李下 "+i); } for (String s : set) { System.out.println(s); } System.out.println("set的大小為:"+set.size()+"\n"); RMap<Integer,String> map=client.getMap("map"); for (int i=0;i<10;i++){ map.put(i,"瓜田李下 "+i); } for (Map.Entry<Integer,String> entry:map.entrySet()){ System.out.println(entry.getKey()+" ==> "+entry.getValue()); } System.out.println("map的大小為:"+map.size()); } }
控制臺輸出
瓜田李下 0
瓜田李下 1
瓜田李下 2
瓜田李下 3
瓜田李下 4
瓜田李下 5
瓜田李下 6
瓜田李下 7
瓜田李下 8
瓜田李下 9
list的數(shù)量為:10
瓜田李下 0
瓜田李下 1
瓜田李下 7
瓜田李下 3
瓜田李下 5
瓜田李下 4
瓜田李下 9
瓜田李下 8
瓜田李下 6
瓜田李下 2
set的大小為:10
0 ==> 瓜田李下 0
1 ==> 瓜田李下 1
2 ==> 瓜田李下 2
3 ==> 瓜田李下 3
4 ==> 瓜田李下 4
5 ==> 瓜田李下 5
6 ==> 瓜田李下 6
7 ==> 瓜田李下 7
8 ==> 瓜田李下 8
9 ==> 瓜田李下 9
map的大小為:10
Redisson使用注意事項
Redisson 是一個在 Redis 的基礎(chǔ)上實現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格,相較于暴露底層操作的Jedis,Redisson提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務(wù)。
特性 & 功能:
- 支持 Redis 單節(jié)點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式
- 程序接口調(diào)用方式采用異步執(zhí)行和異步流執(zhí)行兩種方式
- 數(shù)據(jù)序列化,Redisson 的對象編碼類是用于將對象進行序列化和反序列化,以實現(xiàn)對該對象在 Redis 里的讀取和存儲
- 單個集合數(shù)據(jù)分片,在集群模式下,Redisson 為單個 Redis 集合類型提供了自動分片的功能
- 提供多種分布式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
- 提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
- 分布式鎖和同步器的實現(xiàn),可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯(lián)鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等
- 提供先進的分布式服務(wù),如分布式遠程服務(wù)(Remote Service),分布式實時對象(Live Object)服務(wù),分布式執(zhí)行服務(wù)(Executor Service),分布式調(diào)度任務(wù)服務(wù)(Schedule Service)和分布式映射歸納服務(wù)(MapReduce)
- 更多特性和功能,請關(guān)注官網(wǎng):http://redisson.org
實現(xiàn)原理
redis本身是不支持上述的分布式對象和集合,Redisson是通過利用redis的特性在客戶端實現(xiàn)了高級數(shù)據(jù)結(jié)構(gòu)和特性,例如優(yōu)先隊列的實現(xiàn),是通過客戶端排序整理后再存入redis。
客戶端實現(xiàn),意味著當(dāng)沒有任何客戶端在線時,這些所有的數(shù)據(jù)結(jié)構(gòu)和特性都不會保留,也不會自動生效,例如過期事件的觸發(fā)或原來優(yōu)先隊列的元素增加。
注意事項
實時性
RMap中有一個功能是可以設(shè)置鍵值對的過期時間的,并可以注冊鍵值對的事件監(jiān)聽器
- 元素淘汰功能(Eviction)
- Redisson的分布式的RMapCache Java對象在基于RMap的前提下實現(xiàn)了針對單個元素的淘汰機制。同時仍然保留了元素的插入順序。由于RMapCache是基于RMap實現(xiàn)的,使它同時繼承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于這樣的功能來實現(xiàn)的。
- 目前的Redis自身并不支持散列(Hash)當(dāng)中的元素淘汰,因此所有過期元素都是通過org.redisson.EvictionScheduler實例來實現(xiàn)定期清理的。為了保證資源的有效利用,每次運行最多清理300個過期元素。任務(wù)的啟動時間將根據(jù)上次實際清理數(shù)量自動調(diào)整,間隔時間趨于1秒到1小時之間。比如該次清理時刪除了300條元素,那么下次執(zhí)行清理的時間將在1秒以后(最小間隔時間)。一旦該次清理數(shù)量少于上次清理數(shù)量,時間間隔將增加1.5倍。
正如官方wiki所述,這個功能是通過后臺線程定時去清理的, 所以這個是非實時的(issue-1234:on expired event is not executed in real-time.),延遲在5秒到2小時之間,因此對實時性要求比較高的場景就得自己衡量了。
由于過期時間的非實時性,所以導(dǎo)致過期事件的發(fā)生也是非實時的,相應(yīng)的監(jiān)聽器可能會延遲了一會兒才收到通知,在我的測試中,ttl設(shè)置在秒級誤差是比較大的,分鐘級別的ttl倒還好(左側(cè)設(shè)置值,右側(cè)實際耗時):
1s _ 5s
3s _ 5s
4s _ 5s
5s _ 9s
6s _ 10s
10s _ 15s
1m _ 1m11s
序列化
由Redisson默認的編碼器為JsonJacksonCodec,JsonJackson在序列化有雙向引用的對象時,會出現(xiàn)無限循環(huán)異常。而fastjson在檢查出雙向引用后會自動用引用符$ref替換,終止循環(huán)。
在我的情況中,我序列化了一個service,這個service已被spring托管,而且和另一個service之間也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環(huán)異常。
為了序列化后的內(nèi)容可見,所以不用redission其他自帶的二進制編碼器,自行實現(xiàn)編碼器:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import org.redisson.client.codec.BaseCodec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import java.io.IOException; public class FastjsonCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in,SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }
訂閱發(fā)布
Redisson對訂閱發(fā)布的封裝是RTopic,這也是Redisson中很多事件監(jiān)聽的實現(xiàn)原理(例如鍵值對的事件監(jiān)聽)。
使用單元測試時發(fā)現(xiàn),在事件發(fā)布后,訂閱方需要延時一下才能收到事件。具體原因待查
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot實現(xiàn)分布式任務(wù)調(diào)度的詳細步驟
隨著互聯(lián)網(wǎng)應(yīng)用的規(guī)模和復(fù)雜度不斷增加,單節(jié)點任務(wù)調(diào)度系統(tǒng)已經(jīng)難以滿足高并發(fā)、大數(shù)據(jù)量的處理需求,分布式任務(wù)調(diào)度成為了解決這一問題的重要手段,本文將介紹如何在Spring Boot中實現(xiàn)分布式任務(wù)調(diào)度,需要的朋友可以參考下2024-08-08Java中的CountDownLatch、CyclicBarrier和semaphore實現(xiàn)原理解讀
這篇文章主要介紹了Java中的CountDownLatch、CyclicBarrier和semaphore實現(xiàn)原理詳解,CountDownLatch中調(diào)用await方法線程需要等待所有調(diào)用countDown方法的線程執(zhí)行,這就很適合一個業(yè)務(wù)需要一些準(zhǔn)備條件,等準(zhǔn)備條件準(zhǔn)備好之后再繼續(xù)執(zhí)行,需要的朋友可以參考下2023-12-12Java中l(wèi)ist集合的clear方法及空字符串的區(qū)別
這篇文章主要介紹了Java中l(wèi)ist集合的clear方法及空字符串的區(qū)別,在使用list?結(jié)合的時候習(xí)慣了?list=null?;在創(chuàng)建這樣的方式,但是發(fā)現(xiàn)使用list的clear?方法很不錯,尤其是有大量循環(huán)的時候<BR>list.clear()與list?=?null?區(qū)別,需要的朋友可以參考下2023-08-08java并發(fā)編程synchronized底層實現(xiàn)原理
這篇文章主要介紹了java并發(fā)編程synchronized底層實現(xiàn)原理2022-02-02Java的增強for循環(huán)修改數(shù)組元素的問題小結(jié)
增強for循環(huán)的元素變量x,就是一個局部變量,它是引用數(shù)組當(dāng)前元素引用的副本(就相當(dāng)于上文所說的你復(fù)刻朋友的鑰匙),或者是基本數(shù)據(jù)類型的值的副本,這篇文章主要介紹了Java的增強for循環(huán)修改數(shù)組元素的問題小結(jié),需要的朋友可以參考下2024-02-02Java中實現(xiàn)Comparator接口和用法實例(簡明易懂)
這篇文章主要介紹了Java中實現(xiàn)Comparator接口和用法實例(簡明易懂),本文給出實現(xiàn)Comparator接口的實例和使用這個接口的代碼實例,需要的朋友可以參考下2015-05-05