欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis+threading實(shí)現(xiàn)多線程消息隊(duì)列的使用示例

 更新時(shí)間:2023年12月18日 11:36:04   作者:Asura_____  
Redis多線程消息隊(duì)列是一種使用Redis作為存儲(chǔ)后端的消息隊(duì)列實(shí)現(xiàn),它利用Redis的線程并發(fā)處理能力來(lái)提高消息隊(duì)列的處理效率,本文主要介紹了Redis+threading實(shí)現(xiàn)多線程消息隊(duì)列的使用示例,感興趣的可以了解一下

列表

lpush左插入、rpush右插入、lrange查詢(xún)集合

127.0.0.1:6379> lpush list v1
(integer) 1
127.0.0.1:6379> lpush list v2
(integer) 2
127.0.0.1:6379> lpush list v3
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1
1) "v3"
2) "v2"
3) "v1"
127.0.0.1:6379> LRANGE list 0 1
1) "v3"
2) "v2"
127.0.0.1:6379> LRANGE list 0 0
1) "v3"
127.0.0.1:6379> rpush list rv0
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "v3"
2) "v2"
3) "v1"
4) "rv0"

lpop左移除、rpop右移除

127.0.0.1:6379> lrange list 0 -1
1) "v3"
2) "v2"
3) "v1"
4) "rv0"
127.0.0.1:6379> lpop list
"v3"
127.0.0.1:6379> lrange list 0 -1
1) "v2"
2) "v1"
3) "rv0"
127.0.0.1:6379> rpop list
"rv0"
127.0.0.1:6379> lrange list 0 -1
1) "v2"
2) "v1"

lindex下標(biāo)查詢(xún)、llen長(zhǎng)度查詢(xún)

127.0.0.1:6379> lrange list 0 -1
1) "v4"
2) "v3"
3) "v2"
4) "v1"
127.0.0.1:6379> lindex list 1
"v3"
127.0.0.1:6379> lindex list 0
"v4"
127.0.0.1:6379> llen list
(integer) 4

blpop、brpop

BRPOP 是 Redis 的一個(gè)阻塞式列表彈出命令,用于從指定的一個(gè)或多個(gè)列表中彈出最后一個(gè)元素。它和BLPOP 不同之處在于它是從列表的尾部彈出元素,而不是從頭部。
這種阻塞式彈出操作通常用于實(shí)現(xiàn)消息隊(duì)列。如果列表為空,就會(huì)阻塞等待直到有消息可供處理。timeout: 阻塞超時(shí)時(shí)間,如果所有指定的列表都為空,命令將阻塞直到有元素可彈出或超時(shí)。

# 連接到本地 Redis 服務(wù)器
r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
a = ['item1','item2','item3','item4','item5','item6']
# 將元素推入列表
r.rpush('my_queue',*a)
# 使用 blpop 彈出元素
result = r.blpop('my_queue', timeout=5)  # 設(shè)置超時(shí)時(shí)間為 5 秒

['item1', 'item2', 'item3', 'item4', 'item5', 'item6']
['item2', 'item3', 'item4', 'item5', 'item6']

字符串

set、incr遞增、decr遞減

  • 雖然輸入是int類(lèi)型,但是set會(huì)自動(dòng)轉(zhuǎn)換為string
r.set('my_queue', 5)

# 對(duì) key 為 'my_queue' 的值執(zhí)行遞減操作
value = r.decr('my_queue')
# 獲取遞減后的值
print(f'New value: {value}')

new_value = r.incr('my_queue')

print(f'New value: {new_value}')

New value: 4
New value: 5

可以看到就算是賦值也已經(jīng)改變了my_queue鍵的值。

keys取鍵、get取值、delete

# 將元素推入列表
r.setnx('my_queue:ddd:count',123)
r.setnx('my_queue:aaa:count',456)
r.setnx('my_queue',789)
r.setnx('my_queue',101)
r.set('my_queue:kkk:count',159)

print(r.keys('*'))
keys = r.keys('my_queue:*:count')
# 打印匹配的鍵列表
print(keys)
value = r.get(keys[0])
print(value)
r.delete('my_queue:ddd:count')
print(r.keys('*'))

['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count', 'my_queue:ddd:count']
['my_queue:kkk:count', 'my_queue:aaa:count', 'my_queue:ddd:count']
159
['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count']

