欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

手把手教你實現(xiàn)Python重試超時裝飾器

 更新時間:2023年05月01日 10:49:07   作者:憶想不到的暉  
這篇文章主要為大家介紹了實現(xiàn)Python重試超時裝飾器教程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪

一、前言

在寫業(yè)務(wù)代碼時候,有許多場景需要重試某塊業(yè)務(wù)邏輯,例如網(wǎng)絡(luò)請求、購物下單等,希望發(fā)生異常的時候多重試幾次。本文分享如何利用Python 的裝飾器來進(jìn)行面向切面(AOP)的方式實現(xiàn)自動重試器。

二、簡單分析

一個重試裝飾器,最重要的就是發(fā)生意外異常處理失敗自動重試,有如下幾點需要注意

  • 失敗不能一直重試,因為可能會出現(xiàn)死循環(huán)浪費資源,因此需要有 最大重試次數(shù) 或者 最大超時時間
  • 不能重試太頻繁,因為太頻繁容易導(dǎo)致重試次數(shù)很快用完,卻沒有成功響應(yīng),需要有 重試時間間隔 來限制,有時可以加大成功概率,例如網(wǎng)絡(luò)請求時有一段時間是堵塞的,或者對方服務(wù)負(fù)載太高導(dǎo)致一段時間無法響應(yīng)等。

簡單分析完,我們的重試裝飾器,就要支持可配置最大重試次數(shù)、最大超時時間、重試間隔,所以裝飾器就要設(shè)計成帶參數(shù)裝飾器。

三、代碼模擬實現(xiàn)

重試裝飾器-初版

分析完畢后,看看第一版的裝飾器

import time
from functools import wraps
def task_retry(max_retry_count: int = 5, time_interval: int = 2):
    """
    任務(wù)重試裝飾器
    Args:
        max_retry_count: 最大重試次數(shù) 默認(rèn)5次
        time_interval: 每次重試間隔 默認(rèn)2s
    """
    def _task_retry(task_func):
        @wraps(task_func)
        def wrapper(*args, **kwargs):
            # 函數(shù)循環(huán)重試
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                try:
                    task_result = task_func(*args, **kwargs)
                    return task_result
                except Exception as e:
                    print(f"fail {str(e)}")
                    time.sleep(time_interval)
        return wrapper
    return _task_retry

裝飾器內(nèi)部閉包,就簡單通過 for 循環(huán) 來執(zhí)行指定重試次數(shù),成功獲取結(jié)果就直接 return 返回,發(fā)生異常則睡眠配置重試間隔時間后繼續(xù)循環(huán)

寫個例子來模擬測試下看看效果

沒有異常正常執(zhí)行,在函數(shù)中模擬一個異常來進(jìn)行重試看看

@task_retry(max_retry_count=3, time_interval=1)
def user_place_order():
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
ret = user_place_order()
print("user place order ret", ret)
>>>out
fail division by zero
execute count 2
fail division by zero
execute count 3
fail division by zero
user place order ret None

可以看到 user_place_order 函數(shù)執(zhí)行了三遍,都發(fā)生了除零異常,最后超過最大執(zhí)行次數(shù),返回了 None 值,我們可以在主邏輯中來判斷返回值是否為 None 來進(jìn)行超過最大重試次數(shù)失敗的業(yè)務(wù)邏輯處理

ret = user_place_order()
print("user place order ret", ret)

if not ret:
    print("user place order failed")
    ...

重試裝飾器-改進(jìn)版

現(xiàn)在只能配置 最大重試次數(shù) 沒有最大超時時間,有時候我們想不但有重試,還得在規(guī)定時間內(nèi)完成,不想浪費太多試錯時間。所以增加一個 最大超時時間配置選項默認(rèn)為None,有值時超過最大超時時間退出重試。

