欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python實現(xiàn)的文件同步服務(wù)器實例

 更新時間:2015年06月02日 10:21:21   作者:tianmo2010  
這篇文章主要介紹了python實現(xiàn)的文件同步服務(wù)器,實例分析了文件同步服務(wù)器的原理及客戶端、服務(wù)端的實現(xiàn)技巧,需要的朋友可以參考下

本文實例講述了python實現(xiàn)的文件同步服務(wù)器。分享給大家供大家參考。具體實現(xiàn)方法如下:

服務(wù)端使用asyncore, 收到文件后保存到本地。

客戶端使用pyinotify監(jiān)視目錄的變化 ,把變動的文件發(fā)送到服務(wù)端。

重點:

1. 使用structs打包發(fā)送文件的信息,服務(wù)端收到后,根據(jù)文件信息來接收客戶端傳送過來的文件。

2. 客戶端使用多線程,pyinotify監(jiān)視到文件變化,放到隊列中,由另外一個線程發(fā)送。

上代碼:

服務(wù)端:

# receive file from client and store them into file use asyncore.# 
#/usr/bin/python 
#coding: utf-8 
import asyncore 
import socket 
from socket import errno 
import logging 
import time 
import sys 
import struct 
import os 
import fcntl 
import threading 
from rrd_graph import MakeGraph 
try: 
  import rrdtool 
except (ImportError, ImportWarnning): 
  print "Hope this information can help you:" 
  print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." 
  sys.exit(1) 
class RequestHandler(asyncore.dispatcher): 
  def __init__(self, sock, map=None, chunk_size=1024): 
    self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) 
    self.chunk_size = chunk_size 
    asyncore.dispatcher.__init__(self,sock,map) 
    self.data_to_write = list() 
  def readable(self): 
    #self.logger.debug("readable() called.") 
    return True 
  def writable(self): 
    response = (not self.connected) or len(self.data_to_write) 
    #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) 
    return response 
  def handle_write(self): 
    data = self.data_to_write.pop() 
    #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data)) 
    sent = self.send(data[:self.chunk_size]) 
    if sent < len(data): 
      remaining = data[sent:] 
      self.data_to_write.append(remaining) 
  def handle_read(self): 
    self.writen_size = 0 
    nagios_perfdata = '../perfdata' 
    head_packet_format = "!LL128s128sL" 
    head_packet_size = struct.calcsize(head_packet_format) 
    data = self.recv(head_packet_size) 
    if not data: 
      return 
    filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) 
    filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) 
    filename = filename[:filename_len] 
    self.logger.debug("update file: %s" % filepath + '/' + filename)
    try: 
      if not os.path.exists(filepath): 
        os.makedirs(filepath) 
    except OSError: 
      pass 
    self.fd = open(os.path.join(filepath,filename), 'w') 
    #self.fd = open(filename,'w') 
    if filesize > self.chunk_size: 
      times = filesize / self.chunk_size 
      first_part_size = times * self.chunk_size 
      second_part_size = filesize % self.chunk_size 
      while 1: 
        try: 
          data = self.recv(self.chunk_size) 
          #self.logger.debug("handle_read()->%s size.",len(data)) 
        except socket.error,e: 
          if e.args[0] == errno.EWOULDBLOCK: 
            print "EWOULDBLOCK" 
            time.sleep(1) 
          else: 
            #self.logger.debug("Error happend while receive data: %s" % e) 
            break 
        else: 
          self.fd.write(data) 
          self.fd.flush() 
          self.writen_size += len(data) 
          if self.writen_size == first_part_size: 
            break 
      #receive the packet at last 
      while 1: 
        try: 
          data = self.recv(second_part_size) 
          #self.logger.debug("handle_read()->%s size.",len(data)) 
        except socket.error,e: 
          if e.args[0] == errno.EWOULDBLOCK: 
            print "EWOULDBLOCK" 
            time.sleep(1) 
          else: 
            #self.logger.debug("Error happend while receive data: %s" % e) 
            break 
        else: 
          self.fd.write(data) 
          self.fd.flush() 
          self.writen_size += len(data) 
          if len(data) == second_part_size: 
            break 
    elif filesize <= self.chunk_size: 
      while 1: 
        try: 
          data = self.recv(filesize) 
          #self.logger.debug("handle_read()->%s size.",len(data)) 
        except socket.error,e: 
          if e.args[0] == errno.EWOULDBLOCK: 
            print "EWOULDBLOCK" 
            time.sleep(1) 
          else: 
            #self.logger.debug("Error happend while receive data: %s" % e) 
            break 
        else: 
          self.fd.write(data) 
          self.fd.flush() 
          self.writen_size += len(data) 
          if len(data) == filesize: 
            break 
    self.logger.debug("File size: %s" % self.writen_size) 
