Python定義一個Actor任務(wù)
問題
你想定義跟actor模式中類似“actors”角色的任務(wù)
解決方案
actor模式是一種最古老的也是最簡單的并行和分布式計算解決方案。 事實(shí)上,它天生的簡單性是它如此受歡迎的重要原因之一。 簡單來講,一個actor就是一個并發(fā)執(zhí)行的任務(wù),只是簡單的執(zhí)行發(fā)送給它的消息任務(wù)。 響應(yīng)這些消息時,它可能還會給其他actor發(fā)送更進(jìn)一步的消息。 actor之間的通信是單向和異步的。因此,消息發(fā)送者不知道消息是什么時候被發(fā)送, 也不會接收到一個消息已被處理的回應(yīng)或通知。
結(jié)合使用一個線程和一個隊列可以很容易的定義actor,例如:
from queue import Queue from threading import Thread, Event # Sentinel used for shutdown class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): ''' Send a message to the actor ''' self._mailbox.put(msg) def recv(self): ''' Receive an incoming message ''' msg = self._mailbox.get() if msg is ActorExit: raise ActorExit() return msg def close(self): ''' Close the actor, thus shutting it down ''' self.send(ActorExit) def start(self): ''' Start concurrent execution ''' self._terminated = Event() t = Thread(target=self._bootstrap) t.daemon = True t.start() def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): ''' Run method to be implemented by the user ''' while True: msg = self.recv() # Sample ActorTask class PrintActor(Actor): def run(self): while True: msg = self.recv() print('Got:', msg) # Sample use p = PrintActor() p.start() p.send('Hello') p.send('World') p.close() p.join()
這個例子中,你使用actor實(shí)例的 send()
方法發(fā)送消息給它們。 其機(jī)制是,這個方法會將消息放入一個隊里中, 然后將其轉(zhuǎn)交給處理被接受消息的一個內(nèi)部線程。 close()
方法通過在隊列中放入一個特殊的哨兵值(ActorExit)來關(guān)閉這個actor。 用戶可以通過繼承Actor并定義實(shí)現(xiàn)自己處理邏輯run()方法來定義新的actor。 ActorExit
異常的使用就是用戶自定義代碼可以在需要的時候來捕獲終止請求 (異常被get()方法拋出并傳播出去)。
如果你放寬對于同步和異步消息發(fā)送的要求, 類actor對象還可以通過生成器來簡化定義。例如:
def print_actor(): while True: try: msg = yield # Get a message print('Got:', msg) except GeneratorExit: print('Actor terminating') # Sample use p = print_actor() next(p) # Advance to the yield (ready to receive) p.send('Hello') p.send('World') p.close()
討論
actor模式的魅力就在于它的簡單性。 實(shí)際上,這里僅僅只有一個核心操作 send()
. 甚至,對于在基于actor系統(tǒng)中的“消息”的泛化概念可以已多種方式被擴(kuò)展。 例如,你可以以元組形式傳遞標(biāo)簽消息,讓actor執(zhí)行不同的操作,如下:
class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self,'do_'+tag)(*payload) # Methods correponding to different message tags def do_A(self, x): print('Running A', x) def do_B(self, x, y): print('Running B', x, y) # Example a = TaggedActor() a.start() a.send(('A', 1)) # Invokes do_A(1) a.send(('B', 2, 3)) # Invokes do_B(2,3) a.close() a.join()
作為另外一個例子,下面的actor允許在一個工作者中運(yùn)行任意的函數(shù), 并且通過一個特殊的Result對象返回結(jié)果:
from threading import Event class Result: def __init__(self): self._evt = Event() self._result = None def set_result(self, value): self._result = value self._evt.set() def result(self): self._evt.wait() return self._result class Worker(Actor): def submit(self, func, *args, **kwargs): r = Result() self.send((func, args, kwargs, r)) return r def run(self): while True: func, args, kwargs, r = self.recv() r.set_result(func(*args, **kwargs)) # Example use worker = Worker() worker.start() r = worker.submit(pow, 2, 3) worker.close() worker.join() print(r.result())
最后,“發(fā)送”一個任務(wù)消息的概念可以被擴(kuò)展到多進(jìn)程甚至是大型分布式系統(tǒng)中去。 例如,一個類actor對象的 send()
方法可以被編程讓它能在一個套接字連接上傳輸數(shù)據(jù) 或通過某些消息中間件(比如AMQP、ZMQ等)來發(fā)送。
以上就是Python定義一個Actor任務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Python actor任務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas創(chuàng)建series的三種方法小結(jié)
這篇文章主要介紹了pandas創(chuàng)建series的三種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05Python模塊學(xué)習(xí) datetime介紹
Python提供了多個內(nèi)置模塊用于操作日期時間,像calendar,time,datetime。time模塊我在之前的文章已經(jīng)有所介紹,它提供的接口與C標(biāo)準(zhǔn)庫time.h基本一致2012-08-08Python自動生成代碼 使用tkinter圖形化操作并生成代碼框架
這篇文章主要為大家詳細(xì)介紹了Python自動生成代碼,使用tkinter圖形化操作并生成代碼框架,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-09-09使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問題
這篇文章主要介紹了使用scipy.optimize的fsolve,root函數(shù)求解非線性方程問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12Python+Selenium實(shí)現(xiàn)短視頻自動上傳與發(fā)布的實(shí)踐
本文主要介紹了Python+Selenium實(shí)現(xiàn)短視頻自動上傳與發(fā)布的實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04淺談Pycharm調(diào)用同級目錄下的py腳本bug
今天小編就為大家分享一篇淺談Pycharm調(diào)用同級目錄下的py腳本bug,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12pycharm中選中一個單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法
這篇文章主要介紹了pycharm中選中一個單詞替換所有重復(fù)單詞的實(shí)現(xiàn)方法,類似于sublime 里的ctrl+D功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-11-11Python遠(yuǎn)程創(chuàng)建docker容器的方法
這篇文章主要介紹了Python遠(yuǎn)程創(chuàng)建docker容器的方法,如果docker??ps找不到該容器,可以使用?docker?ps?-a查看所有的,然后看剛才創(chuàng)建的容器的STATUS是EXIT0還是EXIT1如果是1,那應(yīng)該是有報錯,使用?docker?logs?容器id命令來查看日志,根據(jù)日志進(jìn)行解決,需要的朋友可以參考下2024-04-04