Python使用進(jìn)程池并發(fā)執(zhí)行SQL語(yǔ)句的操作代碼
這段代碼使用了 Python 的 multiprocessing
模塊來(lái)實(shí)現(xiàn)真正的并行處理,繞過(guò) Python 的全局解釋器鎖(GIL)限制,從而在多核 CPU 上并發(fā)執(zhí)行多個(gè) SQL 語(yǔ)句。
from pyhive import hive import multiprocessing # 建立連接 conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password") # SQL 語(yǔ)句列表 sql_statements = [ "INSERT INTO table1 VALUES (1, 'value1')", "INSERT INTO table1 VALUES (2, 'value2')", "INSERT INTO table1 VALUES (3, 'value3')" ] # 定義執(zhí)行函數(shù) def execute_sql(sql): with conn.cursor() as cursor: cursor.execute(sql) # 確保多進(jìn)程代碼只在主進(jìn)程中執(zhí)行 if __name__ == '__main__': # 使用進(jìn)程池并發(fā)執(zhí)行 with multiprocessing.Pool() as pool: pool.map(execute_sql, sql_statements) # 關(guān)閉連接 conn.close()
1. 導(dǎo)入模塊
from pyhive import hive import multiprocessing
pyhive
: 這是用于連接和操作 Hive 數(shù)據(jù)庫(kù)的 Python 庫(kù)。hive.Connection
用于建立與 Hive 數(shù)據(jù)庫(kù)的連接。multiprocessing
: 這是 Python 的標(biāo)準(zhǔn)庫(kù),用于創(chuàng)建和管理進(jìn)程。通過(guò)multiprocessing
,我們可以繞過(guò) Python 的 GIL(全局解釋器鎖)限制,實(shí)現(xiàn)真正的并行處理。
2. 建立數(shù)據(jù)庫(kù)連接
conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")
- 這里我們使用
hive.Connection
建立一個(gè)到 Hive 數(shù)據(jù)庫(kù)的連接。 - 參數(shù):
host
: HiveServer2 的主機(jī)地址,通常是localhost
或 HiveServer2 運(yùn)行的服務(wù)器 IP。port
: HiveServer2 的端口號(hào),默認(rèn)是10000
。username
: 連接 Hive 使用的用戶名。password
: 連接 Hive 使用的密碼。
這個(gè)連接對(duì)象 conn
將在后續(xù)的代碼中用于創(chuàng)建游標(biāo)(cursor),并通過(guò)游標(biāo)執(zhí)行 SQL 語(yǔ)句。
3. 定義 SQL 語(yǔ)句列表
sql_statements = [ "INSERT INTO table1 VALUES (1, 'value1')", "INSERT INTO table1 VALUES (2, 'value2')", "INSERT INTO table1 VALUES (3, 'value3')" ]
- 這里定義了一個(gè)包含多個(gè) SQL 語(yǔ)句的列表
sql_statements
。每個(gè)語(yǔ)句都是一個(gè)插入操作,將數(shù)據(jù)插入到 Hive 表table1
中。 - 你可以根據(jù)實(shí)際需求修改這些 SQL 語(yǔ)句。
4. 定義執(zhí)行函數(shù)
def execute_sql(sql): with conn.cursor() as cursor: cursor.execute(sql)
execute_sql
函數(shù)是用于執(zhí)行單個(gè) SQL 語(yǔ)句的函數(shù)。with conn.cursor() as cursor
:為當(dāng)前數(shù)據(jù)庫(kù)連接創(chuàng)建一個(gè)游標(biāo)對(duì)象cursor
,這個(gè)游標(biāo)用于執(zhí)行 SQL 語(yǔ)句。cursor.execute(sql)
:執(zhí)行傳入的 SQL 語(yǔ)句。
- 這個(gè)函數(shù)會(huì)被進(jìn)程池中的每個(gè)進(jìn)程調(diào)用,每個(gè)進(jìn)程都會(huì)獨(dú)立執(zhí)行一個(gè) SQL 語(yǔ)句。
5. 使用進(jìn)程池并發(fā)執(zhí)行
with multiprocessing.Pool() as pool: pool.map(execute_sql, sql_statements)
multiprocessing.Pool()
:創(chuàng)建一個(gè)進(jìn)程池。進(jìn)程池可以管理一組工作進(jìn)程,并將任務(wù)分配給這些進(jìn)程。- 默認(rèn)情況下,
Pool()
會(huì)根據(jù)系統(tǒng)的 CPU 核心數(shù)創(chuàng)建相應(yīng)數(shù)量的工作進(jìn)程。 - 你可以通過(guò)參數(shù)指定池中的進(jìn)程數(shù)量,例如
Pool(4)
表示創(chuàng)建 4 個(gè)工作進(jìn)程。
- 默認(rèn)情況下,
pool.map(execute_sql, sql_statements)
:pool.map
方法會(huì)將execute_sql
函數(shù)應(yīng)用到sql_statements
列表中的每個(gè)元素上。pool.map
方法會(huì)自動(dòng)將 SQL 語(yǔ)句列表分配給進(jìn)程池中的工作進(jìn)程,每個(gè)進(jìn)程獨(dú)立執(zhí)行一個(gè) SQL 語(yǔ)句。- 這個(gè)過(guò)程是并行的,多個(gè)進(jìn)程可以同時(shí)執(zhí)行不同的 SQL 語(yǔ)句,從而提高執(zhí)行效率。
6. 關(guān)閉數(shù)據(jù)庫(kù)連接
conn.close()
- 在所有 SQL 語(yǔ)句執(zhí)行完畢后,我們關(guān)閉數(shù)據(jù)庫(kù)連接,釋放資源。
進(jìn)程池的工作原理
multiprocessing.Pool
提供了一種方便的方式來(lái)并行化執(zhí)行函數(shù)。其工作原理如下:
- 創(chuàng)建進(jìn)程池:當(dāng)你創(chuàng)建一個(gè)
Pool
對(duì)象時(shí),會(huì)啟動(dòng)多個(gè)工作進(jìn)程(數(shù)量可以指定,或默認(rèn)根據(jù) CPU 核心數(shù)決定)。 - 任務(wù)分配:當(dāng)你調(diào)用
pool.map
時(shí),進(jìn)程池會(huì)將任務(wù)(在這里是execute_sql
函數(shù))分配給空閑的工作進(jìn)程。 - 并行執(zhí)行:每個(gè)工作進(jìn)程獨(dú)立執(zhí)行分配給它的任務(wù),互不干擾。
- 結(jié)果收集:
pool.map
會(huì)收集所有工作進(jìn)程的執(zhí)行結(jié)果,并按照原始任務(wù)列表的順序返回結(jié)果。
為什么使用進(jìn)程池而不是線程池?
- GIL 限制:Python 的全局解釋器鎖(GIL)限制了多線程的并行執(zhí)行能力,尤其是在 CPU 密集型任務(wù)中,多線程并不能充分利用多核 CPU。
- 進(jìn)程并行:
multiprocessing
模塊通過(guò)創(chuàng)建多個(gè)進(jìn)程來(lái)繞過(guò) GIL 限制,每個(gè)進(jìn)程都有自己的 Python 解釋器和內(nèi)存空間,因此可以實(shí)現(xiàn)真正的并行執(zhí)行。 - 適用場(chǎng)景:
- 線程池:適合 I/O 密集型任務(wù)(例如,等待數(shù)據(jù)庫(kù)查詢結(jié)果)。
- 進(jìn)程池:適合 CPU 密集型任務(wù)(例如,并行計(jì)算、數(shù)據(jù)處理等),或者你需要繞過(guò) GIL 限制時(shí)。
注意事項(xiàng)
- 數(shù)據(jù)庫(kù)連接:在多進(jìn)程環(huán)境中,每個(gè)進(jìn)程都有自己的內(nèi)存空間,因此每個(gè)進(jìn)程需要獨(dú)立的數(shù)據(jù)庫(kù)連接。在上述代碼中,每個(gè)進(jìn)程都通過(guò)
conn.cursor()
創(chuàng)建了自己的游標(biāo)。 - 進(jìn)程開銷:創(chuàng)建和銷毀進(jìn)程有一定的開銷,因此對(duì)于非常短小的任務(wù),進(jìn)程池可能不會(huì)顯著提高性能。在這種情況下,可以考慮調(diào)整進(jìn)程池的大小或使用其他優(yōu)化手段。
- 連接池:如果你的程序需要頻繁訪問數(shù)據(jù)庫(kù),可以考慮使用數(shù)據(jù)庫(kù)連接池來(lái)復(fù)用數(shù)據(jù)庫(kù)連接,減少連接建立和關(guān)閉的開銷。
總結(jié)
- 進(jìn)程池:通過(guò)
multiprocessing.Pool
實(shí)現(xiàn),可以繞過(guò) Python 的 GIL 限制,實(shí)現(xiàn)真正的并行處理。 - 適用場(chǎng)景:適合 CPU 密集型任務(wù)或需要并行執(zhí)行多個(gè)獨(dú)立任務(wù)的場(chǎng)景。
- 代碼結(jié)構(gòu):
- 建立數(shù)據(jù)庫(kù)連接。
- 定義 SQL 語(yǔ)句列表。
- 定義執(zhí)行函數(shù)
execute_sql
。 - 使用進(jìn)程池并發(fā)執(zhí)行 SQL 語(yǔ)句。
- 關(guān)閉數(shù)據(jù)庫(kù)連接。
通過(guò)這種方式,你可以充分利用多核 CPU 的優(yōu)勢(shì),并發(fā)執(zhí)行多個(gè) SQL 語(yǔ)句,從而提高程序的執(zhí)行效率。
解決多進(jìn)程報(bào)錯(cuò)
你遇到的錯(cuò)誤是 RuntimeError
,這是因?yàn)槟阍谑褂?nbsp;multiprocessing
時(shí)沒有正確地保護(hù)代碼的入口點(diǎn)。具體來(lái)說(shuō),在 Windows 系統(tǒng)上(以及其他非 fork 的啟動(dòng)方式),你必須將多進(jìn)程相關(guān)的代碼放在 if __name__ == '__main__':
語(yǔ)句塊中,以避免子進(jìn)程在啟動(dòng)時(shí)重新導(dǎo)入主模塊并執(zhí)行不必要的代碼。
錯(cuò)誤原因:
在 Windows 系統(tǒng)中,Python 的 multiprocessing
模塊使用 spawn 啟動(dòng)子進(jìn)程,這意味著子進(jìn)程會(huì)重新導(dǎo)入當(dāng)前腳本。如果不加以保護(hù),子進(jìn)程會(huì)再次執(zhí)行主模塊中的代碼,導(dǎo)致遞歸創(chuàng)建進(jìn)程并拋出錯(cuò)誤。
解決方案:
你需要將多進(jìn)程相關(guān)的代碼放在 if __name__ == '__main__':
語(yǔ)句塊中,確保只有主進(jìn)程會(huì)執(zhí)行這些代碼,而子進(jìn)程不會(huì)。
修改后的代碼:
import multiprocessing data = [ "1", "2", "3" ] # 定義執(zhí)行函數(shù) def print_str(data): print(data) # 確保多進(jìn)程代碼只在主進(jìn)程中執(zhí)行 if __name__ == '__main__': # 使用進(jìn)程池并發(fā)執(zhí)行 with multiprocessing.Pool() as pool: pool.map(print_str, data)
解釋:
if __name__ == '__main__':
確保了只有在直接運(yùn)行當(dāng)前腳本時(shí),才會(huì)執(zhí)行其中的多進(jìn)程代碼。子進(jìn)程不會(huì)執(zhí)行這個(gè)代碼塊,從而避免了遞歸創(chuàng)建進(jìn)程的問題。- 在 Windows 系統(tǒng)上,這是使用
multiprocessing
時(shí)必須遵循的慣用寫法。
其他注意事項(xiàng):
- 如果你打算將腳本打包成可執(zhí)行文件(例如使用
pyinstaller
),你還需要調(diào)用multiprocessing.freeze_support()
,不過(guò)在大多數(shù)腳本運(yùn)行的情況下,這個(gè)調(diào)用不是必須的。
例如:
if __name__ == '__main__': multiprocessing.freeze_support() # 如果需要打包成可執(zhí)行文件,可以加上這行 with multiprocessing.Pool() as pool: pool.map(print_str, data)
執(zhí)行sql 簡(jiǎn)單示例
import multiprocessing data = [ ] # 定義執(zhí)行函數(shù) def print_str(data): print(data) # 確保多進(jìn)程代碼只在主進(jìn)程中執(zhí)行 if __name__ == '__main__': data2 = [ "1", "2", "3" ] for i in data2: data_str = f""" inset into {i} """ data.append(data_str) # 使用進(jìn)程池并發(fā)執(zhí)行 with multiprocessing.Pool() as pool: pool.map(print_str, data)
到此這篇關(guān)于Python使用進(jìn)程池并發(fā)執(zhí)行SQL語(yǔ)句的操作代碼的文章就介紹到這了,更多相關(guān)Python進(jìn)程池執(zhí)行SQL語(yǔ)句內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python設(shè)計(jì)足球聯(lián)賽賽程表程序的思路與簡(jiǎn)單實(shí)現(xiàn)示例
足球聯(lián)賽的賽制就是一支隊(duì)伍在一個(gè)賽季中主客場(chǎng)分別面對(duì)聯(lián)賽中除了自身以外的球隊(duì)一次,對(duì)此我們可以使用一種循環(huán)算法來(lái)實(shí)現(xiàn),接下來(lái)就一起來(lái)看一下Python設(shè)計(jì)足球聯(lián)賽賽程表程序的思路與簡(jiǎn)單實(shí)現(xiàn)示例:2016-06-06在Python dataframe中出生日期轉(zhuǎn)化為年齡的實(shí)現(xiàn)方法
這篇文章主要介紹了在Python dataframe中出生日期轉(zhuǎn)化為年齡的實(shí)現(xiàn)方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-10-10Python隨機(jī)函數(shù)random隨機(jī)獲取數(shù)字、字符串、列表等使用詳解
這篇文章主要介紹了Python隨機(jī)函數(shù)random使用詳解包含了Python隨機(jī)數(shù)字,Python隨機(jī)字符串,Python隨機(jī)列表等,需要的朋友可以參考下2021-04-04python實(shí)現(xiàn)簡(jiǎn)單flappy bird
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)簡(jiǎn)單flappy bird小游戲,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-12-12Python利用函數(shù)式編程實(shí)現(xiàn)優(yōu)化代碼
函數(shù)式編程(Functional Programming)是一種編程范式,它將計(jì)算視為函數(shù)的求值,并且避免使用可變狀態(tài)和循環(huán),在Python中還可以利用它的簡(jiǎn)潔和高效來(lái)解決實(shí)際問題,下面我們就來(lái)學(xué)習(xí)一下它的具體用法吧2023-11-11Python鍵盤輸入轉(zhuǎn)換為列表的實(shí)例
今天小編就為大家分享一篇Python鍵盤輸入轉(zhuǎn)換為列表的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06在Python中過(guò)濾Windows文件名中的非法字符方法
今天小編就為大家分享一篇在Python中過(guò)濾Windows文件名中的非法字符方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-06-06