def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
    """
    任務(wù)重試裝飾器
    Args:
        max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
        time_interval: 每次重試間隔 默認(rèn) 2s
        max_timeout: 最大超時時間,單位s 默認(rèn)為 None,
    """

    def _task_retry(task_func):

        @wraps(task_func)
        def wrapper(*args, **kwargs):
            # 函數(shù)循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time > max_timeout:
                    # 超出最大超時時間
                    print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                    return

                try:
                    return task_func(*args, **kwargs)
                except Exception as e:
                    print(f"fail {str(e)}")
                    time.sleep(time_interval)

        return wrapper

    return _task_retry

看看效果

# 超時
@task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
def user_place_order():
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
>>>out
execute count 1
fail division by zero
execute count 2
fail division by zero
execute count 3
execute timeout, use time 2.010528802871704s, max timeout 2
user place order ret None
# 超過最大重試次數(shù)
@task_retry(max_retry_count=3, time_interval=1)
def user_place_order():
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
>>>out
execute count 1
fail division by zero
execute count 2
fail division by zero
execute count 3
fail division by zero
user place order ret None
# 正常
@task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
def user_place_order():
    # a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
>>>out
execute count 1
user place order success
user place order ret {'code': 0, 'msg': 'ok'}

重試裝飾器-加強(qiáng)版

到這重試裝飾器基本功能就實現(xiàn)了,但還可以加強(qiáng),Python現(xiàn)在支持 async 異步方式寫法,因此要是可以兼容異步寫法那就更好了。先看看裝飾異步函數(shù)會是什么樣的效果

import time
import asyncio
import functools
def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
    """
    任務(wù)重試裝飾器
    Args:
        max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
        time_interval: 每次重試間隔 默認(rèn) 2s
        max_timeout: 最大超時時間,單位s 默認(rèn)為 None,
    """
    def _task_retry(task_func):
        @wraps(task_func)
        def wrapper(*args, **kwargs):
            # 函數(shù)循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time > max_timeout:
                    # 超出最大超時時間
                    print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                    return
                try:
                    return task_func(*args, **kwargs)
                except Exception as e:
                    print(f"fail {str(e)}")
                    time.sleep(time_interval)
        return wrapper
    return _task_retry
@task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
def user_place_order():
    # a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
@task_retry(max_retry_count=3, time_interval=2, max_timeout=5)
async def user_place_order_async():
    """異步函數(shù)重試案例"""
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}
async def main():
    # 同步案例
    # ret = user_place_order()
    # print(f"user place order ret {ret}")
    # 異步案例
    ret = await user_place_order_async()
    print(f"user place order ret {ret}")
if __name__ == '__main__':
    asyncio.run(main())
# 正常時候
execute count 1
user place order success
user place order ret {'code': 0, 'msg': 'ok'}
# 異常時候
>>>out
execute count 1
Traceback (most recent call last):
  File "G:/code/python/py-tools/decorator/base.py", line 138, in <module>
    asyncio.run(main())
  File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete
    return future.result()
  File "G:/code/python/py-tools/decorator/base.py", line 133, in main
    ret = await user_place_order_async()
  File "G:/code/python/py-tools/decorator/base.py", line 121, in user_place_order_async
    a = 1 / 0
ZeroDivisionError: division by zero
Process finished with exit code 1

發(fā)現(xiàn)發(fā)生異常的時候并沒有重試,為什么呢?其實在執(zhí)行 task_func() 它并沒有真正的執(zhí)行內(nèi)部邏輯,而是返回一個 coroutine 協(xié)程對象,并不會報異常,所以再裝飾器中執(zhí)行一遍就成功就出來了,外面 ret = await user_place_order_async(), 后才真正的等待執(zhí)行,然后執(zhí)行函數(shù)內(nèi)的邏輯再報異常就沒有捕獲到。我們可以打斷點驗證下

這樣裝飾器就不支持異步函數(shù)的重試,需要加強(qiáng)它,可以使用 asyncio.iscoroutinefunction() 來進(jìn)行異步函數(shù)的判斷, 然后再加一個異步函數(shù)的閉包就可以實現(xiàn)異步、同步函數(shù)都兼容的重試裝飾器。

