typescript?實(shí)現(xiàn)RabbitMQ死信隊(duì)列和延遲隊(duì)列(訂單10分鐘未付歸還庫(kù)存)的過(guò)程
Manjaro安裝RabbitMQ
安裝
sudo pacman -S rabbitmq rabbitmqadmin
啟動(dòng)管理模塊
sudo rabbitmq-plugins enable rabbitmq_management sudo rabbitmq-server
管理界面
http://127.0.0.1:15672/
默認(rèn)用戶(hù)名和密碼都是guest。
要使用 rabbitmqctl
命令添加用戶(hù)并分配權(quán)限,您可以按照以下步驟進(jìn)行操作:
1.添加用戶(hù):
rabbitmqctl add_user mingcai password
請(qǐng)將 password
替換為您想要設(shè)置的實(shí)際密碼。
2.分配權(quán)限:
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"
這個(gè)命令將用戶(hù) mingcai
授予對(duì)所有虛擬主機(jī)的所有資源的讀、寫(xiě)和管理權(quán)限。如果您只想給予特定權(quán)限,請(qǐng)適當(dāng)調(diào)整正則表達(dá)式 ".*"
,以授予適當(dāng)?shù)臋?quán)限。例如,如果您只想給予讀取權(quán)限,可以使用 "^amq\."
。
3.可選步驟:設(shè)置用戶(hù)角色:
您可以將用戶(hù)分配給不同的角色,以便更好地管理權(quán)限。例如,您可以將用戶(hù)添加到 administrator
角色以獲取管理員權(quán)限:
rabbitmqctl set_user_tags mingcai administrator
這樣,用戶(hù) mingcai
就被賦予了管理員權(quán)限。
請(qǐng)確保您具有適當(dāng)?shù)臋?quán)限來(lái)執(zhí)行這些操作,并確保替換示例中的用戶(hù)名和密碼為您自己的實(shí)際值。
死信隊(duì)列
標(biāo)題:利用RabbitMQ死信隊(duì)列處理消息的三種情況
在消息隊(duì)列的應(yīng)用中,處理異常情況和消息的延遲成為了一項(xiàng)重要的任務(wù)。RabbitMQ作為一款流行的消息隊(duì)列服務(wù),提供了死信隊(duì)列(Dead Letter Exchange)功能,能夠有效地處理消息被拒絕、消息過(guò)期以及隊(duì)列達(dá)到最大長(zhǎng)度等情況。本文將介紹如何利用RabbitMQ的死信隊(duì)列來(lái)處理這三種情況,并提供了TypeScript示例代碼。
1. 消息被拒絕
當(dāng)消費(fèi)者無(wú)法處理某條消息時(shí),可以選擇將其標(biāo)記為“被拒絕”。這種情況下,我們可以配置RabbitMQ,將被拒絕的消息發(fā)送到一個(gè)死信隊(duì)列,以后再處理。
// 引入amqplib庫(kù) import * as amqp from 'amqplib'; // 連接到RabbitMQ服務(wù)器 const connection = await amqp.connect('amqp://localhost'); // 創(chuàng)建Channel const channel = await connection.createChannel(); // 定義隊(duì)列 const queueName = 'my_queue'; await channel.assertQueue(queueName, { // 設(shè)置死信交換機(jī) deadLetterExchange: 'my_dead_letter_exchange' }); // 消費(fèi)消息 channel.consume(queueName, (msg) => { // 處理消息 if (msg) { // 處理失敗,拒絕消息并將其重新放回隊(duì)列 // channel.reject(msg, true); // 第二個(gè)參數(shù)設(shè)為 true 表示將消息重新放回隊(duì)列 // 處理失敗,拒絕消息 channel.reject(msg, false); // 第二個(gè)參數(shù)設(shè)為 false 表示將消息投遞到死信隊(duì)列 // or 處理失敗,拒絕消息并將其重新放回死信隊(duì)列 channel.nack(msg, false, false); // 第二個(gè)參數(shù)設(shè)為 false 表示不將消息重新放回原隊(duì)列,第三個(gè)參數(shù)設(shè)為 false 表示不拒絕當(dāng)前和之前所有未確認(rèn)的消息 } });
2. 消息過(guò)期
有時(shí)候我們希望消息在一定時(shí)間內(nèi)被處理,如果超過(guò)了這個(gè)時(shí)間,就認(rèn)為消息已經(jīng)過(guò)期。RabbitMQ允許我們?cè)O(shè)置消息的過(guò)期時(shí)間,并在消息過(guò)期后將其發(fā)送到死信隊(duì)列。
// 發(fā)布消息 await channel.sendToQueue(queueName, Buffer.from('Hello'), { expiration: '60000' // 設(shè)置過(guò)期時(shí)間為60秒 });
3. 隊(duì)列達(dá)到最大長(zhǎng)度
為了避免隊(duì)列過(guò)載,我們可以限制隊(duì)列的最大長(zhǎng)度。當(dāng)隊(duì)列達(dá)到最大長(zhǎng)度時(shí),新的消息將被拒絕,并發(fā)送到死信隊(duì)列。
// 定義隊(duì)列 await channel.assertQueue(queueName, { maxLength: 100, // 設(shè)置最大隊(duì)列長(zhǎng)度為100 deadLetterExchange: 'my_dead_letter_exchange' });
通過(guò)以上配置,我們可以利用RabbitMQ的死信隊(duì)列來(lái)處理消息被拒絕、消息過(guò)期以及隊(duì)列達(dá)到最大長(zhǎng)度等情況,保證消息系統(tǒng)的穩(wěn)定性和可靠性。
以上是利用TypeScript示例代碼演示了如何在RabbitMQ中使用死信隊(duì)列。希望這篇文章對(duì)你有所幫助!
延時(shí)隊(duì)列
什么是延時(shí)隊(duì)列?顧名思義:首先它要具有隊(duì)列的特性,再給它附加一個(gè)延遲消費(fèi)隊(duì)列消息的功能,也就是說(shuō)可以指定隊(duì)列中的消息在哪個(gè)時(shí)間點(diǎn)被消費(fèi)。
延時(shí)隊(duì)列在項(xiàng)目中的應(yīng)用還是比較多的,尤其像電商類(lèi)平臺(tái):
1、訂單成功后,在30分鐘內(nèi)沒(méi)有支付,自動(dòng)取消訂單
2、外賣(mài)平臺(tái)發(fā)送訂餐通知,下單成功后60s給用戶(hù)推送短信。
3、如果訂單一直處于某一個(gè)未完結(jié)狀態(tài)時(shí),及時(shí)處理關(guān)單,并退還庫(kù)存
4、淘寶新建商戶(hù)一個(gè)月內(nèi)還沒(méi)上傳商品信息,將凍結(jié)商鋪等
npm install amqplib --save npm install @types/amqplib --save-dev
總結(jié)
rabbitmqadmin 使用入門(mén)
rabbitmqadmin
是 RabbitMQ 的命令行管理工具,可以用于執(zhí)行各種管理任務(wù),如創(chuàng)建隊(duì)列、交換機(jī),查看隊(duì)列狀態(tài)等。以下是一些基本的用法示例:
export RABBITMQ_SERVER=127.0.0.1 export RABBITMQ_PORT=5672 export RABBITMQ_USER=mingcai export RABBITMQ_PASSWORD=password rabbitmqadmin list exchanges
查看 RabbitMQ 服務(wù)器信息:
rabbitmqadmin status
列出所有交換機(jī):
rabbitmqadmin list exchanges
列出所有隊(duì)列:
rabbitmqadmin list queues
創(chuàng)建一個(gè)交換機(jī):
rabbitmqadmin declare exchange name=my_exchange type=direct
創(chuàng)建一個(gè)隊(duì)列:
rabbitmqadmin declare queue name=my_queue
綁定隊(duì)列到交換機(jī):
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
發(fā)送消息到指定交換機(jī):
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
獲取隊(duì)列消息:
rabbitmqadmin get queue=my_queue
這些命令只是一些基本用法示例,rabbitmqadmin
工具支持更多功能和選項(xiàng)。你可以通過(guò)運(yùn)行 rabbitmqadmin help
命令來(lái)獲取更詳細(xì)的幫助信息,或者查看官方文檔以了解更多選項(xiàng)和使用方法。
延時(shí)3秒和8秒全部代碼
// delayProducer.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); const exchange = 'routing_exchange'; // 定義 dlx-exchange const dlxExchangeName = 'dlx-exchange'; // 聲明交換機(jī) await channel.assertExchange(exchange, 'direct', {durable: true}); await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丟失 const dlxqueueBindings= [ { dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast', }, { dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow' } ]; for (const binding of dlxqueueBindings) { // 綁定延遲死信隊(duì)列 await channel.assertQueue(binding.dlxQueueName ); //死信交換機(jī)和死信隊(duì)列綁定 Routing key fast 的消息 await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 將 dlx-queue 綁定到死信交換機(jī) } // 定義隊(duì)列和路由鍵的映射 const queueBindings = [ { queue: '3_second_queue', routingKey: 'fast', arguments: { 'x-message-ttl': 3000, // TTL 設(shè)置為 3 秒 消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)。 'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng) } }, { queue: '8_second_queue', routingKey: 'slow', arguments: { 'x-message-ttl': 8000, // TTL 設(shè)置為 8 秒 消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)。 'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng) } } ]; // 聲明隊(duì)列,并將隊(duì)列綁定到交換機(jī)上 for (const binding of queueBindings) { await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments}); await channel.bindQueue(binding.queue, exchange, binding.routingKey); } for (let i = 0; i < 10; i++) { await new Promise((resolve) => { setTimeout(() => { resolve(1) }, 1000) }) const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log('當(dāng)前中國(guó)時(shí)間:', chinaTime); // 發(fā)送消息到交換機(jī),并設(shè)置不同的路由鍵 await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`); await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`); } // 關(guān)閉連接 setTimeout(async () => { await channel.close(); await connection.close(); }, 10000); // 在 10 秒后關(guān)閉連接 } async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) { channel.publish(exchange, routingKey, Buffer.from(message)); console.log(`Sent message '${message}' with routing key '${routingKey}'`); } setupRouting().catch(console.error);
//消費(fèi)者 dlx-3_second_queue.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); let queue = 'dlx-3_second_queue' // 定義隊(duì)列和路由鍵的映射 await channel.consume(queue, (msg) => { if (msg !== null) { const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`); channel.ack(msg); // 確認(rèn)消息已被處理 } }); } setupRouting().catch(console.error); //dlx-8_second_queue.ts import * as amqp from 'amqplib'; async function setupRouting() { const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1'); const channel = await connection.createChannel(); let queue = 'dlx-8_second_queue' // 定義隊(duì)列和路由鍵的映射 await channel.consume(queue, (msg) => { if (msg !== null) { const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }); console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`); channel.ack(msg); // 確認(rèn)消息已被處理 } }); } setupRouting().catch(console.error);
到此這篇關(guān)于typescript 實(shí)現(xiàn)RabbitMQ死信隊(duì)列和延遲隊(duì)列(訂單10分鐘未付歸還庫(kù)存)的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊(duì)列和延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
2020史上最全I(xiàn)DEA插件總結(jié)(推薦收藏)
這篇文章主要介紹了2020史上最全I(xiàn)DEA插件總結(jié),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2020-06-06Keil?uVision5?5.38官方下載、安裝及注冊(cè)超詳細(xì)圖文教程
這篇文章主要介紹了Keil?uVision5?5.38官方下載、安裝及注冊(cè)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-03-03JetBrains發(fā)布java代碼質(zhì)量檢測(cè)工具Qodana早期預(yù)覽版
這篇文章主要介紹了JetBrains發(fā)布java代碼質(zhì)量檢測(cè)工具Qodana早期預(yù)覽版,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01互聯(lián)網(wǎng)科技大佬推薦的12本必讀書(shū)籍
12本互聯(lián)網(wǎng)科技大佬推薦的必讀書(shū)籍,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03Git配置用戶(hù)簽名方式的拓展示例實(shí)現(xiàn)全面講解
這篇文章主要為大家介紹了Git配置用戶(hù)簽名方式的拓展示例實(shí)現(xiàn)全面講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04本地部署DeepSeek開(kāi)源多模態(tài)大模型Janus-Pro-7B實(shí)操教程
文章介紹了Janus-Pro-7B,一個(gè)由DeepSeek開(kāi)發(fā)的開(kāi)源多模態(tài)AI模型,它在文本和圖像處理方面表現(xiàn)出色,并且具有強(qiáng)大的性能和靈活性,詳細(xì)介紹了如何在本地環(huán)境中部署Janus-Pro-7B,并展示了其在圖像理解和生成、文本生成、多模態(tài)推理等任務(wù)中的應(yīng)用效果,感興趣的朋友一起看看吧2025-02-02