欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java分布式鎖幾種常見的實(shí)現(xiàn)方式

 更新時間:2025年07月25日 10:23:52   作者:快樂吃手手??:??)  
在java的分布式系統(tǒng)中,有時候會出現(xiàn)不同的服務(wù)操作同一個資源的情況,如交易系統(tǒng)和充值系統(tǒng)都要操作用戶賬戶,分布式鎖為解決分布式系統(tǒng)中多個應(yīng)用同時訪問同一個資源的問題,這篇文章主要介紹了Java分布式鎖幾種常見的實(shí)現(xiàn)方式,需要的朋友可以參考下

前言

分布式鎖主要用于解決在分布式系統(tǒng)中多個節(jié)點(diǎn)對共享資源進(jìn)行并發(fā)訪問時可能出現(xiàn)的競爭問題。

在Java中實(shí)現(xiàn)分布式鎖的方式主要有以下幾種:

基于數(shù)據(jù)庫的實(shí)現(xiàn)

(1)唯一約束

通過數(shù)據(jù)庫表中設(shè)置唯一鍵約束來保證只有一個客戶端可以獲取到鎖。通常會有一張專門的鎖表,包含鎖名稱和鎖的持有者信息等字段。

CREATE TABLE distributed_locks (
    lock_name VARCHAR(255) PRIMARY KEY,
    owner_info VARCHAR(255) NOT NULL
);

該代碼示例展示了如何使用上述表來獲取和釋放鎖,并執(zhí)行相應(yīng)的業(yè)務(wù)邏輯(例如更新庫存),在請求A對業(yè)務(wù)進(jìn)行操作的時候,假設(shè)請求B也進(jìn)入到此方法,則會由于鎖表的唯一索引lock_name而導(dǎo)致插入失敗,導(dǎo)致其操作被拒絕,而主鍵則需要針對不同業(yè)務(wù)場景設(shè)置,不同業(yè)務(wù)場景不會觸發(fā)鎖機(jī)制。

    public void operateStock(Integer num) {
        String lockName = "product_stock_update_lock"; // 鎖名稱,如:業(yè)務(wù)編碼
        String ownerId = "d2d00005sa5s512"; // 當(dāng)前實(shí)例標(biāo)識符,如:用戶id

        // SQL == "INSERT INTO distributed_locks(lock_name, owner_info) VALUES (?, ?)"
        // 加鎖插入成功返回true
        boolean gotLock = acquireLock(lockName, ownerId);
        if (gotLock) {
            try {
                // 執(zhí)行業(yè)務(wù)邏輯,如更新指定物料庫存
                //SQL == "UPDATE products SET stock = stock - #{num} WHERE product_id = '001' AND stock > 0";
                updateProductStock(num);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // SQL == "DELETE FROM distributed_locks WHERE lock_name = ? AND owner_info = ?"
                releaseLock(lockName, ownerId);
            }
        } else {
            System.out.println("Failed to acquire lock.");
        }
    }

(2)行鎖或表鎖

在查詢語句后面增加for update,數(shù)據(jù)庫會在查詢過程中給數(shù)據(jù)庫表增加排他鎖,當(dāng)某條記錄被加上排他鎖之后,其他線程無法再在該行記錄上增加排他鎖,而事務(wù)提交后會自動釋放鎖。

    @Autowired
    private ProductMapper productMapper;

    @Transactional
    public void updateProductStock(String productId) {
        //查詢并加鎖   @Select("SELECT * FROM products WHERE product_id = #{productId} FOR UPDATE")
        Product product = productMapper.selectForUpdate(productId);
        if (product == null || product.getStock() <= 0) {
            System.out.println("庫存不足或產(chǎn)品不存在");
            return;
        }
        
        // 更新庫存  @Update("UPDATE products SET stock = stock - 1 WHERE product_id = #{productId}")
        int rowsAffected = productMapper.updateStock(productId);
        if (rowsAffected > 0) {
            System.out.println("庫存更新成功");
        } else {
            System.out.println("未能成功更新庫存");
        }
    }