def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
    """
    任務(wù)重試裝飾器
    Args:
        max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
        time_interval: 每次重試間隔 默認(rèn) 2s
        max_timeout: 最大超時時間,單位s 默認(rèn)為 None,
    """

    def _task_retry(task_func):

        @functools.wraps(task_func)
        def sync_wrapper(*args, **kwargs):
            # 同步循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time &gt; max_timeout:
                    # 超出最大超時時間
                    print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                    return

                try:
                    task_ret = task_func(*args, **kwargs)
                    return task_ret
                except Exception as e:
                    print(f"fail {str(e)}")
                    time.sleep(time_interval)

        @functools.wraps(task_func)
        async def async_wrapper(*args, **kwargs):
            # 異步循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time &gt; max_timeout:
                    # 超出最大超時時間
                    print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                    return

                try:
                    return await task_func(*args, **kwargs)
                except Exception as e:
                    print(f"fail {str(e)}")
                    await asyncio.sleep(time_interval)

        # 異步函數(shù)判斷
        wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
        return wrapper_func

    return _task_retry

注意時間等待 await asyncio.sleep(time_interval) 會導(dǎo)致函數(shù)掛起,程序不會在這里等待,而是去事件循環(huán)loop中執(zhí)行其他的已經(jīng)就緒的任務(wù),如果其他函數(shù)運行時間太久了才切換回來,會導(dǎo)致時間超時,換成 time.sleep()的話其實也沒有用,如果函數(shù)內(nèi)部還有異步函數(shù)執(zhí)行還是會切換出去,因此異步的時候感覺超時參數(shù)意義不大。

模擬測試下

@task_retry(max_retry_count=5, time_interval=2, max_timeout=5)
async def user_place_order_async():
    """異步函數(shù)重試案例"""
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}


async def io_test():
    """模擬io阻塞"""
    print("io test start")
    time.sleep(3)
    print("io test end")
    return "io test end"


async def main():
    # 同步案例
    # ret = user_place_order()
    # print(f"user place order ret {ret}")

    # 異步案例
    # ret = await user_place_order_async()
    # print(f"user place order ret {ret}")

    # 并發(fā)異步
    order_ret, io_ret = await asyncio.gather(
        user_place_order_async(),
        io_test(),
    )
    print(f"io ret {io_ret}")
    print(f"user place order ret {order_ret}")


if __name__ == '__main__':
    asyncio.run(main())
    
    
 &gt;&gt;&gt;out
execute count 1
fail division by zero
io test start
io test end
execute count 2
fail division by zero
execute count 3
execute timeout, use time 5.015768527984619s, max timeout 5
io ret io test end
user place order ret None

可以看出執(zhí)行一遍后自動切換到了 io_test 中執(zhí)行由于 io test 中的 time.sleep(3) 會導(dǎo)致整個線程阻塞,一定要等到io_test執(zhí)行完后才會切換回去,然后再執(zhí)行兩遍就超時了,你可能會說都用異步的庫,是的異步的庫是可以加速,但我想表達(dá)就是這時候統(tǒng)計的耗時是整個程序的而不是單獨一個函數(shù)的。大家可以在評論區(qū)幫我想想有沒有其他的方法,要么就不要用這個超時參數(shù)。

可以兼容異步函數(shù)、然后超時參數(shù)可以不配置,影響不大,O(∩_∩)O~

重試裝飾器-最終版

最終版就是利用拋異常的方式來結(jié)束超過最大重試次數(shù)、最大超時,而不是直接返回None,然后再添加一個可配置捕獲指定異常的參數(shù),當(dāng)發(fā)生特定異常的時候才重試。

import time
import asyncio
import functools
from typing import Type


class MaxRetryException(Exception):
    """最大重試次數(shù)異常"""
    pass


class MaxTimeoutException(Exception):
    """最大超時異常"""
    pass


