欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis實現(xiàn)分布式鎖和等待序列的方法示例

 更新時間:2019年06月18日 08:26:13   作者:小小小LIN子  
這篇文章主要介紹了Redis實現(xiàn)分布式鎖和等待序列的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

在集群下,經(jīng)常會因為同時處理發(fā)生資源爭搶和并發(fā)問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說白了在集群下沒啥用。這時我們就需要能在多臺 JVM 之間決定執(zhí)行順序的鎖了,現(xiàn)在分布式鎖主要有 redis 、 Zookeeper 實現(xiàn)的,還有數(shù)據(jù)庫的方式,不過性能太差,也就是需要一個第三方的監(jiān)管。

背景

最近在做一個消費 Kafka 消息的時候發(fā)現(xiàn),由于線上的消費者過多,經(jīng)常會遇到,多個機器同時處理一個主鍵類型的數(shù)據(jù)的情況發(fā)生,如果最后是執(zhí)行更新操作的話,也就是一個更新順序的問題,但是如果恰好都需要插入數(shù)據(jù)的時候,會出現(xiàn)主鍵重復(fù)的問題。這是生產(chǎn)上不被允許的(因為公司有異常監(jiān)管的機制,扣分啥的),這是就需要個分布式鎖了,斟酌后用了 Redis 的實現(xiàn)方式(因為網(wǎng)上例子多)

分析

redis 實現(xiàn)的分布式鎖,實現(xiàn)原理是 set 方法,因為多個線程同時請求的時候,只有一個線程可以成功并返回結(jié)果,還可以設(shè)置有效期,來避免死鎖的發(fā)生,一切都是這么的完美,不過有個問題,在 set 的時候,會直接返回結(jié)果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的線程進(jìn)程處理,有兩種方式

  • 丟棄
  • 等待重試 由于我們的系統(tǒng)需要這些數(shù)據(jù),那么只能重新嘗試獲取。這里使用 redis 的 List 類型實現(xiàn)等待序列的作用

代碼

直接上代碼 其實直接redis的工具類就可以解決了

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis隊列實現(xiàn)方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";

  private static final Long RELEASE_SUCCESS = 1L;

  private RedisUcUitl() {

  }
  /**
   * logger
   **/

  /**
   * 存儲redis隊列順序存儲 在隊列首部存入
   *
   * @param key  字節(jié)類型
   * @param value 字節(jié)類型
   */
  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {

    return jedis.lpush(key, value);
  
  }

  /**
   * 移除列表中最后一個元素 并將改元素添加入另一個列表中 ,當(dāng)列表為空時 將阻塞連接 直到等待超時
   *
   * @param srckey
   * @param dstkey
   * @param timeout 0 表示永不超時
   * @return
   */
  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {

    return jedis.brpoplpush(srckey, dstkey, timeout);

  }

  /**
   * 返回制定的key,起始位置的redis數(shù)據(jù)
   * @param redisKey
   * @param start
   * @param end -1 表示到最后
   * @return
   */
  public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
    
    return jedis.lrange(redisKey, start, end);
  }

  /**
   * 刪除key
   * @param redisKey
   */
  public static void delete(Jedis jedis, final byte[] redisKey) {
    
     return jedis.del(redisKey);
  }

  /**
   * 嘗試加鎖
   * @param lockKey key名稱
   * @param requestId 身份標(biāo)識
   * @param expireTime 過期時間
   * @return
   */
  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    return LOCK_SUCCESS.equals(result);

  }

  /**
   * 釋放鎖
   * @param lockKey key名稱
   * @param requestId 身份標(biāo)識
   * @return
   */
  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    return RELEASE_SUCCESS.equals(result);

  }
}

業(yè)務(wù)邏輯主要代碼如下

1.先消耗隊列中的

while(true){
  // 消費隊列
  try{
    // 被放入redis隊列的數(shù)據(jù) 序列化后的
    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
    if(bytes == null || bytes.isEmpty()){
      // 隊列中沒數(shù)據(jù)時退出
      break;
    }
    // 反序列化對象
    Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
    // 塞入唯一的值 防止被其他線程誤解鎖
    String requestId = UUID.randomUUID().toString();
    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
    if(lockGetFlag){
      // 成功獲取鎖 進(jìn)行業(yè)務(wù)處理
      //TODO
      // 處理完畢釋放鎖 
      boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);

    }else{
      // 未能獲得鎖放入等待隊列
     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
  
    }
    
  }catch(Exception e){
    break;
  }
  
}

2.處理最新接到的數(shù)據(jù)