(3)version樂觀鎖

樂觀鎖是一種處理并發(fā)控制的策略,它假設(shè)數(shù)據(jù)沖突的概率較低,因此不會在讀取數(shù)據(jù)時加鎖。相反,它會在更新數(shù)據(jù)時檢查數(shù)據(jù)是否被其他事務(wù)修改過。這通常通過一個版本號(version)字段或時間戳來實(shí)現(xiàn)。

讀取數(shù)據(jù):當(dāng)一個事務(wù)讀取數(shù)據(jù)時,同時獲取該記錄的版本號或時間戳。

修改數(shù)據(jù):當(dāng)事務(wù)嘗試更新數(shù)據(jù)時,它會使用版本號作為條件的一部分進(jìn)行更新操作。

CREATE TABLE products (
    product_id VARCHAR(255) PRIMARY KEY,
    stock INT NOT NULL,
    version INT DEFAULT 0
);
    @Autowired
    private ProductMapper productMapper;

    @Transactional
    public void updateProductStock(String productId) {
        // 查詢并加鎖
        // @Select("SELECT product_id,stock,version FROM products WHERE product_id = #{productId} FOR UPDATE")
        Product product = productMapper.selectForUpdate(productId);
        if (product == null || product.getStock() <= 0) {
            System.out.println("庫存不足或產(chǎn)品不存在");
            return;
        }

        // 嘗試更新庫存,并檢查版本號
        // @Update("UPDATE products SET stock = stock - 1, version = version + 1 WHERE product_id = #{productId} AND version = #{version}")
        int rowsAffected = productMapper.updateStockWithVersion(productId, product.getVersion());
        if (rowsAffected > 0) {
            System.out.println("庫存更新成功");
        } else {
            System.out.println("庫存更新失敗,可能已被其他事務(wù)更新");
            // 這里可以根據(jù)業(yè)務(wù)需求選擇重試或者拋出異常等處理方式
        }
    }

基于Redis的實(shí)現(xiàn)

使用Redis實(shí)現(xiàn)分布式鎖是一種高效且廣泛采用的方法,特別適合于需要高吞吐量和低延遲的場景。Redis通過其原子操作命令提供了一種簡單而有效的機(jī)制來實(shí)現(xiàn)分布式鎖。

SET resource_name my_random_value NX PX 30000

NX 表示僅在鍵不存在時設(shè)置鍵。

PX 30000 設(shè)置鍵的過期時間為30秒,防止死鎖(如果客戶端崩潰或網(wǎng)絡(luò)問題導(dǎo)致無法釋放鎖)。

RedisTemplate實(shí)現(xiàn)分布式鎖

編寫工具類

@Component
public class MyRedisLock {

    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    // 自Spring 4.3起,如果只有一個構(gòu)造函數(shù),可以省略@Autowired注解
    public MyRedisLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    // Lua 腳本用于釋放鎖
    private static final String UNLOCK_SCRIPT =
            "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end";

    // Lua 腳本用于續(xù)期鎖
    private static final String RENEW_LOCK_SCRIPT =
            "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('PEXPIRE', KEYS[1], ARGV[2]) else return 0 end";

    // 鎖的前綴,用于區(qū)分不同的鎖
    private static final String LOCK_PREFIX = "lock:";

    // 續(xù)期鎖的時間間隔(毫秒)
    private static final long RENEW_INTERVAL_MS = 2000;