def task_retry(
        max_retry_count: int = 5,
        time_interval: int = 2,
        max_timeout: int = None,
        catch_exc: Type[BaseException] = Exception
):
    """
    任務(wù)重試裝飾器
    Args:
        max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
        time_interval: 每次重試間隔 默認(rèn) 2s
        max_timeout: 最大超時時間,單位s 默認(rèn)為 None,
        catch_exc: 指定捕獲的異常類用于特定的異常重試 默認(rèn)捕獲 Exception
    """

    def _task_retry(task_func):

        @functools.wraps(task_func)
        def sync_wrapper(*args, **kwargs):
            # 函數(shù)循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time &gt; max_timeout:
                    # 超出最大超時時間
                    raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")

                try:
                    task_ret = task_func(*args, **kwargs)
                    return task_ret
                except catch_exc as e:
                    print(f"fail {str(e)}")
                    time.sleep(time_interval)
            else:
                # 超過最大重試次數(shù), 拋異常終止
                raise MaxRetryException(f"超過最大重試次數(shù)失敗, max_retry_count {max_retry_count}")

        @functools.wraps(task_func)
        async def async_wrapper(*args, **kwargs):
            # 異步循環(huán)重試
            start_time = time.time()
            for retry_count in range(max_retry_count):
                print(f"execute count {retry_count + 1}")
                use_time = time.time() - start_time
                if max_timeout and use_time &gt; max_timeout:
                    # 超出最大超時時間
                    raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")

                try:
                    return await task_func(*args, **kwargs)
                except catch_exc as e:
                    print(f"fail {str(e)}")
                    await asyncio.sleep(time_interval)
            else:
                # 超過最大重試次數(shù), 拋異常終止
                raise MaxRetryException(f"超過最大重試次數(shù)失敗, max_retry_count {max_retry_count}")

        # 異步函數(shù)判斷
        wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
        return wrapper_func

    return _task_retry


@task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError,max_timeout=5)
def user_place_order():
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}


@task_retry(max_retry_count=5, time_interval=2, max_timeout=5)
async def user_place_order_async():
    """異步函數(shù)重試案例"""
    a = 1 / 0
    print("user place order success")
    return {"code": 0, "msg": "ok"}


async def io_test():
    """模擬io阻塞"""
    print("io test start")
    time.sleep(3)
    print("io test end")
    return "io test end"


async def main():
    # 同步案例
    try:
        ret = user_place_order()
        print(f"user place order ret {ret}")
    except MaxRetryException as e:
        # 超過最大重試次數(shù)處理
        print("MaxRetryException", e)
    except MaxTimeoutException as e:
        # 超過最大超時處理
        print("MaxTimeoutException", e)

    # 異步案例
    # ret = await user_place_order_async()
    # print(f"user place order ret {ret}")

    # 并發(fā)異步
    # order_ret, io_ret = await asyncio.gather(
    #     user_place_order_async(),
    #     io_test(),
    # )
    # print(f"io ret {io_ret}")
    # print(f"user place order ret {order_ret}")


if __name__ == '__main__':
    asyncio.run(main())

測試捕獲指定異常

# 指定捕獲除零錯誤,正常捕獲重試
@task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError)
def user_place_order():
    a = 1 / 0
    # a = []
    # b = a[0]
    print("user place order success")
    return {"code": 0, "msg": "ok"}
 

# out
execute count 1
fail division by zero
execute count 2
fail division by zero
execute count 3
fail division by zero
MaxRetryException 超過最大重試次數(shù)失敗, max_retry_count 3


# 指定捕獲除零錯誤,報索引越界錯誤,未正常捕獲重試,直接退出
@task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError)
def user_place_order():
    # a = 1 / 0
    a = []
    b = a[0]
    print("user place order success")
    return {"code": 0, "msg": "ok"}
 

# out
Traceback (most recent call last):
  File "G:/code/python/py-tools/decorator/base.py", line 184, in &lt;module&gt;
    asyncio.run(main())
  File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete
    return future.result()
  File "G:/code/python/py-tools/decorator/base.py", line 161, in main
    ret = user_place_order()
  File "G:/code/python/py-tools/decorator/base.py", line 97, in sync_wrapper
    task_ret = task_func(*args, **kwargs)
  File "G:/code/python/py-tools/decorator/base.py", line 137, in user_place_order
    b = a[0]
