通過(guò)celery異步處理一個(gè)查詢?nèi)蝿?wù)的完整代碼
今天介紹通過(guò)celery實(shí)現(xiàn)一個(gè)異步任務(wù)。有這樣一個(gè)需求,前端發(fā)起一個(gè)查詢的請(qǐng)求,但是發(fā)起查詢后,查詢可能不會(huì)立即返回結(jié)果。這時(shí)候,發(fā)起查詢后,后端可以把這次查詢當(dāng)作一個(gè)task,并立即返回一個(gè)能唯一表明該task的值,如taskID(用戶后面可以通過(guò)這個(gè)taskID 隨時(shí)查看結(jié)果),用戶收到這個(gè)taskID后,可以轉(zhuǎn)去處理其他任務(wù),而不必一直等待查詢結(jié)果。后端API調(diào)用celery來(lái)處理這個(gè)task,并將結(jié)果值保存在一個(gè)csv文件中,后面用戶通過(guò)taskID 查詢時(shí)返回結(jié)果。
def application(environ,start_response):
"""部分代碼省略"""
query_string = environ['QUERY_STRING']
serviceGroupName = ""
for getParam in query_string.split("&"):
params = getParam.split("=")
resultInfo = ""
if params[0] == "type":
alertType = params[1]
elif params[0] == "projectName":
projectName = params[1]
elif params[0] == "serviceGroupName":
serviceGroupName = params[1]
else:
resultInfo = error_info(-1, "GET參數(shù)只能為type=<?>&projectName=<?>&serviceGroupName=<?>;必須指定三個(gè)參數(shù)", {})
return [resultInfo]
taskId = 1
result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort})
result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo")
taskInfo = "任務(wù)已經(jīng)創(chuàng)建,詳情請(qǐng)查看:http://10.4.34.254/api/task?taskId=%s"% (taskId)
return [resultInfo]
getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重點(diǎn)是這一行,apply_async()方法會(huì)返回一個(gè)AsyncResult實(shí)例,通過(guò)這個(gè)實(shí)例可以跟蹤任務(wù)狀態(tài)軌跡。
要使用此功能,需要提供結(jié)果后臺(tái)(result backend),這樣才有地方存儲(chǔ)任務(wù)狀態(tài)等信息。其中,getServiceInfo是自定義的一個(gè)task,后續(xù)會(huì)介紹到,contentInfo是傳遞的一個(gè)參數(shù),queue是指定隊(duì)列名稱。
上面這個(gè)函數(shù)的原型如下:
task.apply_async(args[, kwargs[, …]])
其中 args 和 kwargs 分別是 task 接收的參數(shù),當(dāng)然它也接受額外的參數(shù)對(duì)任務(wù)進(jìn)行控制。
在 Celery 中執(zhí)行任務(wù)的方法一共有三種:
1. delay, 用來(lái)進(jìn)行最簡(jiǎn)單便捷的任務(wù)執(zhí)行(delay在第3小節(jié)的測(cè)試中使用過(guò),它可以看作是apply_async的一個(gè)快捷方式);
2. apply_async, 對(duì)于任務(wù)的執(zhí)行附加額外的參數(shù),對(duì)任務(wù)進(jìn)行控制;
3. app.send_task, 可以執(zhí)行未在 Celery 中進(jìn)行注冊(cè)的任務(wù)。
celery文件配置
在python的庫(kù)存放路徑中(一般是/usr/lib/python2.6/site-packages),創(chuàng)建一個(gè)文件夾proj,進(jìn)入proj目錄,創(chuàng)建三個(gè)文件,init,將proj聲明一個(gè)python包,celepy,其內(nèi)容如下:
#_*_ coding:utf-8 _*_
from __future__ import absolute_import
from celery import Celery
app = Celery("proj",
broker="amqp://user:password@localhost//",
backend="amqp",
include=["proj.tasks"]
)
app.conf.update(
CELERY_ROUTES={
"proj.tasks.getServerInfo":{"queue":"getServerInfo"},
}
)
if __name__=="__main__":
app.start()
這里我們定義了模塊名稱proj以及celery 路由。
還有一個(gè)文件,task.py
#_*_ coding:utf-8 _*_i
from __future__ import absolute_import
from proj.celery import app
import random
import simplejson as json
import types
import time
import MySQLdb
import urllib2
import ConfigParser as cparser
import hmac
import hashlib
import base64
@app.task
def getServiceInfo(contentInfo):
contentInfo = json.loads(contentInfo)
serviceGroupName = contentInfo['serviceGroupName']
dbHost = contentInfo['dbHost']
dbPort = int(contentInfo['dbPort'])
dbUser = contentInfo['dbUser']
dbPasswd = contentInfo['dbPasswd']
msgLib = MessageLib.MessageLib()
Sql = "Your SQL"
#第三步:連接數(shù)據(jù)庫(kù),執(zhí)行代碼邏輯
try:
db_connection = MySQLdb.connect(host=dbHost, port=dbPort, passwd=dbPasswd, db="cmdb", user=dbUser, connect_timeout=2, charset="utf8")
cursor = db_connection.cursor()
cursor.execute(getServiceGroupHostSql)
row = cursor.fetchall()
result = []
for line in row:
...
result.append(tempMysqlHighInfo)
resultInfo = msgLib.success_info(result)
return resultInfo
except Exception, e:
raise
errorInfo = "dbhost:%s, port:%s, error:%s" % (dbHost, dbPort, str(e))
#return getServiceGroupHostSql,errorInfo
return msgLib.error_info(-1, errorInfo, {})
啟動(dòng)celery
celery -A proj worker -Q getServiceInfo -l debug -c 6
最后,寫(xiě)一個(gè)結(jié)果,專門(mén)獲取查詢結(jié)果的結(jié)果,傳入的參數(shù)為taskID,部分代碼如下:
def application(environ,start_response):
status = '400 ERROR'
response_headers = [('Content-type', 'application/json;charset=utf-8')]
start_response(status, response_headers)
status = '200 OK'
response_headers = [('Content-type', 'application/json;charset=utf-8')]
start_response(status, response_headers)
if environ['REQUEST_METHOD'] != "GET":
resultInfo = msgLib.error_info(-1, "http請(qǐng)求類型不是GET", {})
return [resultInfo]
query_string = environ['QUERY_STRING']
serviceGroupName = ""
for getParam in query_string.split("&"):
params = getParam.split("=")
resultInfo = ""
if params[0] == "taskId":
taskId = params[1]
else:
resultInfo = msgLib.error_info(-1, "GET參數(shù)無(wú)比指定taskId這個(gè)參數(shù)", {})
return [resultInfo]
logging.info(query_string)
result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
result = []
try:
with open (result_file_name,'rb') as fp:
lines = csv.reader(fp)
for line in lines :
result.append(line)
resultInfo = msgLib.success_info(result)
return resultInfo
except Exception, e:
errorInfo = "some thing wrong"
return msgLib.error_info(-1, errorInfo, {})
以上這篇通過(guò)celery異步處理一個(gè)查詢?nèi)蝿?wù)的完整代碼就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
PyQtGraph在pyqt中的應(yīng)用及安裝過(guò)程
這篇文章主要介紹了PyQtGraph在pyqt中的應(yīng)用,文中給大家介紹了pyqtgraph的主要用途及PyQtGraph的安裝過(guò)程,需要的朋友可以參考下2019-08-08
使用Python編寫(xiě)文件重復(fù)檢查器的完整代碼
在日常工作中,我們經(jīng)常需要處理大量文件,但有時(shí)候會(huì)遇到文件重復(fù)的情況,為了有效管理文件并避免重復(fù)占用存儲(chǔ)空間,我們可以編寫(xiě)一個(gè)簡(jiǎn)單的Python程序來(lái)檢查文件夾中是否存在重復(fù)文件,本文將介紹如何使用Python和其庫(kù)來(lái)編寫(xiě)一個(gè)文件重復(fù)檢查器2024-08-08
使用jupyter notebook輸出顯示不完全的問(wèn)題及解決
這篇文章主要介紹了使用jupyter notebook輸出顯示不完全的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02
python利用opencv如何實(shí)現(xiàn)答題卡自動(dòng)判卷
由于工作需要,最近在研究關(guān)于如何通過(guò)程序識(shí)別答題卡的客觀題的答案,所以下面這篇文章主要介紹了python利用opencv如何實(shí)現(xiàn)答題卡自動(dòng)判卷的相關(guān)資料,需要的朋友可以參考下2021-08-08
Python機(jī)器學(xué)習(xí)從ResNet到DenseNet示例詳解
ResNet極大地改變了如何參數(shù)化深層網(wǎng)絡(luò)中函數(shù)的觀點(diǎn)。稠密連接網(wǎng)絡(luò)(DenseNet)在某種程度上是ResNet的邏輯擴(kuò)展。讓我們先從數(shù)學(xué)上了解下2021-10-10
python使用itchat模塊給心愛(ài)的人每天發(fā)天氣預(yù)報(bào)
這篇文章主要介紹了python使用itchat模塊給心愛(ài)的人每天發(fā)天氣預(yù)報(bào),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-11-11
基于Python實(shí)現(xiàn)Word轉(zhuǎn)HTML
將Word轉(zhuǎn)換為HTML能將文檔內(nèi)容發(fā)布在網(wǎng)頁(yè)上,這樣,用戶就可以通過(guò)瀏覽器直接查看或閱讀文檔而無(wú)需安裝特定的軟件,下面我們就來(lái)學(xué)習(xí)一下Python是如何實(shí)現(xiàn)Word轉(zhuǎn)HTML的吧2023-12-12
Python中使用插入排序算法的簡(jiǎn)單分析與代碼示例
這篇文章主要介紹了Python使用插入排序算法的簡(jiǎn)單分析與代碼示例,插入算法的平均時(shí)間復(fù)雜度為O(n^2),需要的朋友可以參考下2016-05-05

