Flask中嵌套啟動子線程的方法示例詳解
正文
如果你在Flask中啟動過子線程,然后在子線程中讀寫過g
對象或者嘗試從request
對象中讀取url參數(shù),那么,你肯定對下面這個報錯不陌生:RuntimeError: Working outside of request context.
.
例如下面這段Flask代碼:
import threading from flask import Flask, request app = Flask(__name__) def inner_func(): doc_id = request.args.get('doc_id', '') print(f'用戶ID為:{doc_id}') @app.route('/start_thread') def start_thread(): thread = threading.Thread(target=inner_func) thread.start() return {'success': True, 'msg': '獲取用戶ID成功!'}
請求/start_thread
接口就會報錯,如下圖所示:
如果你在網上搜索flask thread RuntimeError: Working outside of request context.
,那么你可能會看到官方文檔或者StackOverFlow上面提供了一個裝飾器@copy_current_request_context
。如下圖所示:
照著它這樣寫,確實能解決問題,如下圖所示:
但無論是官網還是StackOverFlow,它的例子都非常簡單。但是我們知道,啟動線程有很多種方法,例如:
# 方法一,啟動簡單線程 import threading job = threading.Thread(target=函數(shù)名, args=(參數(shù)1, 參數(shù)2), kwargs={'參數(shù)3': xxx, '參數(shù)4': yyy}) job.start() # 方法2,使用類定義線程 import threading class Job(threading.Thread): def __init__(self, 參數(shù)): super().__init__() def run(self): print('子線程開始運行') job = Job(參數(shù)) job.start() # 方法3,使用線程池 from multiprocessing.dummy import Pool pool = Pool(5) # 5個線程的線程池 pool.map(函數(shù)名, 參數(shù)列表)
網上的方法只能解決第一種寫法的問題。如果想使用方法2和方法3啟動子線程,代碼應該怎么寫呢?如果在子線程中又啟動子線程,再用一次@copy_current_request_context
還行嗎?
相信我,你在網上搜索一下午,只有兩種結果:一是找不到答案,二是找到的答案是晚于2023年1月14日的,因為是別人看了我這篇文章以后,再寫的。
解答上面的問題前,還是說明一下我對于在后端啟動子線程這個行為的觀點。例如有些人喜歡在后端掛一個爬蟲,請求接口以后,通過線程啟動爬蟲,爬蟲開始爬數(shù)據。又或者,有些人在后端上面掛了一些復雜的程序代碼,請求接口以后,后端啟動子線程,在子線程中運行這些代碼。
我一向是不建議在后端又啟動子線程去做復雜操作的。無論你使用的是Flask還是Django還是FastAPI。正確的做法應該是使用消息隊列,后端只是把觸發(fā)任務的相關參數(shù)發(fā)送到消息隊列中。下游真正的運行程序從消息隊列讀取到觸發(fā)參數(shù)以后,開始運行。
但有時候,你可能綜合考慮性價比,覺得再增加一個消息隊列,成本太高;或者干脆是要趕工期,不得不先暫時使用多線程來解決問題,那么這篇文章將會極大幫助到你。
盡量不要在子線程中讀取請求相關的參數(shù)
如果你的子線程不需要讀寫g
對象,也不需要從請求中讀取各種參數(shù),那么你就可以關閉這篇文章了。因為你的子線程可以直接運行,不會遇到什么的問題,例如:
所以最好的解決方法,就是在啟動子線程之前,提前先獲取到子線程需要的每一個參數(shù),然后把這些參數(shù)在啟動子線程的時候作為函數(shù)參數(shù)傳進去。如果你是從零開始寫代碼,那么一開始這樣做,就可以幫你避免很多麻煩。
但如果你是修改已有的代碼,并且嵌套太深,已經沒有辦法一層一層傳入參數(shù),或者代碼量太大,不知道哪些地方悄悄調用了g
對象或者讀寫了請求上下文,那么你可以繼續(xù)往下看。
裝飾閉包函數(shù)而不是一級函數(shù)
上面的簡單多線程寫法,有一個地方需要特別注意,被@copy_current_request_context
裝飾的子線程入口函數(shù)inner_func
,必須是閉包函數(shù),不能是一級函數(shù)。如下圖所示:
如果不小心裝飾了一級函數(shù),就會報如下的錯誤:
線程池復制請求上下文
當我們使用multiprocessing.dummy
來實現(xiàn)線程池時,代碼如下:
from multiprocessing.dummy import Pool from flask import Flask, request, copy_current_request_context, g app = Flask(__name__) @app.route('/start_thread', methods=['POST']) def start_thread(): @copy_current_request_context def crawl(doc_id): url_template = request.json.get('url_template', '') url = url_template.format(doc_id=doc_id) print(f'開始爬?。簕url}') doc_id_list = [123, 456, 789, 111, 222, 333, 444] pool = Pool(3) pool.map(crawl, doc_id_list) return {'success': True, 'msg': '爬取文章成功!'}
運行效果如下圖所示:
寫法上整體跟threading.Thread
啟動簡單線程的方法差不多。
用類定義線程時復制請求上下文
當我們額外定義了一個線程類時,需要把被裝飾的閉包函數(shù)傳入到子線程中,然后在子線程的run()
方法中運行:
import threading from flask import Flask, request, copy_current_request_context app = Flask(__name__) class Job(threading.Thread): def __init__(self, func): super().__init__() self.func = func def run(self): self.func() @app.route('/start_thread', methods=['POST']) def start_thread(): @copy_current_request_context def runner(): doc_id = request.json.get('doc_id', '') print(f'docId的值是:{doc_id}') job = Job(runner) job.start() return {'success': True, 'msg': '讀取文章成功!'}
運行效果如下圖所示:
嵌套子線程復制請求上下文
有時候,我們先創(chuàng)建了一個子線程,然后在子線程中,又需要創(chuàng)建孫線程。并且在孫線程中讀取請求上下文。例如下面的代碼:
import threading from multiprocessing.dummy import Pool from flask import Flask, request, copy_current_request_context app = Flask(__name__) def deep_func_runner(doc_id_list): @copy_current_request_context def deep_func(doc_id): category = request.args.get('category', '') url = f'https://www.kingname.info/{category}/{doc_id}' print(f'開始爬取:{url}') pool = Pool(3) pool.map(deep_func, doc_id_list) @app.route('/start_thread', methods=['POST']) def start_thread(): @copy_current_request_context def runner(): doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999] deep_func_runner(doc_id_list) job = threading.Thread(target=runner) job.start() return {'success': True, 'msg': '讀取文章成功!'}
此時使用@copy_current_request_context
就會報您一個錯誤:ValueError: <Token var=<ContextVar name='flask.request_ctx' at 0x103ef69a0> at 0x104446700> was created in a different Context
。如下圖所示:
這個時候,我們就需要額外再創(chuàng)建一個裝飾器:
def copy_current_app_context(f): from flask.globals import _app_ctx_stack appctx = _app_ctx_stack.top def _(*args, **kwargs): with appctx: return f(*args, **kwargs) return _
@copy_current_app_context
這個裝飾器需要放到孫線程里面@copy_current_request_context
的上面。完整的代碼為:
import threading from multiprocessing.dummy import Pool from flask import Flask, request, copy_current_request_context app = Flask(__name__) def copy_current_app_context(f): from flask.globals import _app_ctx_stack appctx = _app_ctx_stack.top def _(*args, **kwargs): with appctx: return f(*args, **kwargs) return _ def deep_func_runner(doc_id_list): @copy_current_app_context @copy_current_request_context def deep_func(doc_id): category = request.args.get('category', '') url = f'https://www.kingname.info/{category}/{doc_id}' print(f'開始爬取:{url}') pool = Pool(3) pool.map(deep_func, doc_id_list) @app.route('/start_thread', methods=['POST']) def start_thread(): @copy_current_request_context def runner(): doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999] deep_func_runner(doc_id_list) job = threading.Thread(target=runner) job.start() return {'success': True, 'msg': '讀取文章成功!'}
運行效果如下圖所示,孫線程也正常啟動了:
總結
- 非必要不在后端中創(chuàng)建子線程
- 創(chuàng)建子線程時,如果能把參數(shù)從外面?zhèn)魅?,就不要讓子線程自己去Flask的上下文讀取
@copy_current_request_context
需要裝飾閉包函數(shù),不能裝飾一級函數(shù)- 嵌套子線程需要同時使用
@copy_current_app_context
和@copy_current_request_context
兩個裝飾器來裝飾孫線程的閉包函數(shù)
以上就是Flask中嵌套啟動子線程的方法示例詳解的詳細內容,更多關于Flask嵌套啟動子線程的資料請關注腳本之家其它相關文章!