redis實現(xiàn)延遲任務(wù)的項目實踐
一、實現(xiàn)思路
問題思路
1.為什么任務(wù)需要存儲在數(shù)據(jù)庫中?
延遲任務(wù)是一個通用的服務(wù),任何需要延遲得任務(wù)都可以調(diào)用該服務(wù),需要考慮數(shù)據(jù)持久化的問題,存儲數(shù)據(jù)庫中是一種數(shù)據(jù)安全的考慮。
2.為什么redis中使用兩種數(shù)據(jù)類型,list和zset?
效率問題,算法的時間復(fù)雜度
3.在添加zset數(shù)據(jù)的時候,為什么不需要預(yù)加載?
任務(wù)模塊是一個通用的模塊,項目中任何需要延遲隊列的地方,都可以調(diào)用這個接口,要考慮到數(shù)據(jù)量的問題,如果數(shù)據(jù)量特別大,為了防止阻塞,只需要把未來幾分鐘要執(zhí)行的數(shù)據(jù)存入緩存即可。
二、延遲任務(wù)服務(wù)實現(xiàn)
1、搭建heima-leadnews-schedule模塊
leadnews-schedule是一個通用的服務(wù),單獨(dú)創(chuàng)建模塊來管理任何類型的延遲任務(wù)
①:導(dǎo)入資料文件夾下的heima-leadnews-schedule模塊到heima-leadnews-service下。
如下圖所示:
②:添加bootstrap.yml
server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
③:在nacos中添加對應(yīng)配置,并添加數(shù)據(jù)庫及mybatis-plus的配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root # 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名 type-aliases-package: com.heima.model.schedule.pojos
2、數(shù)據(jù)庫準(zhǔn)備
導(dǎo)入資料中l(wèi)eadnews_schedule數(shù)據(jù)庫
taskinfo 任務(wù)表
實體類
package com.heima.model.schedule.pojos; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.io.Serializable; import java.util.Date; /** * <p> * * </p> * * @author itheima */ @Data @TableName("taskinfo") public class Taskinfo implements Serializable { private static final long serialVersionUID = 1L; /** * 任務(wù)id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 執(zhí)行時間 */ @TableField("execute_time") private Date executeTime; /** * 參數(shù) */ @TableField("parameters") private byte[] parameters; /** * 優(yōu)先級 */ @TableField("priority") private Integer priority; /** * 任務(wù)類型 */ @TableField("task_type") private Integer taskType; }
taskinfo_logs 任務(wù)日志表
實體類
package com.heima.model.schedule.pojos; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.io.Serializable; import java.util.Date; /** * <p> * * </p> * * @author itheima */ @Data @TableName("taskinfo_logs") public class TaskinfoLogs implements Serializable { private static final long serialVersionUID = 1L; /** * 任務(wù)id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 執(zhí)行時間 */ @TableField("execute_time") private Date executeTime; /** * 參數(shù) */ @TableField("parameters") private byte[] parameters; /** * 優(yōu)先級 */ @TableField("priority") private Integer priority; /** * 任務(wù)類型 */ @TableField("task_type") private Integer taskType; /** * 版本號,用樂觀鎖 */ @Version private Integer version; /** * 狀態(tài) 0=int 1=EXECUTED 2=CANCELLED */ @TableField("status") private Integer status; }
樂觀鎖支持:
/** * mybatis-plus樂觀鎖支持 * @return */ @Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; }
3、安裝redis
①拉取鏡像
docker pull redis
② 創(chuàng)建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
③鏈接測試
打開資料中的Redis Desktop Manager,輸入host、port、password鏈接測試
能鏈接成功,即可
4、項目集成redis
①:在項目導(dǎo)入redis相關(guān)依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- redis依賴commons-pool 這個依賴一定要添加 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
②:在heima-leadnews-schedule中集成redis,添加以下nacos配置,鏈接上redis
spring: redis: host: 192.168.200.130 password: leadnews port: 6379
③:拷貝資料文件夾下的類:CacheService到heima-leadnews-common模塊下,并添加自動配置
cacheService.java
package com.heima.common.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.*; import org.springframework.data.redis.core.*; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import org.springframework.data.redis.core.types.Expiration; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @Component public class CacheService extends CachingConfigurerSupport { @Autowired private StringRedisTemplate stringRedisTemplate; public StringRedisTemplate getstringRedisTemplate() { return this.stringRedisTemplate; } /** -------------------key相關(guān)操作--------------------- */ /** * 刪除key * * @param key */ public void delete(String key) { stringRedisTemplate.delete(key); } /** * 批量刪除key * * @param keys */ public void delete(Collection<String> keys) { stringRedisTemplate.delete(keys); } /** * 序列化key * * @param key * @return */ public byte[] dump(String key) { return stringRedisTemplate.dump(key); } /** * 是否存在key * * @param key * @return */ public Boolean exists(String key) { return stringRedisTemplate.hasKey(key); } /** * 設(shè)置過期時間 * * @param key * @param timeout * @param unit * @return */ public Boolean expire(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.expire(key, timeout, unit); } /** * 設(shè)置過期時間 * * @param key * @param date * @return */ public Boolean expireAt(String key, Date date) { return stringRedisTemplate.expireAt(key, date); } /** * 查找匹配的key * * @param pattern * @return */ public Set<String> keys(String pattern) { return stringRedisTemplate.keys(pattern); } /** * 將當(dāng)前數(shù)據(jù)庫的 key 移動到給定的數(shù)據(jù)庫 db 當(dāng)中 * * @param key * @param dbIndex * @return */ public Boolean move(String key, int dbIndex) { return stringRedisTemplate.move(key, dbIndex); } /** * 移除 key 的過期時間,key 將持久保持 * * @param key * @return */ public Boolean persist(String key) { return stringRedisTemplate.persist(key); } /** * 返回 key 的剩余的過期時間 * * @param key * @param unit * @return */ public Long getExpire(String key, TimeUnit unit) { return stringRedisTemplate.getExpire(key, unit); } /** * 返回 key 的剩余的過期時間 * * @param key * @return */ public Long getExpire(String key) { return stringRedisTemplate.getExpire(key); } /** * 從當(dāng)前數(shù)據(jù)庫中隨機(jī)返回一個 key * * @return */ public String randomKey() { return stringRedisTemplate.randomKey(); } /** * 修改 key 的名稱 * * @param oldKey * @param newKey */ public void rename(String oldKey, String newKey) { stringRedisTemplate.rename(oldKey, newKey); } /** * 僅當(dāng) newkey 不存在時,將 oldKey 改名為 newkey * * @param oldKey * @param newKey * @return */ public Boolean renameIfAbsent(String oldKey, String newKey) { return stringRedisTemplate.renameIfAbsent(oldKey, newKey); } /** * 返回 key 所儲存的值的類型 * * @param key * @return */ public DataType type(String key) { return stringRedisTemplate.type(key); } /** -------------------string相關(guān)操作--------------------- */ /** * 設(shè)置指定 key 的值 * @param key * @param value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 獲取指定 key 的值 * @param key * @return */ public String get(String key) { return stringRedisTemplate.opsForValue().get(key); } /** * 返回 key 中字符串值的子字符 * @param key * @param start * @param end * @return */ public String getRange(String key, long start, long end) { return stringRedisTemplate.opsForValue().get(key, start, end); } /** * 將給定 key 的值設(shè)為 value ,并返回 key 的舊值(old value) * * @param key * @param value * @return */ public String getAndSet(String key, String value) { return stringRedisTemplate.opsForValue().getAndSet(key, value); } /** * 對 key 所儲存的字符串值,獲取指定偏移量上的位(bit) * * @param key * @param offset * @return */ public Boolean getBit(String key, long offset) { return stringRedisTemplate.opsForValue().getBit(key, offset); } /** * 批量獲取 * * @param keys * @return */ public List<String> multiGet(Collection<String> keys) { return stringRedisTemplate.opsForValue().multiGet(keys); } /** * 設(shè)置ASCII碼, 字符串'a'的ASCII碼是97, 轉(zhuǎn)為二進(jìn)制是'01100001', 此方法是將二進(jìn)制第offset位值變?yōu)関alue * * @param key * @param * @param value * 值,true為1, false為0 * @return */ public boolean setBit(String key, long offset, boolean value) { return stringRedisTemplate.opsForValue().setBit(key, offset, value); } /** * 將值 value 關(guān)聯(lián)到 key ,并將 key 的過期時間設(shè)為 timeout * * @param key * @param value * @param timeout * 過期時間 * @param unit * 時間單位, 天:TimeUnit.DAYS 小時:TimeUnit.HOURS 分鐘:TimeUnit.MINUTES * 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS */ public void setEx(String key, String value, long timeout, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, value, timeout, unit); } /** * 只有在 key 不存在時設(shè)置 key 的值 * * @param key * @param value * @return 之前已經(jīng)存在返回false,不存在返回true */ public boolean setIfAbsent(String key, String value) { return stringRedisTemplate.opsForValue().setIfAbsent(key, value); } /** * 用 value 參數(shù)覆寫給定 key 所儲存的字符串值,從偏移量 offset 開始 * * @param key * @param value * @param offset * 從指定位置開始覆寫 */ public void setRange(String key, String value, long offset) { stringRedisTemplate.opsForValue().set(key, value, offset); } /** * 獲取字符串的長度 * * @param key * @return */ public Long size(String key) { return stringRedisTemplate.opsForValue().size(key); } /** * 批量添加 * * @param maps */ public void multiSet(Map<String, String> maps) { stringRedisTemplate.opsForValue().multiSet(maps); } /** * 同時設(shè)置一個或多個 key-value 對,當(dāng)且僅當(dāng)所有給定 key 都不存在 * * @param maps * @return 之前已經(jīng)存在返回false,不存在返回true */ public boolean multiSetIfAbsent(Map<String, String> maps) { return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps); } /** * 增加(自增長), 負(fù)數(shù)則為自減 * * @param key * @param * @return */ public Long incrBy(String key, long increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * * @param key * @param * @return */ public Double incrByFloat(String key, double increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * 追加到末尾 * * @param key * @param value * @return */ public Integer append(String key, String value) { return stringRedisTemplate.opsForValue().append(key, value); } /** -------------------hash相關(guān)操作------------------------- */ /** * 獲取存儲在哈希表中指定字段的值 * * @param key * @param field * @return */ public Object hGet(String key, String field) { return stringRedisTemplate.opsForHash().get(key, field); } /** * 獲取所有給定字段的值 * * @param key * @return */ public Map<Object, Object> hGetAll(String key) { return stringRedisTemplate.opsForHash().entries(key); } /** * 獲取所有給定字段的值 * * @param key * @param fields * @return */ public List<Object> hMultiGet(String key, Collection<Object> fields) { return stringRedisTemplate.opsForHash().multiGet(key, fields); } public void hPut(String key, String hashKey, String value) { stringRedisTemplate.opsForHash().put(key, hashKey, value); } public void hPutAll(String key, Map<String, String> maps) { stringRedisTemplate.opsForHash().putAll(key, maps); } /** * 僅當(dāng)hashKey不存在時才設(shè)置 * * @param key * @param hashKey * @param value * @return */ public Boolean hPutIfAbsent(String key, String hashKey, String value) { return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value); } /** * 刪除一個或多個哈希表字段 * * @param key * @param fields * @return */ public Long hDelete(String key, Object... fields) { return stringRedisTemplate.opsForHash().delete(key, fields); } /** * 查看哈希表 key 中,指定的字段是否存在 * * @param key * @param field * @return */ public boolean hExists(String key, String field) { return stringRedisTemplate.opsForHash().hasKey(key, field); } /** * 為哈希表 key 中的指定字段的整數(shù)值加上增量 increment * * @param key * @param field * @param increment * @return */ public Long hIncrBy(String key, Object field, long increment) { return stringRedisTemplate.opsForHash().increment(key, field, increment); } /** * 為哈希表 key 中的指定字段的整數(shù)值加上增量 increment * * @param key * @param field * @param delta * @return */ public Double hIncrByFloat(String key, Object field, double delta) { return stringRedisTemplate.opsForHash().increment(key, field, delta); } /** * 獲取所有哈希表中的字段 * * @param key * @return */ public Set<Object> hKeys(String key) { return stringRedisTemplate.opsForHash().keys(key); } /** * 獲取哈希表中字段的數(shù)量 * * @param key * @return */ public Long hSize(String key) { return stringRedisTemplate.opsForHash().size(key); } /** * 獲取哈希表中所有值 * * @param key * @return */ public List<Object> hValues(String key) { return stringRedisTemplate.opsForHash().values(key); } /** * 迭代哈希表中的鍵值對 * * @param key * @param options * @return */ public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) { return stringRedisTemplate.opsForHash().scan(key, options); } /** ------------------------list相關(guān)操作---------------------------- */ /** * 通過索引獲取列表中的元素 * * @param key * @param index * @return */ public String lIndex(String key, long index) { return stringRedisTemplate.opsForList().index(key, index); } /** * 獲取列表指定范圍內(nèi)的元素 * * @param key * @param start * 開始位置, 0是開始位置 * @param end * 結(jié)束位置, -1返回所有 * @return */ public List<String> lRange(String key, long start, long end) { return stringRedisTemplate.opsForList().range(key, start, end); } /** * 存儲在list頭部 * * @param key * @param value * @return */ public Long lLeftPush(String key, String value) { return stringRedisTemplate.opsForList().leftPush(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, String... value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * 當(dāng)list存在的時候才加入 * * @param key * @param value * @return */ public Long lLeftPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().leftPushIfPresent(key, value); } /** * 如果pivot存在,再pivot前面添加 * * @param key * @param pivot * @param value * @return */ public Long lLeftPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().leftPush(key, pivot, value); } /** * * @param key * @param value * @return */ public Long lRightPush(String key, String value) { return stringRedisTemplate.opsForList().rightPush(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, String... value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * 為已存在的列表添加值 * * @param key * @param value * @return */ public Long lRightPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().rightPushIfPresent(key, value); } /** * 在pivot元素的右邊添加值 * * @param key * @param pivot * @param value * @return */ public Long lRightPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().rightPush(key, pivot, value); } /** * 通過索引設(shè)置列表元素的值 * * @param key * @param index * 位置 * @param value */ public void lSet(String key, long index, String value) { stringRedisTemplate.opsForList().set(key, index, value); } /** * 移出并獲取列表的第一個元素 * * @param key * @return 刪除的元素 */ public String lLeftPop(String key) { return stringRedisTemplate.opsForList().leftPop(key); } /** * 移出并獲取列表的第一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發(fā)現(xiàn)可彈出元素為止 * * @param key * @param timeout * 等待時間 * @param unit * 時間單位 * @return */ public String lBLeftPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().leftPop(key, timeout, unit); } /** * 移除并獲取列表最后一個元素 * * @param key * @return 刪除的元素 */ public String lRightPop(String key) { return stringRedisTemplate.opsForList().rightPop(key); } /** * 移出并獲取列表的最后一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發(fā)現(xiàn)可彈出元素為止 * * @param key * @param timeout * 等待時間 * @param unit * 時間單位 * @return */ public String lBRightPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPop(key, timeout, unit); } /** * 移除列表的最后一個元素,并將該元素添加到另一個列表并返回 * * @param sourceKey * @param destinationKey * @return */ public String lRightPopAndLeftPush(String sourceKey, String destinationKey) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey); } /** * 從列表中彈出一個值,將彈出的元素插入到另外一個列表中并返回它; 如果列表沒有元素會阻塞列表直到等待超時或發(fā)現(xiàn)可彈出元素為止 * * @param sourceKey * @param destinationKey * @param timeout * @param unit * @return */ public String lBRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit); } /** * 刪除集合中值等于value得元素 * * @param key * @param index * index=0, 刪除所有值等于value的元素; index>0, 從頭部開始刪除第一個值等于value的元素; * index<0, 從尾部開始刪除第一個值等于value的元素; * @param value * @return */ public Long lRemove(String key, long index, String value) { return stringRedisTemplate.opsForList().remove(key, index, value); } /** * 裁剪list * * @param key * @param start * @param end */ public void lTrim(String key, long start, long end) { stringRedisTemplate.opsForList().trim(key, start, end); } /** * 獲取列表長度 * * @param key * @return */ public Long lLen(String key) { return stringRedisTemplate.opsForList().size(key); } /** --------------------set相關(guān)操作-------------------------- */ /** * set添加元素 * * @param key * @param values * @return */ public Long sAdd(String key, String... values) { return stringRedisTemplate.opsForSet().add(key, values); } /** * set移除元素 * * @param key * @param values * @return */ public Long sRemove(String key, Object... values) { return stringRedisTemplate.opsForSet().remove(key, values); } /** * 移除并返回集合的一個隨機(jī)元素 * * @param key * @return */ public String sPop(String key) { return stringRedisTemplate.opsForSet().pop(key); } /** * 將元素value從一個集合移到另一個集合 * * @param key * @param value * @param destKey * @return */ public Boolean sMove(String key, String value, String destKey) { return stringRedisTemplate.opsForSet().move(key, value, destKey); } /** * 獲取集合的大小 * * @param key * @return */ public Long sSize(String key) { return stringRedisTemplate.opsForSet().size(key); } /** * 判斷集合是否包含value * * @param key * @param value * @return */ public Boolean sIsMember(String key, Object value) { return stringRedisTemplate.opsForSet().isMember(key, value); } /** * 獲取兩個集合的交集 * * @param key * @param otherKey * @return */ public Set<String> sIntersect(String key, String otherKey) { return stringRedisTemplate.opsForSet().intersect(key, otherKey); } /** * 獲取key集合與多個集合的交集 * * @param key * @param otherKeys * @return */ public Set<String> sIntersect(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().intersect(key, otherKeys); } /** * key集合與otherKey集合的交集存儲到destKey集合中 * * @param key * @param otherKey * @param destKey * @return */ public Long sIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey, destKey); } /** * key集合與多個集合的交集存儲到destKey集合中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys, destKey); } /** * 獲取兩個集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, String otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * 獲取key集合與多個集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * key集合與otherKey集合的并集存儲到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey); } /** * key集合與多個集合的并集存儲到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey); } /** * 獲取兩個集合的差集 * * @param key * @param otherKey * @return */ public Set<String> sDifference(String key, String otherKey) { return stringRedisTemplate.opsForSet().difference(key, otherKey); } /** * 獲取key集合與多個集合的差集 * * @param key * @param otherKeys * @return */ public Set<String> sDifference(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().difference(key, otherKeys); } /** * key集合與otherKey集合的差集存儲到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sDifference(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey, destKey); } /** * key集合與多個集合的差集存儲到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sDifference(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys, destKey); } /** * 獲取集合所有元素 * * @param key * @param * @param * @return */ public Set<String> setMembers(String key) { return stringRedisTemplate.opsForSet().members(key); } /** * 隨機(jī)獲取集合中的一個元素 * * @param key * @return */ public String sRandomMember(String key) { return stringRedisTemplate.opsForSet().randomMember(key); } /** * 隨機(jī)獲取集合中count個元素 * * @param key * @param count * @return */ public List<String> sRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().randomMembers(key, count); } /** * 隨機(jī)獲取集合中count個元素并且去除重復(fù)的 * * @param key * @param count * @return */ public Set<String> sDistinctRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count); } /** * * @param key * @param options * @return */ public Cursor<String> sScan(String key, ScanOptions options) { return stringRedisTemplate.opsForSet().scan(key, options); } /**------------------zSet相關(guān)操作--------------------------------*/ /** * 添加元素,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @param score * @return */ public Boolean zAdd(String key, String value, double score) { return stringRedisTemplate.opsForZSet().add(key, value, score); } /** * * @param key * @param values * @return */ public Long zAdd(String key, Set<TypedTuple<String>> values) { return stringRedisTemplate.opsForZSet().add(key, values); } /** * * @param key * @param values * @return */ public Long zRemove(String key, Object... values) { return stringRedisTemplate.opsForZSet().remove(key, values); } public Long zRemove(String key, Collection<String> values) { if(values!=null&&!values.isEmpty()){ Object[] objs = values.toArray(new Object[values.size()]); return stringRedisTemplate.opsForZSet().remove(key, objs); } return 0L; } /** * 增加元素的score值,并返回增加后的值 * * @param key * @param value * @param delta * @return */ public Double zIncrementScore(String key, String value, double delta) { return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta); } /** * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @return 0表示第一位 */ public Long zRank(String key, Object value) { return stringRedisTemplate.opsForZSet().rank(key, value); } /** * 返回元素在集合的排名,按元素的score值由大到小排列 * * @param key * @param value * @return */ public Long zReverseRank(String key, Object value) { return stringRedisTemplate.opsForZSet().reverseRank(key, value); } /** * 獲取集合的元素, 從小到大排序 * * @param key * @param start * 開始位置 * @param end * 結(jié)束位置, -1查詢所有 * @return */ public Set<String> zRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().range(key, start, end); } /** * 獲取zset集合的所有元素, 從小到大排序 * */ public Set<String> zRangeAll(String key) { return zRange(key,0,-1); } /** * 獲取集合元素, 并且把score值也獲取 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end); } /** * 根據(jù)Score值查詢集合元素 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<String> zRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max); } /** * 根據(jù)Score值查詢集合元素, 從小到大排序 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max, start, end); } /** * 獲取集合的元素, 從大到小排序 * * @param key * @param start * @param end * @return */ public Set<String> zReverseRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRange(key, start, end); } public Set<String> zReverseRangeByScore(String key, long min, long max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 獲取集合的元素, 從大到小排序, 并返回score值 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zReverseRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start, end); } /** * 根據(jù)Score值查詢集合元素, 從大到小排序 * * @param key * @param min * @param max * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 根據(jù)Score值查詢集合元素, 從大到小排序 * * @param key * @param min * @param max * @return */ public Set<TypedTuple<String>> zReverseRangeByScoreWithScores( String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max, start, end); } /** * 根據(jù)score值獲取集合元素數(shù)量 * * @param key * @param min * @param max * @return */ public Long zCount(String key, double min, double max) { return stringRedisTemplate.opsForZSet().count(key, min, max); } /** * 獲取集合大小 * * @param key * @return */ public Long zSize(String key) { return stringRedisTemplate.opsForZSet().size(key); } /** * 獲取集合大小 * * @param key * @return */ public Long zZCard(String key) { return stringRedisTemplate.opsForZSet().zCard(key); } /** * 獲取集合中value元素的score值 * * @param key * @param value * @return */ public Double zScore(String key, Object value) { return stringRedisTemplate.opsForZSet().score(key, value); } /** * 移除指定索引位置的成員 * * @param key * @param start * @param end * @return */ public Long zRemoveRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().removeRange(key, start, end); } /** * 根據(jù)指定的score值的范圍來移除成員 * * @param key * @param min * @param max * @return */ public Long zRemoveRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max); } /** * 獲取key和otherKey的并集并存儲在destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long zUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey); } /** * * @param key * @param otherKeys * @param destKey * @return */ public Long zUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet() .unionAndStore(key, otherKeys, destKey); } /** * 交集 * * @param key * @param otherKey * @param destKey * @return */ public Long zIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey, destKey); } /** * 交集 * * @param key * @param otherKeys * @param destKey * @return */ public Long zIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys, destKey); } /** * * @param key * @param options * @return */ public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) { return stringRedisTemplate.opsForZSet().scan(key, options); } /** * 掃描主鍵,建議使用 * @param patten * @return */ public Set<String> scan(String patten){ Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> { Set<String> result = new HashSet<>(); try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder() .match(patten).count(10000).build())) { while (cursor.hasNext()) { result.add(new String(cursor.next())); } } catch (IOException e) { e.printStackTrace(); } return result; }); return keys; } /** * 管道技術(shù),提高性能 * @param type * @param values * @return */ public List<Object> lRightPushPipeline(String type,Collection<String> values){ List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection)connection; //集合轉(zhuǎn)換數(shù)組 String[] strings = values.toArray(new String[values.size()]); //直接批量發(fā)送 stringRedisConn.rPush(type, strings); return null; } }); return results; } public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){ List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection; String[] strings = values.toArray(new String[values.size()]); stringRedisConnection.rPush(topic_key,strings); stringRedisConnection.zRem(future_key,strings); return null; } }); return objects; } /** * 加鎖 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //參考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; } }
④:測試
package com.heima.schedule.test; import com.heima.common.redis.CacheService; import com.heima.schedule.ScheduleApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Set; @SpringBootTest(classes = ScheduleApplication.class) @RunWith(SpringRunner.class) public class RedisTest { @Autowired private CacheService cacheService; @Test public void testList(){ //在list的左邊添加元素 // cacheService.lLeftPush("list_001","hello,redis"); //在list的右邊獲取元素,并刪除 String list_001 = cacheService.lRightPop("list_001"); System.out.println(list_001); } @Test public void testZset(){ //添加數(shù)據(jù)到zset中 分值 /*cacheService.zAdd("zset_key_001","hello zset 001",1000); cacheService.zAdd("zset_key_001","hello zset 002",8888); cacheService.zAdd("zset_key_001","hello zset 003",7777); cacheService.zAdd("zset_key_001","hello zset 004",999999);*/ //按照分值獲取數(shù)據(jù) Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888); System.out.println(zset_key_001); } }
5、添加任務(wù)
①:拷貝mybatis-plus生成的文件,mapper
②:創(chuàng)建task類,用于接收添加任務(wù)的參數(shù)
package com.heima.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { /** * 任務(wù)id */ private Long taskId; /** * 類型 */ private Integer taskType; /** * 優(yōu)先級 */ private Integer priority; /** * 執(zhí)行id */ private long executeTime; /** * task參數(shù) */ private byte[] parameters; }
③:創(chuàng)建TaskService
package com.heima.schedule.service; import com.heima.model.schedule.dtos.Task; /** * 對外訪問接口 */ public interface TaskService { /** * 添加任務(wù) * @param task 任務(wù)對象 * @return 任務(wù)id */ public long addTask(Task task) ; }
實現(xiàn):
package com.heima.schedule.service.impl; import com.alibaba.fastjson.JSON; import com.heima.common.constants.ScheduleConstants; import com.heima.common.redis.CacheService; import com.heima.model.schedule.dtos.Task; import com.heima.model.schedule.pojos.Taskinfo; import com.heima.model.schedule.pojos.TaskinfoLogs; import com.heima.schedule.mapper.TaskinfoLogsMapper; import com.heima.schedule.mapper.TaskinfoMapper; import com.heima.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Calendar; import java.util.Date; @Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService { /** * 添加延遲任務(wù) * * @param task * @return */ @Override public long addTask(Task task) { //1.添加任務(wù)到數(shù)據(jù)庫中 boolean success = addTaskToDb(task); if (success) { //2.添加任務(wù)到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private CacheService cacheService; /** * 把任務(wù)添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //獲取5分鐘之后的時間 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任務(wù)的執(zhí)行時間小于等于當(dāng)前時間,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任務(wù)的執(zhí)行時間大于當(dāng)前時間 && 小于等于預(yù)設(shè)時間(未來5分鐘) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } } @Autowired private TaskinfoMapper taskinfoMapper; @Autowired private TaskinfoLogsMapper taskinfoLogsMapper; /** * 添加任務(wù)到數(shù)據(jù)庫中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任務(wù)表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //設(shè)置taskID task.setTaskId(taskinfo.getTaskId()); //保存任務(wù)日志數(shù)據(jù) TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } }
ScheduleConstants常量類
package com.heima.common.constants; public class ScheduleConstants { //task狀態(tài) public static final int SCHEDULED=0; //初始化狀態(tài) public static final int EXECUTED=1; //已執(zhí)行狀態(tài) public static final int CANCELLED=2; //已取消狀態(tài) public static String FUTURE="future_"; //未來數(shù)據(jù)key前綴 public static String TOPIC="topic_"; //當(dāng)前數(shù)據(jù)key前綴 }
④:測試
6、取消任務(wù)
在TaskService中添加方法
/** * 取消任務(wù) * @param taskId 任務(wù)id * @return 取消結(jié)果 */ public boolean cancelTask(long taskId);
實現(xiàn)
/** * 取消任務(wù) * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //刪除任務(wù),更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //刪除redis的數(shù)據(jù) if(task != null){ removeTaskFromCache(task); flag = true; } return false; } /** * 刪除redis中的任務(wù)數(shù)據(jù) * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"_"+task.getPriority(); if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } } /** * 刪除任務(wù),更新任務(wù)日志狀態(tài) * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //刪除任務(wù) taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); } return task; }
測試
7、消費(fèi)任務(wù)
在TaskService中添加方法
/** * 按照類型和優(yōu)先級來拉取任務(wù) * @param type * @param priority * @return */ public Task poll(int type,int priority);
實現(xiàn)
/** * 按照類型和優(yōu)先級拉取任務(wù) * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); //更新數(shù)據(jù)庫信息 updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; }
8、未來數(shù)據(jù)定時刷新
1.reids key值匹配
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很強(qiáng)大,但是在生產(chǎn)環(huán)境需要慎用!開發(fā)中使用keys的模糊匹配卻發(fā)現(xiàn)redis的CPU使用率極高,所以公司的redis生產(chǎn)環(huán)境將keys命令禁用了!redis是單線程,會被堵塞。
方案2:scan
SCAN 命令是一個基于游標(biāo)的迭代器,SCAN命令每次被調(diào)用之后, 都會向用戶返回一個新的游標(biāo), 用戶在下次迭代時需要使用這個新游標(biāo)作為SCAN命令的游標(biāo)參數(shù), 以此來延續(xù)之前的迭代過程。
代碼案例:
@Test public void testKeys(){ Set<String> keys = cacheService.keys("future_*"); System.out.println(keys); Set<String> scan = cacheService.scan("future_*"); System.out.println(scan); }
2.reids管道
普通redis客戶端和服務(wù)器交互模式
Pipeline請求模型
官方測試結(jié)果數(shù)據(jù)對比
測試案例對比:
//耗時6151 @Test public void testPiple1(){ long start =System.currentTimeMillis(); for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); cacheService.lLeftPush("1001_1", JSON.toJSONString(task)); } System.out.println("耗時"+(System.currentTimeMillis()- start)); } @Test public void testPiple2(){ long start = System.currentTimeMillis(); //使用管道技術(shù) List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes()); } return null; } }); System.out.println("使用管道技術(shù)執(zhí)行10000次自增操作共耗時:"+(System.currentTimeMillis()-start)+"毫秒"); }
3.未來數(shù)據(jù)定時刷新-功能完成
在TaskService中添加方法
@Scheduled(cron = "0 */1 * * * ?") public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "執(zhí)行了定時任務(wù)"); // 獲取所有未來數(shù)據(jù)集合的key值 Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_* for (String futureKey : futureKeys) { // future_250_250 String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //獲取該組key下當(dāng)前需要消費(fèi)的任務(wù)數(shù)據(jù) Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { //將這些任務(wù)數(shù)據(jù)添加到消費(fèi)者隊列中 cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的將" + futureKey + "下的當(dāng)前需要執(zhí)行的任務(wù)數(shù)據(jù)刷新到" + topicKey + "下"); } } }
在引導(dǎo)類中添加開啟任務(wù)調(diào)度注解:@EnableScheduling
9、分布式鎖解決集群下的方法搶占執(zhí)行
1.問題描述
啟動兩臺heima-leadnews-schedule服務(wù),每臺服務(wù)都會去執(zhí)行refresh定時任務(wù)方法
2.分布式鎖
分布式鎖:控制分布式系統(tǒng)有序的去對共享資源進(jìn)行操作,通過互斥來保證數(shù)據(jù)的一致性。
解決方案:
3.redis分布式鎖
sexnx (SET if Not eXists) 命令在指定的 key 不存在時,為 key 設(shè)置指定的值。
這種加鎖的思路是,如果 key 不存在則為 key 設(shè)置 value,如果 key 已存在則 SETNX 命令不做任何操作
- 客戶端A請求服務(wù)器設(shè)置key的值,如果設(shè)置成功就表示加鎖成功
- 客戶端B也去請求服務(wù)器設(shè)置key的值,如果返回失敗,那么就代表加鎖失敗
- 客戶端A執(zhí)行代碼完成,刪除鎖
- 客戶端B在等待一段時間后再去請求設(shè)置key的值,設(shè)置成功
- 客戶端B執(zhí)行代碼完成,刪除鎖
4.在工具類CacheService中添加方法
/** * 加鎖 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //參考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
修改未來數(shù)據(jù)定時刷新的方法,如下:
/** * 未來數(shù)據(jù)定時刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未來數(shù)據(jù)定時刷新---定時任務(wù)"); //獲取所有未來數(shù)據(jù)的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //獲取當(dāng)前數(shù)據(jù)的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查詢符合條件的數(shù)據(jù) Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步數(shù)據(jù) if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的將"+futureKey+"刷新到了"+topicKey); } } } }
10、數(shù)據(jù)庫同步到redis
@Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("數(shù)據(jù)庫數(shù)據(jù)同步到緩存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未來5分鐘的所有任務(wù) List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache(){ // 刪除緩存中未來數(shù)據(jù)集合和當(dāng)前消費(fèi)者隊列的所有key Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); }
三、延遲隊列解決精準(zhǔn)時間發(fā)布文章
1、延遲隊列服務(wù)提供對外接口
提供遠(yuǎn)程的feign接口,在heima-leadnews-feign-api編寫類如下:
package com.heima.apis.schedule; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { /** * 添加任務(wù) * @param task 任務(wù)對象 * @return 任務(wù)id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); /** * 取消任務(wù) * @param taskId 任務(wù)id * @return 取消結(jié)果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); /** * 按照類型和優(yōu)先級來拉取任務(wù) * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
在heima-leadnews-schedule微服務(wù)下提供對應(yīng)的實現(xiàn)
package com.heima.schedule.feign; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; /** * 添加任務(wù) * @param task 任務(wù)對象 * @return 任務(wù)id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } /** * 取消任務(wù) * @param taskId 任務(wù)id * @return 取消結(jié)果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } /** * 按照類型和優(yōu)先級來拉取任務(wù) * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
2、發(fā)布文章集成添加延遲隊列接口
在創(chuàng)建WmNewsTaskService
package com.heima.wemedia.service; import com.heima.model.wemedia.pojos.WmNews; public interface WmNewsTaskService { /** * 添加任務(wù)到延遲隊列中 * @param id 文章的id * @param publishTime 發(fā)布的時間 可以做為任務(wù)的執(zhí)行時間 */ public void addNewsToTask(Integer id, Date publishTime); }
實現(xiàn):
package com.heima.wemedia.service.impl; import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.enums.TaskTypeEnum; import com.heima.model.schedule.dtos.Task; import com.heima.model.wemedia.pojos.WmNews; import com.heima.utils.common.ProtostuffUtil; import com.heima.wemedia.service.WmNewsTaskService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; /** * 添加任務(wù)到延遲隊列中 * @param id 文章的id * @param publishTime 發(fā)布的時間 可以做為任務(wù)的執(zhí)行時間 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任務(wù)到延遲服務(wù)中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任務(wù)到延遲服務(wù)中----end"); } }
枚舉類:
package com.heima.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定時審核"), REMOTEERROR(1002, 2,"第三方接口調(diào)用失敗,重試"); private final int taskType; //對應(yīng)具體業(yè)務(wù) private final int priority; //業(yè)務(wù)不同級別 private final String desc; //描述信息 }
序列化工具對比
- JdkSerialize:java內(nèi)置的序列化能將實現(xiàn)了Serilazable接口的對象進(jìn)行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化對象生成字節(jié)數(shù)組
- Protostuff:google開源的protostuff采用更為緊湊的二進(jìn)制數(shù)組,表現(xiàn)更加優(yōu)異,然后使用protostuff的編譯工具生成pojo類
拷貝資料中的兩個類到heima-leadnews-utils下
JdkSerializeUtil.java
package com.heima.utils.common; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** * jdk序列化 */ public class JdkSerializeUtil { /** * 序列化 * @param obj * @param <T> * @return */ public static <T> byte[] serialize(T obj) { if (obj == null){ throw new NullPointerException(); } ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); return bos.toByteArray(); } catch (Exception ex) { ex.printStackTrace(); } return new byte[0]; } /** * 反序列化 * @param data * @param clazz * @param <T> * @return */ public static <T> T deserialize(byte[] data, Class<T> clazz) { ByteArrayInputStream bis = new ByteArrayInputStream(data); try { ObjectInputStream ois = new ObjectInputStream(bis); T obj = (T)ois.readObject(); return obj; } catch (Exception ex) { ex.printStackTrace(); } return null; } }
ProtostuffUtil.java
package com.heima.utils.common; import com.heima.model.wemedia.pojos.WmNews; import io.protostuff.LinkedBuffer; import io.protostuff.ProtostuffIOUtil; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; public class ProtostuffUtil { /** * 序列化 * @param t * @param <T> * @return */ public static <T> byte[] serialize(T t){ Schema schema = RuntimeSchema.getSchema(t.getClass()); return ProtostuffIOUtil.toByteArray(t,schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); } /** * 反序列化 * @param bytes * @param c * @param <T> * @return */ public static <T> T deserialize(byte []bytes,Class<T> c) { T t = null; try { t = c.newInstance(); Schema schema = RuntimeSchema.getSchema(t.getClass()); ProtostuffIOUtil.mergeFrom(bytes,t,schema); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return t; } /** * jdk序列化與protostuff序列化對比 * @param args */ public static void main(String[] args) { long start =System.currentTimeMillis(); for (int i = 0; i <1000000 ; i++) { WmNews wmNews =new WmNews(); JdkSerializeUtil.serialize(wmNews); } System.out.println(" jdk 花費(fèi) "+(System.currentTimeMillis()-start)); start =System.currentTimeMillis(); for (int i = 0; i <1000000 ; i++) { WmNews wmNews =new WmNews(); ProtostuffUtil.serialize(wmNews); } System.out.println(" protostuff 花費(fèi) "+(System.currentTimeMillis()-start)); } }
Protostuff需要引導(dǎo)依賴:
<dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.6.0</version> </dependency>
修改發(fā)布文章代碼:
把之前的異步調(diào)用修改為調(diào)用延遲任務(wù)
@Autowired private WmNewsTaskService wmNewsTaskService; /** * 發(fā)布修改文章或保存為草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.條件判斷 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //屬性拷貝 屬性名詞和類型相同才能拷貝 BeanUtils.copyProperties(dto,wmNews); //封面圖片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果當(dāng)前封面類型為自動 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判斷是否為草稿 如果為草稿結(jié)束當(dāng)前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章內(nèi)容圖片與素材的關(guān)系 //獲取到文章內(nèi)容中的圖片信息 List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面圖片與素材的關(guān)系,如果當(dāng)前布局是自動,需要匹配封面圖片 saveRelativeInfoForCover(dto,wmNews,materials); //審核文章 // wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
3、消費(fèi)任務(wù)進(jìn)行審核文章
WmNewsTaskService中添加方法
/** * 消費(fèi)延遲隊列數(shù)據(jù) */ public void scanNewsByTask();
實現(xiàn)
@Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService; /** * 消費(fèi)延遲隊列數(shù)據(jù) */ @Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() { log.info("文章審核---消費(fèi)任務(wù)執(zhí)行---begin---"); ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章審核---消費(fèi)任務(wù)執(zhí)行---end---"); }
到此這篇關(guān)于redis實現(xiàn)延遲任務(wù)的項目實踐的文章就介紹到這了,更多相關(guān)redis 延遲任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
redis在Linux系統(tǒng)下的環(huán)境配置和redis的全局命令大全
在Linux系統(tǒng)中我們經(jīng)常使用Redis作為高性能的緩存數(shù)據(jù)庫,然而有時候我們需要在系統(tǒng)中多個地方使用Redis命令,這就需要將Redis的全局命令設(shè)置好,這篇文章主要給大家介紹了關(guān)于redis在Linux系統(tǒng)下的環(huán)境配置和redis的全局命令大全的相關(guān)資料,需要的朋友可以參考下2024-05-05Redis實現(xiàn)數(shù)據(jù)的交集、并集、補(bǔ)集的示例
本文主要介紹了Redis實現(xiàn)數(shù)據(jù)的交集、并集、補(bǔ)集的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08