Redis如何使用Pipeline實(shí)現(xiàn)批處理操作
在正常情況下,我們每次發(fā)送 Redis 命令時(shí),客戶端會(huì)等待 Redis 服務(wù)器的響應(yīng),直到接收到結(jié)果后,才會(huì)發(fā)送下一個(gè)命令。這種方式雖然保證了操作的順序性,但在執(zhí)行大量命令時(shí)會(huì)產(chǎn)生很大的網(wǎng)絡(luò)延遲。
通過(guò) Pipeline 技術(shù),我們的客戶端可以將多個(gè)命令同時(shí)發(fā)送給 Redis 服務(wù)器,并且不需要等待每個(gè)命令的返回結(jié)果,直到所有命令都被執(zhí)行完畢,客戶端再一起獲取返回值。這樣能減少每個(gè)命令的等待時(shí)間,大幅提高執(zhí)行效率。
Redis Pipeline 是一種優(yōu)化 Redis 操作的機(jī)制,通過(guò)將多個(gè)命令打包發(fā)送到 Redis 服務(wù)器,減少客戶端與服務(wù)器之間的網(wǎng)絡(luò)往返時(shí)間(RTT),從而顯著提升性能。
在默認(rèn)情況下,Redis 客戶端與服務(wù)器之間的通信是請(qǐng)求-響應(yīng)模式,即:
1客戶端發(fā)送一個(gè)命令到服務(wù)器。
2.服務(wù)器執(zhí)行命令并返回結(jié)果。
3.客戶端等待響應(yīng)后再發(fā)送下一個(gè)命令。
這種模式在命令數(shù)量較少時(shí)沒(méi)有問(wèn)題,但在需要執(zhí)行大量命令時(shí),網(wǎng)絡(luò)往返時(shí)間(RTT)會(huì)成為性能瓶頸。所以我們需要實(shí)現(xiàn)下面目的:
1.將多個(gè)命令打包發(fā)送到服務(wù)器。
2.服務(wù)器依次執(zhí)行這些命令,并將結(jié)果一次性返回給客戶端。
3.減少網(wǎng)絡(luò)開(kāi)銷,提升性能。
以下是一個(gè)簡(jiǎn)單的 Java 示例,展示了如何使用 Jedis(Redis 的一個(gè) Java 客戶端)執(zhí)行 Pipeline:
注意:批處理時(shí)不建議一次攜帶太多命令,并且Pipeline的多個(gè)命令之間不具備原子性。
// 創(chuàng)建 Jedis 實(shí)例 Jedis jedis = new Jedis("localhost", 6379); // 使用 pipelining 方式批量執(zhí)行命令 Pipeline pipeline = jedis.pipelined(); // 批量操作:使用 pipeline 來(lái)緩存命令 for (int i = 0; i < 1000; i++) { pipeline.set("key" + i, "value" + i); } // 同步執(zhí)行所有命令 pipeline.sync();
pipelined() 方法: 創(chuàng)建一個(gè) Pipeline 對(duì)象,它緩存所有要執(zhí)行的命令。
批量設(shè)置命令: 通過(guò) pipeline.set() 將多個(gè) SET 命令放入管道中,但命令并不會(huì)立即執(zhí)行。
sync() 方法: 通過(guò)調(diào)用 sync() 方法,客戶端將會(huì)把所有緩存的命令一次性發(fā)送給 Redis,并等待它們完成執(zhí)行。
但是這些都是在單機(jī)模式下的批處理,那對(duì)于集群來(lái)說(shuō)該如何使用呢?
向MSet或Pipeline這樣的批處理需要在一次請(qǐng)求中攜帶多條命令,而此時(shí)如何Redis是一個(gè)集群,那批處理命令的多個(gè)key必須落在同一個(gè)插槽中,否則就會(huì)導(dǎo)致執(zhí)行失敗。
一般推薦使用并行插槽來(lái)解決,如果使用hash_tag,可能會(huì)出現(xiàn)大量的key分配同一插槽導(dǎo)致數(shù)據(jù)傾斜,而并行插槽不會(huì)。
那么這里我們模擬一下并行插槽實(shí)現(xiàn):
將多個(gè)鍵值對(duì)按照Redis集群的槽位進(jìn)行分組,然后分別使用jedisCluster.mset()方法按組設(shè)置鍵值對(duì)。
public class JedisClusterTest { // 聲明一個(gè)JedisCluster對(duì)象,用于與Redis集群進(jìn)行交互 private JedisCluster jedisCluster; // 在每個(gè)測(cè)試方法執(zhí)行之前,初始化JedisCluster連接 @BeforeEach void setUp() { // 配置Jedis連接池 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(8); poolConfig.setMaxIdle(8); poolConfig.setMinIdle(0); poolConfig.setMaxWaitMillis(1000); // 創(chuàng)建一個(gè)HashSet,用于存儲(chǔ)Redis集群的節(jié)點(diǎn)信息 HashSet<HostAndPort> nodes = new HashSet<>(); // 添加Redis集群的節(jié)點(diǎn)信息(IP和端口) nodes.add(new HostAndPort("192.168.150.101", 7001)); nodes.add(new HostAndPort("192.168.150.101", 7002)); nodes.add(new HostAndPort("192.168.150.101", 7003)); nodes.add(new HostAndPort("192.168.150.101", 8001)); nodes.add(new HostAndPort("192.168.150.101", 8002)); nodes.add(new HostAndPort("192.168.150.101", 8003)); // 使用配置的連接池和節(jié)點(diǎn)信息初始化JedisCluster對(duì)象 jedisCluster = new JedisCluster(nodes, poolConfig); } // 測(cè)試方法:使用mset命令一次性設(shè)置多個(gè)鍵值對(duì) @Test void testMSet() { // 使用JedisCluster的mset方法,一次性設(shè)置多個(gè)鍵值對(duì) // 但是jedisCluster默認(rèn)是無(wú)法解決批處理問(wèn)題的,需要我們手動(dòng)解決 jedisCluster.mset("name", "Jack", "age", "21", "sex", "male"); } // 測(cè)試方法:使用mset命令按槽位分組設(shè)置多個(gè)鍵值對(duì) @Test void testMSet2() { // 創(chuàng)建一個(gè)HashMap,用于存儲(chǔ)多個(gè)鍵值對(duì) Map<String, String> map = new HashMap<>(3); map.put("name", "Jack"); map.put("age", "21"); map.put("sex", "Male"); // 將map中的鍵值對(duì)按照Redis集群的槽位進(jìn)行分組 Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet() .stream() .collect(Collectors.groupingBy( // 使用ClusterSlotHashUtil計(jì)算每個(gè)鍵對(duì)應(yīng)的槽位 entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())) ); // 遍歷按哈希槽分組后的結(jié)果 for (List<Map.Entry<String, String>> list : result.values()) { // 創(chuàng)建一個(gè)數(shù)組用于批量設(shè)置Redis的鍵值對(duì) String[] arr = new String[list.size() * 2]; // 每個(gè)鍵值對(duì)包含兩個(gè)元素 int j = 0; // 索引變量,用于在數(shù)組中定位位置 for (int i = 0; i < list.size(); i++) { j = i << 1; // 通過(guò)位移計(jì)算數(shù)組中的位置 Map.Entry<String, String> e = list.get(i); // 獲取當(dāng)前的鍵值對(duì) arr[j] = e.getKey(); // 將鍵放入數(shù)組中 arr[j + 1] = e.getValue(); // 將值放入數(shù)組中 } // 批量設(shè)置Redis集群中的鍵值對(duì) jedisCluster.mset(arr); } } // 在每個(gè)測(cè)試方法執(zhí)行之后,關(guān)閉JedisCluster連接 @AfterEach void tearDown() { // 如果JedisCluster對(duì)象不為空,則關(guān)閉連接 if (jedisCluster != null) { jedisCluster.close(); } } }
而在Redis集群環(huán)境下,如果需要批量獲取多個(gè)鍵的值,可以使用multiGet方法。multiGet是RedisTemplate提供的一個(gè)方法,用于一次性獲取多個(gè)鍵的值。然而,需要注意的是,multiGet在集群環(huán)境下要求所有鍵必須位于同一個(gè)槽位(slot),否則會(huì)拋出異常。
@Service public class RedisService { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 跨槽位批量獲取多個(gè)鍵的值 */ public Map<String, Object> batchGetCrossSlot(List<String> keys) { // 按槽位分組 Map<Integer, List<String>> slotKeyMap = keys.stream() .collect(Collectors.groupingBy(ClusterSlotHashUtil::calculateSlot)); // 存儲(chǔ)最終結(jié)果 Map<String, Object> result = new HashMap<>(); // 對(duì)每個(gè)槽位的鍵分別調(diào)用multiGet for (Map.Entry<Integer, List<String>> entry : slotKeyMap.entrySet()) { List<String> slotKeys = entry.getValue(); List<Object> slotValues = redisTemplate.opsForValue().multiGet(slotKeys); // 將結(jié)果存入Map for (int i = 0; i < slotKeys.size(); i++) { result.put(slotKeys.get(i), slotValues.get(i)); } } return result; } /** * 測(cè)試跨槽位批量獲取方法 */ public void testBatchGetCrossSlot() { List<String> keys = Arrays.asList("name", "age", "sex"); Map<String, Object> values = batchGetCrossSlot(keys); // 打印結(jié)果 values.forEach((key, value) -> { System.out.println("Key: " + key + ", Value: " + value); }); } }
到此這篇關(guān)于Redis如何使用Pipeline實(shí)現(xiàn)批處理操作的文章就介紹到這了,更多相關(guān)Redis Pipeline批處理操作內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis 中的熱點(diǎn)鍵和數(shù)據(jù)傾斜示例詳解
熱點(diǎn)鍵是指在 Redis 中被頻繁訪問(wèn)的特定鍵,這些鍵由于其高訪問(wèn)頻率,可能導(dǎo)致 Redis 服務(wù)器的性能問(wèn)題,尤其是在高并發(fā)場(chǎng)景下,本文給大家介紹Redis 中的熱點(diǎn)鍵和數(shù)據(jù)傾斜,感興趣的朋友一起看看吧2025-03-03reids自定義RedisTemplate以及亂碼問(wèn)題解決
本文主要介紹了reids自定義RedisTemplate以及亂碼問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-04-04Redis集群的三種部署方式及三種應(yīng)用問(wèn)題的處理
這篇文章主要介紹了Redis集群的三種部署方式及三種應(yīng)用問(wèn)題的處理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04使用JMeter插件Redis Data Set如何實(shí)現(xiàn)高性能數(shù)據(jù)驅(qū)動(dòng)測(cè)試
RedisDataSet插件是JMeter的一個(gè)插件,可以實(shí)現(xiàn)從Redis中動(dòng)態(tài)加載數(shù)據(jù),并將其用作測(cè)試參數(shù),本文詳細(xì)介紹如何在JMeter中使用RedisDataSet插件,幫助你實(shí)現(xiàn)高效的數(shù)據(jù)驅(qū)動(dòng)測(cè)試2025-01-01Redis實(shí)現(xiàn)附近商鋪的項(xiàng)目實(shí)戰(zhàn)
本文主要介紹了Redis實(shí)現(xiàn)附近商鋪的項(xiàng)目實(shí)戰(zhàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01windows下通過(guò)批處理腳本啟動(dòng)redis的操作
本文主要給大家介紹了windows下通過(guò)批處理腳本啟動(dòng)redis的操作,windows下redis啟動(dòng),需要進(jìn)入redis安裝目錄,然后shift+右鍵,選擇“在此處打開(kāi)命令窗口”,然后輸入redis-server.exe redis.conf,就可以啟動(dòng)redis了,文中有詳細(xì)的圖文參考,感興趣的朋友可以參考下2023-12-12redis使用watch秒殺搶購(gòu)實(shí)現(xiàn)思路
這篇文章主要為大家詳細(xì)介紹了redis使用watch秒殺搶購(gòu)的實(shí)現(xiàn)思路,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02redis執(zhí)行l(wèi)ua腳本的實(shí)現(xiàn)
本文主要介紹了redis執(zhí)行l(wèi)ua腳本的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-10-10