同樣是走嘗試獲取鎖,獲取不到放入隊列的流程

一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下

public class ObjectSerialUtil {

  private ObjectSerialUtil() {
//    工具類
  }

  /**
   * 將Object對象序列化為byte[]
   *
   * @param obj 對象
   * @return byte數(shù)組
   * @throws Exception
   */
  public static byte[] objectToBytes(Object obj) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    byte[] bytes = bos.toByteArray();
    bos.close();
    oos.close();
    return bytes;
  }


  /**
   * 將bytes數(shù)組還原為對象
   *
   * @param bytes
   * @return
   * @throws Exception
   */
  public static Object bytesToObject(byte[] bytes) {
    try {
      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
      ObjectInputStream ois = new ObjectInputStream(bin);
      return ois.readObject();
    } catch (Exception e) {
      throw new BaseException("反序列化出錯!", e);
    }
  }
}

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Redis分布式鎖Redlock的實現(xiàn)

    Redis分布式鎖Redlock的實現(xiàn)

    本文主要介紹了Redis分布式鎖Redlock的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • 編譯安裝redisd的方法示例詳解

    編譯安裝redisd的方法示例詳解

    這篇文章主要介紹了編譯安裝redisd的方法示例詳解,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-02-02
  • Redis中?HyperLogLog數(shù)據(jù)類型使用小結(jié)

    Redis中?HyperLogLog數(shù)據(jù)類型使用小結(jié)

    Redis使用HyperLogLog的主要作用是在大數(shù)據(jù)流(view,IP,城市)的情況下進(jìn)行去重計數(shù),這篇文章主要介紹了Redis中?HyperLogLog數(shù)據(jù)類型使用總結(jié),需要的朋友可以參考下
    2023-03-03
  • Redis分布式鎖的10個坑總結(jié)

    Redis分布式鎖的10個坑總結(jié)

    日常開發(fā)中,經(jīng)常會碰到秒殺搶購等業(yè)務(wù),為了避免并發(fā)請求造成的庫存超賣等問題,我們一般會用到Redis分布式鎖,但是使用Redis分布式鎖,很容易踩坑哦,本文將給大家分析闡述,Redis分布式鎖的10個坑,需要的朋友可以參考下
    2023-05-05
  • redis key過期監(jiān)聽的實現(xiàn)示例

    redis key過期監(jiān)聽的實現(xiàn)示例

    在Redis中,我們可以為Key設(shè)置過期時間,當(dāng)Key的過期時間到達(dá)后,Redis會自動將該Key標(biāo)記為已失效,本文就來介紹一下redis key過期監(jiān)聽的實現(xiàn)示例,感興趣的可以了解一下
    2024-03-03
  • RedisDesktopManager遠(yuǎn)程連接redis的實現(xiàn)

    RedisDesktopManager遠(yuǎn)程連接redis的實現(xiàn)

    本文主要介紹了RedisDesktopManager遠(yuǎn)程連接redis的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-05-05
  • window下創(chuàng)建redis出現(xiàn)問題小結(jié)

    window下創(chuàng)建redis出現(xiàn)問題小結(jié)

    這篇文章主要介紹了window下創(chuàng)建redis出現(xiàn)問題總結(jié),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10
  • 詳解Redis中的List是如何實現(xiàn)的

    詳解Redis中的List是如何實現(xiàn)的

    List 的 Redis 中的 5 種主要數(shù)據(jù)結(jié)構(gòu)之一,它是一種序列集合,可以存儲一個有序的字符串列表,順序是插入的順序,本文將給大家介紹了一下Redis中的List是如何實現(xiàn)的,需要的朋友可以參考下
    2024-05-05
  • 如何使用redis中的zset實現(xiàn)滑動窗口限流

    如何使用redis中的zset實現(xiàn)滑動窗口限流

    滑動窗口限流是一種常見的流量控制方法,它限制了在一定時間窗口內(nèi)的請求數(shù)量,下面是使用Redis ZSet實現(xiàn)滑動窗口限流的一個簡單示例,需要的朋友可以參考下
    2023-09-09
  • Django使用redis配置緩存的方法

    Django使用redis配置緩存的方法

    Redis是一個內(nèi)存數(shù)據(jù)庫由于其性能極高,因此經(jīng)常作為中間件、緩存使用,緩存某些內(nèi)容是為了保存昂貴計算的結(jié)果,這樣就不必在下次執(zhí)行計算,接下來通過本文給大家分享redis配置緩存的方法,感興趣的朋友一起看看吧
    2021-06-06

最新評論