class SyncServer(asyncore.dispatcher): 
  def __init__(self,host,port): 
    asyncore.dispatcher.__init__(self) 
    self.debug = True 
    self.logger = logging.getLogger(self.__class__.__name__) 
    self.create_socket(socket.AF_INET,socket.SOCK_STREAM) 
    self.set_reuse_addr() 
    self.bind((host,port)) 
    self.listen(2000) 
  def handle_accept(self): 
    client_socket = self.accept() 
    if client_socket is None: 
      pass 
    else: 
      sock, addr = client_socket 
      #self.logger.debug("Incoming connection from %s" % repr(addr)) 
      handler = RequestHandler(sock=sock) 
class RunServer(threading.Thread): 
  def __init__(self): 
    super(RunServer,self).__init__() 
    self.daemon = False 
  def run(self): 
    server = SyncServer('',9999) 
    asyncore.loop(use_poll=True) 
def StartServer(): 
  logging.basicConfig(level=logging.DEBUG, 
            format='%(name)s: %(message)s', 
            ) 
  RunServer().start() 
  #MakeGraph().start() 
if __name__ == '__main__': 
  StartServer()

客戶端:

# monitor path with inotify(python module), and send them to remote server.# 
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# 
import socket 
import time 
import os 
import sys 
import struct 
import threading 
import Queue 
try: 
   import pyinotify 
except (ImportError, ImportWarnning): 
   print "Hope this information can help you:" 
   print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." 
   sys.exit(1) 
try: 
   from sendfile import sendfile 
except (ImportError,ImportWarnning): 
   pass 
filetype_filter = [".rrd",".xml"] 
def check_filetype(pathname): 
   for suffix_name in filetype_filter: 
     if pathname[-4:] == suffix_name: 
       return True 
   try: 
     end_string = pathname.rsplit('.')[-1:][0] 
     end_int = int(end_string) 
   except: 
     pass 
   else: 
     # means pathname endwith digit 
     return False 
class sync_file(threading.Thread): 
   def __init__(self, addr, events_queue): 
     super(sync_file,self).__init__() 
     self.daemon = False 
     self.queue = events_queue 
     self.addr = addr 
     self.chunk_size = 1024 
   def run(self): 
     while 1: 
       event = self.queue.get() 
       if check_filetype(event.pathname): 
         print time.asctime(),event.maskname, event.pathname 
         filepath = event.path.split('/')[-1:][0] 
         filename = event.name 
         filesize = os.stat(os.path.join(event.path, filename)).st_size 
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
         filepath_len = len(filepath) 
         filename_len = len(filename) 
         sock.connect(self.addr) 
         offset = 0 
         data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize) 
         fd = open(event.pathname,'rb') 
         sock.sendall(data) 
         if "sendfile" in sys.modules: 
           # print "use sendfile(2)" 
           while 1: 
             sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) 
             if sent == 0: 
               break 
             offset += sent 
         else: 
           # print "use original send function" 
           while 1: 
             data = fd.read(self.chunk_size) 
             if not data: break 
             sock.send(data) 
         sock.close() 
         fd.close() 
class EventHandler(pyinotify.ProcessEvent): 
   def __init__(self, events_queue): 
     super(EventHandler,self).__init__() 
     self.events_queue = events_queue 
   def my_init(self): 
     pass 
   def process_IN_CLOSE_WRITE(self,event): 
     self.events_queue.put(event) 
   def process_IN_MOVED_TO(self,event): 
     self.events_queue.put(event) 
def start_notify(path, mask, sync_server): 
   events_queue = Queue.Queue() 
   sync_thread_pool = list() 
   for i in range(500): 
     sync_thread_pool.append(sync_file(sync_server, events_queue)) 
   for i in sync_thread_pool: 
     i.start() 
   wm = pyinotify.WatchManager() 
   notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) 
   wdd = wm.add_watch(path,mask,rec=True) 
   notifier.loop() 
def do_notify(): 
   perfdata_path = '/var/lib/pnp4nagios/perfdata' 
   mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO 
   sync_server = ('127.0.0.1',9999) 
   start_notify(perfdata_path,mask,sync_server) 
if __name__ == '__main__': 
   do_notify()

python監(jiān)視線程池

#!/usr/bin/python 
import threading 
import time 
class Monitor(threading.Thread): 
  def __init__(self, *args,**kwargs): 
    super(Monitor,self).__init__() 
    self.daemon = False 
    self.args = args 
    self.kwargs = kwargs 
    self.pool_list = [] 
  def run(self): 
    print self.args 
    print self.kwargs 
    for name,value in self.kwargs.items(): 
      obj = value[0] 
      temp = {} 
      temp[name] = obj 
      self.pool_list.append(temp) 
    while 1: 
      print self.pool_list 
      for name,value in self.kwargs.items(): 
        obj = value[0] 
        parameters = value[1:] 
        died_threads = self.cal_died_thread(self.pool_list,name)
        print "died_threads", died_threads 
        if died_threads >0: 
          for i in range(died_threads): 
            print "start %s thread..." % name 
            t = obj[0].__class__(*parameters) 
            t.start() 
            self.add_to_pool_list(t,name) 
        else: 
          break 
      time.sleep(0.5) 
  def cal_died_thread(self,pool_list,name): 
    i = 0 
    for item in self.pool_list: 
      for k,v in item.items(): 
        if name == k: 
          lists = v 
    for t in lists: 
      if not t.isAlive(): 
        self.remove_from_pool_list(t) 
        i +=1 
    return i 
  def add_to_pool_list(self,obj,name): 
    for item in self.pool_list: 
      for k,v in item.items(): 
        if name == k: 
          v.append(obj) 
  def remove_from_pool_list(self, obj): 
    for item in self.pool_list: 
      for k,v in item.items(): 
        try: 
          v.remove(obj) 
        except: 
          pass 
        else: 
          return

