欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PHP簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)流程詳解

 更新時(shí)間:2022年11月21日 08:54:40   作者:i_zane  
普通的隊(duì)列是先進(jìn)先出,但是延時(shí)隊(duì)列并不是,而是加上了時(shí)間這一權(quán)重。希望到達(dá)時(shí)間點(diǎn)的先執(zhí)行。從某種意義上來講,延遲隊(duì)列的結(jié)構(gòu)并不像一個(gè)隊(duì)列,而更像是一種以時(shí)間為權(quán)重的有序堆結(jié)構(gòu)

需求說明

  • 當(dāng)用戶申請(qǐng)售后,商家未在n小時(shí)內(nèi)處理,系統(tǒng)自動(dòng)進(jìn)行退款。
  • 商家拒絕后,用戶可申請(qǐng)客服介入,客服x天內(nèi)超時(shí)未處理,系統(tǒng)自動(dòng)退款。
  • 用戶收到貨物,x天自動(dòng)確認(rèn)收貨
  • 等等需要延時(shí)操作的流程……

設(shè)計(jì)思路

  • 設(shè)計(jì)一張隊(duì)列表,記錄所有隊(duì)列的參數(shù),執(zhí)行狀態(tài),重試次數(shù)
  • 將創(chuàng)建隊(duì)列的id 存于redis 中,使用zset有序集合。按照時(shí)間戳進(jìn)行排序
  • 使用croontab定時(shí)任務(wù)每分鐘執(zhí)行一次

實(shí)現(xiàn)

新建隊(duì)列表

CREATE TABLE `delay_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `params` varchar(512) DEFAULT NULL,
  `message` varchar(255) DEFAULT '' COMMENT '執(zhí)行結(jié)果',
  `ext_string` varchar(255) DEFAULT '' COMMENT '擴(kuò)展字符串,可用于快速檢索。取消該隊(duì)列',
  `retry_times` int(2) DEFAULT '0' COMMENT '重試次數(shù)',
  `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待執(zhí)行, 10 執(zhí)行成功, 20 執(zhí)行失敗,30取消執(zhí)行',
  `created` datetime DEFAULT NULL,
  `modified` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `ext_idx` (`ext_string`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

部分隊(duì)列的操作方法,新增隊(duì)列、取消隊(duì)列、隊(duì)列執(zhí)行成功、隊(duì)列執(zhí)行失敗、隊(duì)列重試【重試時(shí)間間隔抄的微信支付的異步通知時(shí)間】

class DelayQueueService
{
    // 重試時(shí)間,最大重試次數(shù) 15
    private static $retryTimes = [
        15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60,
        3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60,
    ];
	/**
	 * @description 增加隊(duì)列至redis
	 * @param $queueId
	 * @param int $delay 需要延遲執(zhí)行的時(shí)間。單位秒
	 * @return void
	 */
	public function addDelayQueue($queueId, int $delay)
	{
	    $time = time() + $delay;
	    $redis = RedisService::getInstance();
	    $redis->zAdd("delay_queue_job", $time, $queueId);
	}
	// 取消redis 隊(duì)列
	public function cancelDelayQueue($ext)
	{
	    $row = $query->getRow(); // 使用ext_string 快速檢索到相應(yīng)的記錄
	    if ($row) {
	        $redis = RedisService::getInstance();
	        $redis->zRem('delay_queue_job', $row->id);
	        $row->status = DelayQueueTable::STATUS_CANCEL;
	        $table->save($row);
	    }
	}
	/**
	 * @description 執(zhí)行成功
	 * @return void
	 */
	public static function success($id, $message = null)
	{
	    $table->update([
	        'status' => DelayQueueTable::STATUS_SUCCESS,
	        'message' => $message ?? '',
	        'modified' => date('Y-m-d H:i:s'),
	    ], [
	        'id' => $id,
	    ]);
	}
	/**
	 * @description 執(zhí)行失敗
	 * @return void
	 */
	public static function failed($id, $message = null)
	{
	    $table->updateAll([
	        'status' => DelayQueueTable::STATUS_FAILED,
	        'message' => $message ?? '',
	        'modified' => date('Y-m-d H:i:s'),
	    ], [
	        'id' => $id,
	    ]);
	}
	/**
	 * @description 失敗隊(duì)列重試,最大重試15次
	 * @param $id
	 * @return void
	 */
	public static function retry($id)
	{
	    $info = self::getById($id);
	    if (!$info) {
	        return;
	    }
	    $retryTimes = ++$info['retry_times'];
	    if ($retryTimes > 15) {
	        return;
	    }
	    $entity = [
	        'params' => $info['params'],
	        'ext_string' => $info['ext_string'],
	        'retry_times' => $retryTimes,
	    ];
	    $queueId = $table->save($entity);
	    self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]);
	}
}

在命令行進(jìn)行任務(wù)的運(yùn)行

public function execute(Arguments $args, ConsoleIo $io)
{
    $startTimestamp = strtotime("-1 days");
    $now = time();
    $redis = RedisService::getInstance();
    $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now);
    if ($queueIds) {
        foreach ($queueIds as $id) {
            $info = // 按照隊(duì)列id 獲取相應(yīng)的信息
            if ($info['status'] === DelayQueueTable::STATUS_PADDING) {
                $params = unserialize($info['params']); // 創(chuàng)建記錄的時(shí)候,需要試用serialize 將類名,方法,參數(shù)序列化
                $class = $params['class'];
                $method = $params['method'];
                $data = $params['data'];
                try {
                    call_user_func_array([$class, $method], [$data]);
                    $redis->zRem('delay_queue_job', $id);
                    $msg = date('Y-m-d H:i:s') . " [info] success: $id";
                    DelayQueueService::success($id, $msg);
                    $io->success($msg);
                } catch (Exception $e) {
                    $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}";
                        DelayQueueService::failed($id, $msg);
                        // 自定義異常code,不進(jìn)行隊(duì)列重試
                        if (10000 != $e->getCode()) {
                            DelayQueueService::retry($id);
                        }
                        $io->error($msg);
                }
            }
        }
    }
}

最后說點(diǎn)

  • 我這邊的系統(tǒng)對(duì)實(shí)時(shí)性要求不高,所以直接使用的是linuxcrond 服務(wù),每分鐘運(yùn)行一次。如需精確到秒級(jí),可寫一個(gè)shell,一分鐘循環(huán)執(zhí)行<=60
  • 因?yàn)槟壳暗臄?shù)據(jù)較少,延時(shí)隊(duì)列加入的只有小部分。所以就在command 里面直接執(zhí)行更新操作了,后期如果隊(duì)列多,且有比較耗時(shí)的操作,可考慮把耗時(shí)操作單獨(dú)放置一個(gè)隊(duì)列中。本方法只用于將數(shù)據(jù)塞進(jìn)隊(duì)列。

附上 shell 腳本 一分鐘執(zhí)行60次

#!/bin/bash
step=2 #間隔的秒數(shù),不能大于60
for (( i = 0; i < 60; i=(i+step) )); do
   echo $i # do something
   sleep $step
done 

到此這篇關(guān)于PHP簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)流程詳解的文章就介紹到這了,更多相關(guān)PHP延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論