欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python中進(jìn)程間通信及設(shè)置狀態(tài)量控制另一個(gè)進(jìn)程

 更新時(shí)間:2022年05月19日 14:25:59   作者:PT、小小馬  
這篇文章主要介紹了python中進(jìn)程間通信及設(shè)置狀態(tài)量控制另一個(gè)進(jìn)程,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下

一、python中進(jìn)程間通信

業(yè)務(wù)場(chǎng)景:在當(dāng)前遇到的業(yè)務(wù)場(chǎng)景中,我們需要啟一個(gè)間隔任務(wù),這個(gè)間隔任務(wù)跑一個(gè)算法,然后把算法的結(jié)果進(jìn)行一些處理,并入庫(kù)。任務(wù)目前間隔是一小時(shí),算法運(yùn)行時(shí)間要50多分鐘,留給結(jié)果處理的時(shí)間并不多,所以有可能會(huì)出現(xiàn)超時(shí)。目前來(lái)說(shuō),優(yōu)化方向在算法上會(huì)更為合理,因?yàn)榻Y(jié)果處理本來(lái)就不用很多時(shí)間。但是在這個(gè)業(yè)務(wù)場(chǎng)景下,想要把結(jié)果處理的時(shí)間進(jìn)行無(wú)限壓縮,壓縮到0,其實(shí)也是可以實(shí)現(xiàn)的,說(shuō)是壓縮為0,實(shí)際上就是在算法執(zhí)行完成后,再啟一個(gè)進(jìn)程去處理,這樣就不會(huì)由于需要進(jìn)行數(shù)據(jù)處理而影響到算法的運(yùn)行,將算法和結(jié)果處理分為兩個(gè)獨(dú)立的進(jìn)程去處理。在最開始的程序中,是把算法運(yùn)行和結(jié)果處理作為一個(gè)周期,而現(xiàn)在是把算法運(yùn)行和結(jié)果處理分為兩個(gè)周期去處理。

技術(shù)實(shí)現(xiàn)方案:

啟動(dòng)二個(gè)進(jìn)程,其中一個(gè)運(yùn)行算法,在算法運(yùn)行結(jié)束后,發(fā)送一個(gè)狀態(tài)值到另外一個(gè)進(jìn)程,另外一個(gè)進(jìn)程在收到狀態(tài)量后啟動(dòng)數(shù)據(jù)處理即可。兩個(gè)進(jìn)程間互不影響即可。其實(shí)也相當(dāng)于算法進(jìn)程控制數(shù)據(jù)處理進(jìn)程

測(cè)試場(chǎng)景構(gòu)造代碼:

from multiprocessing import Process,Pipe
import time
import sys
import os
def send_message(conn):
? ? for i in range(1000):
? ? ? ? print('send_message:%d'%i)
? ? ? ? print(os.getpid())
? ? ? ? conn.send(i)
? ? ? ? time.sleep(3)
def send_message1(conn):
? ? # for i in range(1000):
? ? print(conn.recv())
? ? while True:
? ? ? ? if conn.recv() % 5 == 0:
? ? ? ? ? ? print(' today is nice day')
? ? ? ? time.sleep(1)
if __name__ == '__main__':
? ? ? ? #創(chuàng)建一個(gè)進(jìn)程通信管道
? ? left,right = Pipe()
? ? t1 = Process(target=send_message,args=(left,))
? ? t2 = Process(target=send_message1,args=(right,))
? ? t1.start()
? ? t2.start()

在這個(gè)案例場(chǎng)景下有一些需要注意的點(diǎn):

  • 一、time.sleep()的問題,睡眠指定時(shí)間,總是會(huì)出錯(cuò),具體的出錯(cuò)原因到現(xiàn)在也沒有找到,這是原來(lái)出現(xiàn)的問題,在這里沒有做長(zhǎng)時(shí)間的測(cè)試,所以不一定會(huì)出現(xiàn),但是還是要注意
  • 二、代碼實(shí)現(xiàn)中與上述的描述差異有一些,如未啟用調(diào)度任務(wù),只是啟了一個(gè)間隔運(yùn)行的任務(wù)。
  • 三、數(shù)據(jù)處理進(jìn)程一直處理空跑狀態(tài),會(huì)造成資源的浪費(fèi)(更合理的應(yīng)該是形成阻塞狀態(tài),但是對(duì)于阻塞狀態(tài)的構(gòu)造缺乏認(rèn)知,所以先犧牲資源
  • 四、在上述描述的需求中,在算法運(yùn)行及數(shù)據(jù)處理的上一節(jié)點(diǎn)還有一個(gè)調(diào)度任務(wù)在控制,這里未做出體現(xiàn),其實(shí)應(yīng)該把定時(shí)任務(wù)和數(shù)據(jù)處理作為兩個(gè)周期獨(dú)立出來(lái)才更符合上述描述中的需求。

二、設(shè)置狀態(tài)量控制另一個(gè)進(jìn)程

 業(yè)務(wù)場(chǎng)景:在當(dāng)前遇到的業(yè)務(wù)場(chǎng)景中,我們需要啟一個(gè)間隔任務(wù),這個(gè)間隔任務(wù)跑一個(gè)算法,然后把算法的結(jié)果進(jìn)行一些處理,并入庫(kù)。任務(wù)目前間隔是一小時(shí),算法運(yùn)行時(shí)間要50多分鐘,留給結(jié)果處理的時(shí)間并不多,所以有可能會(huì)出現(xiàn)超時(shí)。目前來(lái)說(shuō),優(yōu)化方向在算法上會(huì)更為合理,因?yàn)榻Y(jié)果處理本來(lái)就不用很多時(shí)間。但是在這個(gè)業(yè)務(wù)場(chǎng)景下,想要把結(jié)果處理的時(shí)間進(jìn)行無(wú)限壓縮,壓縮到0,其實(shí)也是可以實(shí)現(xiàn)的,說(shuō)是壓縮為0,實(shí)際上就是在算法執(zhí)行完成后,再啟一個(gè)進(jìn)程去處理,這樣就不會(huì)由于需要進(jìn)行數(shù)據(jù)處理而影響到算法的運(yùn)行,將算法和結(jié)果處理分為兩個(gè)獨(dú)立的進(jìn)程去處理。在最開始的程序中,是把算法運(yùn)行和結(jié)果處理作為一個(gè)周期,而現(xiàn)在是把算法運(yùn)行和結(jié)果處理分為兩個(gè)周期去處理。