    /**
     嘗試獲取鎖
     @param lockKey   鎖的鍵名
     @param expireMs  鎖的過期時間(毫秒)
     @param operateId 鎖的值
     @return   如果成功獲取鎖,返回 true;否則返回 false
     **/
    public boolean tryLock(String lockKey, long expireMs, String operateId) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS);     // setIfAbsent實(shí)現(xiàn)上鎖
        return result != null && result;
    }

    /**
     嘗試獲取鎖并自動續(xù)期
     @param lockKey   鎖的鍵名
     @param expireMs  鎖的過期時間(毫秒)
     @param operateId 鎖的值
     @return   如果成功獲取鎖,返回鎖的唯一標(biāo)識符;否則返回 null
     **/
    public boolean tryLockWithRenewal(String lockKey, long expireMs, String operateId) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, operateId, expireMs, TimeUnit.MILLISECONDS);        // setIfAbsent實(shí)現(xiàn)上鎖
        if (result != null && result) {
            // 啟動續(xù)期線程
            startRenewalThread(lockKey, operateId, expireMs);
            return true;
        }
        return false;
    }

    /**
     釋放鎖
     @param lockKey   鎖的鍵名
     @param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶端)
     @return  如果成功釋放鎖,返回 true;否則返回 false
     **/
    public boolean unlock(String lockKey, String operateId) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class);
        // 執(zhí)行l(wèi)ua腳本,參數(shù)解釋下:
        // 第一個參數(shù)script為lua腳本
        // 第二個參數(shù)為key的集合,會依次替換lua腳本中的KEYS[]數(shù)組的數(shù)據(jù),默認(rèn)1開始
        // 第三個參數(shù)為參數(shù)集合,會依次替換lua腳本中的ARGVS[]數(shù)組的數(shù)據(jù),默認(rèn)1開始
        Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId);
        return result != null && result == 1L;
    }

    /**
     自動續(xù)期鎖
     @param lockKey   鎖的鍵名
     @param operateId 鎖的值(用于驗(yàn)證是否是持有鎖的客戶端)
     @param expireMs  鎖的過期時間(毫秒)
     @return  如果成功續(xù)期,返回 true;否則返回 false
     **/
    public boolean renewLock(String lockKey, String operateId, long expireMs) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(LOCK_PREFIX + lockKey), operateId, String.valueOf(expireMs));
        return result != null && result == 1L;
    }

    /**
     啟動續(xù)期線程
     @param lockKey   鎖的鍵名
     @param operateId 鎖的值
     @param expireMs  鎖的過期時間(毫秒)
     **/
    private void startRenewalThread(final String lockKey, final String operateId, final long expireMs) {
        Thread renewalThread = new Thread(() -> {
            try {
                while (true) {
                    // 每隔一段時間續(xù)期一次,需要確保間隔時間小于過期時間,過期或釋放鎖將無法續(xù)費(fèi)
                    Thread.sleep(RENEW_INTERVAL_MS);
                    if (!renewLock(lockKey, operateId, expireMs)) {  // 續(xù)鎖操作
                        // 如果續(xù)期失敗,直接結(jié)束守護(hù)線程,停止鎖續(xù)期行為。
                        // 這里說明下,刪除鎖和續(xù)鎖都需要驗(yàn)證lockValue,這個上鎖時通過uuid創(chuàng)建的,其他線程肯定獲取的都不一致,這樣確保續(xù)鎖行為只能是自己的守護(hù)線程才可以操作;如果續(xù)鎖失敗了,則說明是主線程完成任務(wù)刪除了key鎖,所以這里守護(hù)線程也可以結(jié)束了
                        break;
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        renewalThread.setDaemon(true);  // 設(shè)置為守護(hù)線程
        renewalThread.start();
    }
}

代碼示例:常規(guī)鎖

    @Autowired
    private MyRedisLock redisLock;

    // 鎖的默認(rèn)過期時間(毫秒)
    private static final long DEFAULT_EXPIRE_TIME_MS = 5000;

    public void testLock() throws InterruptedException {
        String lockKey = "my_distributed_lock";
        //進(jìn)程標(biāo)識ID
        String operateId = java.util.UUID.randomUUID().toString();
        if (redisLock.tryLock(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
            System.out.println(" 獲取到鎖,開始執(zhí)行任務(wù)...");
            try {
                // 執(zhí)行業(yè)務(wù)邏輯
                Thread.sleep(5000); // 模擬耗時操作
                System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
            } finally {
                if (redisLock.unlock(lockKey, operateId)) {
                    System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶端刪除");
                }
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
        }
    }

代碼示例:續(xù)費(fèi)線程鎖

    @Autowired
    private MyRedisLock redisLock;

    // 鎖的默認(rèn)過期時間(毫秒)
    private static final long DEFAULT_EXPIRE_TIME_MS = 5000;

    public void testLock() throws InterruptedException {
        String lockKey = "my_distributed_lock";
        //進(jìn)程標(biāo)識ID
        String operateId = java.util.UUID.randomUUID().toString();
        // 嘗試獲取鎖并自動續(xù)期
        if ( redisLock.tryLockWithRenewal(lockKey, DEFAULT_EXPIRE_TIME_MS, operateId)) {
            try {
                // 執(zhí)行業(yè)務(wù)邏輯
                Thread.sleep(8000); 
                System.out.println(Thread.currentThread().getName() + " 任務(wù)執(zhí)行完成");
            } finally {
                // 釋放鎖
                boolean unlockSuccess = redisLock.unlock(lockKey, operateId);
                if (unlockSuccess) {
                    System.out.println(Thread.currentThread().getName() + " 成功釋放鎖");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 釋放鎖失敗,鎖可能已被其他客戶端刪除");
                }
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 未能獲取到鎖");
        }
    }

基于Zookeeper的實(shí)現(xiàn)

基本原理

  • 創(chuàng)建臨時順序節(jié)點(diǎn):客戶端嘗試在特定路徑下創(chuàng)建一個臨時順序節(jié)點(diǎn)(例如/locks/lock-),以表示對鎖的競爭。
  • 判斷是否獲得鎖:檢查自己創(chuàng)建的節(jié)點(diǎn)是否是該路徑下所有子節(jié)點(diǎn)中最小的一個。如果是,則表示獲得了鎖;如果不是,則監(jiān)聽前一個節(jié)點(diǎn)(即比自己小的那個節(jié)點(diǎn))的變化。
  • 監(jiān)聽前一節(jié)點(diǎn)刪除事件:如果當(dāng)前節(jié)點(diǎn)不是最小節(jié)點(diǎn),則需要等待直到前一節(jié)點(diǎn)被刪除(意味著前一客戶端釋放了鎖),然后重新檢查是否可以獲得鎖。
  • 釋放鎖:當(dāng)業(yè)務(wù)邏輯執(zhí)行完畢后,客戶端可以主動刪除自己創(chuàng)建的節(jié)點(diǎn)來釋放鎖。此外,由于使用的是臨時節(jié)點(diǎn),如果客戶端崩潰或與ZooKeeper斷開連接,該節(jié)點(diǎn)也會自動被刪除。

引入依賴(一個用于簡化ZooKeeper操作的框架)

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

代碼示例

    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String LOCK_PATH = "/distributed_lock_example";

    public void operate() throws Exception {
        // 創(chuàng)建CuratorFramework實(shí)例
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
        client.start();
        // 使用InterProcessMutex作為分布式鎖
        InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
        try {
            // 獲取鎖
            lock.acquire(); 
            //業(yè)務(wù)操作代碼
            this.performBusinessLogic();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (lock.isAcquiredInThisProcess()) {
                try {
                    // 釋放鎖
                    lock.release(); 
                    System.out.println("Lock released.");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 關(guān)閉客戶端
            client.close(); 
        }
    }

    private static void performBusinessLogic() throws InterruptedException {
        // 模擬業(yè)務(wù)邏輯處理
        System.out.println("Performing some operations...");
        Thread.sleep(5000); // 暫停5秒模擬長時間操作
    }

基于etcd的實(shí)現(xiàn)(僅了解)

類似于Zookeeper,etcd也提供了類似的分布式協(xié)調(diào)服務(wù),可以通過創(chuàng)建租約(lease)并附加到關(guān)鍵路徑上來實(shí)現(xiàn)分布式鎖。Etcd支持事務(wù)、watch機(jī)制等功能,使得它同樣適用于構(gòu)建分布式鎖。

總結(jié)

到此這篇關(guān)于Java分布式鎖幾種常見的實(shí)現(xiàn)方式的文章就介紹到這了,更多相關(guān)Java分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論