Python自定義主從分布式架構實例分析
本文實例講述了Python自定義主從分布式架構。分享給大家供大家參考,具體如下:
環(huán)境:Win7 x64,Python 2.7,APScheduler 2.1.2。
原理圖如下:

代碼部分:
(1)、中心節(jié)點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心節(jié)點(主要功能是分配任務)
import SocketServer, socket, Queue
CenterIP = '127.0.0.1' #中心節(jié)點IP
CenterListenPort = 9999 #中心節(jié)點監(jiān)聽端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心節(jié)點用于發(fā)送網(wǎng)絡消息的socket
TaskQueue = Queue.Queue() #任務隊列
#獲取任務隊列
def GetTaskQueue():
for i in range(1, 11):
TaskQueue.put(str(i))
#CenterServer的回調函數(shù),在接受到udp報文是觸發(fā)
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(data)
if data.startswith('wait'):
vec = data.split(':')
if len(vec) != 3:
print('Error: len(vec) != 3')
else:
nodeIP = vec[1]
nodeListenPort = vec[2]
nodeID = nodeIP + ':' + nodeListenPort
if not TaskQueue.empty():
task = TaskQueue.get()
print('send task ' + task + ' to ' + nodeID)
CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
else:
print('TaskQueue is empty!')
GetTaskQueue() #獲取任務隊列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()
(2)、任務節(jié)點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任務節(jié)點(請求/接收/執(zhí)行任務)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1' #中心節(jié)點IP
CenterListenPort = 9999 #中心節(jié)點監(jiān)聽端口
NodeIP = socket.gethostbyname(socket.gethostname()) #任務節(jié)點自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #任務節(jié)點用于發(fā)送網(wǎng)絡消息的socket
#任務:發(fā)送網(wǎng)絡信息
def jobSendNetMsg():
msg = ''
if NodeServer.TaskState == 'wait':
msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
elif NodeServer.TaskState == 'exec':
msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
print(msg)
NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并啟動定時任務
def InitTimer():
sched = Scheduler()
sched.add_interval_job(jobSendNetMsg, seconds=1)
sched.start()
#執(zhí)行任務
def ExecTask(task):
print('ExecTask ' + task + ' ...')
time.sleep(2)
print('ExecTask ' + task + ' over')
#NodeServer的回調函數(shù),在接受到udp報文是觸發(fā)
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print('recv data: ' + data)
if data.startswith('task'):
vec = data.split(':')
if len(vec) != 2:
print('Error: len(vec) != 2')
else:
task = vec[1]
self.server.TaskState = 'exec'
ExecTask(task)
self.server.TaskState = 'wait'
InitTimer()
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python URL操作技巧總結》、《Python圖片操作技巧總結》、《Python數(shù)據(jù)結構與算法教程》、《Python Socket編程技巧總結》、《Python函數(shù)使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
相關文章
詳解pyppeteer(python版puppeteer)基本使用
這篇文章主要介紹了詳解pyppeteer(python版puppeteer)基本使用 ,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-06-06
淺談keras 的抽象后端(from keras import backend as K)
這篇文章主要介紹了淺談keras 的抽象后端(from keras import backend as K),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06
Django中reverse反轉并且傳遞參數(shù)的方法
今天小編就為大家分享一篇Django中reverse反轉并且傳遞參數(shù)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
pytorch動態(tài)神經(jīng)網(wǎng)絡(擬合)實現(xiàn)
這篇文章主要介紹了pytorch動態(tài)神經(jīng)網(wǎng)絡(擬合)實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03
python實現(xiàn)給微信公眾號發(fā)送消息的方法
這篇文章主要介紹了python實現(xiàn)給微信公眾號發(fā)送消息的方法,結合實例形式分析了Python針對微信公眾號接口操作的相關技巧,需要的朋友可以參考下2017-06-06

