欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能

 更新時(shí)間:2024年03月24日 10:50:42   作者:ayzen1988  
這篇文章主要介紹了如何使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用PHP具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前言

今天我們來(lái)做個(gè)小試驗(yàn),用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能。

前期準(zhǔn)備,需要安裝好docker、docker-compose的運(yùn)行環(huán)境。

需要安裝RabbitMQ的可以看下面這篇文章。

使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列功能_php技巧_腳本之家 (jb51.net)

一、安裝RabbitMQ延遲插件

1、打開rabbitmq插件官網(wǎng)。

地址如下:Community Plugins | RabbitMQ

找到對(duì)應(yīng)的延遲插件,rabbitmq_delayed_message_exchange,如下圖所示。

2、進(jìn)入RabbitMQ容器,下載對(duì)應(yīng)插件,執(zhí)行如下命令。

docker exec -ti rabbitmq bash
cd /opt/rabbitmq/plugins/
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

如下圖所示,找到自己RabbitMQ對(duì)應(yīng)的版本,下載.ez文件。

3、啟用插件,執(zhí)行如下命令。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重啟RabbitMQ服務(wù)。

5、檢查RabbitMQ已啟用哪些插件,執(zhí)行如下命令。

rabbitmq-plugins list -e

正常會(huì)返回如下內(nèi)容。

上圖說(shuō)明延遲插件已啟用。

6、至此,RabbitMQ的延遲插件已安裝完成。

二、安裝php-amqplib

1、安裝php composer,執(zhí)行如下命令。

curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer

2、編寫composer.json,內(nèi)容如下,這里下載php-amqplib的版本是3.6。

vim composer.json
{
    "require": {
        "php-amqplib/php-amqplib": "3.6.*"
    }
}

3、下載包,執(zhí)行如下命令。

composer install

正常情況下,安裝完成的話,當(dāng)前目錄會(huì)多一個(gè)vendor目錄,如下圖所示。

4、至此php-amqplib已安裝完成。

三、測(cè)試驗(yàn)證

1、編寫生產(chǎn)者,代碼內(nèi)容如下。

vim producer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
// 連接到RabbitMQ服務(wù)器
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();
 
// 聲明一個(gè)具有延遲插件的自定義交換機(jī)
$args = new \PhpAmqpLib\Wire\AMQPTable([
    'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設(shè)我們使用 direct 類型的交換機(jī)
]);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
 
$messageBody = 'Hello Max!';
$delay = 5000; // 延遲5秒,單位是毫秒
$headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => $delay]);
$message = new AMQPMessage($messageBody, ['delivery_mode' => 2]);
$message->set('application_headers', $headers);
 
// 發(fā)布消息到交換機(jī)
$channel->basic_publish($message, 'delayed_exchange', 'delayed_key');
 
echo "Sent {$messageBody} with delay {$delay}ms\n";
$datetime = date('Y/m/d H:i:s');
echo "成功發(fā)送延遲消息 : {$messageBody} , {$datetime} \n";
 
// 關(guān)閉連接
$channel->close();
$connection->close();

2、編寫消費(fèi)者,代碼內(nèi)容如下。

vim consumer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
// 連接到RabbitMQ服務(wù)器
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();
 
// 聲明一個(gè)具有延遲插件的自定義交換機(jī)
$args = new \PhpAmqpLib\Wire\AMQPTable([
    'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設(shè)我們使用 direct 類型的交換機(jī)
]);
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args);
 
// 聲明死信隊(duì)列
$channel->queue_declare(
    'delayed_queue',
    false,
    true,
    false,
    false,
    false,
    new \PhpAmqpLib\Wire\AMQPTable([
        'x-dead-letter-exchange' => 'delayed'
    ])
);
 
// 綁定隊(duì)列到交換機(jī)
$channel->queue_bind('delayed_queue', 'delayed_exchange', 'delayed_key');
 
echo "正在等待延遲隊(duì)列消息, waiting... \n";
 
$callback = function (AMQPMessage $message) {
    //$headers = $message->get('application_headers');
    //$nativeData = $headers->getNativeData();
    echo $message->body . '-------' . date('Y/m/d H:i:s') . "\n";
    $message->ack();
};
 
$channel->basic_consume(
    'delayed_queue',
    '',
    false,
    false,
    false,
    false,
    $callback
);
 
while ($channel->is_consuming()) {
    $channel->wait();
}
 
// 關(guān)閉連接
$channel->close();
$connection->close();

3、啟動(dòng)消費(fèi)端,執(zhí)行如下命令。

php consumer.php

正常情況會(huì)返回如下內(nèi)容,等等消息。

4、運(yùn)行生產(chǎn)端代,執(zhí)行如下命令。

php producer.php

正常情況會(huì)返回如下內(nèi)容。

5、再看消費(fèi)端接收到的消息,正常返回如下內(nèi)容。

從上面截圖可以看出時(shí)間剛好是5秒鐘。發(fā)送時(shí)間是08:44:49,消費(fèi)時(shí)間是08:44:54。

6、至此,延遲隊(duì)列的測(cè)試驗(yàn)證已完成。

總結(jié)

用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能,其實(shí)依靠的是RabbitMQ的一個(gè)延遲插件,主要有以下幾個(gè)步驟。

1、安裝RabbitMQ延遲插件。

2、安裝PHP的AMQP擴(kuò)展、php-amqplib代碼包。

3、編寫生產(chǎn)者、消費(fèi)者進(jìn)行驗(yàn)證。

上面的代碼只是做個(gè)簡(jiǎn)單的示例,如果運(yùn)用到實(shí)際的項(xiàng)目當(dāng)中需要做進(jìn)一步的優(yōu)化。

到此這篇關(guān)于使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能的文章就介紹到這了,更多相關(guān)PHP RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論