欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

typescript?實(shí)現(xiàn)RabbitMQ死信隊(duì)列和延遲隊(duì)列(訂單10分鐘未付歸還庫(kù)存)的過(guò)程

 更新時(shí)間:2024年03月29日 10:35:10   作者:熊明才  
RabbitMQ作為一款流行的消息隊(duì)列服務(wù),提供了死信隊(duì)列(Dead?Letter?Exchange)功能,能夠有效地處理消息被拒絕、消息過(guò)期以及隊(duì)列達(dá)到最大長(zhǎng)度等情況,本文將介紹如何利用RabbitMQ的死信隊(duì)列來(lái)處理這三種情況,并提供了TypeScript示例代碼,需要的朋友可以參考下

Manjaro安裝RabbitMQ

安裝

sudo pacman -S rabbitmq rabbitmqadmin

啟動(dòng)管理模塊

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默認(rèn)用戶(hù)名和密碼都是guest。
要使用 rabbitmqctl 命令添加用戶(hù)并分配權(quán)限,您可以按照以下步驟進(jìn)行操作:

1.添加用戶(hù)

rabbitmqctl add_user mingcai password

請(qǐng)將 password 替換為您想要設(shè)置的實(shí)際密碼。

2.分配權(quán)限

rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

這個(gè)命令將用戶(hù) mingcai 授予對(duì)所有虛擬主機(jī)的所有資源的讀、寫(xiě)和管理權(quán)限。如果您只想給予特定權(quán)限,請(qǐng)適當(dāng)調(diào)整正則表達(dá)式 ".*",以授予適當(dāng)?shù)臋?quán)限。例如,如果您只想給予讀取權(quán)限,可以使用 "^amq\."。

3.可選步驟:設(shè)置用戶(hù)角色

您可以將用戶(hù)分配給不同的角色,以便更好地管理權(quán)限。例如,您可以將用戶(hù)添加到 administrator 角色以獲取管理員權(quán)限:

rabbitmqctl set_user_tags mingcai administrator

這樣,用戶(hù) mingcai 就被賦予了管理員權(quán)限。

請(qǐng)確保您具有適當(dāng)?shù)臋?quán)限來(lái)執(zhí)行這些操作,并確保替換示例中的用戶(hù)名和密碼為您自己的實(shí)際值。

死信隊(duì)列

標(biāo)題:利用RabbitMQ死信隊(duì)列處理消息的三種情況

在消息隊(duì)列的應(yīng)用中,處理異常情況和消息的延遲成為了一項(xiàng)重要的任務(wù)。RabbitMQ作為一款流行的消息隊(duì)列服務(wù),提供了死信隊(duì)列(Dead Letter Exchange)功能,能夠有效地處理消息被拒絕、消息過(guò)期以及隊(duì)列達(dá)到最大長(zhǎng)度等情況。本文將介紹如何利用RabbitMQ的死信隊(duì)列來(lái)處理這三種情況,并提供了TypeScript示例代碼。

1. 消息被拒絕

當(dāng)消費(fèi)者無(wú)法處理某條消息時(shí),可以選擇將其標(biāo)記為“被拒絕”。這種情況下,我們可以配置RabbitMQ,將被拒絕的消息發(fā)送到一個(gè)死信隊(duì)列,以后再處理。

// 引入amqplib庫(kù)
import * as amqp from 'amqplib';
// 連接到RabbitMQ服務(wù)器
const connection = await amqp.connect('amqp://localhost');
// 創(chuàng)建Channel
const channel = await connection.createChannel();
// 定義隊(duì)列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {
  // 設(shè)置死信交換機(jī)
  deadLetterExchange: 'my_dead_letter_exchange'
});
// 消費(fèi)消息
channel.consume(queueName, (msg) => {
  // 處理消息
  if (msg) {
    // 處理失敗,拒絕消息并將其重新放回隊(duì)列
    // channel.reject(msg, true); // 第二個(gè)參數(shù)設(shè)為 true 表示將消息重新放回隊(duì)列
    // 處理失敗,拒絕消息
    channel.reject(msg, false); // 第二個(gè)參數(shù)設(shè)為 false 表示將消息投遞到死信隊(duì)列
     // or 處理失敗,拒絕消息并將其重新放回死信隊(duì)列
    channel.nack(msg, false, false); // 第二個(gè)參數(shù)設(shè)為 false 表示不將消息重新放回原隊(duì)列,第三個(gè)參數(shù)設(shè)為 false 表示不拒絕當(dāng)前和之前所有未確認(rèn)的消息
  }
});

2. 消息過(guò)期

有時(shí)候我們希望消息在一定時(shí)間內(nèi)被處理,如果超過(guò)了這個(gè)時(shí)間,就認(rèn)為消息已經(jīng)過(guò)期。RabbitMQ允許我們?cè)O(shè)置消息的過(guò)期時(shí)間,并在消息過(guò)期后將其發(fā)送到死信隊(duì)列。

// 發(fā)布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {
  expiration: '60000' // 設(shè)置過(guò)期時(shí)間為60秒
});

