基于Spring接口集成Caffeine+Redis兩級(jí)緩存
前言
在上一篇文章Redis+Caffeine兩級(jí)緩存的實(shí)現(xiàn)中,我們介紹了3種整合Caffeine
和Redis
作為兩級(jí)緩存使用的方法,雖然說(shuō)能夠?qū)崿F(xiàn)功能,但實(shí)現(xiàn)手法還是太粗糙了,并且遺留了一些問(wèn)題沒(méi)有處理。本文將在上一篇的基礎(chǔ)上,圍繞兩個(gè)方面進(jìn)行進(jìn)一步的改造:
JSR107
定義了緩存使用規(guī)范,spring中提供了基于這個(gè)規(guī)范的接口,所以我們可以直接使用spring中的接口進(jìn)行Caffeine
和Redis
兩級(jí)緩存的整合改造- 在分布式環(huán)境下,如果一臺(tái)主機(jī)的本地緩存進(jìn)行修改,需要通知其他主機(jī)修改本地緩存,解決分布式環(huán)境下本地緩存一致性問(wèn)題
好了,在明確了需要的改進(jìn)問(wèn)題后,下面我們開(kāi)始正式修改。
改造
在上篇文章的v3
版本中,我們使用自定義注解的方式實(shí)現(xiàn)了兩級(jí)緩存通過(guò)一個(gè)注解管理的功能。本文我們換一種方式,直接通過(guò)擴(kuò)展spring提供的接口來(lái)實(shí)現(xiàn)這個(gè)功能,在進(jìn)行整合之前,我們需要簡(jiǎn)單了解一下JSR107
緩存規(guī)范。
JSR107 規(guī)范
在JSR107
緩存規(guī)范中定義了5個(gè)核心接口,分別是CachingProvider
,CacheManager
,Cache
, Entry
和Expiry
,參考下面這張圖,可以看到除了Entry
和Expiry
以外,從上到下都是一對(duì)多的包含關(guān)系。
從上面這張圖我們可以看出,一個(gè)應(yīng)用可以創(chuàng)建并管理多個(gè)CachingProvider
,同樣一個(gè)CachingProvider
也可以管理多個(gè)CacheManager
,緩存管理器CacheManager
中則維護(hù)了多個(gè)Cache
。
Cache
是一個(gè)類似Map
的數(shù)據(jù)結(jié)構(gòu),Entry
就是其中存儲(chǔ)的每一個(gè)key-value
數(shù)據(jù)對(duì),并且每個(gè)Entry
都有一個(gè)過(guò)期時(shí)間Expiry
。而我們?cè)谑褂胹pring集成第三方的緩存時(shí),只需要實(shí)現(xiàn)Cache
和CacheManager
這兩個(gè)接口就可以了,下面分別具體來(lái)看一下。
Cache
spring中的Cache
接口規(guī)范了緩存組件的定義,包含了緩存的各種操作,實(shí)現(xiàn)具體緩存操作的管理。例如我們熟悉的RedisCache
、EhCacheCache
等,都實(shí)現(xiàn)了這個(gè)接口。
在Cache
接口中,定義了get
、put
、evict
、clear
等方法,分別對(duì)應(yīng)緩存的存入、取出、刪除、清空操作。不過(guò)我們這里不直接使用Cache
接口,上面這張圖中的AbstractValueAdaptingCache
是一個(gè)抽象類,它已經(jīng)實(shí)現(xiàn)了Cache
接口,是spring在Cache
接口的基礎(chǔ)上幫助我們進(jìn)行了一層封裝,所以我們直接繼承這個(gè)類就可以。
繼承AbstractValueAdaptingCache
抽象類后,除了創(chuàng)建Cache
的構(gòu)造方法外,還需要實(shí)現(xiàn)下面的幾個(gè)方法:
// 在緩存中實(shí)際執(zhí)行查找的操作,父類的get()方法會(huì)調(diào)用這個(gè)方法 protected abstract Object lookup(Object key); // 通過(guò)key獲取緩存值,如果沒(méi)有找到,會(huì)調(diào)用valueLoader的call()方法 public <T> T get(Object key, Callable<T> valueLoader); // 將數(shù)據(jù)放入緩存中 public void put(Object key, Object value); // 刪除緩存 public void evict(Object key); // 清空緩存中所有數(shù)據(jù) public void clear(); // 獲取緩存名稱,一般在CacheManager創(chuàng)建時(shí)指定 String getName(); // 獲取實(shí)際使用的緩存 Object getNativeCache();
因?yàn)橐?code>RedisTemplate和Caffeine
的Cache
,所以這些都需要在緩存的構(gòu)造方法中傳入,除此之外構(gòu)造方法中還需要再傳出緩存名稱cacheName
,以及在配置文件中實(shí)際配置的一些緩存參數(shù)。先看一下構(gòu)造方法的實(shí)現(xiàn):
public class DoubleCache extends AbstractValueAdaptingCache { private String cacheName; private RedisTemplate<Object, Object> redisTemplate; private Cache<Object, Object> caffeineCache; private DoubleCacheConfig doubleCacheConfig; protected DoubleCache(boolean allowNullValues) { super(allowNullValues); } public DoubleCache(String cacheName,RedisTemplate<Object, Object> redisTemplate, Cache<Object, Object> caffeineCache, DoubleCacheConfig doubleCacheConfig){ super(doubleCacheConfig.getAllowNull()); this.cacheName=cacheName; this.redisTemplate=redisTemplate; this.caffeineCache=caffeineCache; this.doubleCacheConfig=doubleCacheConfig; } //... }
抽象父類的構(gòu)造方法中只有一個(gè)boolean
類型的參數(shù)allowNullValues
,表示是否允許緩存對(duì)象為null
。除此之外,AbstractValueAdaptingCache
中還定義了兩個(gè)包裝方法來(lái)配合這個(gè)參數(shù)進(jìn)行使用,分別是toStoreValue
和fromStoreValue
,特殊用途是用于在緩存null
對(duì)象時(shí)進(jìn)行包裝、以及在獲取時(shí)進(jìn)行解析并返回。
我們之后會(huì)在CacheManager
中調(diào)用后面這個(gè)自己實(shí)現(xiàn)的構(gòu)造方法,來(lái)實(shí)例化Cache
對(duì)象,參數(shù)中DoubleCacheConfig
是使用@ConfigurationProperties
讀取的yml配置文件封裝的數(shù)據(jù)對(duì)象,會(huì)在后面使用。
當(dāng)一個(gè)方法添加了@Cacheable
注解時(shí),執(zhí)行時(shí)會(huì)先調(diào)用父類AbstractValueAdaptingCache
中的get(key)
方法,它會(huì)再調(diào)用我們自己實(shí)現(xiàn)的lookup
方法。在實(shí)際執(zhí)行查找操作的lookup
方法中,我們的邏輯仍然是先查找Caffeine
、沒(méi)有找到時(shí)再查找Redis
:
@Override protected Object lookup(Object key) { // 先從caffeine中查找 Object obj = caffeineCache.getIfPresent(key); if (Objects.nonNull(obj)){ log.info("get data from caffeine"); return obj; } //再?gòu)膔edis中查找 String redisKey=this.name+":"+ key; obj = redisTemplate.opsForValue().get(redisKey); if (Objects.nonNull(obj)){ log.info("get data from redis"); caffeineCache.put(key,obj); } return obj; }
如果lookup
方法的返回結(jié)果不為null
,那么就會(huì)直接返回結(jié)果給調(diào)用方。如果返回為null
時(shí),就會(huì)執(zhí)行原方法,執(zhí)行完成后調(diào)用put
方法,將數(shù)據(jù)放入緩存中。接下來(lái)我們實(shí)現(xiàn)put
方法:
@Override public void put(Object key, Object value) { if(!isAllowNullValues() && Objects.isNull(value)){ log.error("the value NULL will not be cached"); return; } //使用 toStoreValue(value) 包裝,解決caffeine不能存null的問(wèn)題 caffeineCache.put(key,toStoreValue(value)); // null對(duì)象只存在caffeine中一份就夠了,不用存redis了 if (Objects.isNull(value)) return; String redisKey=this.cacheName +":"+ key; Optional<Long> expireOpt = Optional.ofNullable(doubleCacheConfig) .map(DoubleCacheConfig::getRedisExpire); if (expireOpt.isPresent()){ redisTemplate.opsForValue().set(redisKey,toStoreValue(value), expireOpt.get(), TimeUnit.SECONDS); }else{ redisTemplate.opsForValue().set(redisKey,toStoreValue(value)); } }
上面我們對(duì)于是否允許緩存空對(duì)象進(jìn)行了判斷,能夠緩存空對(duì)象的好處之一就是可以避免緩存穿透。需要注意的是,Caffeine
中是不能直接緩存null
的,因此可以使用父類提供的toStoreValue()
方法,將它包裝成一個(gè)NullValue
類型。在取出對(duì)象時(shí),如果是NullValue
,也不用我們自己再去調(diào)用fromStoreValue()
將這個(gè)包裝類型還原,父類的get
方法中已經(jīng)幫我們做好了。
另外,上面在put
方法中緩存空對(duì)象時(shí),只在Caffeine
緩存中一份即可,可以不用在Redis
中再存一份。
緩存的刪除方法evict()
和清空方法clear()
的實(shí)現(xiàn)就比較簡(jiǎn)單了,直接刪除一跳或全部數(shù)據(jù)即可:
@Override public void evict(Object key) { redisTemplate.delete(this.cacheName +":"+ key); caffeineCache.invalidate(key); } @Override public void clear() { Set<Object> keys = redisTemplate.keys(this.cacheName.concat(":*")); for (Object key : keys) { redisTemplate.delete(String.valueOf(key)); } caffeineCache.invalidateAll(); }
獲取緩存cacheName
和實(shí)際緩存的方法實(shí)現(xiàn):
@Override public String getName() { return this.cacheName; } @Override public Object getNativeCache() { return this; }
最后,我們?cè)賮?lái)看一下帶有兩個(gè)參數(shù)的get
方法,為什么把這個(gè)方法放到最后來(lái)說(shuō)呢,因?yàn)槿绻覀冎皇鞘褂米⒔鈦?lái)管理緩存的話,那么這個(gè)方法不會(huì)被調(diào)用到,簡(jiǎn)單看一下實(shí)現(xiàn):
@Override public <T> T get(Object key, Callable<T> valueLoader) { ReentrantLock lock=new ReentrantLock(); try{ lock.lock();//加鎖 Object obj = lookup(key); if (Objects.nonNull(obj)){ return (T)obj; } //沒(méi)有找到 obj = valueLoader.call(); put(key,obj);//放入緩存 return (T)obj; }catch (Exception e){ log.error(e.getMessage()); }finally { lock.unlock(); } return null; }
方法的實(shí)現(xiàn)比較容易理解,還是先調(diào)用lookup
方法尋找是否已經(jīng)緩存了對(duì)象,如果沒(méi)有找到那么就調(diào)用Callable
中的call
方法進(jìn)行獲取,并在獲取完成后存入到緩存中去。至于這個(gè)方法如何使用,具體代碼我們放在后面使用這一塊再看。
需要注意的是,這個(gè)方法的接口注釋中強(qiáng)調(diào)了需要我們自己來(lái)保證方法同步,因此這里使用了ReentrantLock
進(jìn)行了加鎖操作。到這里,Cache
的實(shí)現(xiàn)就完成了,下面我們接著看另一個(gè)重要的接口CacheManager
。
CacheManager
從名字就可以看出,CacheManager
是一個(gè)緩存管理器,它可以被用來(lái)管理一組Cache
。在上一篇文章的v2版本中,我們使用的CaffeineCacheManager
就實(shí)現(xiàn)了這個(gè)接口,除此之外還有RedisCacheManager
、EhCacheCacheManager
等也都是通過(guò)這個(gè)接口實(shí)現(xiàn)。
下面我們要自定義一個(gè)類實(shí)現(xiàn)CacheManager
接口,管理上面實(shí)現(xiàn)的DoubleCache
作為spring中的緩存使用。接口中需要實(shí)現(xiàn)的方法只有下面兩個(gè):
//根據(jù)cacheName獲取Cache實(shí)例,不存在時(shí)進(jìn)行創(chuàng)建 Cache getCache(String name); //返回管理的所有cacheName Collection<String> getCacheNames();
在自定義的緩存管理器中,我們要使用ConcurrentHashMap
維護(hù)一組不同的Cache
,再定義一個(gè)構(gòu)造方法,在參數(shù)中傳入已經(jīng)在spring中配置好的RedisTemplate
,以及相關(guān)的緩存配置參數(shù):
public class DoubleCacheManager implements CacheManager { Map<String, Cache> cacheMap = new ConcurrentHashMap<>(); private RedisTemplate<Object, Object> redisTemplate; private DoubleCacheConfig dcConfig; public DoubleCacheManager(RedisTemplate<Object, Object> redisTemplate, DoubleCacheConfig doubleCacheConfig) { this.redisTemplate = redisTemplate; this.dcConfig = doubleCacheConfig; } //... }
然后實(shí)現(xiàn)getCache
方法,邏輯很簡(jiǎn)單,先根據(jù)name
從Map
中查找對(duì)應(yīng)的Cache
,如果找到則直接返回,這個(gè)參數(shù)name
就是上一篇文章中提到的cacheName
,CacheManager
根據(jù)它實(shí)現(xiàn)不同Cache
的隔離。
如果沒(méi)有根據(jù)名稱找到緩存的話,那么新建一個(gè)DoubleCache
對(duì)象,并放入Map
中。這里使用的ConcurrentHashMap
的putIfAbsent()
方法放入,避免重復(fù)創(chuàng)建Cache
以及造成Cache
內(nèi)數(shù)據(jù)的丟失。具體代碼如下:
@Override public Cache getCache(String name) { Cache cache = cacheMap.get(name); if (Objects.nonNull(cache)) { return cache; } cache = new DoubleCache(name, redisTemplate, createCaffeineCache(), dcConfig); Cache oldCache = cacheMap.putIfAbsent(name, cache); return oldCache == null ? cache : oldCache; }
在上面創(chuàng)建DoubleCache
對(duì)象的過(guò)程中,需要先創(chuàng)建一個(gè)Caffeine
的Cache
對(duì)象作為參數(shù)傳入,這一過(guò)程主要是根據(jù)實(shí)際項(xiàng)目的配置文件中的具體參數(shù)進(jìn)行初始化,代碼如下:
private com.github.benmanes.caffeine.cache.Cache createCaffeineCache(){ Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder(); Optional<DoubleCacheConfig> dcConfigOpt = Optional.ofNullable(this.dcConfig); dcConfigOpt.map(DoubleCacheConfig::getInit) .ifPresent(init->caffeineBuilder.initialCapacity(init)); dcConfigOpt.map(DoubleCacheConfig::getMax) .ifPresent(max->caffeineBuilder.maximumSize(max)); dcConfigOpt.map(DoubleCacheConfig::getExpireAfterWrite) .ifPresent(eaw->caffeineBuilder.expireAfterWrite(eaw,TimeUnit.SECONDS)); dcConfigOpt.map(DoubleCacheConfig::getExpireAfterAccess) .ifPresent(eaa->caffeineBuilder.expireAfterAccess(eaa,TimeUnit.SECONDS)); dcConfigOpt.map(DoubleCacheConfig::getRefreshAfterWrite) .ifPresent(raw->caffeineBuilder.refreshAfterWrite(raw,TimeUnit.SECONDS)); return caffeineBuilder.build(); }
getCacheNames
方法很簡(jiǎn)單,直接返回Map
的keySet
就可以了,代碼如下:
@Override public Collection<String> getCacheNames() { return cacheMap.keySet(); }
配置&使用
在application.yml
文件中配置緩存的參數(shù),代碼中使用@ConfigurationProperties
接收到DoubleCacheConfig
類中:
doublecache: allowNull: true init: 128 max: 1024 expireAfterWrite: 30 #Caffeine過(guò)期時(shí)間 redisExpire: 60 #Redis緩存過(guò)期時(shí)間
配置自定義的DoubleCacheManager
作為默認(rèn)的緩存管理器:
@Configuration public class CacheConfig { @Autowired DoubleCacheConfig doubleCacheConfig; @Bean public DoubleCacheManager cacheManager(RedisTemplate<Object,Object> redisTemplate, DoubleCacheConfig doubleCacheConfig){ return new DoubleCacheManager(redisTemplate,doubleCacheConfig); } }
Service
中的代碼還是老樣子,不需要在代碼中手動(dòng)操作緩存,只要直接在方法上使用@Cache
相關(guān)注解即可:
@Service @Slf4j @AllArgsConstructor public class OrderServiceImpl implements OrderService { private final OrderMapper orderMapper; @Cacheable(value = "order",key = "#id") public Order getOrderById(Long id) { Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>() .eq(Order::getId, id)); return myOrder; } @CachePut(cacheNames = "order",key = "#order.id") public Order updateOrder(Order order) { orderMapper.updateById(order); return order; } @CacheEvict(cacheNames = "order",key = "#id") public void deleteOrder(Long id) { orderMapper.deleteById(id); } //沒(méi)有注解,使用get(key,callable)方法 public Order getOrderById2(Long id) { DoubleCacheManager cacheManager = SpringContextUtil.getBean(DoubleCacheManager.class); Cache cache = cacheManager.getCache("order"); Order order =(Order) cache.get(id, (Callable<Object>) () -> { log.info("get data from database"); Order myOrder = orderMapper.selectOne(new LambdaQueryWrapper<Order>() .eq(Order::getId, id)); return myOrder; }); return order; } }
注意最后這個(gè)沒(méi)有添加任何注解的方法,只有以這種方式調(diào)用時(shí)才會(huì)執(zhí)行我們?cè)?code>DoubleCache中自己實(shí)現(xiàn)的get(key,callable)
方法。到這里,基于JSR107
規(guī)范和spring接口的兩級(jí)緩存改造就完成了,下面我們看一下遺漏的第二個(gè)問(wèn)題。
分布式環(huán)境改造
前面我們說(shuō)了,在分布式環(huán)境下,可能會(huì)存在各個(gè)主機(jī)上一級(jí)緩存不一致的問(wèn)題。當(dāng)一臺(tái)主機(jī)修改了本地緩存后,其他主機(jī)是沒(méi)有感知的,仍然保持了之前的緩存,那么這種情況下就可能取到臟數(shù)據(jù)。既然我們?cè)陧?xiàng)目中已經(jīng)使用了Redis
,那么就可以使用它的發(fā)布/訂閱功能來(lái)使各個(gè)節(jié)點(diǎn)的緩存進(jìn)行同步。
定義消息體
在使用Redis
發(fā)送消息前,需要先定義一個(gè)消息對(duì)象。其中的數(shù)據(jù)包括消息要作用于的Cache
名稱、操作類型、數(shù)據(jù)以及發(fā)出消息的源主機(jī)標(biāo)識(shí):
@Data @NoArgsConstructor @AllArgsConstructor public class CacheMassage implements Serializable { private static final long serialVersionUID = -3574997636829868400L; private String cacheName; private CacheMsgType type; //標(biāo)識(shí)更新或刪除操作 private Object key; private Object value; private String msgSource; //源主機(jī)標(biāo)識(shí),用來(lái)避免重復(fù)操作 }
定義一個(gè)枚舉來(lái)標(biāo)識(shí)消息的類型,是要進(jìn)行更新還是刪除操作:
public enum CacheMsgType { UPDATE, DELETE; }
消息體中的msgSource
是添加的一個(gè)消息源主機(jī)的標(biāo)識(shí),添加這個(gè)是為了避免收到當(dāng)前主機(jī)發(fā)送的消息后,再進(jìn)行重復(fù)操作,也就是說(shuō)收到本機(jī)發(fā)出的消息直接丟掉什么都不做就可以了。源主機(jī)標(biāo)識(shí)這里使用的是主機(jī)ip加項(xiàng)目端口的方式,獲取方法如下:
public static String getMsgSource() throws UnknownHostException { String host = InetAddress.getLocalHost().getHostAddress(); Environment env = SpringContextUtil.getBean(Environment.class); String port = env.getProperty("server.port"); return host+":"+port; }
這樣消息體的定義就完成了,之后只要調(diào)用redisTemplate
的convertAndSend
方法就可以把這個(gè)對(duì)象發(fā)布到指定的主題上了。
Redis消息配置
要使用Redis的消息監(jiān)聽(tīng)功能,需要配置兩項(xiàng)內(nèi)容:
MessageListenerAdapter
:消息監(jiān)聽(tīng)適配器,可以在其中指定自定義的監(jiān)聽(tīng)代理類,并且可以自定義使用哪個(gè)方法處理監(jiān)聽(tīng)邏輯RedisMessageListenerContainer
: 一個(gè)可以為消息監(jiān)聽(tīng)器提供異步行為的容器,并且提供消息轉(zhuǎn)換和分派等底層功能
@Configuration public class MessageConfig { public static final String TOPIC="cache.msg"; @Bean RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter, RedisConnectionFactory redisConnectionFactory){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic(TOPIC)); return container; } @Bean MessageListenerAdapter adapter(RedisMessageReceiver receiver){ return new MessageListenerAdapter(receiver,"receive"); } }
在上面的監(jiān)聽(tīng)適配器MessageListenerAdapter
中,我們傳入了一個(gè)自定義的RedisMessageReceiver
接收并處理消息,并指定使用它的receive
方法來(lái)處理監(jiān)聽(tīng)到的消息,下面我們就來(lái)看看它如何接收消息并消費(fèi)。
消息消費(fèi)邏輯
定義一個(gè)類RedisMessageReceiver
來(lái)接收并消費(fèi)消息,需要在它的方法中實(shí)現(xiàn)以下功能:
- 反序列化接收到的消息,轉(zhuǎn)換為前面定義的
CacheMassage
類型對(duì)象 - 根據(jù)消息的主機(jī)標(biāo)識(shí)判斷這條消息是不是本機(jī)發(fā)出的,如果是那么直接丟棄,只有接收到其他主機(jī)發(fā)出的消息才進(jìn)行處理
- 使用
cacheName
得到具體使用的那一個(gè)DoubleCache
實(shí)例 - 根據(jù)消息的類型判斷要執(zhí)行的是更新還是刪除操作,調(diào)用對(duì)應(yīng)的方法
@Slf4j @Component @AllArgsConstructor public class RedisMessageReceiver { private final RedisTemplate redisTemplate; private final DoubleCacheManager manager; //接收通知,進(jìn)行處理 public void receive(String message) throws UnknownHostException { CacheMassage msg = (CacheMassage) redisTemplate .getValueSerializer().deserialize(message.getBytes()); log.info(msg.toString()); //如果是本機(jī)發(fā)出的消息,那么不進(jìn)行處理 if (msg.getMsgSource().equals(MessageSourceUtil.getMsgSource())){ log.info("收到本機(jī)發(fā)出的消息,不做處理"); return; } DoubleCache cache = (DoubleCache) manager.getCache(msg.getCacheName()); if (msg.getType()== CacheMsgType.UPDATE) { cache.updateL1Cache(msg.getKey(),msg.getValue()); log.info("更新本地緩存"); } if (msg.getType()== CacheMsgType.DELETE) { log.info("刪除本地緩存"); cache.evictL1Cache(msg.getKey()); } } }
在上面的代碼中,調(diào)用了DoubleCache
中更新一級(jí)緩存方法updateL1Cache
、刪除一級(jí)緩存方法evictL1Cache
,我們會(huì)后面在DoubleCache
中進(jìn)行添加。
修改DoubleCache
在DoubleCache
中先添加上面提到的兩個(gè)方法,由CacheManager
獲取到具體緩存后調(diào)用,進(jìn)行一級(jí)緩存的更新或刪除操作:
// 更新一級(jí)緩存 public void updateL1Cache(Object key,Object value){ caffeineCache.put(key,value); } // 刪除一級(jí)緩存 public void evictL1Cache(Object key){ caffeineCache.invalidate(key); }
好了,完事具備只欠東風(fēng),我們要在什么場(chǎng)合發(fā)送消息呢?答案是在DoubleCache
中存入緩存的put
方法和移除緩存的evict
方法中。首先修改put
方法,方法中前面的邏輯不變,在最后添加發(fā)送消息通知其他節(jié)點(diǎn)更新一級(jí)緩存的邏輯:
public void put(Object key, Object value) { // 省略前面的不變代碼... //發(fā)送信息通知其他節(jié)點(diǎn)更新一級(jí)緩存 CacheMassage cacheMassage = new CacheMassage(this.cacheName, CacheMsgType.UPDATE, key,value, MessageSourceUtil.getMsgSource()); redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage); }
然后修改evict
方法,同樣保持前面的邏輯不變,在最后添加發(fā)送消息的代碼:
public void evict(Object key) { // 省略前面的不變代碼... //發(fā)送信息通知其他節(jié)點(diǎn)刪除一級(jí)緩存 CacheMassage cacheMassage = new CacheMassage(this.cacheName, CacheMsgType.DELETE, key,null, MessageSourceUtil.getMsgSource()); redisTemplate.convertAndSend(MessageConfig.TOPIC,cacheMassage); }
適配分布式環(huán)境的改造工作到此結(jié)束,下面進(jìn)行一下簡(jiǎn)單的測(cè)試工作。
測(cè)試
我們可以用idea
的Allow parallel run
功能同時(shí)啟動(dòng)兩個(gè)一樣的springboot項(xiàng)目,來(lái)模擬分布式環(huán)境下的兩臺(tái)主機(jī),注意在啟動(dòng)參數(shù)中添加-Dserver.port
參數(shù)來(lái)啟動(dòng)到不同端口。
首先測(cè)試更新操作,使用接口修改某一個(gè)主機(jī)的本地緩存,可以看到發(fā)出消息的主機(jī)在收到消息后,直接丟棄不做任何處理:
查看另一臺(tái)主機(jī)的日志,收到消息并更新了本地緩存:
再看一下緩存的刪除情況,同樣本地刪除后再收到消息不做處理:
看另一臺(tái)主機(jī)收到消息后,會(huì)刪除本地的一級(jí)緩存:
可以看到,分布式環(huán)境下本地緩存通過(guò)Redis
消息的發(fā)布訂閱機(jī)制保證了一級(jí)緩存的一致性。
另外,如果更加嚴(yán)謹(jǐn)一些的話,其實(shí)還應(yīng)該處理一下緩存更新失敗的情況,這里留個(gè)坑以后再填。簡(jiǎn)單說(shuō)一下思路,我們應(yīng)該在代碼中捕獲緩存更新失敗的異常,然后刪除二級(jí)緩存、本機(jī)以及其他主機(jī)的一級(jí)緩存,再等待下一次訪問(wèn)時(shí)直接拉取最新的數(shù)據(jù)進(jìn)行緩存。同樣,要想實(shí)現(xiàn)緩存失效同時(shí)作用于所有單機(jī)節(jié)點(diǎn)的本地緩存這一功能,也可以使用上面的發(fā)布訂閱來(lái)實(shí)現(xiàn)。
總結(jié)
好了,這次縫縫補(bǔ)補(bǔ)的填坑之旅到這里就要結(jié)束了??梢钥吹绞褂没?code>JSR107規(guī)范的spring接口進(jìn)行修改后,代碼看起來(lái)舒服了很多,并且支持直接使用spring的@Cache
相關(guān)注解。如果想在項(xiàng)目中使用的話,自己封裝一個(gè)簡(jiǎn)單的starter
就可以了,使用起來(lái)也非常簡(jiǎn)單。
以上就是基于Spring接口集成Caffeine+Redis兩級(jí)緩存的詳細(xì)內(nèi)容,更多關(guān)于集成Caffeine+Redis兩級(jí)緩存的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Cloud 整合Apache-SkyWalking實(shí)現(xiàn)鏈路跟蹤的方法
這篇文章主要介紹了Spring Cloud 整合Apache-SkyWalking鏈路跟蹤的示例代碼,代碼簡(jiǎn)單易懂,通過(guò)圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06SpringBoot使用Hibernate攔截器實(shí)現(xiàn)時(shí)間自動(dòng)注入的操作代碼
這篇文章主要介紹了SpringBoot使用Hibernate攔截器實(shí)現(xiàn)時(shí)間自動(dòng)注入的操作代碼,主要包括hibernate攔截器的相關(guān)知識(shí),結(jié)合實(shí)例代碼給大家講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10關(guān)于gradle多模塊項(xiàng)目依賴管理方式
這篇文章主要介紹了關(guān)于gradle多模塊項(xiàng)目依賴管理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04Spring Boot 數(shù)據(jù)校驗(yàn)@Valid+統(tǒng)一異常處理的實(shí)現(xiàn)
這篇文章主要介紹了Spring Boot 數(shù)據(jù)校驗(yàn)@Valid+統(tǒng)一異常處理的實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-04-04idea創(chuàng)建SpringBoot項(xiàng)目時(shí)Type選maven?project和maven?pom有何區(qū)別
Maven是一個(gè)Java工程的管理工具,跟其相同功能的工具如Gradle,下面這篇文章主要給大家介紹了關(guān)于idea創(chuàng)建SpringBoot項(xiàng)目時(shí)Type選maven?project和maven?pom有何區(qū)別的相關(guān)資料,需要的朋友可以參考下2023-02-02