Python使用socket的UDP協(xié)議實現(xiàn)FTP文件服務(wù)功能
簡介
本示例主要是用Python的socket,使用UDP協(xié)議實現(xiàn)一個FTP服務(wù)端、FTP客戶端,用來實現(xiàn)文件的傳輸。在公司內(nèi)網(wǎng)下,可以不使用U盤的情況下,純粹使用網(wǎng)絡(luò),來實現(xiàn)文件服務(wù)器的搭建,進而實現(xiàn)文件的網(wǎng)絡(luò)傳輸。同時用來理解Python的socket使用。
服務(wù)端運行起來后,會把服務(wù)器上面的指定目錄作為根目錄提供給客戶端使用,即客戶端可以訪問、下載服務(wù)端設(shè)置的根目錄里面的文件內(nèi)容
客戶端和服務(wù)端之間支持的一些簡單命令如下:
- “ll”或者“ls” 查看當(dāng)前目錄下的所有文件或者目錄
- “pwd” 查看當(dāng)前所在的目錄(根目錄是服務(wù)端設(shè)置的“D:\var”目錄)
- “get 文件名” 下載指定的文件到客戶端配置的目錄(客戶端指定的根目錄,在運行時配置)
- “get 目錄” 下載指定的目錄到客戶端配置的目錄
- “get all” 把當(dāng)前所在的目錄的所有文件、目錄下載到客戶端配置的目錄
- “cd” 把客戶端的目錄切換到根目錄
- “cd 目錄” 把客戶端的目錄切換到指定的目錄
- “cd ..” 把客戶端的目錄切換到上一級目錄
客戶端和服務(wù)端之間的通信,是把dict格式使用pickle.dumps()和pickle.loads()轉(zhuǎn)成對應(yīng)的bytes類型進行傳輸?shù)?。dict格式參考代碼。
使用效果示例
先運行服務(wù)端代碼,再運行客戶端代碼。然后再在客戶端輸入響應(yīng)的命令即可
代碼
file_handler.py
該文件就是把對文件的一些操作進行提取出來,供UDP服務(wù)端使用
import os import logging import traceback LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s" logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) _logger = logging.getLogger() class FileHandler: def __init__(self, logger=None): """ 由于設(shè)置成可以支持自定義的logger,因此就沒有設(shè)置成工具類形式,使用的時候,還是要實例化該類的 :param logger: """ self.__logger = logger if logger else _logger def list_dir(self, dir_path: str, return_absolute_path=False) -> list: """ 獲取指定目錄下面的文件或者目錄的列表 返回列表,里面中的每一個元素都是元組,元組的第一個值是文件或者目錄名,第二個值是"d"(代表目錄)或者"f"(代表文件) :param dir_path: :param return_absolute_path: True: 返回值是返回文件或者目錄的絕對路徑; False: 只返回文件名或者目錄名 :return: [('download', 'd'), ('mylog.txt', 'f')] """ ret_list = [] try: if os.path.exists(dir_path): ret = os.listdir(dir_path) for item in ret: if os.path.isdir(os.path.join(dir_path, item)): if return_absolute_path: ret_list.append((os.path.join(dir_path, item), 0, "d")) else: ret_list.append((item, 0, "d")) if os.path.isfile(os.path.join(dir_path, item)): size = os.path.getsize(os.path.join(dir_path, item)) if return_absolute_path: ret_list.append((os.path.join(dir_path, item), size, "f")) else: ret_list.append((item, size, "f")) except Exception: self.__logger.error("Can not list dir: [%s]" % dir_path + traceback.format_exc()) finally: return ret_list def seek_file(self, file_path: str, start_index: int, length=1024) -> tuple: """ 通過二進制格式讀取指定一個文件指定范圍的內(nèi)容 :param file_path: :param start_index: :param length: 讀取的字節(jié)數(shù) :return: """ # 下一次訪問的時候的起始start_index值。 -1代表已經(jīng)訪問到文件結(jié)尾了,不用再訪問該文件了。 content_bytes = b'' next_index = -1 if not os.path.exists(file_path): message = "File[%s] not exists !!!" % file_path self.__logger.error(message) raise Exception(message) file_size = os.path.getsize(file_path) # 文件大小 if start_index >= file_size: return content_bytes, next_index try: # print("### file_size: ", file_size) with open(file_path, "rb") as fh: fh.seek(start_index) # 游標(biāo)跳到指定位置 content_bytes = fh.read(length) # 讀取文件內(nèi)容 # print("content_bytes: ", content_bytes) # print("type(content_bytes): ", type(content_bytes)) if start_index + length < file_size: next_index = start_index + length except Exception: self.__logger.error("Seek file exception !!! " + traceback.format_exc()) finally: return content_bytes, next_index if __name__ == '__main__': file = r"D:\var\download\system.log" file_target = r"D:\var\download\system.txt" file = r"D:\軟件安裝包\NetAssist.exe" file_target = r"D:\軟件安裝包\NetAssist_copy.exe" file_obj = FileHandler() # ret = file_obj.seek_file(file, start_index=17, length=30) # print("ret: ", ret) # file_obj.copy_file(file, file_target, 1024 * 1000)
服務(wù)端代碼
""" 使用socket的udp協(xié)議實現(xiàn)的一個ftp服務(wù)端。 服務(wù)器和客戶端之間傳遞數(shù)據(jù)格式: 1、服務(wù)端和客戶端統(tǒng)一使用Python的字典格式(也就是本例中自定義的"通信協(xié)議"),格式形如: { "type": "cmd", # 支持的值有: "cmd"、"download" "body": "ll", # 在cmd模式下,常用的命令有: ll、ls、cd 指定目錄、pwd "pwd": ["folder1", "folder2"], "status": 1, "uuid": "b93e21e659f711ee9285a46bb6f59f55" # uuid.uuid1().hex,用來保證客戶端和服務(wù)端 } 2、客戶端使用pickle模塊的pickle.dumps(dict類型數(shù)據(jù))把要發(fā)送的數(shù)據(jù)轉(zhuǎn)成bytes類型 3、客戶端使用socket的udp傳輸轉(zhuǎn)成的bytes數(shù)據(jù)給服務(wù)端 4、服務(wù)端接收到從客戶端發(fā)送過來的bytes類型的數(shù)據(jù),再使用pickle.loads(bytes類型數(shù)據(jù))把數(shù)據(jù)轉(zhuǎn)成原始的dict類型。 使用socket的udp,既能接收數(shù)據(jù),又能發(fā)送數(shù)據(jù),因此服務(wù)端和客戶端都是相對的。 """ import os import pickle import sys import socket import logging import time import traceback from file_handler import FileHandler LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s" logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) _logger = logging.getLogger() _HOST = "127.0.0.1" _PORT = 8090 # 服務(wù)器端口號 _ROOT_DIR = r"D:\var" # 服務(wù)端給客戶端展示的可以進行下載的文件路徑 __all__ = ["FTPServerUDP"] class FTPServerUDP: def __init__(self, host="", port=None, root_dir="", logger=None): self.__host = host if host else _HOST self.__port = port if port else _PORT self.__root_dir = root_dir if root_dir else _ROOT_DIR self.__logger = logger if logger else _logger self.__file_handler = FileHandler() self.__socket_obj = self.__get_socket_obj() self.__message_type_unsupported = "Unsupported message type" self.__message_type_server_inner_error = "Server internal error" self.__message_type_path_not_exists = "Target path not exists" self.__message_type_ok = "ok" def __get_socket_obj(self) -> socket.socket: socket_obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP socket_obj.bind((self.__host, self.__port)) # 服務(wù)端必須綁定IP和端口 return socket_obj def __message_handler(self, message_tuple: tuple): # 給客戶端返回的數(shù)據(jù)類型,最終還需要使用pickle.dumps()把字典類型轉(zhuǎn)成bytes類型進行發(fā)送 response_message_dict = { "type": "cmd", "body": self.__message_type_unsupported, "pwd": [], # 客戶端的相對于服務(wù)端self.__root_dir的相對路徑 "status": 0 # 0:代表后端處理結(jié)果異常; 1:代表后端處理結(jié)果正常 } recv_message_bytes = message_tuple[0] client_tuple = message_tuple[1] # 客戶端元組信息,形如 ('127.0.0.1', 59699) # 確保接收到的消息是 pickle.dumps() 轉(zhuǎn)成的bytes類型,并把數(shù)據(jù)轉(zhuǎn)成dict類型 # recv_message_dict 的格式形如上面的response_message_dict格式 recv_message_dict = self.__check_recv_message_type(recv_message_bytes) if not recv_message_dict: response_message_dict["body"] = self.__message_type_unsupported response_message_dict["status"] = 0 self.__send_bytes_message(response_message_dict, client_tuple) return # 把客戶端的進入的目錄,賦值給返回值的目錄。 response_message_dict["pwd"] = recv_message_dict.get("pwd", []) # 把客戶端傳遞進來的uuid,賦值給返回值的目錄 response_message_dict["uuid"] = recv_message_dict.get("uuid", "") # 接收到的消息符合規(guī)范,就需要根據(jù)"type"類型進行分類。 try: print("recv_message_dict: ", recv_message_dict) if recv_message_dict.get("type", "") == "cmd": self.__cmd_handler(recv_message_dict, response_message_dict, client_tuple) elif recv_message_dict.get("type", "") == "download": self.__download_handler(recv_message_dict, response_message_dict, client_tuple) pass except Exception: self.__logger.error("Server message handler exception !!!" + traceback.format_exc()) response_message_dict["status"] = 0 response_message_dict["body"] = self.__message_type_server_inner_error self.__send_bytes_message(response_message_dict, client_tuple) def __check_recv_message_type(self, recv_message_bytes: bytes) -> dict: """ 確保接收到的消息是 pickle.dumps() 轉(zhuǎn)成的bytes類型,并且通過pickle.loads()把bytes類型的數(shù)據(jù)轉(zhuǎn)成dict后, dict類型數(shù)據(jù)中要有: "type"、"body"、"pwd"、"uuid" 等字段 :param recv_message_bytes: :return: """ ret_dict = {} try: message_dict = pickle.loads(recv_message_bytes) except Exception: return ret_dict if not isinstance(message_dict, dict): return ret_dict # 接收到的dict類型的消息中,必須要有如下的字段 if not {"type", "body", "pwd", "uuid"}.issubset(set(message_dict.keys())): return ret_dict return message_dict def __send_bytes_message(self, message_dict: dict, client_tuple: tuple): """ 使用pickle.dumps()把字典格式的消息(message_dict),轉(zhuǎn)成bytes發(fā)送給客戶端(client_tuple) :param message_dict: :param client_tuple: :return: """ try: message_bytes = pickle.dumps(message_dict) self.__socket_obj.sendto(message_bytes, client_tuple) except Exception: self.__logger.error("Send message to client exception !!! " + traceback.format_exc()) message_dict["status"] = 0 message_dict["body"] = self.__message_type_server_inner_error message_bytes = pickle.dumps(message_dict) self.__socket_obj.sendto(message_bytes, client_tuple) def __cmd_handler(self, recv_message_dict: dict, response_message_dict: dict, client_tuple: tuple): """ 處理消息體的type參數(shù)值是"cmd",這種命令行的消息 :param recv_message_dict: :param response_message_dict: :param client_tuple: :return: """ cmd_str = recv_message_dict.get("body", "").lower().strip() if not cmd_str or (not isinstance(cmd_str, str)): # 返回值形如: {'type': 'cmd', 'body': 'Unsupported message type', 'pwd': ['download']} response_message_dict["body"] = self.__message_type_unsupported response_message_dict["status"] = 0 self.__send_bytes_message(response_message_dict, client_tuple) return if cmd_str in ("ls", "ll"): # 查看當(dāng)前目錄的文件目錄列表 # 返回值形如: {'type': 'cmd', 'body': [('folder1', 'd'), ('file1.txt', 'f')]} customer_dir = self.__root_dir if recv_message_dict.get("pwd", []): customer_dir = os.path.join(self.__root_dir, *recv_message_dict.get("pwd", [])) response_message_dict["body"] = self.__file_handler.list_dir(customer_dir) response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return if cmd_str.startswith("cd"): # 切換到下一個目錄 if cmd_str.strip() == "cd": response_message_dict["pwd"] = [] response_message_dict["body"] = self.__message_type_ok response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return target_dir = cmd_str.split(" ")[-1] if target_dir == "..": response_message_dict["pwd"] = response_message_dict["pwd"][0:-1] response_message_dict["body"] = self.__message_type_ok response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return if target_dir == "/": response_message_dict["body"] = self.__message_type_ok response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return if not os.path.exists(os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir)): # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']} response_message_dict["body"] = self.__message_type_path_not_exists response_message_dict["status"] = 0 self.__send_bytes_message(response_message_dict, client_tuple) return elif not (os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir).startswith(self.__root_dir)): # 客戶進入的目錄,必須是以 self.__root_dir 開頭,不能進入到其他目錄 # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']} # response_message_dict["pwd"] = [] response_message_dict["body"] = self.__message_type_path_not_exists response_message_dict["status"] = 0 self.__send_bytes_message(response_message_dict, client_tuple) return elif os.path.isfile(os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir)): # 文件 # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']} response_message_dict["body"] = self.__message_type_path_not_exists response_message_dict["status"] = 0 self.__send_bytes_message(response_message_dict, client_tuple) return else: # 返回值形如: {'type': 'cmd', 'body': 'ok', 'pwd': ['folder1', 'folder2']} response_message_dict["pwd"].append(target_dir) response_message_dict["body"] = self.__message_type_ok response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return if cmd_str == "pwd": # 返回值形如 {'type': 'cmd', 'body': ['folder1'], 'pwd': ['folder1']} response_message_dict["body"] = response_message_dict.get("pwd", []) response_message_dict["status"] = 1 # 處理結(jié)果符合預(yù)期,狀態(tài)設(shè)置成1 self.__send_bytes_message(response_message_dict, client_tuple) return self.__send_bytes_message(response_message_dict, client_tuple) return def __download_handler(self, recv_message_dict: dict, response_message_dict: dict, client_tuple: tuple): """ 處理消息體的type參數(shù)值是"download",這種下載單個文件的命令行的消息 :param recv_message_dict: :param response_message_dict: :param client_tuple: :return: """ response_message_dict["type"] = "download" try: file_name = recv_message_dict["body"]["file_name"] # 要下載的文件的名字 pwd_list = recv_message_dict["pwd"] # 要下載的文件所在的路徑 start_index = recv_message_dict["body"]["start_index"] # 要下載的文件的起始位置索引 length = recv_message_dict["body"]["length"] # 單次下載的字節(jié)數(shù) if length > 1024 * 10: # 單次傳輸?shù)臄?shù)據(jù)太大的話,會報錯。通過測試,單次發(fā)送10KB數(shù)據(jù)比較合適。 length = 1024 * 10 file_name_absolute = os.path.join(self.__root_dir, *pwd_list, file_name) # 要下載的文件的絕對路徑 content_bytes, next_index = self.__file_handler.seek_file(file_path=file_name_absolute, start_index=start_index, length=length) response_message_dict["body"] = { "file_name": file_name, "pwd": pwd_list, "content_bytes": content_bytes, "start_index": start_index, "next_index": next_index } response_message_dict["status"] = 1 self.__send_bytes_message(response_message_dict, client_tuple) except Exception: response_message_dict["body"] = self.__message_type_server_inner_error response_message_dict["status"] = 0 self.__logger.error("Download file exception !!!" + traceback.format_exc()) self.__send_bytes_message(response_message_dict, client_tuple) def run(self): """ 調(diào)用該方法,啟動服務(wù)端 :return: """ self.__logger.info("Server is running at [%s@%s], wating for client ..." % (self.__host, self.__port)) while True: try: # message_tuple 是一個元組類型: (b'消息體', ('127.0.0.1', 61040)),第一個值是bytes類型的消息,第二個值是客戶端信息 message_tuple = self.__socket_obj.recvfrom(1024 * 1024) # 從客戶端接收到的消息 self.__message_handler(message_tuple) except Exception: self.__logger.error("FTP Server Error: " + traceback.format_exc()) time.sleep(1) if __name__ == '__main__': ftp_server_obj = FTPServerUDP(host="127.0.0.1", port=8090, root_dir=r"D:\var" # 服務(wù)端的根目錄,該目錄下的文件及目錄可以供客戶端查看、下載 ) ftp_server_obj.run()
客戶端代碼
""" 使用socket的udp協(xié)議實現(xiàn)的一個ftp客戶端。 服務(wù)器和客戶端之間傳遞數(shù)據(jù)格式: 1、服務(wù)端和客戶端統(tǒng)一使用Python的字典格式(也就是本例中自定義的"通信協(xié)議"),格式形如: { "type": "cmd", # 支持的值有: "cmd"、"download" "body": "ll", # 在cmd模式下,常用的命令有: ll、ls、cd 指定目錄、pwd "pwd": ["folder1", "folder2"], "status": 1, "uuid": "b93e21e659f711ee9285a46bb6f59f55" # uuid.uuid1().hex,用來保證客戶端和服務(wù)端 } 2、客戶端使用pickle模塊的pickle.dumps(dict類型數(shù)據(jù))把要發(fā)送的數(shù)據(jù)轉(zhuǎn)成bytes類型 3、客戶端使用socket的udp傳輸轉(zhuǎn)成的bytes數(shù)據(jù)給服務(wù)端 4、服務(wù)端接收到從客戶端發(fā)送過來的bytes類型的數(shù)據(jù),再使用pickle.loads(bytes類型數(shù)據(jù))把數(shù)據(jù)轉(zhuǎn)成原始的dict類型。 使用socket的udp,既能接收數(shù)據(jù),又能發(fā)送數(shù)據(jù),因此服務(wù)端和客戶端都是相對的。 """ import socket import sys import time import pickle import os import logging import traceback import uuid LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s" logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) _logger = logging.getLogger() class FTPClientUDP: def __init__(self, host="127.0.0.1", port=8090, download_path="D:\my_download", logger=None): self.__host = host # 服務(wù)器的IP地址 self.__port = port # 服務(wù)器的端口號 self.__download_root_path = download_path # 要下載的文件路徑 self.__logger = logger if logger else _logger if not os.path.exists(download_path): os.makedirs(download_path, True) self.__socket_obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP連接對象 self.__is_downloading = False # 用來標(biāo)識是否在下載文件,當(dāng)在下載文件的時候,就不會再接收輸入?yún)?shù) self.__pwd_list = [] # 用來存儲用戶所在的當(dāng)前目錄 self.__server_tuple = (self.__host, self.__port) # 服務(wù)端的地址和端口號組成的元組 def __cmd_handler(self, cmd_dict: dict): """ :param cmd_dict: 形如:{'type': 'cmd', 'body': 'll', 'pwd': [], 'status': 1, 'uuid': 'f5c2f4d263f111ee877aa46bb6f59f55'} :return: """ # print("cmd_dict: ", cmd_dict) cmd_bytes = pickle.dumps(cmd_dict) if cmd_dict["body"] in ["ll", "ls"]: self.__cmd_handler_ls(cmd_dict, cmd_bytes) elif cmd_dict["body"].startswith("cd "): self.__cmd_handler_cd(cmd_dict, cmd_bytes) elif cmd_dict["body"] == "pwd": self.__cmd_handler_pwd(cmd_dict, cmd_bytes) elif cmd_dict["body"].startswith("get "): self.__cmd_handler_get(cmd_dict, cmd_bytes) else: print("Command [%s] not supported !!!" % cmd_dict["body"]) print() def __cmd_handler_ls(self, cmd_dict: dict, cmd_bytes: bytes): """ 發(fā)送類似于"ls -la" 命令給服務(wù)端,并把返回值打印到終端 :param cmd_dict: :param cmd_bytes: :return: """ recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes) self.__print_normal_info(recv_message_dict) def __cmd_handler_cd(self, cmd_dict: dict, cmd_bytes: bytes): """ 發(fā)送類似于"cd 指定目錄"命令給服務(wù)端,并把返回值打印到終端 :param cmd_dict: :param cmd_bytes: :return: """ recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes) self.__print_normal_info(recv_message_dict) if recv_message_dict.get("status", 0): self.__pwd_list = recv_message_dict["pwd"] # 保存客戶端進入的目錄 def __cmd_handler_pwd(self, cmd_dict: dict, cmd_bytes: bytes): """ 發(fā)送 "pwd" 命令給服務(wù)端,并把返回值打印到終端 :param cmd_dict: :param cmd_bytes: :return: """ recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes) self.__print_pwd_info(recv_message_dict) if recv_message_dict.get("status", 0): self.__pwd_list = recv_message_dict["pwd"] # 保存客戶端進入的目錄 def __cmd_handler_get(self, cmd_dict: dict, cmd_bytes: bytes): # 獲取當(dāng)前目錄下面的所有的文件,包括文件夾 if cmd_dict["body"].lower().strip() == "get all": self.__download_handler_all_file() return # 獲取當(dāng)前目錄下面"get "命令后面的一個文件 command = cmd_dict["body"] file_name = cmd_dict["body"].split(" ")[-1] # 字符串類型的要下載的文件名 if not file_name: return # 首先獲取當(dāng)前目錄"ll"的返回值 cmd_dict_ll = {'type': 'cmd', 'body': 'll', 'pwd': cmd_dict["pwd"], 'status': 1, 'uuid': uuid.uuid1().hex } ret_dict = self.__recv_normal_cmd_handler(cmd_dict_ll) # print("111######### ret_dict: ", ret_dict) can_download = False file_size = 0 # 文件大小 pwd = [] # 文件所處路徑 file_type = "f" # f:文件 d:目錄 for item_tuple in ret_dict["body"]: # item_tuple形如 ('aa.txt', 34, 'f') if item_tuple[0] == file_name: can_download = True file_size = item_tuple[1] pwd = ret_dict["pwd"] file_type = item_tuple[-1] if can_download and file_type == "f": self.__download_handler_one_file(file_name, file_size, pwd) print() if can_download and file_type == "d": raw_pwd = self.__pwd_list # 存儲用戶最原始的pwd目錄。 self.__download_handler_one_folder(file_name, pwd) self.__pwd_list = raw_pwd # 把用戶最原始的pwd目錄再重新賦給self.__pwd_list變量中。 else: print() def __download_handler_one_file(self, file_name: str, file_size=0, pwd=[]): # 發(fā)送下載單個文件的命令格式,下載的時候,要來回更新 download_cmd_dict['body']['start_index']的值 download_cmd_dict = { 'type': 'download', 'body': { "file_name": file_name, "pwd": pwd, "start_index": 0, # "length": 1024 * 1024 # 使用UDP傳輸?shù)臄?shù)據(jù),單次數(shù)據(jù)太大的話會報錯 }, 'pwd': pwd, 'status': 1, 'uuid': uuid.uuid1().hex } try: file_absolute = os.path.join(self.__download_root_path, *pwd, file_name) if not os.path.exists(os.path.dirname(file_absolute)): os.mkdir(os.path.dirname(file_absolute)) with open(file_absolute, "wb") as fh: while True: download_cmd_bytes = pickle.dumps(download_cmd_dict) ret_dict = self.__recv_normal_cmd_handler(download_cmd_dict, download_cmd_bytes) if not ret_dict["status"]: # 下載途中失敗了 self.__logger.error("Download exception: " + str(ret_dict["body"])) break fh.write(ret_dict["body"]["content_bytes"]) next_index = ret_dict["body"]["next_index"] if ret_dict["body"]["next_index"] == -1: # 說明已經(jīng)把文件下載完了 break download_cmd_dict["body"]["start_index"] = ret_dict["body"]["next_index"] download_cmd_dict["uuid"] = uuid.uuid1().hex self.__print_download_info(file_absolute, file_size, next_index) self.__print_download_successfully_info(file_absolute) except Exception: self.__logger.error("Download file exception !!!" + traceback.format_exc()) def __download_handler_one_folder(self, file_name: str, pwd=[]): """ 下載一個文件夾。 :param file_name: 即文件夾名字。進入到該方法,默認(rèn)文件名是存在的 :param pwd: :return: """ if not os.path.exists(os.path.join(self.__download_root_path, *pwd, file_name)): os.makedirs(os.path.join(self.__download_root_path, *pwd, file_name)) cmd_dict_ll = {'type': 'cmd', 'body': 'll', 'pwd': pwd, # 該參數(shù)會在后面的代碼中進行替換更新 'status': 1, 'uuid': uuid.uuid1().hex } cmd_dict_cd = {'type': 'cmd', 'body': 'cd ' + file_name, # 進入到的目錄 'pwd': pwd, 'status': 1, 'uuid': uuid.uuid1().hex } # 先進入指定目錄 recv_message_dict_cd = self.__recv_normal_cmd_handler(cmd_dict_cd, None) pwd = recv_message_dict_cd["pwd"] # 保存進入的目錄 # print("#### pwd: ", pwd) # 獲取指定目錄的文件列表 cmd_dict_ll["pwd"] = pwd recv_message_dict_ll = self.__recv_normal_cmd_handler(cmd_dict_ll, None) # print("&&&&&&&v recv_message_dict_ll: ", recv_message_dict_ll) if recv_message_dict_ll["status"] and recv_message_dict_ll["body"]: for item_tuple in recv_message_dict_ll["body"]: # print("@@@@ item_tuple: ", item_tuple) if item_tuple[-1] == "f": self.__download_handler_one_file(file_name=item_tuple[0], file_size=item_tuple[1], pwd=pwd ) elif item_tuple[-1] == "d": self.__download_handler_one_folder(file_name=item_tuple[0], pwd=pwd) def __download_handler_all_file(self): """ 下載當(dāng)前目錄下面的所有文件及文件夾 :return: """ if not os.path.exists(os.path.join(self.__download_root_path, *self.__pwd_list)): os.makedirs(os.path.join(self.__download_root_path, *self.__pwd_list)) cmd_dict_ll = {'type': 'cmd', 'body': 'll', 'pwd': self.__pwd_list, # 該參數(shù)會在后面的代碼中進行替換更新 'status': 1, 'uuid': uuid.uuid1().hex } recv_message_dict_ll = self.__recv_normal_cmd_handler(cmd_dict_ll, None) if recv_message_dict_ll["status"] and recv_message_dict_ll["body"]: for item_tuple in recv_message_dict_ll["body"]: if item_tuple[-1] == "f": self.__download_handler_one_file(file_name=item_tuple[0], file_size=item_tuple[1], pwd=recv_message_dict_ll["pwd"] ) elif item_tuple[-1] == "d": self.__download_handler_one_folder(file_name=item_tuple[0], pwd=recv_message_dict_ll["pwd"] ) def __recv_normal_cmd_handler(self, cmd_dict: dict, cmd_bytes: bytes=None, try_times=3) -> dict: """ 持續(xù)發(fā)送一條命令到客戶端,直到正常接收到數(shù)據(jù)后結(jié)束 :param cmd_dict: 形如 {'type': 'cmd', 'body': 'get aa.txt', 'pwd': [], 'status': 1, 'uuid': '464899225dcb11ee9a91a46bb6f59f55'} :param cmd_bytes: :param try_times: 重試的次數(shù),即 :return: 形如 {'type': 'cmd', 'body': [('log', 0, 'd'), ('mylog.txt', 8, 'f')], 'pwd': [], 'status': 1, 'uuid': '464899235dcb11eebf99a46bb6f59f55'} 或者 {'type': 'cmd', 'body': {'file_name': 'logger.log', 'pwd': ['log'], 'content_bytes': b'2023-', 'start_index': 0, 'next_index': 5}, 'pwd': [], 'status': 1, 'uuid': '464899235dcb11eebf99a46bb6f59f55'} """ ret_dict = {} cmd_bytes = cmd_bytes if cmd_bytes else pickle.dumps(cmd_dict) try: for x in range(try_times): # print("cmd_dict_ll: ", cmd_dict) self.__socket_obj.sendto(cmd_bytes, self.__server_tuple) # 服務(wù)器端的地址 recv_message_tuple = self.__socket_obj.recvfrom(1024 * 1024) recv_message_bytes = recv_message_tuple[0] recv_message_dict = pickle.loads(recv_message_bytes) if cmd_dict.get("uuid", "False") == recv_message_dict.get("uuid", "True"): ret_dict = recv_message_dict break time.sleep(0.1) except Exception: self.__logger.error("Recv normal cmd info exception !!!" + traceback.format_exc()) finally: return ret_dict def __print_normal_info(self, recv_message_dict: dict): message_body = recv_message_dict.get("body", None) if isinstance(message_body, list): for item in message_body: print("%-20s %-20s %-20s" % (item[0], item[1], item[2])) print() return True if isinstance(message_body, str): print("%-20s" % message_body) print() return True return True def __print_pwd_info(self, recv_message_dict: dict): pwd_list = recv_message_dict.get("pwd", []) if not pwd_list: print("%-20s" % "/") else: pwd_str = '/' + '/'.join(pwd_list) print("%-20s" % pwd_str) print() return True def __print_download_info(self, file_name, file_size, next_index): sys.stdout.write("\r[%s] -->: %s" % (file_name, ("%.3f" % (next_index / file_size* 100) + "%"))) def __print_download_successfully_info(self, file_name): """ 最終打印100% :param file_name: :return: """ sys.stdout.write("\r[%s] -->: %s" % (file_name, "100%")) print() def run(self): while True: try: # 沒有下載任務(wù)的話,則接收輸入新的命令 if not self.__is_downloading: cmd = input("Input: ") cmd_dict = { "type": "cmd", "body": cmd, "pwd": self.__pwd_list, "status": 1, "uuid": uuid.uuid1().hex } self.__cmd_handler(cmd_dict) print("================================================") else: time.sleep(1) except Exception: self.__logger.error("Client exception: " + traceback.format_exc()) time.sleep(1) if __name__ == '__main__': ftp_client_obj = FTPClientUDP(host="127.0.0.1", port=8090, download_path="D:\my_download" # 客戶端下載文件的根目錄 ) ftp_client_obj.run()
到此這篇關(guān)于Python使用socket的UDP協(xié)議實現(xiàn)FTP文件服務(wù)的文章就介紹到這了,更多相關(guān)Python UDP協(xié)議 FTP文件服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pyqt 實現(xiàn)QlineEdit 輸入密碼顯示成圓點的方法
今天小編就為大家分享一篇pyqt 實現(xiàn)QlineEdit 輸入密碼顯示成圓點的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06Python編程調(diào)用百度API實現(xiàn)地理位置經(jīng)緯度坐標(biāo)轉(zhuǎn)換示例
這篇文章主要介紹了Python編程調(diào)用百度API來實現(xiàn)地理位置經(jīng)緯度坐標(biāo)轉(zhuǎn)換的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-10-10python獲取當(dāng)前git的repo地址的示例代碼
大家好,當(dāng)談及版本控制系統(tǒng)時,Git是最為廣泛使用的一種,而Python作為一門多用途的編程語言,在處理Git倉庫時也展現(xiàn)了其強大的能力,本文給大家介紹了python獲取當(dāng)前git的repo地址的方法,需要的朋友可以參考下2024-09-09