3. 隊(duì)列達(dá)到最大長(zhǎng)度

為了避免隊(duì)列過(guò)載,我們可以限制隊(duì)列的最大長(zhǎng)度。當(dāng)隊(duì)列達(dá)到最大長(zhǎng)度時(shí),新的消息將被拒絕,并發(fā)送到死信隊(duì)列。

// 定義隊(duì)列
await channel.assertQueue(queueName, {
  maxLength: 100, // 設(shè)置最大隊(duì)列長(zhǎng)度為100
  deadLetterExchange: 'my_dead_letter_exchange'
});

通過(guò)以上配置,我們可以利用RabbitMQ的死信隊(duì)列來(lái)處理消息被拒絕、消息過(guò)期以及隊(duì)列達(dá)到最大長(zhǎng)度等情況,保證消息系統(tǒng)的穩(wěn)定性和可靠性。

以上是利用TypeScript示例代碼演示了如何在RabbitMQ中使用死信隊(duì)列。希望這篇文章對(duì)你有所幫助!

延時(shí)隊(duì)列

什么是延時(shí)隊(duì)列?顧名思義:首先它要具有隊(duì)列的特性,再給它附加一個(gè)延遲消費(fèi)隊(duì)列消息的功能,也就是說(shuō)可以指定隊(duì)列中的消息在哪個(gè)時(shí)間點(diǎn)被消費(fèi)。
延時(shí)隊(duì)列在項(xiàng)目中的應(yīng)用還是比較多的,尤其像電商類(lèi)平臺(tái):
1、訂單成功后,在30分鐘內(nèi)沒(méi)有支付,自動(dòng)取消訂單
2、外賣(mài)平臺(tái)發(fā)送訂餐通知,下單成功后60s給用戶(hù)推送短信。
3、如果訂單一直處于某一個(gè)未完結(jié)狀態(tài)時(shí),及時(shí)處理關(guān)單,并退還庫(kù)存
4、淘寶新建商戶(hù)一個(gè)月內(nèi)還沒(méi)上傳商品信息,將凍結(jié)商鋪等

npm install amqplib --save
npm install @types/amqplib --save-dev

總結(jié)

rabbitmqadmin 使用入門(mén)

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于執(zhí)行各種管理任務(wù),如創(chuàng)建隊(duì)列、交換機(jī),查看隊(duì)列狀態(tài)等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=password
rabbitmqadmin list exchanges

查看 RabbitMQ 服務(wù)器信息

rabbitmqadmin status

列出所有交換機(jī)

rabbitmqadmin list exchanges

列出所有隊(duì)列

rabbitmqadmin list queues

創(chuàng)建一個(gè)交換機(jī)

rabbitmqadmin declare exchange name=my_exchange type=direct

創(chuàng)建一個(gè)隊(duì)列

rabbitmqadmin declare queue name=my_queue

綁定隊(duì)列到交換機(jī)

rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key

發(fā)送消息到指定交換機(jī)

rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"

獲取隊(duì)列消息

rabbitmqadmin get queue=my_queue

這些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和選項(xiàng)。你可以通過(guò)運(yùn)行 rabbitmqadmin help 命令來(lái)獲取更詳細(xì)的幫助信息,或者查看官方文檔以了解更多選項(xiàng)和使用方法。

延時(shí)3秒和8秒全部代碼

// delayProducer.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    const exchange = 'routing_exchange';
    // 定義 dlx-exchange
    const dlxExchangeName = 'dlx-exchange';
    // 聲明交換機(jī)
    await channel.assertExchange(exchange, 'direct', {durable: true});
    await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丟失
    const dlxqueueBindings= [
        {
            dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',
        },
        {
            dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'
        }
    ];
    for (const binding of dlxqueueBindings) {
        // 綁定延遲死信隊(duì)列
        await channel.assertQueue(binding.dlxQueueName );
        //死信交換機(jī)和死信隊(duì)列綁定 Routing key fast 的消息
        await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 將 dlx-queue 綁定到死信交換機(jī)
    }
    // 定義隊(duì)列和路由鍵的映射
    const queueBindings = [
        {
            queue: '3_second_queue', routingKey: 'fast', arguments: {
                'x-message-ttl': 3000, // TTL 設(shè)置為 3 秒 消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)
            }
        },
        {
            queue: '8_second_queue', routingKey: 'slow', arguments: {
                'x-message-ttl': 8000, // TTL 設(shè)置為 8 秒 消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒絕或過(guò)期時(shí)將重新發(fā)布到的交換器的可選名稱(chēng)
            }
        }
    ];
    // 聲明隊(duì)列,并將隊(duì)列綁定到交換機(jī)上
    for (const binding of queueBindings) {
        await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});
        await channel.bindQueue(binding.queue, exchange, binding.routingKey);
    }
    for (let i = 0; i < 10; i++) {
        await new Promise((resolve) => {
            setTimeout(() => {
                resolve(1)
            }, 1000)
        })
        const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
        console.log('當(dāng)前中國(guó)時(shí)間:', chinaTime);
        // 發(fā)送消息到交換機(jī),并設(shè)置不同的路由鍵
        await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);
        await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);
    }
    // 關(guān)閉連接
    setTimeout(async () => {
        await channel.close();
        await connection.close();
    }, 10000); // 在 10 秒后關(guān)閉連接
}
async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {
    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}