上面的解決方案中只涉及到了啟用兩個(gè)進(jìn)程去運(yùn)行兩個(gè)任務(wù),并未涉及到啟用定時(shí)任務(wù)框架,所以可能會(huì)顯得和上述的業(yè)務(wù)場(chǎng)景不一致,所以在這里重新解決一下。上面也是沒有問題的,只是把定時(shí)任務(wù)框架也作為一個(gè)任務(wù)去處理即可。然后在定時(shí)任務(wù)運(yùn)行完程后,向另外一個(gè)進(jìn)程傳入一個(gè)參數(shù),作為啟動(dòng)另一個(gè)進(jìn)程的狀態(tài)量即可。當(dāng)然,在這里,兩個(gè)進(jìn)程還是完全占滿的,即處理阻塞狀態(tài)。對(duì)于資源的利用還是沒有完全達(dá)到最好。后續(xù)再考慮使用進(jìn)程池的方式,看是否可以讓其中的一個(gè)進(jìn)程運(yùn)行完后直接釋放資源。

技術(shù)解決方案如下:

from multiprocessing import Process,Pipe
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# schedule = BackgroundScheduler()
schedule = BlockingScheduler(timezone="Asia/Shanghai")
# schedule = AsyncIOScheduler(timezone="Asia/Shanghai")
def algorithm(conn):
? ? print('start_run')
? ? conn.send('please run')
? ? # time.sleep(5)
def worth_result(conn):
? ? while True:
? ? ? ? if conn.recv() == 'please run':
? ? ? ? ? ? print(conn.recv() + ' very nice!')
def time_job(conns):
? ? schedule.add_job(func=algorithm,trigger='interval',seconds=5,args=(conns,))
? ? schedule.start()
if __name__ == '__main__':
? ? left,right = Pipe()
? ? t1 = Process(target=time_job,args=(left,))
? ? t2 = Process(target=worth_result,args=(right,))
? ? t1.start()
? ? t2.start()

在這里還有一些點(diǎn)需要說(shuō)明,定時(shí)任務(wù)選擇那一種類型其實(shí)都沒有關(guān)系,阻塞和非阻塞其實(shí)沒有關(guān)系,因?yàn)槲覀冊(cè)谶@里是直接啟了兩個(gè)進(jìn)程,每個(gè)進(jìn)程間是相互獨(dú)立的,并非是在定時(shí)任務(wù)下啟用的兩個(gè)進(jìn)程,所以不會(huì)影響的。

關(guān)于這個(gè)解決方案還有的問題:

  • 一、上述所說(shuō),兩個(gè)進(jìn)程是占滿的,所以對(duì)于資源來(lái)說(shuō),兩個(gè)進(jìn)程的利用率一直很高
  • 二、擴(kuò)展性不足,如果在這個(gè)程序中還有其他需要處理的過程,就需要再添加進(jìn)程,或者把他添加到當(dāng)前的進(jìn)程之下,代碼重構(gòu)會(huì)比較麻煩一些
  • 三、整個(gè)任務(wù)的控制不足,需要加以完善。比如對(duì)于運(yùn)行狀態(tài)一些控制及查看,一般程序如果運(yùn)行時(shí)間較長(zhǎng)的話,我們應(yīng)該添加這樣的接口,否則啟動(dòng)后如果沒有出結(jié)果,我們是不知道其運(yùn)行狀態(tài),有一點(diǎn)被動(dòng)
  • 四、關(guān)于三,使用logging庫(kù),應(yīng)該是可以直接去輸出其日志,但是日志庫(kù)作為第三方庫(kù),相當(dāng)于是對(duì)整個(gè)運(yùn)行狀態(tài)進(jìn)行監(jiān)控,會(huì)不會(huì)再占用一個(gè)進(jìn)程,這個(gè)需要去測(cè)試
  • 五、完備性及容災(zāi)處理,如果程序由于資源等其他問題掛掉后,會(huì)有一些數(shù)據(jù)冗余下來(lái),也就是一些算法未進(jìn)行處理,這個(gè)時(shí)候需要考慮怎么樣去補(bǔ)數(shù)據(jù)?原始文件如果沒有保留下來(lái)呢?而且如果這些數(shù)據(jù)是極重要的數(shù)據(jù)該怎么處理?如果程序掛掉后,應(yīng)該如何快速的去處理呢?直接重啟嗎?
  • 六、如果數(shù)據(jù)處理的進(jìn)程所用的時(shí)間比算法還多,那該怎么辦?目前的業(yè)務(wù)來(lái)看,是遠(yuǎn)低于的,但是如果是遠(yuǎn)高于呢?可否將處理工作進(jìn)行分配,利用多臺(tái)機(jī)器來(lái)處理,然后再把結(jié)果合并起來(lái)?

分布式處理的思想越來(lái)越濃。

到此這篇關(guān)于python中進(jìn)程間通信及設(shè)置狀態(tài)量控制另一個(gè)進(jìn)程的文章就介紹到這了,更多相關(guān)python進(jìn)程通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論