使用方法:

rrds_queue = Queue.Queue() 
  make_rrds_pool = [] 
  for i in range(5): 
    make_rrds_pool.append(MakeRrds(rrds_queue)) 
  for i in make_rrds_pool: 
    i.start() 
  make_graph_pool = [] 
  for i in range(5): 
    make_graph_pool.append(MakeGraph(rrds_queue)) 
  for i in make_graph_pool: 
    i.start() 
  monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \ 
           make_graph_pool=(make_graph_pool, rrds_queue)) 
  monitor.start()

解析:

1. 接受字典參數(shù),value為一個元組,第一個元素是線程池,后面的都是參數(shù)。
2. 每0.5秒監(jiān)視線程池中的線程數(shù)量,如果線程死掉了,記錄死掉線程的數(shù)目,再啟動同樣數(shù)量的線程。
3. 如果沒有線程死去,則什么也不做。

從外部調(diào)用Django模塊

import os 
import sys 
sys.path.insert(0,'/data/cloud_manage') 
from django.core.management import setup_environ 
import settings 
setup_environ(settings) 
from common.monitor import Monitor 
from django.db import connection, transaction

前提就是,要新建一個django的project,這里我們新建了一個cloud_manage.
這樣不僅可以調(diào)用django自身的模塊,還能調(diào)用project本身的東西。

希望本文所述對大家的Python程序設(shè)計有所幫助。

相關(guān)文章

  • python實現(xiàn)守護進程、守護線程、守護非守護并行

    python實現(xiàn)守護進程、守護線程、守護非守護并行

    本篇文章主要介紹了python實現(xiàn)守護進程、守護線程、守護非守護并行,詳細的介紹了守護子進程、非守護子進程并存,守護子線程非守護子進程并存的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-05-05
  • python使用pyecharts繪制簡單的折線圖

    python使用pyecharts繪制簡單的折線圖

    這篇文章講給大家介紹一下python使用pyecharts繪制簡單的折線圖的黨法步驟,文中有詳細的代碼示例講解,對我們學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2023-07-07
  • django 刪除數(shù)據(jù)庫表后重新同步的方法

    django 刪除數(shù)據(jù)庫表后重新同步的方法

    今天小編就為大家分享一篇django 刪除數(shù)據(jù)庫表后重新同步的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-05-05
  • Flask搭建一個API服務(wù)器的步驟

    Flask搭建一個API服務(wù)器的步驟

    Flask真是一個強大且簡介的web框架,能夠快速搭建web服務(wù)器,本文主要介紹了Flask搭建一個API服務(wù)器的步驟,分享給大家,感興趣的可以了解一下
    2021-05-05
  • wxPython繪圖模塊wxPyPlot實現(xiàn)數(shù)據(jù)可視化

    wxPython繪圖模塊wxPyPlot實現(xiàn)數(shù)據(jù)可視化

    這篇文章主要為大家詳細介紹了wxPython繪圖模塊wxPyPlot實現(xiàn)數(shù)據(jù)可視化,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • 如何使用Python修改matplotlib.pyplot.colorbar的位置以對齊主圖

    如何使用Python修改matplotlib.pyplot.colorbar的位置以對齊主圖

    使用matplotlib.colors模塊可以完成大多數(shù)常見的任務(wù),下面這篇文章主要給大家介紹了關(guān)于如何使用Python修改matplotlib.pyplot.colorbar的位置以對齊主圖的相關(guān)資料,需要的朋友可以參考下
    2022-07-07
  • Python 新建文件夾與復(fù)制文件夾內(nèi)所有內(nèi)容的方法

    Python 新建文件夾與復(fù)制文件夾內(nèi)所有內(nèi)容的方法

    今天小編就為大家分享一篇Python 新建文件夾與復(fù)制文件夾內(nèi)所有內(nèi)容的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-10-10
  • python Paramiko使用示例

    python Paramiko使用示例

    這篇文章主要介紹了python Paramiko的使用示例,幫助大家遠程控制類 UNIX 系統(tǒng),感興趣的朋友可以了解下。
    2020-09-09
  • Python中的 enum 模塊源碼詳析

    Python中的 enum 模塊源碼詳析

    這篇文章主要給大家介紹了關(guān)于Python中 enum 模塊的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01
  • 使用python實現(xiàn)時間序列白噪聲檢驗方式

    使用python實現(xiàn)時間序列白噪聲檢驗方式

    這篇文章主要介紹了使用python實現(xiàn)時間序列白噪聲檢驗方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06

最新評論