PHP簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)流程詳解
需求說明
- 當(dāng)用戶申請(qǐng)售后,商家未在n小時(shí)內(nèi)處理,系統(tǒng)自動(dòng)進(jìn)行退款。
- 商家拒絕后,用戶可申請(qǐng)客服介入,客服x天內(nèi)超時(shí)未處理,系統(tǒng)自動(dòng)退款。
- 用戶收到貨物,x天自動(dòng)確認(rèn)收貨
- 等等需要延時(shí)操作的流程……
設(shè)計(jì)思路
- 設(shè)計(jì)一張隊(duì)列表,記錄所有隊(duì)列的參數(shù),執(zhí)行狀態(tài),重試次數(shù)
- 將創(chuàng)建隊(duì)列的
id
存于redis
中,使用zset
有序集合。按照時(shí)間戳進(jìn)行排序 - 使用
croontab
定時(shí)任務(wù)每分鐘執(zhí)行一次
實(shí)現(xiàn)
新建隊(duì)列表
CREATE TABLE `delay_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `params` varchar(512) DEFAULT NULL, `message` varchar(255) DEFAULT '' COMMENT '執(zhí)行結(jié)果', `ext_string` varchar(255) DEFAULT '' COMMENT '擴(kuò)展字符串,可用于快速檢索。取消該隊(duì)列', `retry_times` int(2) DEFAULT '0' COMMENT '重試次數(shù)', `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待執(zhí)行, 10 執(zhí)行成功, 20 執(zhí)行失敗,30取消執(zhí)行', `created` datetime DEFAULT NULL, `modified` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `ext_idx` (`ext_string`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
部分隊(duì)列的操作方法,新增隊(duì)列、取消隊(duì)列、隊(duì)列執(zhí)行成功、隊(duì)列執(zhí)行失敗、隊(duì)列重試【重試時(shí)間間隔抄的微信支付的異步通知時(shí)間】
class DelayQueueService { // 重試時(shí)間,最大重試次數(shù) 15 private static $retryTimes = [ 15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60, ]; /** * @description 增加隊(duì)列至redis * @param $queueId * @param int $delay 需要延遲執(zhí)行的時(shí)間。單位秒 * @return void */ public function addDelayQueue($queueId, int $delay) { $time = time() + $delay; $redis = RedisService::getInstance(); $redis->zAdd("delay_queue_job", $time, $queueId); } // 取消redis 隊(duì)列 public function cancelDelayQueue($ext) { $row = $query->getRow(); // 使用ext_string 快速檢索到相應(yīng)的記錄 if ($row) { $redis = RedisService::getInstance(); $redis->zRem('delay_queue_job', $row->id); $row->status = DelayQueueTable::STATUS_CANCEL; $table->save($row); } } /** * @description 執(zhí)行成功 * @return void */ public static function success($id, $message = null) { $table->update([ 'status' => DelayQueueTable::STATUS_SUCCESS, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 執(zhí)行失敗 * @return void */ public static function failed($id, $message = null) { $table->updateAll([ 'status' => DelayQueueTable::STATUS_FAILED, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 失敗隊(duì)列重試,最大重試15次 * @param $id * @return void */ public static function retry($id) { $info = self::getById($id); if (!$info) { return; } $retryTimes = ++$info['retry_times']; if ($retryTimes > 15) { return; } $entity = [ 'params' => $info['params'], 'ext_string' => $info['ext_string'], 'retry_times' => $retryTimes, ]; $queueId = $table->save($entity); self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]); } }
在命令行進(jìn)行任務(wù)的運(yùn)行
public function execute(Arguments $args, ConsoleIo $io) { $startTimestamp = strtotime("-1 days"); $now = time(); $redis = RedisService::getInstance(); $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now); if ($queueIds) { foreach ($queueIds as $id) { $info = // 按照隊(duì)列id 獲取相應(yīng)的信息 if ($info['status'] === DelayQueueTable::STATUS_PADDING) { $params = unserialize($info['params']); // 創(chuàng)建記錄的時(shí)候,需要試用serialize 將類名,方法,參數(shù)序列化 $class = $params['class']; $method = $params['method']; $data = $params['data']; try { call_user_func_array([$class, $method], [$data]); $redis->zRem('delay_queue_job', $id); $msg = date('Y-m-d H:i:s') . " [info] success: $id"; DelayQueueService::success($id, $msg); $io->success($msg); } catch (Exception $e) { $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}"; DelayQueueService::failed($id, $msg); // 自定義異常code,不進(jìn)行隊(duì)列重試 if (10000 != $e->getCode()) { DelayQueueService::retry($id); } $io->error($msg); } } } } }
最后說點(diǎn)
- 我這邊的系統(tǒng)對(duì)實(shí)時(shí)性要求不高,所以直接使用的是
linux
的crond
服務(wù),每分鐘運(yùn)行一次。如需精確到秒級(jí),可寫一個(gè)shell
,一分鐘循環(huán)執(zhí)行<=60
次 - 因?yàn)槟壳暗臄?shù)據(jù)較少,延時(shí)隊(duì)列加入的只有小部分。所以就在
command
里面直接執(zhí)行更新操作了,后期如果隊(duì)列多,且有比較耗時(shí)的操作,可考慮把耗時(shí)操作單獨(dú)放置一個(gè)隊(duì)列中。本方法只用于將數(shù)據(jù)塞進(jìn)隊(duì)列。
附上 shell
腳本 一分鐘執(zhí)行60次
#!/bin/bash step=2 #間隔的秒數(shù),不能大于60 for (( i = 0; i < 60; i=(i+step) )); do echo $i # do something sleep $step done
到此這篇關(guān)于PHP簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)流程詳解的文章就介紹到這了,更多相關(guān)PHP延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
PHP 實(shí)現(xiàn)explort() 功能的詳解
本篇文章是對(duì)PHP 實(shí)現(xiàn)explort()功能進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06在PHP中養(yǎng)成7個(gè)面向?qū)ο蟮暮昧?xí)慣
如果您尚未打算用 OO 原則創(chuàng)建應(yīng)用程序,則使用 PHP 的面向?qū)ο螅∣O)的語言特性,這 7 個(gè)習(xí)慣將幫助您開始在過程編程與 OO 編程之間進(jìn)行轉(zhuǎn)換。2010-07-07PHP 函數(shù)call_user_func和call_user_func_array用法詳解
下面來和大家分享一下這個(gè)call_user_func_array和call_user_func函數(shù)的用法,另外附贈(zèng)func_get_args()函數(shù)和func_num_args()函數(shù),嘿嘿!!2014-03-03php中sort函數(shù)排序知識(shí)點(diǎn)總結(jié)
在本篇文章里小編給大家分享的是一篇關(guān)于php中sort函數(shù)排序知識(shí)點(diǎn)總結(jié)內(nèi)容,有興趣的朋友們可以參考下。2021-01-01php下的原生ajax請(qǐng)求用法實(shí)例分析
這篇文章主要介紹了php下的原生ajax請(qǐng)求用法,結(jié)合實(shí)例形式分析了前臺(tái)原生ajax請(qǐng)求與后臺(tái)PHP響應(yīng)相關(guān)操作技巧,需要的朋友可以參考下2020-02-02php進(jìn)行md5加密簡(jiǎn)單實(shí)例方法
在本文里小編給大家整理了一篇非常實(shí)用的php如何進(jìn)行md5加密知識(shí)點(diǎn)內(nèi)容,有需要的朋友們可以參考下。2019-09-09