欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

通過celery異步處理一個查詢任務的完整代碼

 更新時間:2019年11月19日 15:58:40   作者:spur_man  
今天小編就為大家分享一篇通過celery異步處理一個查詢任務的完整代碼,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

今天介紹通過celery實現一個異步任務。有這樣一個需求,前端發(fā)起一個查詢的請求,但是發(fā)起查詢后,查詢可能不會立即返回結果。這時候,發(fā)起查詢后,后端可以把這次查詢當作一個task,并立即返回一個能唯一表明該task的值,如taskID(用戶后面可以通過這個taskID 隨時查看結果),用戶收到這個taskID后,可以轉去處理其他任務,而不必一直等待查詢結果。后端API調用celery來處理這個task,并將結果值保存在一個csv文件中,后面用戶通過taskID 查詢時返回結果。

def application(environ,start_response):
  """部分代碼省略"""
  query_string = environ['QUERY_STRING']
  serviceGroupName = ""
  for getParam in query_string.split("&"):
    params = getParam.split("=")
    resultInfo = ""
    if params[0] == "type":
      alertType = params[1]
    elif params[0] == "projectName":
      projectName = params[1]
    elif params[0] == "serviceGroupName":
      serviceGroupName = params[1]
    else:
      resultInfo = error_info(-1, "GET參數只能為type=<?>&projectName=<?>&serviceGroupName=<?>;必須指定三個參數", {})
    return [resultInfo]  
  taskId = 1
  result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
  contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort})
  result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo")
  taskInfo = "任務已經創(chuàng)建,詳情請查看:http://10.4.34.254/api/task?taskId=%s"% (taskId)
  return [resultInfo]

getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重點是這一行,apply_async()方法會返回一個AsyncResult實例,通過這個實例可以跟蹤任務狀態(tài)軌跡。

要使用此功能,需要提供結果后臺(result backend),這樣才有地方存儲任務狀態(tài)等信息。其中,getServiceInfo是自定義的一個task,后續(xù)會介紹到,contentInfo是傳遞的一個參數,queue是指定隊列名稱。

上面這個函數的原型如下:

task.apply_async(args[, kwargs[, …]])

其中 args 和 kwargs 分別是 task 接收的參數,當然它也接受額外的參數對任務進行控制。

在 Celery 中執(zhí)行任務的方法一共有三種:

1. delay, 用來進行最簡單便捷的任務執(zhí)行(delay在第3小節(jié)的測試中使用過,它可以看作是apply_async的一個快捷方式);

2. apply_async, 對于任務的執(zhí)行附加額外的參數,對任務進行控制;

3. app.send_task, 可以執(zhí)行未在 Celery 中進行注冊的任務。

celery文件配置

在python的庫存放路徑中(一般是/usr/lib/python2.6/site-packages),創(chuàng)建一個文件夾proj,進入proj目錄,創(chuàng)建三個文件,init,將proj聲明一個python包,celepy,其內容如下:

#_*_ coding:utf-8 _*_
from __future__ import absolute_import
from celery import Celery

app = Celery("proj",
broker="amqp://user:password@localhost//",
backend="amqp",
include=["proj.tasks"]
)
app.conf.update(
CELERY_ROUTES={
"proj.tasks.getServerInfo":{"queue":"getServerInfo"},
}
)
if __name__=="__main__":
  app.start()

這里我們定義了模塊名稱proj以及celery 路由。

還有一個文件,task.py

#_*_ coding:utf-8 _*_i
from __future__ import absolute_import
from proj.celery import app
import random
import simplejson as json
import types
import time
import MySQLdb
import urllib2
import ConfigParser as cparser
import hmac
import hashlib
import base64
@app.task
def getServiceInfo(contentInfo):
  contentInfo = json.loads(contentInfo)
  serviceGroupName = contentInfo['serviceGroupName']

  dbHost = contentInfo['dbHost']
  dbPort = int(contentInfo['dbPort'])
  dbUser = contentInfo['dbUser']
  dbPasswd = contentInfo['dbPasswd']
  msgLib = MessageLib.MessageLib()
  Sql = "Your SQL"
  #第三步:連接數據庫,執(zhí)行代碼邏輯
  try:
    db_connection = MySQLdb.connect(host=dbHost, port=dbPort, passwd=dbPasswd, db="cmdb", user=dbUser, connect_timeout=2, charset="utf8")
    cursor = db_connection.cursor()
    cursor.execute(getServiceGroupHostSql)
    row = cursor.fetchall()
    result = []
    for line in row:
      ...
      result.append(tempMysqlHighInfo)

  resultInfo = msgLib.success_info(result)
  return resultInfo
  except Exception, e:
    raise
    errorInfo = "dbhost:%s, port:%s, error:%s" % (dbHost, dbPort, str(e))
    #return getServiceGroupHostSql,errorInfo
    return msgLib.error_info(-1, errorInfo, {})

