PHP實現(xiàn)異步延遲消息隊列的方法詳解
一、前言
需求:電商秒殺場景中,如果用戶下單10分鐘未支付,需要進行庫存歸還
本篇是用PHP+Laravel+RabbitMQ來實現(xiàn)異步延遲消息隊列
二、場景
在電商項目中,當我們下單之后,一般需要 20 分鐘之內(nèi)或者 30 分鐘之內(nèi)付款,否則訂單就會進入異常處理邏輯中,被取消,那么進入到異常處理邏輯中,就可以當成是一個延遲隊列
公司的會議預定系統(tǒng),在會議預定成功后,會在會議開始前半小時通知所有預定該會議的用戶
安全工單超過 24 小時未處理,則自動拉企業(yè)微信群提醒相關(guān)責任人
用戶下單外賣以后,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時
…
很多場景下我們都需要延遲隊列。
本文以 RabbitMQ 為例來和大家聊一聊延遲隊列的玩法。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件來實現(xiàn)定時任務,這種方案較簡單。
三、安裝RabbitMQ延遲隊列插件
官網(wǎng)插件下載地址
我這里直接下載了最新版本,你們根據(jù)自己的rabbitmq版本號進行下載
把下載好的文件移動到rabbitmq的插件plugins下,以我自己的Mac為例子,放到了如下路徑
然后執(zhí)行安裝插件指令,如下
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange
最后重啟rabbitmq服務,并刷新查看exchanges交換機有沒有該插件
如上圖則延遲消息隊列插件安裝完成
四、在Laravel框架中進行使用
新建rabbitmq服務類,包含延遲消息隊列生產(chǎn)消息,和消費消息,如下
代碼如下:
<?php namespace App\Http\Controllers\Service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class RabbitmqServer { private $host = "127.0.0.1"; private $port = 5672; private $user = "guest"; private $password = "guest"; private $msg; private $channel; private $connection; // 過期時間 const TIMEOUT_5_S = 5; // 5s const TIMEOUT_10_S = 10; // 10s private $exchange_logs = "logs"; private $exchange_direct = "direct"; private $exchange_delayed = "delayed"; private $queue_delayed = "delayedQueue"; const EXCHANGETYPE_FANOUT = "fanout"; const EXCHANGETYPE_DIRECT = "direct"; const EXCHANGETYPE_DELAYED = "x-delayed-message"; public function __construct($type = false) { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); // 聲明Exchange $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT])); $this->channel->queue_declare($this->queue_delayed, false, true, false, false); $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed); } /** * delay creat message */ public function createMessageDelay($msg, $time) { $delayConfig = [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $time * 1000]) ]; $msg = new AMQPMessage($msg, $delayConfig); return $msg; } /** * delay send message */ public function sendDelay($msg, $time = self::TIMEOUT_10_S) { $msg = $this->createMessageDelay($msg, $time);; $this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed); $this->channel->close(); $this->connection->close(); } /** * delay consum */ public function consumDelay() { $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; $this->channel->basic_ack($msg->delivery_info['delivery_tag'], false); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } }
比如新建QueueController控制器,進行測試生產(chǎn)消息放到延遲消息隊列中
代碼如下:
<?php namespace App\Http\Controllers\Api\v1; use App\Http\Controllers\Controller; use App\Http\Controllers\Service\RabbitmqServer; use App\Jobs\Queue; use Illuminate\Http\Request; class QueueController extends Controller { // public function index(Request $request) { //比如說現(xiàn)在是下訂單操作 //需求:如果用戶10分鐘之內(nèi)不支付訂單就要取消訂單,并且?guī)齑鏆w還 $msg = $request->post(); $Rabbit = new RabbitmqServer("x-delayed-message"); //第一個參數(shù)發(fā)送的消息,第二個參數(shù)延遲多少秒 $Rabbit->sendDelay(json_encode($msg),5); } }
至此通過接口調(diào)試工具進行模擬生產(chǎn)消息即可
消息生產(chǎn)完畢要進行消費,這里使用的是Laravel的任務調(diào)度,代碼如下
<?php namespace App\Console\Commands; use App\Http\Controllers\Service\RabbitmqServer; use Illuminate\Console\Command; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqConsumerCommand extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'rabbitmq_consumer';//給消費者起個command名稱 /** * The console command description. * * @var string */ protected $description = 'Command description'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * @return int */ public function handle() { $Rabbit = new RabbitmqServer("x-delayed-message"); $Rabbit->consumDelay(); } }
五、執(zhí)行生產(chǎn)消息和消費消息
用postman模擬生產(chǎn)消息,效果如下:
然后消費消息,用一下命令,如果延遲5秒執(zhí)行消費則成功
至此,就完成了rabbitmq異步延遲消息隊列
到此這篇關(guān)于PHP實現(xiàn)異步延遲消息隊列的方法詳解的文章就介紹到這了,更多相關(guān)PHP異步延遲消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
php使用str_replace替換多維數(shù)組的實現(xiàn)方法分析
這篇文章主要介紹了php使用str_replace替換多維數(shù)組的實現(xiàn)方法,結(jié)合具體實例對比分析了php針對多維數(shù)組的遍歷與替換操作相關(guān)實現(xiàn)技巧與注意事項,需要的朋友可以參考下2017-06-06PHP實現(xiàn)大數(shù)(浮點數(shù))取余的方法
這篇文章主要介紹了PHP實現(xiàn)大數(shù)(浮點數(shù))取余的方法,結(jié)合實例形式分析了php數(shù)學運算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02phpExcel導出大量數(shù)據(jù)出現(xiàn)內(nèi)存溢出錯誤的解決方法
我們經(jīng)常會使用phpExcel導入或?qū)離ls文件,但是如果一次導出數(shù)據(jù)比較大就會出現(xiàn)內(nèi)存溢出錯誤,下面我來總結(jié)解決辦法2013-02-02