setupRouting().catch(console.error);
//消費(fèi)者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-3_second_queue'
    // 定義隊(duì)列和路由鍵的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 確認(rèn)消息已被處理
        }
    });
}
setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';
async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();
    let queue = 'dlx-8_second_queue'
    // 定義隊(duì)列和路由鍵的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 確認(rèn)消息已被處理
        }
    });
}
setupRouting().catch(console.error);

到此這篇關(guān)于typescript 實(shí)現(xiàn)RabbitMQ死信隊(duì)列和延遲隊(duì)列(訂單10分鐘未付歸還庫(kù)存)的文章就介紹到這了,更多相關(guān)RabbitMQ死信隊(duì)列和延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 軟件測(cè)試面試如何測(cè)試一個(gè)杯子

    軟件測(cè)試面試如何測(cè)試一個(gè)杯子

    本文主要介紹軟件測(cè)試面試如何測(cè)試一個(gè)杯子,這里幫大家整理了詳細(xì)的面試資料,和面試需要準(zhǔn)備的知識(shí)點(diǎn),有興趣的小伙伴可以參考下
    2016-08-08
  • 2020史上最全I(xiàn)DEA插件總結(jié)(推薦收藏)

    2020史上最全I(xiàn)DEA插件總結(jié)(推薦收藏)

    這篇文章主要介紹了2020史上最全I(xiàn)DEA插件總結(jié),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2020-06-06
  • Keil?uVision5?5.38官方下載、安裝及注冊(cè)超詳細(xì)圖文教程

    Keil?uVision5?5.38官方下載、安裝及注冊(cè)超詳細(xì)圖文教程

    這篇文章主要介紹了Keil?uVision5?5.38官方下載、安裝及注冊(cè)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-03-03
  • JetBrains發(fā)布java代碼質(zhì)量檢測(cè)工具Qodana早期預(yù)覽版

    JetBrains發(fā)布java代碼質(zhì)量檢測(cè)工具Qodana早期預(yù)覽版

    這篇文章主要介紹了JetBrains發(fā)布java代碼質(zhì)量檢測(cè)工具Qodana早期預(yù)覽版,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01
  • XMind?2021激活碼及安裝步驟

    XMind?2021激活碼及安裝步驟

    XMind?是一款非常實(shí)用的商業(yè)思維導(dǎo)圖,應(yīng)用Eclipse?RCP?開(kāi)發(fā)架構(gòu),打造易用、高效的可視化思維,強(qiáng)調(diào)該功能的可擴(kuò)展、跨平臺(tái)、穩(wěn)定性和性能,致力于幫助用戶(hù)提高生產(chǎn)率。本文給大家?guī)?lái)了XMind?2021激活碼,需要的朋友可以參考下
    2021-12-12
  • Git配置用戶(hù)簽名方式及原因說(shuō)明

    Git配置用戶(hù)簽名方式及原因說(shuō)明

    這篇文章主要為大家介紹了Git配置用戶(hù)簽名方式及原因說(shuō)明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-04-04
  • 互聯(lián)網(wǎng)科技大佬推薦的12本必讀書(shū)籍

    互聯(lián)網(wǎng)科技大佬推薦的12本必讀書(shū)籍

    12本互聯(lián)網(wǎng)科技大佬推薦的必讀書(shū)籍,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • 谷歌師兄的算法刷題筆記

    谷歌師兄的算法刷題筆記

    這篇文章主要介紹了谷歌師兄的算法刷題筆記的相關(guān)資料,需要的朋友可以參考下
    2021-02-02
  • Git配置用戶(hù)簽名方式的拓展示例實(shí)現(xiàn)全面講解

    Git配置用戶(hù)簽名方式的拓展示例實(shí)現(xiàn)全面講解

    這篇文章主要為大家介紹了Git配置用戶(hù)簽名方式的拓展示例實(shí)現(xiàn)全面講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-04-04
  • 本地部署DeepSeek開(kāi)源多模態(tài)大模型Janus-Pro-7B實(shí)操教程

    本地部署DeepSeek開(kāi)源多模態(tài)大模型Janus-Pro-7B實(shí)操教程

    文章介紹了Janus-Pro-7B,一個(gè)由DeepSeek開(kāi)發(fā)的開(kāi)源多模態(tài)AI模型,它在文本和圖像處理方面表現(xiàn)出色,并且具有強(qiáng)大的性能和靈活性,詳細(xì)介紹了如何在本地環(huán)境中部署Janus-Pro-7B,并展示了其在圖像理解和生成、文本生成、多模態(tài)推理等任務(wù)中的應(yīng)用效果,感興趣的朋友一起看看吧
    2025-02-02

最新評(píng)論