啟動celery

celery -A proj worker -Q getServiceInfo -l debug -c 6

最后,寫一個結果,專門獲取查詢結果的結果,傳入的參數為taskID,部分代碼如下:

def application(environ,start_response):
  status = '400 ERROR'
  response_headers = [('Content-type', 'application/json;charset=utf-8')]
  start_response(status, response_headers)

  status = '200 OK'
  response_headers = [('Content-type', 'application/json;charset=utf-8')]
  start_response(status, response_headers)

  if environ['REQUEST_METHOD'] != "GET":
    resultInfo = msgLib.error_info(-1, "http請求類型不是GET", {})
  return [resultInfo]

  query_string = environ['QUERY_STRING']
  serviceGroupName = ""
  for getParam in query_string.split("&"):
    params = getParam.split("=")
    resultInfo = ""
    if params[0] == "taskId":
      taskId = params[1]
    else:
      resultInfo = msgLib.error_info(-1, "GET參數無比指定taskId這個參數", {})
    return [resultInfo]
  logging.info(query_string)
  result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
  result = []
  try:
    with open (result_file_name,'rb') as fp:
    lines = csv.reader(fp)
    for line in lines :
    result.append(line)
    resultInfo = msgLib.success_info(result)
  return resultInfo
  except Exception, e:
  errorInfo = "some thing wrong"
  return msgLib.error_info(-1, errorInfo, {})

以上這篇通過celery異步處理一個查詢任務的完整代碼就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • PyQtGraph在pyqt中的應用及安裝過程

    PyQtGraph在pyqt中的應用及安裝過程

    這篇文章主要介紹了PyQtGraph在pyqt中的應用,文中給大家介紹了pyqtgraph的主要用途及PyQtGraph的安裝過程,需要的朋友可以參考下
    2019-08-08
  • python基礎知識之索引與切片詳解

    python基礎知識之索引與切片詳解

    在python的學習過程,有些同學對索引和切換會感到困惑,今天我們就來弄清楚它,下面這篇文章主要給大家介紹了關于python基礎知識之索引與切片的相關資料,需要的朋友可以參考下
    2022-05-05
  • 使用Python編寫文件重復檢查器的完整代碼

    使用Python編寫文件重復檢查器的完整代碼

    在日常工作中,我們經常需要處理大量文件,但有時候會遇到文件重復的情況,為了有效管理文件并避免重復占用存儲空間,我們可以編寫一個簡單的Python程序來檢查文件夾中是否存在重復文件,本文將介紹如何使用Python和其庫來編寫一個文件重復檢查器
    2024-08-08
  • 使用jupyter notebook輸出顯示不完全的問題及解決

    使用jupyter notebook輸出顯示不完全的問題及解決

    這篇文章主要介紹了使用jupyter notebook輸出顯示不完全的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • python利用opencv如何實現答題卡自動判卷

    python利用opencv如何實現答題卡自動判卷

    由于工作需要,最近在研究關于如何通過程序識別答題卡的客觀題的答案,所以下面這篇文章主要介紹了python利用opencv如何實現答題卡自動判卷的相關資料,需要的朋友可以參考下
    2021-08-08
  • Python機器學習從ResNet到DenseNet示例詳解

    Python機器學習從ResNet到DenseNet示例詳解

    ResNet極大地改變了如何參數化深層網絡中函數的觀點。稠密連接網絡(DenseNet)在某種程度上是ResNet的邏輯擴展。讓我們先從數學上了解下
    2021-10-10
  • python使用itchat模塊給心愛的人每天發(fā)天氣預報

    python使用itchat模塊給心愛的人每天發(fā)天氣預報

    這篇文章主要介紹了python使用itchat模塊給心愛的人每天發(fā)天氣預報,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • 基于python的字節(jié)編譯詳解

    基于python的字節(jié)編譯詳解

    下面小編就為大家?guī)硪黄趐ython的字節(jié)編譯詳解。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • 基于Python實現Word轉HTML

    基于Python實現Word轉HTML

    將Word轉換為HTML能將文檔內容發(fā)布在網頁上,這樣,用戶就可以通過瀏覽器直接查看或閱讀文檔而無需安裝特定的軟件,下面我們就來學習一下Python是如何實現Word轉HTML的吧
    2023-12-12
  • Python中使用插入排序算法的簡單分析與代碼示例

    Python中使用插入排序算法的簡單分析與代碼示例

    這篇文章主要介紹了Python使用插入排序算法的簡單分析與代碼示例,插入算法的平均時間復雜度為O(n^2),需要的朋友可以參考下
    2016-05-05

最新評論