PHP實現(xiàn)異步延遲消息隊列的方法詳解
一、前言
需求:電商秒殺場景中,如果用戶下單10分鐘未支付,需要進行庫存歸還
本篇是用PHP+Laravel+RabbitMQ來實現(xiàn)異步延遲消息隊列
二、場景
在電商項目中,當我們下單之后,一般需要 20 分鐘之內(nèi)或者 30 分鐘之內(nèi)付款,否則訂單就會進入異常處理邏輯中,被取消,那么進入到異常處理邏輯中,就可以當成是一個延遲隊列
公司的會議預(yù)定系統(tǒng),在會議預(yù)定成功后,會在會議開始前半小時通知所有預(yù)定該會議的用戶
安全工單超過 24 小時未處理,則自動拉企業(yè)微信群提醒相關(guān)責(zé)任人
用戶下單外賣以后,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時
…
很多場景下我們都需要延遲隊列。
本文以 RabbitMQ 為例來和大家聊一聊延遲隊列的玩法。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件來實現(xiàn)定時任務(wù),這種方案較簡單。
三、安裝RabbitMQ延遲隊列插件
官網(wǎng)插件下載地址

我這里直接下載了最新版本,你們根據(jù)自己的rabbitmq版本號進行下載

把下載好的文件移動到rabbitmq的插件plugins下,以我自己的Mac為例子,放到了如下路徑

然后執(zhí)行安裝插件指令,如下
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange

最后重啟rabbitmq服務(wù),并刷新查看exchanges交換機有沒有該插件

如上圖則延遲消息隊列插件安裝完成
四、在Laravel框架中進行使用
新建rabbitmq服務(wù)類,包含延遲消息隊列生產(chǎn)消息,和消費消息,如下

代碼如下:
<?php
namespace App\Http\Controllers\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitmqServer
{
private $host = "127.0.0.1";
private $port = 5672;
private $user = "guest";
private $password = "guest";
private $msg;
private $channel;
private $connection;
// 過期時間
const TIMEOUT_5_S = 5; // 5s
const TIMEOUT_10_S = 10; // 10s
private $exchange_logs = "logs";
private $exchange_direct = "direct";
private $exchange_delayed = "delayed";
private $queue_delayed = "delayedQueue";
const EXCHANGETYPE_FANOUT = "fanout";
const EXCHANGETYPE_DIRECT = "direct";
const EXCHANGETYPE_DELAYED = "x-delayed-message";
public function __construct($type = false)
{
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
$this->channel = $this->connection->channel();
// 聲明Exchange
$this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false, false, false, new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
$this->channel->queue_declare($this->queue_delayed, false, true, false, false);
$this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed, $this->queue_delayed);
}
/**
* delay creat message
*/
public function createMessageDelay($msg, $time)
{
$delayConfig = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
];
$msg = new AMQPMessage($msg, $delayConfig);
return $msg;
}
/**
* delay send message
*/
public function sendDelay($msg, $time = self::TIMEOUT_10_S)
{
$msg = $this->createMessageDelay($msg, $time);;
$this->channel->basic_publish($msg, $this->exchange_delayed, $this->queue_delayed);
$this->channel->close();
$this->connection->close();
}
/**
* delay consum
*/
public function consumDelay()
{
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
$this->channel->basic_ack($msg->delivery_info['delivery_tag'], false);
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
$this->channel->close();
$this->connection->close();
}
}比如新建QueueController控制器,進行測試生產(chǎn)消息放到延遲消息隊列中

代碼如下:
<?php
namespace App\Http\Controllers\Api\v1;
use App\Http\Controllers\Controller;
use App\Http\Controllers\Service\RabbitmqServer;
use App\Jobs\Queue;
use Illuminate\Http\Request;
class QueueController extends Controller
{
//
public function index(Request $request)
{
//比如說現(xiàn)在是下訂單操作
//需求:如果用戶10分鐘之內(nèi)不支付訂單就要取消訂單,并且?guī)齑鏆w還
$msg = $request->post();
$Rabbit = new RabbitmqServer("x-delayed-message");
//第一個參數(shù)發(fā)送的消息,第二個參數(shù)延遲多少秒
$Rabbit->sendDelay(json_encode($msg),5);
}
}至此通過接口調(diào)試工具進行模擬生產(chǎn)消息即可
消息生產(chǎn)完畢要進行消費,這里使用的是Laravel的任務(wù)調(diào)度,代碼如下

<?php
namespace App\Console\Commands;
use App\Http\Controllers\Service\RabbitmqServer;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq_consumer';//給消費者起個command名稱
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
* @return int
*/
public function handle()
{
$Rabbit = new RabbitmqServer("x-delayed-message");
$Rabbit->consumDelay();
}
}五、執(zhí)行生產(chǎn)消息和消費消息
用postman模擬生產(chǎn)消息,效果如下:

然后消費消息,用一下命令,如果延遲5秒執(zhí)行消費則成功

至此,就完成了rabbitmq異步延遲消息隊列
到此這篇關(guān)于PHP實現(xiàn)異步延遲消息隊列的方法詳解的文章就介紹到這了,更多相關(guān)PHP異步延遲消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
php使用str_replace替換多維數(shù)組的實現(xiàn)方法分析
這篇文章主要介紹了php使用str_replace替換多維數(shù)組的實現(xiàn)方法,結(jié)合具體實例對比分析了php針對多維數(shù)組的遍歷與替換操作相關(guān)實現(xiàn)技巧與注意事項,需要的朋友可以參考下2017-06-06
PHP實現(xiàn)大數(shù)(浮點數(shù))取余的方法
這篇文章主要介紹了PHP實現(xiàn)大數(shù)(浮點數(shù))取余的方法,結(jié)合實例形式分析了php數(shù)學(xué)運算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02
phpExcel導(dǎo)出大量數(shù)據(jù)出現(xiàn)內(nèi)存溢出錯誤的解決方法
我們經(jīng)常會使用phpExcel導(dǎo)入或?qū)離ls文件,但是如果一次導(dǎo)出數(shù)據(jù)比較大就會出現(xiàn)內(nèi)存溢出錯誤,下面我來總結(jié)解決辦法2013-02-02

