使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能
前言
今天我們來(lái)做個(gè)小試驗(yàn),用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能。
前期準(zhǔn)備,需要安裝好docker、docker-compose的運(yùn)行環(huán)境。
需要安裝RabbitMQ的可以看下面這篇文章。
使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列功能_php技巧_腳本之家 (jb51.net)
一、安裝RabbitMQ延遲插件
1、打開rabbitmq插件官網(wǎng)。
地址如下:Community Plugins | RabbitMQ
找到對(duì)應(yīng)的延遲插件,rabbitmq_delayed_message_exchange,如下圖所示。
2、進(jìn)入RabbitMQ容器,下載對(duì)應(yīng)插件,執(zhí)行如下命令。
docker exec -ti rabbitmq bash cd /opt/rabbitmq/plugins/ wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
如下圖所示,找到自己RabbitMQ對(duì)應(yīng)的版本,下載.ez文件。
3、啟用插件,執(zhí)行如下命令。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、重啟RabbitMQ服務(wù)。
5、檢查RabbitMQ已啟用哪些插件,執(zhí)行如下命令。
rabbitmq-plugins list -e
正常會(huì)返回如下內(nèi)容。
上圖說(shuō)明延遲插件已啟用。
6、至此,RabbitMQ的延遲插件已安裝完成。
二、安裝php-amqplib
1、安裝php composer,執(zhí)行如下命令。
curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer
2、編寫composer.json,內(nèi)容如下,這里下載php-amqplib的版本是3.6。
vim composer.json { "require": { "php-amqplib/php-amqplib": "3.6.*" } }
3、下載包,執(zhí)行如下命令。
composer install
正常情況下,安裝完成的話,當(dāng)前目錄會(huì)多一個(gè)vendor目錄,如下圖所示。
4、至此php-amqplib已安裝完成。
三、測(cè)試驗(yàn)證
1、編寫生產(chǎn)者,代碼內(nèi)容如下。
vim producer.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 連接到RabbitMQ服務(wù)器 $connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明一個(gè)具有延遲插件的自定義交換機(jī) $args = new \PhpAmqpLib\Wire\AMQPTable([ 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設(shè)我們使用 direct 類型的交換機(jī) ]); $channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); $messageBody = 'Hello Max!'; $delay = 5000; // 延遲5秒,單位是毫秒 $headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => $delay]); $message = new AMQPMessage($messageBody, ['delivery_mode' => 2]); $message->set('application_headers', $headers); // 發(fā)布消息到交換機(jī) $channel->basic_publish($message, 'delayed_exchange', 'delayed_key'); echo "Sent {$messageBody} with delay {$delay}ms\n"; $datetime = date('Y/m/d H:i:s'); echo "成功發(fā)送延遲消息 : {$messageBody} , {$datetime} \n"; // 關(guān)閉連接 $channel->close(); $connection->close();
2、編寫消費(fèi)者,代碼內(nèi)容如下。
vim consumer.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 連接到RabbitMQ服務(wù)器 $connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明一個(gè)具有延遲插件的自定義交換機(jī) $args = new \PhpAmqpLib\Wire\AMQPTable([ 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設(shè)我們使用 direct 類型的交換機(jī) ]); $channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); // 聲明死信隊(duì)列 $channel->queue_declare( 'delayed_queue', false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([ 'x-dead-letter-exchange' => 'delayed' ]) ); // 綁定隊(duì)列到交換機(jī) $channel->queue_bind('delayed_queue', 'delayed_exchange', 'delayed_key'); echo "正在等待延遲隊(duì)列消息, waiting... \n"; $callback = function (AMQPMessage $message) { //$headers = $message->get('application_headers'); //$nativeData = $headers->getNativeData(); echo $message->body . '-------' . date('Y/m/d H:i:s') . "\n"; $message->ack(); }; $channel->basic_consume( 'delayed_queue', '', false, false, false, false, $callback ); while ($channel->is_consuming()) { $channel->wait(); } // 關(guān)閉連接 $channel->close(); $connection->close();
3、啟動(dòng)消費(fèi)端,執(zhí)行如下命令。
php consumer.php
正常情況會(huì)返回如下內(nèi)容,等等消息。
4、運(yùn)行生產(chǎn)端代,執(zhí)行如下命令。
php producer.php
正常情況會(huì)返回如下內(nèi)容。
5、再看消費(fèi)端接收到的消息,正常返回如下內(nèi)容。
從上面截圖可以看出時(shí)間剛好是5秒鐘。發(fā)送時(shí)間是08:44:49,消費(fèi)時(shí)間是08:44:54。
6、至此,延遲隊(duì)列的測(cè)試驗(yàn)證已完成。
總結(jié)
用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能,其實(shí)依靠的是RabbitMQ的一個(gè)延遲插件,主要有以下幾個(gè)步驟。
1、安裝RabbitMQ延遲插件。
2、安裝PHP的AMQP擴(kuò)展、php-amqplib代碼包。
3、編寫生產(chǎn)者、消費(fèi)者進(jìn)行驗(yàn)證。
上面的代碼只是做個(gè)簡(jiǎn)單的示例,如果運(yùn)用到實(shí)際的項(xiàng)目當(dāng)中需要做進(jìn)一步的優(yōu)化。
到此這篇關(guān)于使用PHP和RabbitMQ實(shí)現(xiàn)消息隊(duì)列的延遲功能的文章就介紹到這了,更多相關(guān)PHP RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
PHP實(shí)現(xiàn)cookie跨域session共享的方法分析
這篇文章主要介紹了PHP實(shí)現(xiàn)cookie跨域session共享的方法,結(jié)合實(shí)例形式分析了php操作cookie的有效期、跨域、session存儲(chǔ)等相關(guān)操作技巧,需要的朋友可以參考下2019-08-08PHP刪除二維數(shù)組中相同元素及數(shù)組重復(fù)值的方法示例
這篇文章主要介紹了PHP刪除二維數(shù)組中相同元素及數(shù)組重復(fù)值的方法,涉及php針對(duì)數(shù)組的遍歷、判斷、比較等相關(guān)操作技巧,需要的朋友可以參考下2017-05-05php使用ob_start()實(shí)現(xiàn)圖片存入變量的方法
這篇文章主要介紹了php使用ob_start()實(shí)現(xiàn)圖片存入變量的方法,是對(duì)緩存的靈活運(yùn)用,具有既定的參考借鑒價(jià)值,需要的朋友可以參考下2014-11-11PHP面向?qū)ο笪宕笤瓌t之單一職責(zé)原則(SRP)詳解
這篇文章主要介紹了PHP面向?qū)ο笪宕笤瓌t之單一職責(zé)原則(SRP),結(jié)合實(shí)例形式詳細(xì)分析了單一職責(zé)原則(SRP)的概念、原理、定于與使用方法,需要的朋友可以參考下2018-04-04PHP數(shù)據(jù)庫(kù)操作四:mongodb用法分析
這篇文章主要介紹了PHP數(shù)據(jù)庫(kù)操作mongodb用法,結(jié)合實(shí)例形式較為詳細(xì)的分析了MongoDB的功能、安裝、基本命令、使用方法及相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-08-08