關(guān)于為什么delete了還能取到值(

delete只是刪除了redis隊(duì)列的鍵值對(duì),keys是已經(jīng)賦過(guò)值了所以不受影響。

r.setnx(f'rtp_task:{1}:{123}:count',123)
r.setnx(f'rtp_task:{2}:{456}:count',456)
r.setnx(f'rtp_task:{3}:{789}:count',789)
r.setnx(f'rtp_task:{4}:{101}:count',101)
r.setnx(f'rtp_task:{5}:{159}:count',159)


keys = r.keys('rtp_task:*:count')
r.delete('rtp_task:1:123:count')
print(r.keys("*"))
print(keys)

['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:5:159:count']
['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:1:123:count', 'rtp_task:5:159:count']

setnx

含義(setnx = SET if Not eXists):如果不存在,則set。

r.setnx('my_queue:ddd:count',123)
print(r.get('my_queue:ddd:count'))
123
r.setnx('my_queue:ddd:count',123)
r.setnx('my_queue:ddd:count',456)
print(r.get('my_queue:ddd:count'))
123

threading

Thread

創(chuàng)建線程

在創(chuàng)建線程時(shí),傳遞參數(shù)需要是一個(gè)可迭代的對(duì)象,如果只有一個(gè)參數(shù),需要在參數(shù)后面添加逗號(hào),以表示它是一個(gè)元組而不是一個(gè)單一的值。

如果寫(xiě)成 args=(a,),它會(huì)被解釋為一個(gè)包含單一元素的元組,而如果你寫(xiě)成 args=(a),它會(huì)被解釋為 args=a,這樣就不再是一個(gè)元組。

a = "this is message"

def iptest(message):
    print(message)

t = threading.Thread(target=iptest, args=(a,))

start、join

a = "this is message"

def iptest(message):
    print(message)

t = threading.Thread(target=iptest, args=(a,))

t.start()

this is message

join方法的作用是確保thread子線程執(zhí)行完畢后才能執(zhí)行下一個(gè)線程。

沒(méi)加join前:

def iptest():
    print("message1\n")
    for i in range(10):
        # time.sleep() 函數(shù)推遲調(diào)用線程的運(yùn)行,可通過(guò)參數(shù)secs指秒數(shù),表示進(jìn)程掛起的時(shí)間。
        time.sleep(0.1)
    print('message2')
def main():
    add_thread = threading.Thread(target=iptest, name="T2")
    add_thread.start()
    print("done")
    
if __name__ == '__main__':
    main()


message1
done

message2

加join后

message1

message2
done

消息隊(duì)列

import redis
import threading
import time
import json

# 連接到本地 Redis 服務(wù)器
r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

def producer(queue_name):
    # 生產(chǎn)者線程,模擬向隊(duì)列中推送任務(wù)
    for i in range(5):
        message = {'task_id': i, 'data': f'Task {i}'}
        r.rpush(queue_name, json.dumps(message))
        time.sleep(1)

def consumer(queue_name, worker_id):
    while True:
        # 消費(fèi)者線程,使用 blpop 從隊(duì)列中阻塞獲取任務(wù)
        message = r.blpop(queue_name, timeout=10)
        if message:
            task = json.loads(message[1])
            print(f"Worker {worker_id} processing task: {task} \n")
            # 模擬任務(wù)處理時(shí)間
            time.sleep(2)
            # 模擬任務(wù)處理完成后,更新任務(wù)計(jì)數(shù)
            r.decr(f'rtp_task:{task["task_id"]}:count')

            num_task = r.get(f'rtp_task:{task["task_id"]}:count')
            print(f'taskid{task["task_id"]},num_task{num_task}')

if __name__ == '__main__':
    # 設(shè)置初始任務(wù)計(jì)數(shù)
    for i in range(5):
        r.setnx(f'rtp_task:{i}:count', 3)

    # 創(chuàng)建一個(gè)生產(chǎn)者線程
    producer_thread = threading.Thread(target=producer, args=('product',))
    producer_thread.start()

    # 創(chuàng)建多個(gè)消費(fèi)者線程
    num_consumers = 3
    consumer_threads = []
    for i in range(num_consumers):
        consumer_thread = threading.Thread(target=consumer, args=('product', i + 1))
        consumer_threads.append(consumer_thread)
        consumer_thread.start()

    # 等待生產(chǎn)者線程和消費(fèi)者線程完成
    producer_thread.join()
    for consumer_thread in consumer_threads:
        consumer_thread.join()

Worker 2 processing task: {'task_id': 0, 'data': 'Task 0'} 

Worker 1 processing task: {'task_id': 1, 'data': 'Task 1'} 

Worker 3 processing task: {'task_id': 2, 'data': 'Task 2'} 

taskid0,num_task2
taskid1,num_task2
Worker 2 processing task: {'task_id': 3, 'data': 'Task 3'} 

taskid2,num_task2
Worker 1 processing task: {'task_id': 4, 'data': 'Task 4'} 

taskid3,num_task2
taskid4,num_task2

到此這篇關(guān)于Redis+threading實(shí)現(xiàn)多線程消息隊(duì)列的使用示例的文章就介紹到這了,更多相關(guān)Redis threading多線程消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • Redis常見(jiàn)數(shù)據(jù)類(lèi)型List列表使用詳解

    Redis常見(jiàn)數(shù)據(jù)類(lèi)型List列表使用詳解

    Redis的List是一種有序的字符串集合,支持兩端高效插入和刪除,適用于隊(duì)列和棧,這篇文章主要介紹了Redis常見(jiàn)數(shù)據(jù)類(lèi)型List列表使用的相關(guān)資料,需要的朋友可以參考下
    2024-12-12
  • Redis?如何清空所有數(shù)據(jù)

    Redis?如何清空所有數(shù)據(jù)

    這篇文章主要介紹了Redis?如何清空所有數(shù)據(jù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • 解析高可用Redis服務(wù)架構(gòu)分析與搭建方案

    解析高可用Redis服務(wù)架構(gòu)分析與搭建方案

    我們按照由簡(jiǎn)至繁的步驟,搭建一個(gè)最小型的高可用的Redis服務(wù)。 本文通過(guò)四種方案給大家介紹包含每種方案的優(yōu)缺點(diǎn)及詳細(xì)解說(shuō),具體內(nèi)容詳情跟隨小編一起看看吧
    2021-06-06
  • Redis本地/遠(yuǎn)程(外部)連接失敗問(wèn)題及解決

    Redis本地/遠(yuǎn)程(外部)連接失敗問(wèn)題及解決

    這篇文章主要介紹了Redis本地/遠(yuǎn)程(外部)連接失敗問(wèn)題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2025-03-03
  • 基于session?Redis實(shí)現(xiàn)登錄

    基于session?Redis實(shí)現(xiàn)登錄

    這篇文章主要介紹了基于session?Redis實(shí)現(xiàn)登錄的相關(guān)資料,需要的朋友可以參考下
    2023-10-10
  • Redis實(shí)現(xiàn)分布式鎖的示例代碼

    Redis實(shí)現(xiàn)分布式鎖的示例代碼

    日常開(kāi)發(fā)中,秒殺下單、搶紅包等等業(yè)務(wù)場(chǎng)景,都需要用到分布式鎖,本文主要介紹了Redis實(shí)現(xiàn)分布式鎖的示例代碼,感興趣的可以了解一下
    2023-10-10
  • 如何監(jiān)聽(tīng)Redis中Key值的變化(SpringBoot整合)

    如何監(jiān)聽(tīng)Redis中Key值的變化(SpringBoot整合)

    測(cè)試過(guò)程中我們有一部分常量值放入redis,共大部分應(yīng)用調(diào)用,但在測(cè)試過(guò)程中經(jīng)常有人會(huì)清空redis,回歸測(cè)試,下面這篇文章主要給大家介紹了關(guān)于如何監(jiān)聽(tīng)Redis中Key值變化的相關(guān)資料,需要的朋友可以參考下
    2024-03-03
  • Redis實(shí)戰(zhàn)之商城購(gòu)物車(chē)功能的實(shí)現(xiàn)代碼

    Redis實(shí)戰(zhàn)之商城購(gòu)物車(chē)功能的實(shí)現(xiàn)代碼

    這篇文章主要介紹了Redis實(shí)戰(zhàn)之商城購(gòu)物車(chē)功能的實(shí)現(xiàn)代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-02-02
  • 查看Redis內(nèi)存信息的命令

    查看Redis內(nèi)存信息的命令

    Redis 是一個(gè)開(kāi)源、高性能的Key-Value數(shù)據(jù)庫(kù),被廣泛應(yīng)用在服務(wù)器各種場(chǎng)景中。本文介紹幾個(gè)查看Redis內(nèi)存信息的命令,包括常用的info memory、info keyspace、bigkeys等。
    2020-09-09
  • Redis與緩存解讀

    Redis與緩存解讀

    文章介紹了Redis作為緩存層的優(yōu)勢(shì)和缺點(diǎn),并分析了六種緩存更新策略,包括超時(shí)剔除、先刪緩存再更新數(shù)據(jù)庫(kù)、旁路緩存、先更新數(shù)據(jù)庫(kù)再刪緩存、先更新數(shù)據(jù)庫(kù)再更新緩存、讀寫(xiě)穿透和異步緩存寫(xiě)入模式,還討論了緩存常見(jiàn)問(wèn)題
    2025-01-01

最新評(píng)論