IndexError: list index out of range

Process finished with exit code 1

用拋異常的方式推出更好知道是因為什么原因退出來進(jìn)行對應(yīng)的業(yè)務(wù)邏輯處理。乍一看兼容異步、同步函數(shù)導(dǎo)致這個裝飾器有點代碼冗余,看看大家有沒有更好的辦法。

到這就結(jié)束了,雖然是一個簡單的裝飾器,但本文還是引伸出很多知識點,望可以幫助到大家。

修改記錄

2023-05-01

  • 把重試?yán)锏某瑫r計算單獨抽離出來,這樣功能不會太藕合,分兩個裝飾實現(xiàn)
def set_timeout(timeout: int, use_signal=False):
    """
    超時處理裝飾器
    Args:
        timeout: 超時時間,單位秒
        use_signal: 使用信號量機(jī)制只能在 unix內(nèi)核上使用,默認(rèn)False
    Raises:
        TimeoutException
    """
    def _timeout(func: Callable):
        def _handle_timeout(signum, frame):
            raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            # 同步函數(shù)處理超時
            if use_signal:
                # 使用信號量計算超時
                signal.signal(signal.SIGALRM, _handle_timeout)
                signal.alarm(timeout)
                try:
                    return func(*args, **kwargs)
                finally:
                    signal.alarm(0)
            else:
                # 使用線程
                with ThreadPoolExecutor() as executor:
                    future = executor.submit(func, *args, **kwargs)
                    try:
                        return future.result(timeout)
                    except TimeoutError:
                        raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            # 異步函數(shù)處理超時
            try:
                ret = await asyncio.wait_for(func(*args, **kwargs), timeout)
                return ret
            except asyncio.TimeoutError:
                raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
    return _timeout
def retry(
        max_count: int = 5,
        interval: int = 2,
        catch_exc: Type[BaseException] = Exception
):
    """
    重試裝飾器
    Args:
        max_count: 最大重試次數(shù) 默認(rèn) 5 次
        interval: 每次異常重試間隔 默認(rèn) 2s
        catch_exc: 指定捕獲的異常類用于特定的異常重試 默認(rèn)捕獲 Exception
    Raises:
        MaxRetryException
    """
    def _retry(task_func):
        @functools.wraps(task_func)
        def sync_wrapper(*args, **kwargs):
            # 函數(shù)循環(huán)重試
            for retry_count in range(max_count):
                logger.info(f"{task_func} execute count {retry_count + 1}")
                try:
                    return task_func(*args, **kwargs)
                except catch_exc:
                    logger.error(f"fail {traceback.print_exc()}")
                    if retry_count < max_count - 1:
                        # 最后一次異常不等待
                        time.sleep(interval)
            # 超過最大重試次數(shù), 拋異常終止
            raise MaxRetryException(f"超過最大重試次數(shù)失敗, max_retry_count {max_count}")
        @functools.wraps(task_func)
        async def async_wrapper(*args, **kwargs):
            # 異步循環(huán)重試
            for retry_count in range(max_count):
                logger.info(f"{task_func} execute count {retry_count + 1}")
                try:
                    return await task_func(*args, **kwargs)
                except catch_exc as e:
                    logger.error(f"fail {str(e)}")
                    if retry_count < max_count - 1:
                        await asyncio.sleep(interval)
            # 超過最大重試次數(shù), 拋異常終止
            raise MaxRetryException(f"超過最大重試次數(shù)失敗, max_retry_count {max_count}")
        # 異步函數(shù)判斷
        wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
        return wrapper_func
    return _retry

源代碼

HuiDBK/py-tools: 打造 Python 開發(fā)常用的工具,讓Coding變得更簡單 (github.com)

以上就是手把手教你實現(xiàn)Python重試超時裝飾器,的詳細(xì)內(nèi)容,更多關(guān)于Python重試超時裝飾器的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論