欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python使用Redis實(shí)現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡(jiǎn)單)

 更新時(shí)間:2016年03月22日 10:22:09   作者:kongxx  
Redis作為內(nèi)存數(shù)據(jù)庫(kù)的一個(gè)典型代表,已經(jīng)在很多應(yīng)用場(chǎng)景中被使用,這里僅就Redis的pub/sub功能來(lái)說(shuō)說(shuō)怎樣通過(guò)此功能來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的作業(yè)調(diào)度系統(tǒng)。這里只是想展現(xiàn)一個(gè)簡(jiǎn)單的想法,所以還是有很多需要考慮的東西沒(méi)有包括在這個(gè)例子中,比如錯(cuò)誤處理,持久化等

概述

Redis是一個(gè)開(kāi)源,先進(jìn)的key-value存儲(chǔ),并用于構(gòu)建高性能,可擴(kuò)展的Web應(yīng)用程序的完美解決方案。

Redis從它的許多競(jìng)爭(zhēng)繼承來(lái)的三個(gè)主要特點(diǎn):

Redis數(shù)據(jù)庫(kù)完全在內(nèi)存中,使用磁盤僅用于持久性。

相比許多鍵值數(shù)據(jù)存儲(chǔ),Redis擁有一套較為豐富的數(shù)據(jù)類型。

Redis可以將數(shù)據(jù)復(fù)制到任意數(shù)量的從服務(wù)器。

Redis 優(yōu)勢(shì)

異常快速:Redis的速度非??欤棵肽軋?zhí)行約11萬(wàn)集合,每秒約81000+條記錄。

支持豐富的數(shù)據(jù)類型:Redis支持最大多數(shù)開(kāi)發(fā)人員已經(jīng)知道像列表,集合,有序集合,散列數(shù)據(jù)類型。這使得它非常容易解決各種各樣的問(wèn)題,因?yàn)槲覀冎滥男﹩?wèn)題是可以處理通過(guò)它的數(shù)據(jù)類型更好。

操作都是原子性:所有Redis操作是原子的,這保證了如果兩個(gè)客戶端同時(shí)訪問(wèn)的Redis服務(wù)器將獲得更新后的值。

多功能實(shí)用工具:Redis是一個(gè)多實(shí)用的工具,可以在多個(gè)用例如緩存,消息,隊(duì)列使用(Redis原生支持發(fā)布/訂閱),任何短暫的數(shù)據(jù),應(yīng)用程序,如Web應(yīng)用程序會(huì)話,網(wǎng)頁(yè)命中計(jì)數(shù)等。

步入主題:

Redis作為內(nèi)存數(shù)據(jù)庫(kù)的一個(gè)典型代表,已經(jīng)在很多應(yīng)用場(chǎng)景中被使用,這里僅就Redis的pub/sub功能來(lái)說(shuō)說(shuō)怎樣通過(guò)此功能來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的作業(yè)調(diào)度系統(tǒng)。這里只是想展現(xiàn)一個(gè)簡(jiǎn)單的想法,所以還是有很多需要考慮的東西沒(méi)有包括在這個(gè)例子中,比如錯(cuò)誤處理,持久化等。

下面是實(shí)現(xiàn)上的想法

MyMaster:集群的master節(jié)點(diǎn)程序,負(fù)責(zé)產(chǎn)生作業(yè),派發(fā)作業(yè)和獲取執(zhí)行結(jié)果。

MySlave:集群的計(jì)算節(jié)點(diǎn)程序,每個(gè)計(jì)算節(jié)點(diǎn)一個(gè),負(fù)責(zé)獲取作業(yè)并運(yùn)行,并將結(jié)果發(fā)送會(huì)master節(jié)點(diǎn)。

channel CHANNEL_DISPATCH:每個(gè)slave節(jié)點(diǎn)訂閱一個(gè)channel,比如“CHANNEL_DISPATCH_[idx或機(jī)器名]”,master會(huì)向此channel中publish被dispatch的作業(yè)。

channel CHANNEL_RESULT:用來(lái)保存作業(yè)結(jié)果的channel,master和slave共享此channel,master訂閱此channel來(lái)獲取作業(yè)運(yùn)行結(jié)果,每個(gè)slave負(fù)責(zé)將作業(yè)執(zhí)行結(jié)果發(fā)布到此channel中。

Master代碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)

說(shuō)明

MyMaster類 - master主程序,用來(lái)啟動(dòng)dispatch和resulthandler的線程

MyServerDispatchThread類 - 派發(fā)作業(yè)線程,產(chǎn)生作業(yè)并派發(fā)到計(jì)算節(jié)點(diǎn)

MyServerResultHandleThread類 - 作業(yè)運(yùn)行結(jié)果處理線程,從channel里獲取作業(yè)結(jié)果并顯示

Slave代碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)

說(shuō)明

MySlave類 - slave節(jié)點(diǎn)主程序,用來(lái)啟動(dòng)MyJobWorkerThread的線程

MyJobWorkerThread類 - 從channel里獲取派發(fā)的作業(yè)并將運(yùn)行結(jié)果發(fā)送回master

測(cè)試

首先運(yùn)行MySlave來(lái)定義派發(fā)作業(yè)channel。

然后運(yùn)行MyMaster派發(fā)作業(yè)并顯示執(zhí)行結(jié)果。

有關(guān)Python使用Redis實(shí)現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡(jiǎn)單),小編就給大家介紹這么多,希望對(duì)大家有所幫助!

相關(guān)文章

最新評(píng)論