欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python分布式異步任務(wù)框架Celery使用教程

 更新時(shí)間:2023年05月16日 11:51:24   作者:不 再 熬 夜  
Celery 是由Python 編寫的簡(jiǎn)單,靈活,可靠的用來(lái)處理大量信息的分布式系統(tǒng),它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具

一、Celery架構(gòu)介紹

Celery:芹菜?(跟翻譯沒有任何關(guān)系),分布式異步任務(wù)框架(跟其他web框架無(wú)關(guān))

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)

celery服務(wù)為其他項(xiàng)目服務(wù)提供異步解決任務(wù)需求的。

架構(gòu):

分為三部分

  • broker:任務(wù)中間件,用戶提交的任務(wù),存在這個(gè)里面(redis,rabbitmq)
  • worker:任務(wù)執(zhí)行者,消費(fèi)者,真正執(zhí)行任務(wù)的進(jìn)程(真正干活的人)
  • backend:任務(wù)結(jié)果存儲(chǔ),任務(wù)執(zhí)行后的結(jié)果(redis,rabbitmq)

celery能夠做的事:

  • 異步任務(wù)(區(qū)分同步任務(wù))
  • 延遲任務(wù)
  • 定時(shí)任務(wù)(其他框架做)

怎么更好的理解celery?

會(huì)有兩個(gè)服務(wù)同時(shí)運(yùn)行,一個(gè)是項(xiàng)目服務(wù)(django服務(wù)),一個(gè)是celery服務(wù),項(xiàng)目服務(wù)將需要異步處理的任務(wù)交給celery服務(wù),celery就會(huì)在需要時(shí)異步完成項(xiàng)目的需求。打個(gè)比方,人是一個(gè)獨(dú)立運(yùn)行的服務(wù)(django) | 醫(yī)院也是一個(gè)獨(dú)立運(yùn)行的服務(wù)(celery)。正常情況下,人可以完成所有健康情況的動(dòng)作,不需要醫(yī)院的參與;但當(dāng)人生病時(shí),就會(huì)被醫(yī)院接收,解決人生病問題,人生病的處理方案交給醫(yī)院來(lái)解決,所有人不生病時(shí),醫(yī)院獨(dú)立運(yùn)行,人生病時(shí),醫(yī)院就來(lái)解決人生病的需求。

注:python有自己的定時(shí)任務(wù),感興趣的了解下apscheduler。

二、Celery簡(jiǎn)單使用

安裝:pip install celery==5.1.2

使用:

1.配置celery

from celery import Celery
# app=Celery('test',)
# backend='redis://:密碼@127.0.0.1:6379/1'  如果有密碼,這么寫
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址
# 1 實(shí)例化得到celery對(duì)象
app = Celery(__name__, backend=backend, broker=broker)
# 2 寫一堆任務(wù)(計(jì)算a+b,挖井,砍樹),函數(shù)
# 使用裝飾器包裹任務(wù)(函數(shù))
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.提交任務(wù)

# from celery_task import app
import celery_task
# 1 同步執(zhí)行
# res = celery_task.add(2, 3)  # 普通的同步任務(wù),同步執(zhí)行任務(wù)
# print(res)

2 異步任務(wù):

第一步:提交(使用任務(wù)名.apply_async(參數(shù)))

結(jié)果是任務(wù)id號(hào),唯一標(biāo)識(shí)這個(gè)任務(wù)

# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

第二步:讓worker執(zhí)行—>結(jié)果存到redis

通過命令啟動(dòng),非windows:

5.x之前這么啟動(dòng)

命令:celery worker -A celery_task -l info

5.x以后

命令:celery -A celery_task worker -l info

windows:

pip3 install eventlet

5.x之前這么啟動(dòng)

命令:celery worker -A celery_task -l info -P eventlet

5.x以后

命令:celery -A celery_task worker -l info -P eventlet

3.查看任務(wù)執(zhí)行結(jié)果

from celery_task import app
from celery.result import AsyncResult
id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print('任務(wù)執(zhí)行成功了')
        result = a.get()  # 異步任務(wù)執(zhí)行的結(jié)果
        print(result)
    elif a.failed():
        print('任務(wù)失敗')
    elif a.status == 'PENDING':
        print('任務(wù)等待中被執(zhí)行')
    elif a.status == 'RETRY':
        print('任務(wù)異常后正在重試')
    elif a.status == 'STARTED':
        print('任務(wù)已經(jīng)開始被執(zhí)行')

三、Celery包結(jié)構(gòu)

目錄結(jié)構(gòu):

    -celery_task               # 包名
        __init__.py
        celery.py             # app所在py文件
        course_task.py        # 任務(wù)
        order_task.py         # 任務(wù)
        user_task.py          # 任務(wù)
    提交任務(wù).py                # 提交任務(wù)
    查看結(jié)果.py                # 查看結(jié)果

