TP5使用RabbitMQ實現(xiàn)消息隊列的項目實踐
在使用 RabbitMQ 之前,你要安裝好 RabbitMQ 服務,具體安裝方法可以參考 windows下安裝RabbitMQ
1、安裝擴展
進入TP5 更目錄下,輸入命令安裝:
composer require php-amqplib/php-amqplib
2、自定義命令
TP5 的自定義命令,這里也簡單說下。
第一步:
創(chuàng)建命令類文件,新建 application/api/command/Test.php。
<?php namespace app\api\command; use think\console\Command; use think\console\Input; use think\console\Output; /** ?* 自定義命令測試 ?*/ class Test extends Command { ?? ?/** ?? ? * 配置 ?? ? */ ?? ?protected function configure() ? ? { ? ? ?? ?// 設置命令的名稱和描述 ? ? ? ? $this->setName('test')->setDescription('這是一個測試命令'); ? ? } ? ? /** ? ? ?* 執(zhí)行 ? ? ?*/ ? ? protected function execute(Input $input, Output $output) ? ? { ? ? ? ? $output->writeln("測試命令"); ? ? } }
這個文件定義了一個叫test的命令,備注為 這是一個測試命令,執(zhí)行命令會輸出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
<?php return [ ?? ?'app\api\command\Test', ];
第三步:
測試命令,在項目根目錄下輸入命令:
php think test
回車運行之后輸出:
test command
到這里,自定義命令就結(jié)束了,test命令就自定義成功了。
3、rabbitmq服務端
下來我們自定義 RabbitMQ 啟動命令,守護進程運行,啟動 rabbirmq 服務端接收消息。
在 application/api/command 目錄下,新建 Ramq.php 文件,在執(zhí)行命令的方法中,調(diào)用 RabbitMQ 啟動守護進程方法即可。
<?php namespace app\api\command; use PhpAmqpLib\Connection\AMQPStreamConnection; use think\console\Command; use think\console\Input; use think\console\Output; /** ?* RabbitMq 啟動命令 ?*/ class Ramq extends Command { ?? ?protected $consumerTag = 'customer'; ? ? protected $exchange = 'xcuser'; ? ? protected $queue = 'xcmsg'; ?? ?protected function configure() ? ? { ? ? ? ? $this->setName('ramq')->setDescription('rabbitmq'); ? ? } ? ? protected function execute(Input $input, Output $output) ? ? { ? ? ? ? $output->writeln("消息隊列開始"); ? ? ? ? $this->start(); ? ? ? ? // 指令輸出 ? ? ? ? $output->writeln('消費隊列結(jié)束'); ? ? } ? ? /** ? ? ?* 關閉 ? ? ?*/ ? ? function shutdown($channel, $connection) ? ? { ? ? ? ? $channel->close(); ? ? ? ? $connection->close(); ? ? } ? ? /** ? ? ?* 回調(diào)處理信息 ? ? ?*/ ? ? function process_message($message) ? ? { ? ? ? ? if ($message->body !== 'quit') { ? ? ? ? ? ? echo $message->body; ? ? ? ? } ? ? ? ? //手動應答 ? ? ? ? $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); ? ? ? ? if ($message->body === 'quit') { ? ? ? ? ? ? $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); ? ? ? ? } ? ? } ? ? /** ? ? ?* 啟動 守護進程運行 ? ? ?*/ ? ? public function start() ? ? { ? ? ? ? $host = '127.0.0.1'; ? ? ? ? $port = 5672; ? ? ? ? $user = 'guest'; ? ? ? ? $pwd = 'guest'; ? ? ? ? $vhost = '/'; ? ? ? ? $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); ? ? ? ? $channel = $connection->channel(); ? ? ? ? $channel->queue_declare($this->queue, false, true, false, false); ? ? ? ? $channel->exchange_declare($this->exchange, 'direct', false, true, false); ? ? ? ? $channel->queue_bind($this->queue, $this->exchange); ? ? ? ? $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message')); ? ? ? ? register_shutdown_function(array($this, 'shutdown'), $channel, $connection); ? ? ? ? while (count($channel->callbacks)) { ? ? ? ? ? ? $channel->wait(); ? ? ? ? } ? ? } }
在application/command.php文件中,添加rabbitmq自定義命令。
return [ 'app\api\command\Ramq',// rabbitmq ];
4、發(fā)送端
最后,我們再寫發(fā)送消息的控制器,實現(xiàn)消息隊列,具體代碼如下:
<?php namespace app\api\controller; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\Controller; /** ?* 發(fā)送端 ?*/ class MessageQueue extends Controller { ?? ?const exchange = 'xcuser'; ? ? const queue = 'xcmsg'; ? ? /** ? ? ?* 發(fā)送消息 ? ? ?*/ ? ? public function pushMessage($data) ? ? { ? ? ? ? $host = '127.0.0.1'; ? ? ? ? $port = 5672; ? ? ? ? $user = 'guest'; ? ? ? ? $pwd = 'guest'; ? ? ? ? $vhost = '/'; ? ? ? ? $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); ? ? ? ? $channel = $connection->channel(); ? ? ? ? $channel->exchange_declare(self::exchange, 'direct', false, true, false); ? ? ? ? $channel->queue_declare(self::queue, false, true, false, false); ? ? ? ? $channel->queue_bind(self::queue, self::exchange); ? ? ? ? $messageBody = $data; ? ? ? ? $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); ? ? ? ? $channel->basic_publish($message, self::exchange); ? ? ? ? $channel->close(); ? ? ? ? $connection->close(); ? ? ? ? echo 'ok'; ? ? } ? ? /** ? ? ?* 執(zhí)行 ? ? ?*/ ? ? public function index() ? ? { ? ? ? ? $data = json_encode(['msg' => '測試數(shù)據(jù)', 'id' => '15']); ? ? ? ? $this->pushMessage($data); ? ? } }
5、驗證
先執(zhí)行自定義命令,啟動 rabbitmq 守護進程。在項目更目錄下打開命令行,輸入下面命令:
php think ramq
然后在瀏覽器訪問發(fā)送信息的方法,http://你的域名/api/message/index,你發(fā)送一次消息,在命令行就會輸出一條消息。這樣我們就用 RabbitMQ 實現(xiàn)了一個簡單的消息隊列。
到此這篇關于TP5使用RabbitMQ實現(xiàn)消息隊列的項目實踐的文章就介紹到這了,更多相關TP5 RabbitMQ消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
php打亂數(shù)組二維數(shù)組多維數(shù)組的簡單實例
下面小編就為大家?guī)硪黄猵hp打亂數(shù)組二維數(shù)組多維數(shù)組的簡單實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-06-06Laravel執(zhí)行migrate命令提示:No such file or directory的解決方法
這篇文章主要介紹了Laravel執(zhí)行migrate命令提示:No such file or directory的解決方法,分析了執(zhí)行migrate命令出現(xiàn)錯誤的原因與相關的解決方法,需要的朋友可以參考下2016-03-03