Django使用Celery實現(xiàn)異步發(fā)送郵件
前言
在Django使用Celery異步發(fā)送郵件的過程中,遇到Celery日志提示任務已接收,但實際上任務并沒有執(zhí)行,解決后特此記錄。
使用版本如下:
Django版本:4.1.4
Celery版本:5.2.7
郵箱配置
進入郵箱,找到POP3/SMTP/IMAP項,開啟POP3/SMTP服務,添加客戶端授權碼
Django項目發(fā)送郵件
配置郵件服務器
發(fā)送郵件時需要配置好SMTP服務器的連接信息。在settings.py
中配置郵件服務器信息
# 配置郵件發(fā)送 EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' # 對應郵箱服務器地址 EMAIL_HOST = 'smtp.163.com' # 端口 EMAIL_PORT = 25 #發(fā)送郵件的郵箱 EMAIL_HOST_USER = 'admin@163.com' #在郵箱中設置的客戶端授權密碼 EMAIL_HOST_PASSWORD = 'YS22JE123PAZJ2N' #收件人看到的發(fā)件人 EMAIL_FROM = 'admin<admin@163.com>'
Django發(fā)送郵件模塊
Django自帶了發(fā)送郵件的模塊django.core.mail
,可以方便地使用它來發(fā)送電子郵件
send_mail方法描述:
send_mail(subject, message, from_email, recipient_list, html_message=None)
- subject 主題 郵件標題
- message 普通郵件正文,普通字符串
- from_email 發(fā)件人
- recipient_list 收件人列表
- html_message 多媒體郵件正文,可以是html字符串
基本使用示例:
from django.core.mail import send_mail subject = 'Subject' # 主題 message = 'Message' # 正文 from_email = 'noreply@example.com' # 發(fā)件人地址 recipient_list = ['recipient1@example.com', 'recipient2@example.com'] # 收件人地址列表 send_mail(subject, message, from_email, recipient_list)
Celery
發(fā)送郵箱郵件是耗時操作,所以需要異步發(fā)送郵件,使用Celery實現(xiàn)異步任務。
概述
Celery是一個基于Python的分布式任務隊列,它可以輕松地處理大量的并發(fā)任務。Celery支持多種消息傳輸協(xié)議,如AMQP、Redis等,同時也支持多種后端存儲系統(tǒng),如RabbitMQ、Redis等。通過使用Celery,我們可以將一些耗時的任務放到異步的任務隊列中,從而提高Web應用的響應速度和性能。
Celery的工作原理非常簡單。首先定義一個任務(Task),然后將這個任務加入到任務隊列中。Celery Worker會從任務隊列中取出任務并執(zhí)行,完成后將結果返回給調用方??梢愿鶕?jù)需要對任務進行優(yōu)先級排序、設定任務超時時間等。
除了作為任務隊列之外,Celery還可以用來實現(xiàn)周期性任務調度,比如定時清理緩存、備份數(shù)據(jù)庫等。Celery提供了豐富的API和插件,可以輕松地完成各種復雜的任務處理需求。
官網:https://docs.celeryq.dev/en/stable/index.html
GitHub:https://github.com/celery/celery
工作模式
默認是進程池方式,進程數(shù)以當前機器的CPU核數(shù)為參考,每個CPU開四個進程。
指定進程數(shù):
# proj:celery實例對象文件 celery worker -A proj --concurrency=4
改變進程池方式為協(xié)程方式:
# 安裝eventlet模塊 pip install eventlet # 啟用Eventlet,指定協(xié)程數(shù)目 celery worker -A proj --concurrency=1000 -P eventlet -c 1000
安裝Celery
安裝Celery
pip install -U Celery
Celery的基本使用
創(chuàng)建config.py配置文件
# 設置代理人broker,使用redis的5號庫 broker_url = "redis://127.0.0.1/5" # 設置結果存儲,使用redis的6號庫 result_backend = "redis://127.0.0.1/6" # 任務超時限制 task_time_limit = 10 * 60 # 時區(qū) celery_timezone = 'Asia/Shanghai'
創(chuàng)建Celery實例并加載配置
創(chuàng)建celery_tasks 包,然后創(chuàng)建main.py文件,實現(xiàn)創(chuàng)建Celery實例并加載配置
import os from celery import Celery from celery_tasks import config # 為celery使用django配置文件進行設置,識別和加載django的配置文件 os.environ.setdefault('DJANGO_SETTINGS_MODULE', '項目名.settings') # 創(chuàng)建celery實例,參數(shù)是celery名稱,需保證唯一 celery_app = Celery('celery_tasks') # 加載celery配置,設置broker隊列 celery_app.config_from_object(config)
定義任務
在包celery_tasks
下創(chuàng)建任務包test_task
,并在該包下創(chuàng)建tasks.py
文件,用于定義任務
from celery_tasks.main import celery_app # bind:保證task對象會作為第一個參數(shù)自動傳入 # name:異步任務別名 # retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s # max_retries:異常自動重試次數(shù)的上限 @celery_app.task(bind=True, name='test_task', retry_backoff=3) def celerTest(self, number): try: print("執(zhí)行{}號任務".format(number)) except Exception as e: # 有異常自動重試三次 raise self.retry(exc=e, max_retries=3)
在celery_tasks.main.py文件種進行任務注冊
import os from celery import Celery # 讓celery使用django配置文件,即加載當前工程的配置文件 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "demo.settings") # 創(chuàng)建celery實例,參數(shù)是celery名稱,需保證唯一 celery_app = Celery('demo_celery') # 加載celery配置,指定配置文件路徑,即可設置broker隊列 celery_app.config_from_object('celery_tasks.config') # 自動注冊celery任務,列表元素是任務包路徑 celery_app.autodiscover_tasks(['celery_tasks.test_task'])
啟動Celery服務
# -A 對應的應用程序, 其參數(shù)是項目中Celery實例的位置 # worker 要啟動的worker # -l 日志等級,如info等級 celery -A celery_tasks.main worker -l info
(demo) D:\WorkSpace\Python\demo\demo>celery -A celery_tasks.main worker -l info -------------- celery@Coding v5.2.7 (dawn-chorus) --- ***** ----- -- ******* ---- Windows-10-10.0.22000-SP0 2023-01-16 23:25:49 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: demo_celery:0x1fdba863d00 - ** ---------- .> transport: redis://127.0.0.1:6379/8 - ** ---------- .> results: redis://127.0.0.1/9 - *** --- * --- .> concurrency: 12 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . test_task [2023-01-16 23:25:49,701: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8 [2023-01-16 23:25:49,703: INFO/MainProcess] mingle: searching for neighbors [2023-01-16 23:25:50,095: INFO/SpawnPoolWorker-1] child process 49036 calling self.run() [2023-01-16 23:25:50,114: INFO/SpawnPoolWorker-2] child process 43196 calling self.run() [2023-01-16 23:25:50,136: INFO/SpawnPoolWorker-3] child process 1284 calling self.run() [2023-01-16 23:25:50,154: INFO/SpawnPoolWorker-4] child process 49708 calling self.run() [2023-01-16 23:25:50,183: INFO/SpawnPoolWorker-5] child process 49704 calling self.run() [2023-01-16 23:25:50,208: INFO/SpawnPoolWorker-6] child process 20884 calling self.run() [2023-01-16 23:25:50,221: INFO/SpawnPoolWorker-7] child process 17840 calling self.run() [2023-01-16 23:25:50,242: INFO/SpawnPoolWorker-8] child process 56040 calling self.run() [2023-01-16 23:25:50,275: INFO/SpawnPoolWorker-9] child process 45968 calling self.run() [2023-01-16 23:25:50,291: INFO/SpawnPoolWorker-10] child process 44888 calling self.run() [2023-01-16 23:25:50,313: INFO/SpawnPoolWorker-11] child process 8848 calling self.run() [2023-01-16 23:25:50,318: INFO/SpawnPoolWorker-12] child process 11020 calling self.run() [2023-01-16 23:25:50,727: INFO/MainProcess] mingle: all alone [2023-01-16 23:25:50,740: WARNING/MainProcess] D:\Development\Python\env\demo\lib\site-packages\celery\fixups\django.py:203: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('''Using settings.DEBUG leads to a memory [2023-01-16 23:25:50,741: INFO/MainProcess] celery@Coding ready. [2023-01-16 23:25:51,332: INFO/SpawnPoolWorker-13] child process 4580 calling self.run() [2023-01-16 23:25:51,341: INFO/SpawnPoolWorker-14] child process 44956 calling self.run() [2023-01-16 23:25:51,453: INFO/SpawnPoolWorker-15] child process 46100 calling self.run() [2023-01-16 23:25:51,466: INFO/SpawnPoolWorker-16] child process 46872 calling self.run() [2023-01-16 23:25:52,797: INFO/SpawnPoolWorker-17] child process 2716 calling self.run() [2023-01-16 23:25:52,800: INFO/SpawnPoolWorker-18] child process 49488 calling self.run() [2023-01-16 23:25:52,807: INFO/SpawnPoolWorker-19] child process 3912 calling self.run() [2023-01-16 23:25:53,608: INFO/SpawnPoolWorker-20] child process 40624 calling self.run()
提交任務
from celery_tasks.test_task.tasks import celerTest if __name__ == '__main__': for i in range(1,10): celerTest.delay(i)
異常
提交任務 Celery控制臺日志出現(xiàn)示任務已接收,但并沒有執(zhí)行
INFO/MainProcess] Task send_email[f301b786-af40-4283-a4d6-4a97ae05658f] received
INFO/MainProcess] Task send_email[5997d896-fdb2-4220-92fe-7027291df56d] received
原因:
celery對windows支持不好,需添加組件eventlet 指定協(xié)程
解決辦法
pip install eventlet
celery -A celery_tasks.main worker -l info -P eventlet -c 10
執(zhí)行1號任務
執(zhí)行2號任務
執(zhí)行3號任務
執(zhí)行4號任務
執(zhí)行5號任務
執(zhí)行6號任務
執(zhí)行7號任務
執(zhí)行8號任務
執(zhí)行9號任務
INFO/MainProcess] Task send_email[01457c6c-4571-4b1e-b09c-39df49d50162] received
WARNING/MainProcess] 執(zhí)行1號任務
INFO/MainProcess] Task send_email[01457c6c-4571-4b1e-b09c-39df49d50162] succeeded in 1.2969999999913853s: None
Celery發(fā)送郵件
創(chuàng)建config.py配置文件
# 設置代理人broker,使用redis的5號庫 broker_url = "redis://127.0.0.1/5" # 設置結果存儲,使用redis的6號庫 result_backend = "redis://127.0.0.1/6" # 任務超時限制 celery_task_time_limit = 10 * 60 # 時區(qū) celery_timezone = 'Asia/Shanghai'
創(chuàng)建Celery實例并加載配置
創(chuàng)建定義Celery包:celery_tasks
,然后創(chuàng)建main.py文件,實現(xiàn)創(chuàng)建Celery實例并加載配置
import os from celery import Celery from celery_tasks import config # 為celery使用django配置文件進行設置,識別和加載django的配置文件 os.environ.setdefault('DJANGO_SETTINGS_MODULE', '項目名稱.settings') # 創(chuàng)建celery實例,參數(shù)是celery名稱,需保證唯一 celery_app = Celery('celery_tasks') # 加載celery配置,設置broker隊列 celery_app.config_from_object(config, namespace='CELERY') # celery_app.config_from_object('celery_tasks.config') # 自動注冊celery任務,列表元素是任務包路徑 celery_app.autodiscover_tasks(['celery_tasks.email'])
定義發(fā)送郵件任務
在包celery_tasks
下創(chuàng)建任務包email_task
,并在該包下創(chuàng)建tasks.py
文件,用于定義任務
from django.conf import settings from django.core.mail import send_mail from celery_tasks.main import celery_app # bind:保證task對象會作為第一個參數(shù)自動傳入 # name:異步任務別名 # retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s # max_retries:異常自動重試次數(shù)的上限 @celery_app.task(bind=True, name='send_email', retry_backoff=3) def sendEmail(self, to_email, verify_url): subject = "郵箱驗證" html_message = '<p>尊敬的用戶您好!</p>' \ '<p>您的郵箱為:%s 。請點擊此鏈接激活您的郵箱:</p>' \ '<p><a href="%s" rel="external nofollow" >%s<a></p>' % (to_email, verify_url, verify_url) try: send_mail(subject, "", settings.EMAIL_FROM, [to_email], html_message=html_message) except Exception as e: # 有異常自動重試三次 raise self.retry(exc=e, max_retries=3)
啟動Celery
在使用Celery時,需要啟動worker進程來處理異步任務??梢允褂靡韵旅顏韱觲orker進程:
celery -A celery_tasks.main worker -l info -P eventlet -c 10
調用發(fā)送郵件異步任務
定義請求地址
from django.urls import re_path from . import views urlpatterns = [ re_path(r'^send/$', views.SendView.as_view(), name='send'), ]
定義視圖,并發(fā)送郵件
class SendView(View): def get(self, request): for i in range(1, 2): # 異步發(fā)送驗證郵件 verify_url = 'https://www.admin.com' email = 'admin@qq.com' res = sendEmail.delay(email, verify_url) print(res) return http.JsonResponse({"msg": "OK"})
以上就是Django使用Celery實現(xiàn)異步發(fā)送郵件的詳細內容,更多關于Django Celery異步發(fā)送郵件的資料請關注腳本之家其它相關文章!
相關文章
使用Python編寫Prometheus監(jiān)控的方法
今天小編就為大家分享一篇關于使用Python編寫Prometheus監(jiān)控的方法,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10使用Python獲取愛奇藝電視劇彈幕數(shù)據(jù)的示例代碼
這篇文章主要介紹了用Python獲取愛奇藝電視劇彈幕數(shù)據(jù),本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01python向已存在的excel中新增表,不覆蓋原數(shù)據(jù)的實例
下面小編就為大家分享一篇python向已存在的excel中新增表,不覆蓋原數(shù)據(jù)的實例,具有很好超參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05解決Python 命令行執(zhí)行腳本時,提示導入的包找不到的問題
今天小編就為大家分享一篇解決Python 命令行執(zhí)行腳本時,提示導入的包找不到的問題,具有很好的參考價值,希望對大家有所幫助,一起跟隨小編過來看看吧2019-01-01