如何用RabbitMQ和Swoole實(shí)現(xiàn)一個異步任務(wù)系統(tǒng)
系統(tǒng)介紹
從圖中可以看到,我們這個系統(tǒng)是一個基于事件的異步任務(wù)系統(tǒng)。就是說當(dāng)一個事件產(chǎn)生時,生產(chǎn)者將事件拋給調(diào)度器,調(diào)度器負(fù)責(zé)查詢事件下有哪些任務(wù),然后將這些任務(wù)丟到相應(yīng)的隊(duì)列中,最后由消費(fèi)者消費(fèi)任務(wù)隊(duì)列中的任務(wù)。
在整個系統(tǒng)中主要分為三大部分
1.事件生產(chǎn)者,即產(chǎn)生消息事件的一方。
2.任務(wù)調(diào)度器(Scheduler),負(fù)責(zé)注冊事件并調(diào)度任務(wù)。
3.消費(fèi)者(Worker),負(fù)責(zé)消費(fèi)任務(wù)隊(duì)列中的任務(wù)。
事件生產(chǎn)者
事件生產(chǎn)者很簡單,在業(yè)務(wù)系統(tǒng)中直接調(diào)用即可,代碼如下。
<?php require_once DIR.'/../autoload.php'; use Asynclib\Ebats\Event; try{ $event = new Event('order_paied'); //定義事件 $event->setOptions(['order_id' => 'FB138020392193312']); //事件產(chǎn)生的參數(shù) $event->publish(); }catch (Exception $exc){ echo $exc->getMessage(); }
任務(wù)調(diào)度器
調(diào)度器主要做兩件事,一是注冊事件,另一個是調(diào)度任務(wù)。
注冊事件代碼如下:
//注冊事件 EventManager::register('order_create', 'closeOrder', 'demo', 10);//關(guān)閉未付款訂單(延遲任務(wù)) EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動發(fā)貨
這樣就注冊了兩個事件,事件下各有一個任務(wù)。
具體調(diào)度部分代碼很簡單,就不多贅述,有興趣的可以去看代碼。
消費(fèi)者
重頭戲來了,一個異步任務(wù)系統(tǒng)最重要的就是消費(fèi)端了,現(xiàn)在讓我們來看下Worker的流程圖。
可以看到,在這里我們采用了兩個交換器和兩個隊(duì)列,一個負(fù)責(zé)處理正常的任務(wù)即ntask,另一個負(fù)責(zé)處理需要延遲執(zhí)行的任務(wù)即dtask。簡單描述下一個任務(wù)的生命周期。
正常任務(wù)
1、task產(chǎn)生,進(jìn)入正常任務(wù)的交換器Exchange[ebats_core_ntask]
2、交換器根據(jù)topic將任務(wù)分發(fā)到對應(yīng)的隊(duì)列中
3、子進(jìn)程ntask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
4、執(zhí)行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
5、子進(jìn)程ntask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
6、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看
延遲任務(wù)
1、子進(jìn)程dtask阻塞等待成功獲取到task,并執(zhí)行該任務(wù)
2、執(zhí)行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子進(jìn)程dtask捕獲到重試異常將任務(wù)拋給延遲任務(wù)的交換器Exchange[ebats_core_dtask]
4、將任務(wù)執(zhí)行信息回調(diào)給上層開發(fā)者以便保存查看
消費(fèi)者代碼如下:
require_once DIR.'/../autoload.php'; require_once DIR.'/task/TaskDemoModel.php'; use Asynclib\Ebats\Worker; //執(zhí)行結(jié)果回調(diào)函數(shù) $callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){ }; $worker = new Worker($callback); //支持多進(jìn)程消費(fèi)默認(rèn)為1 $worker->setQueue('demo'); //隊(duì)列名和事件的topic一一對應(yīng) $worker->run();
自定義調(diào)度器
一般來說這是一個基于事件的任務(wù)系統(tǒng),那么能不能直接產(chǎn)生任務(wù)呢。答案是肯定的。
只需要創(chuàng)建一個自定義調(diào)度器,由您自行實(shí)現(xiàn)調(diào)度邏輯,最終生成一個任務(wù)即可。代碼如下:
<?php require_once DIR.'/../autoload.php'; use Asynclib\Ebats\Task; use Asynclib\Core\Consumer; use Asynclib\Amq\ExchangeTypes; use Asynclib\Exception\ExceptionInterface; /** * 本示例演示了如何創(chuàng)建一個自定義調(diào)度器,開發(fā)者可以根據(jù)自身需求開發(fā)自己的任務(wù)調(diào)度器 */ try{ $worker = new Consumer(); $worker->setExchange('order_fanout', ExchangeTypes::TOPIC); $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']); $worker->run(function($key, $msg){ $order_data = json_encode($msg); echo " [$key] $order_data \n"; Task::create('demo', 'orderAsync', $msg);//創(chuàng)建任務(wù),之后消息將作為參數(shù)由任務(wù)接管處理 }); }catch (ExceptionInterface $exc){ echo $exc->getMessage(); }
這樣,當(dāng)接收到消息時就會產(chǎn)生一個orderAsync的任務(wù),您只需要啟動一個用來消費(fèi)這個Topic的Worker即可。
也許你會覺得這里直接寫業(yè)務(wù)邏輯的代碼就可以了,實(shí)際上也確實(shí)可以。當(dāng)你可以忍受一個進(jìn)程慢慢消費(fèi)的時候是可以這樣做的。但大多數(shù)情況下我們還是希望它能夠盡快的消費(fèi)掉,所以建議這里只負(fù)責(zé)創(chuàng)建任務(wù),具體任務(wù)的業(yè)務(wù)邏輯由worker去執(zhí)行。
以上就是如何用RabbitMQ和Swoole實(shí)現(xiàn)一個異步任務(wù)系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于用RabbitMQ和Swoole實(shí)現(xiàn)一個異步任務(wù)系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
PHP實(shí)現(xiàn)關(guān)鍵字搜索后描紅功能示例
這篇文章主要介紹了PHP實(shí)現(xiàn)關(guān)鍵字搜索后描紅功能,結(jié)合實(shí)例形式分析了php數(shù)據(jù)庫連接、查詢、字符串轉(zhuǎn)換等相關(guān)操作技巧,需要的朋友可以參考下2019-07-07php 處理上百萬條的數(shù)據(jù)庫如何提高處理查詢速度
php 處理上百萬條的數(shù)據(jù)庫如何提高處理查詢速度2010-02-02php清空(刪除)指定目錄下的文件,不刪除目錄文件夾的實(shí)現(xiàn)代碼
這篇文章主要介紹了php清空(刪除)指定目錄下的文件,不刪除目錄文件夾的實(shí)現(xiàn)代碼,需要的朋友可以參考下2014-09-09