創(chuàng)建多個(gè)任務(wù):

celery_task /celery.py

from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一個(gè)列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    'celery_task.course_task',
    'celery_task.order_task',
    'celery_task.user_task',
])
# 原來(lái),任務(wù)寫在這個(gè)py文件中
# 后期任務(wù)非常多,可能有用戶相關(guān)任務(wù),課程相關(guān)任務(wù),訂單相關(guān)任務(wù)。。。

celery_task /任務(wù).py

user_task.py

import time
from .celery import app
# 發(fā)送短信任務(wù)
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模擬發(fā)送短信延遲
    print('短信發(fā)送成功,手機(jī)號(hào)是:%s,驗(yàn)證碼是:%s' % (phone, code))
    return '短信發(fā)送成功'

order_task.py

from .celery import app
# 生成訂單任務(wù)
@app.task()
def make_order():
    with open(r'D:\py18\luffy_api\script\2 celery的包結(jié)構(gòu)\celery_task\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一條訂單\n')
    return True

course_task.py

from .celery import app
@app.task()
def add(a,b):
    return a+b

提交多個(gè)任務(wù):

from celery_task import user_task,order_task
# 提交一個(gè)發(fā)送短信任務(wù)
# res = user_task.send_sms.apply_async(args=['18972374345', '8888'])
# print(res)
# 提交一個(gè)生成訂單任務(wù)
# res=order_task.make_order.apply_async()
# print(res)

查看結(jié)果:

from celery_task.celery import app
from celery.result import AsyncResult
id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    print(app.conf)
    if a.successful():
        print('任務(wù)執(zhí)行成功了')
        result = a.get()  # 異步任務(wù)執(zhí)行的結(jié)果
        print(result)
    elif a.failed():
        print('任務(wù)失敗')
    elif a.status == 'PENDING':
        print('任務(wù)等待中被執(zhí)行')
    elif a.status == 'RETRY':
        print('任務(wù)異常后正在重試')
    elif a.status == 'STARTED':
        print('任務(wù)已經(jīng)開始被執(zhí)行')

四、Celery延遲任務(wù)

# 添加延遲任務(wù)方式一:
# from datetime import datetime, timedelta
# datetime.utcnow()  獲取當(dāng)前的utc時(shí)間
# eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc時(shí)間
# 10s后,發(fā)送短信
res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta)
print(res)
# 使用第二種方式執(zhí)行異步任務(wù)(兩者傳參不同;不寫時(shí)間,就表示立即執(zhí)行):
res=user_task.send_sms.delay('12345566677', '8888')
print(res)

五、Celery定時(shí)任務(wù)

第一步:celery.py中寫入

# 第一步,在包(celery_task)下的celey.py中寫入
###修改celery的配置信息    app.conf整個(gè)celery的配置信息
# 時(shí)區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
####配置定時(shí)任務(wù)
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.user_task.send_sms',  # 指定執(zhí)行的是哪個(gè)任務(wù)
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點(diǎn)
        'args': ('18953675221', '8888'),
    },
    'make_order_every_5_seconds': {
        'task': 'celery_task.order_task.make_order',  # 指定執(zhí)行的是哪個(gè)任務(wù)
        'schedule': timedelta(seconds=5),
    },
    'add_every_1_seconds': {
        'task': 'celery_task.course_task.add',  # 指定執(zhí)行的是哪個(gè)任務(wù)
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點(diǎn)
        'args': (3, 5),
    },
}

第二步:?jiǎn)?dòng)worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

如果beat沒有啟動(dòng),worker是沒有活干的,需要啟動(dòng)beat,worker才能干活,和beat啟動(dòng)順序無(wú)先后

第三步:?jiǎn)?dòng)beat

# celery beat -A celery_task -l 
celery -A celery_task beat -l info

六、Django中集成Celery

第一種方式使用django-celery(了解):

第三方把django和celery集成起來(lái),方便我們使用,但是,第三方寫的包的版本,跟celery和django版本完全對(duì)應(yīng)。

我們自己使用包結(jié)構(gòu)集成到django中:

第一步,把寫好的包,直接復(fù)制到項(xiàng)目根路徑

第二步,在視圖類中(函數(shù)中)

from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步發(fā)送假設(shè)3分支鐘,異步發(fā)送,直接就返回id了,是否成功不知道,后期通過id查詢
    print(res)
    return HttpResponse(res)

到此這篇關(guān)于Python分布式異步任務(wù)框架Celery使用教程的文章就介紹到這了,更多相關(guān)Python Celery內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論