欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python分布式異步任務框架Celery使用教程

 更新時間:2023年05月16日 11:51:24   作者:不 再 熬 夜  
Celery 是由Python 編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統(tǒng),它同時提供操作和維護分布式系統(tǒng)所需的工具

一、Celery架構(gòu)介紹

Celery:芹菜?(跟翻譯沒有任何關(guān)系),分布式異步任務框架(跟其他web框架無關(guān))

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)

celery服務為其他項目服務提供異步解決任務需求的。

架構(gòu):

分為三部分

  • broker:任務中間件,用戶提交的任務,存在這個里面(redis,rabbitmq)
  • worker:任務執(zhí)行者,消費者,真正執(zhí)行任務的進程(真正干活的人)
  • backend:任務結(jié)果存儲,任務執(zhí)行后的結(jié)果(redis,rabbitmq)

celery能夠做的事:

  • 異步任務(區(qū)分同步任務)
  • 延遲任務
  • 定時任務(其他框架做)

怎么更好的理解celery?

會有兩個服務同時運行,一個是項目服務(django服務),一個是celery服務,項目服務將需要異步處理的任務交給celery服務,celery就會在需要時異步完成項目的需求。打個比方,人是一個獨立運行的服務(django) | 醫(yī)院也是一個獨立運行的服務(celery)。正常情況下,人可以完成所有健康情況的動作,不需要醫(yī)院的參與;但當人生病時,就會被醫(yī)院接收,解決人生病問題,人生病的處理方案交給醫(yī)院來解決,所有人不生病時,醫(yī)院獨立運行,人生病時,醫(yī)院就來解決人生病的需求。

注:python有自己的定時任務,感興趣的了解下apscheduler

二、Celery簡單使用

安裝:pip install celery==5.1.2

使用:

1.配置celery

from celery import Celery
# app=Celery('test',)
# backend='redis://:密碼@127.0.0.1:6379/1'  如果有密碼,這么寫
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址
# 1 實例化得到celery對象
app = Celery(__name__, backend=backend, broker=broker)
# 2 寫一堆任務(計算a+b,挖井,砍樹),函數(shù)
# 使用裝飾器包裹任務(函數(shù))
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.提交任務

# from celery_task import app
import celery_task
# 1 同步執(zhí)行
# res = celery_task.add(2, 3)  # 普通的同步任務,同步執(zhí)行任務
# print(res)

2 異步任務:

第一步:提交(使用任務名.apply_async(參數(shù)))

結(jié)果是任務id號,唯一標識這個任務

# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

第二步:讓worker執(zhí)行—>結(jié)果存到redis

通過命令啟動,非windows:

5.x之前這么啟動

命令:celery worker -A celery_task -l info

5.x以后

命令:celery -A celery_task worker -l info

windows:

pip3 install eventlet

5.x之前這么啟動

命令:celery worker -A celery_task -l info -P eventlet

5.x以后

命令:celery -A celery_task worker -l info -P eventlet

3.查看任務執(zhí)行結(jié)果

from celery_task import app
from celery.result import AsyncResult
id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print('任務執(zhí)行成功了')
        result = a.get()  # 異步任務執(zhí)行的結(jié)果
        print(result)
    elif a.failed():
        print('任務失敗')
    elif a.status == 'PENDING':
        print('任務等待中被執(zhí)行')
    elif a.status == 'RETRY':
        print('任務異常后正在重試')
    elif a.status == 'STARTED':
        print('任務已經(jīng)開始被執(zhí)行')

三、Celery包結(jié)構(gòu)

目錄結(jié)構(gòu):

    -celery_task               # 包名
        __init__.py
        celery.py             # app所在py文件
        course_task.py        # 任務
        order_task.py         # 任務
        user_task.py          # 任務
    提交任務.py                # 提交任務
    查看結(jié)果.py                # 查看結(jié)果

創(chuàng)建多個任務:

celery_task /celery.py

from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一個列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    'celery_task.course_task',
    'celery_task.order_task',
    'celery_task.user_task',
])
# 原來,任務寫在這個py文件中
# 后期任務非常多,可能有用戶相關(guān)任務,課程相關(guān)任務,訂單相關(guān)任務。。。

celery_task /任務.py

user_task.py

import time
from .celery import app
# 發(fā)送短信任務
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模擬發(fā)送短信延遲
    print('短信發(fā)送成功,手機號是:%s,驗證碼是:%s' % (phone, code))
    return '短信發(fā)送成功'

order_task.py

from .celery import app
# 生成訂單任務
@app.task()
def make_order():
    with open(r'D:\py18\luffy_api\script\2 celery的包結(jié)構(gòu)\celery_task\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一條訂單\n')
    return True

course_task.py

from .celery import app
@app.task()
def add(a,b):
    return a+b

提交多個任務:

from celery_task import user_task,order_task
# 提交一個發(fā)送短信任務
# res = user_task.send_sms.apply_async(args=['18972374345', '8888'])
# print(res)
# 提交一個生成訂單任務
# res=order_task.make_order.apply_async()
# print(res)

查看結(jié)果:

from celery_task.celery import app
from celery.result import AsyncResult
id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    print(app.conf)
    if a.successful():
        print('任務執(zhí)行成功了')
        result = a.get()  # 異步任務執(zhí)行的結(jié)果
        print(result)
    elif a.failed():
        print('任務失敗')
    elif a.status == 'PENDING':
        print('任務等待中被執(zhí)行')
    elif a.status == 'RETRY':
        print('任務異常后正在重試')
    elif a.status == 'STARTED':
        print('任務已經(jīng)開始被執(zhí)行')

四、Celery延遲任務

# 添加延遲任務方式一:
# from datetime import datetime, timedelta
# datetime.utcnow()  獲取當前的utc時間
# eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc時間
# 10s后,發(fā)送短信
res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta)
print(res)
# 使用第二種方式執(zhí)行異步任務(兩者傳參不同;不寫時間,就表示立即執(zhí)行):
res=user_task.send_sms.delay('12345566677', '8888')
print(res)

五、Celery定時任務

第一步:celery.py中寫入

# 第一步,在包(celery_task)下的celey.py中寫入
###修改celery的配置信息    app.conf整個celery的配置信息
# 時區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
####配置定時任務
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.user_task.send_sms',  # 指定執(zhí)行的是哪個任務
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點
        'args': ('18953675221', '8888'),
    },
    'make_order_every_5_seconds': {
        'task': 'celery_task.order_task.make_order',  # 指定執(zhí)行的是哪個任務
        'schedule': timedelta(seconds=5),
    },
    'add_every_1_seconds': {
        'task': 'celery_task.course_task.add',  # 指定執(zhí)行的是哪個任務
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點
        'args': (3, 5),
    },
}

第二步:啟動worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

如果beat沒有啟動,worker是沒有活干的,需要啟動beat,worker才能干活,和beat啟動順序無先后

第三步:啟動beat

# celery beat -A celery_task -l 
celery -A celery_task beat -l info

六、Django中集成Celery

第一種方式使用django-celery(了解):

第三方把django和celery集成起來,方便我們使用,但是,第三方寫的包的版本,跟celery和django版本完全對應。

我們自己使用包結(jié)構(gòu)集成到django中:

第一步,把寫好的包,直接復制到項目根路徑

第二步,在視圖類中(函數(shù)中)

from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步發(fā)送假設3分支鐘,異步發(fā)送,直接就返回id了,是否成功不知道,后期通過id查詢
    print(res)
    return HttpResponse(res)

到此這篇關(guān)于Python分布式異步任務框架Celery使用教程的文章就介紹到這了,更多相關(guān)Python Celery內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論