python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
最近在用fastapi框架開發(fā)web后端,由于近幾年python異步編程大火,fastapi憑借高性能也火了起來。本篇介紹了在異步環(huán)境下實(shí)現(xiàn)redis消息隊(duì)列的方法,代碼可以直接拷貝到fastapi中使用。
安裝相關(guān)庫(kù)
pip install aioredis
消息隊(duì)列實(shí)現(xiàn)及使用
我們使用redis的stream類型作為消息隊(duì)列的載體
首先我們創(chuàng)建一個(gè)目錄作為項(xiàng)目目錄:works/
創(chuàng)建配置文件
在項(xiàng)目根目錄下新建文件works/.env
在文件中寫入
export APP_ENV=development export REDIS_URL="192.168.70.130/" export REDIS_USER= export REDIS_PASSWORD= export REDIS_HOST="192.168.70.130" export REDIS_PORT=6379
代碼實(shí)現(xiàn)
在項(xiàng)目目錄下創(chuàng)建py文件works/main.py
import os from dotenv import load_dotenv import aioredis import asyncio load_dotenv() class Redis(): ? ? def __init__(self): ? ? ? ? """initialize ?connection """ ? ? ? ? self.REDIS_URL = os.environ['REDIS_URL'] ? ? ? ? self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD'] ? ? ? ? self.REDIS_USER = os.environ['REDIS_USER'] ? ? ? ? self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}" ? ? ? ? self.REDIS_HOST = os.environ['REDIS_HOST'] ? ? ? ? self.REDIS_PORT = os.environ['REDIS_PORT'] ? ? ? ?? ? ? async def create_connection(self): ? ? ? ? self.connection = aioredis.from_url( ? ? ? ? ? ? self.connection_url, db=0) ? ? ? ? return self.connection class Producer: ? ? def __init__(self, redis_client): ? ? ? ? self.redis_client = redis_client ? ? async def add_to_stream(self, ?data: dict, stream_channel): ? ? ? ? """將一條數(shù)據(jù)添加到隊(duì)列 ? ? ? ? Args: ? ? ? ? ? ? data (dict): _description_ ? ? ? ? ? ? stream_channel (_type_): _description_ ? ? ? ? Returns: ? ? ? ? ? ? _type_: _description_ ? ? ? ? """ ? ? ? ? try: ? ? ? ? ? ? msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data) ? ? ? ? ? ? print(f"Message id {msg_id} added to {stream_channel} stream") ? ? ? ? ? ? return msg_id ? ? ? ? except Exception as e: ? ? ? ? ? ? raise Exception(f"Error sending msg to stream => {e}") class StreamConsumer: ? ? def __init__(self, redis_client): ? ? ? ? self.redis_client = redis_client ? ? async def consume_stream(self, count: int, block: int, ?stream_channel): ? ? ? ? """讀取隊(duì)列中的消息,但是并不刪除 ? ? ? ? Args: ? ? ? ? ? ? count (int): _description_ ? ? ? ? ? ? block (int): _description_ ? ? ? ? ? ? stream_channel (_type_): _description_ ? ? ? ? Returns: ? ? ? ? ? ? _type_: _description_ ? ? ? ? """ ? ? ? ? response = await self.redis_client.xread( ? ? ? ? ? ? streams={stream_channel: ?'0-0'}, count=count, block=block) ? ? ? ? return response ? ? async def delete_message(self, stream_channel, message_id): ? ? ? ? """成功消費(fèi)數(shù)據(jù)后,調(diào)用此函數(shù)刪除隊(duì)列數(shù)據(jù) ? ? ? ? Args: ? ? ? ? ? ? stream_channel (_type_): _description_ ? ? ? ? ? ? message_id (_type_): _description_ ? ? ? ? """ ? ? ? ? await self.redis_client.xdel(stream_channel, message_id) async def main(): ? ? redis_conn = await Redis().create_connection() ? ? produce = Producer(redis_conn) ? ? consumer = StreamConsumer(redis_conn) ? ? # 添加一個(gè)消息到隊(duì)列中 ? ? data = {'xiaoming4':123} ? ? await produce.add_to_stream(data=data,stream_channel='message_channel') ? ?? ? ? # 從隊(duì)列中拿出最新的1條數(shù)據(jù) ? ? data = await consumer.consume_stream(1,block=0,stream_channel='message_channel') ? ? print(data) ? ?? ? ? # 輪詢等待隊(duì)列中的新消息 ? ? response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0) ? ? if response: ? ? ? ? for stream, messagees in response: ? ? ? ? ? ? print('stream:',stream) ? ? ? ? ? ? for message in messagees: ? ? ? ? ? ? ? ? print('message: ',message) ? ? ? ? ? ? ? ? message_id = message[0] ? ? ? ? ? ? ? ? print('message_id: ',message_id) ? ? ? ? ? ? ? ? message_content = message[1] ? ? ? ? ? ? ? ? print('message_content: ',message_content) ? ? ? ? ? ? ? ? print('注意里面的鍵、值都變成了byte類型,需要進(jìn)行解碼:') ? ? ? ? ? ? ? ? message_content:dict ? ? ? ? ? ? ? ? print('message_content_decode: ',{k.decode('utf-8'):v.decode('utf-8') for k,v in message_content.items()}) ? ? # 消費(fèi)成功后刪除隊(duì)列中的消息 ? ? await consumer.delete_message( ? ? ? ? stream_channel='message_channel',message_id=message_id ? ? ) ? ? if __name__ == '__main__': ? ? asyncio.run(main())
非常簡(jiǎn)單好用,啟動(dòng)一下看看吧
到此這篇關(guān)于python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程的文章就介紹到這了,更多相關(guān)python redis消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- 一文詳解消息隊(duì)列中為什么不用redis作為隊(duì)列
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- redis?消息隊(duì)列完成秒殺過期訂單處理方法(一)
- 如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作(二)
- Redis高階使用消息隊(duì)列分布式鎖排行榜等(高階用法)
- Redis消息隊(duì)列的三種實(shí)現(xiàn)方式
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列使用小結(jié)
- 詳解Redis Stream做消息隊(duì)列
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
編寫Python小程序來統(tǒng)計(jì)測(cè)試腳本的關(guān)鍵字
這篇文章主要介紹了編寫Python小程序來統(tǒng)計(jì)測(cè)試腳本的關(guān)鍵字的方法,文中的實(shí)例不僅可以統(tǒng)計(jì)關(guān)鍵字?jǐn)?shù)量,還可以按主關(guān)鍵字來歸類,需要的朋友可以參考下2016-03-03python鏈接Oracle數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了python鏈接Oracle數(shù)據(jù)庫(kù)的方法,實(shí)例分析了Python使用cx_Oracle模塊操作Oracle數(shù)據(jù)庫(kù)的相關(guān)技巧,需要的朋友可以參考下2015-06-06python基礎(chǔ)入門學(xué)習(xí)筆記(Python環(huán)境搭建)
這篇文章主要介紹了python基礎(chǔ)入門學(xué)習(xí)筆記,這是開啟學(xué)習(xí)python基礎(chǔ)知識(shí)的第一篇,夯實(shí)Python基礎(chǔ),才能走的更遠(yuǎn),感興趣的小伙伴們可以參考一下2016-01-01python實(shí)現(xiàn)文件分組復(fù)制到不同目錄的例子
這篇文章主要介紹了python實(shí)現(xiàn)文件按組復(fù)制到不同目錄的例子,需要的朋友可以參考下2014-06-06python獲取的html中都是\\u003e實(shí)現(xiàn)轉(zhuǎn)成正確字符
這篇文章主要介紹了python獲取的html中都是\\u003e實(shí)現(xiàn)轉(zhuǎn)成正確字符方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07django-rest-swagger的優(yōu)化使用方法
今天小編就為大家分享一篇django-rest-swagger的優(yōu)化使用方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-08-08