python3中http協(xié)議提供文件服務(wù)器功能詳解
http協(xié)議是互聯(lián)網(wǎng)的通用基礎(chǔ)協(xié)議,也可以利用其來(lái)開(kāi)發(fā)文件服務(wù)器,給客戶(hù)提供文件瀏覽,查看,下載,上傳等功能。
1.python3自帶http文件服務(wù)
python3中http.server提供http文件服務(wù),默認(rèn)端口是8000,可以進(jìn)行修改
運(yùn)行命令:
python3 -m http.server 12567
另外python2中可以使用SimpleHTTPServer來(lái)提供http文件服務(wù),默認(rèn)端口是8000,ye可以進(jìn)行修改
運(yùn)行命令:
python -m SimpleHTTPServer 12567運(yùn)行效果如下:
2.python3從頭開(kāi)發(fā)http文件服務(wù)
http_server.py
# -*- coding: UTF-8 -*- import os, time import sys, socket import posixpath #escape:逃跑,用來(lái)讓特殊符號(hào)表示它本來(lái)的意思 try: from html import escape except ImportError: from cgi import escape import shutil import mimetypes import re import signal from io import StringIO, BytesIO import codecs from urllib.parse import quote from urllib.parse import unquote import urllib.parse from http.server import HTTPServer from http.server import BaseHTTPRequestHandler """urlencode quote unquote urllib.parse.urlencode urlencode函數(shù),可以把key-value這樣的鍵值對(duì)轉(zhuǎn)換成我們想要的格式,返回的是a=1&b=2這樣的字符串 urllib.parse.quote quote對(duì)一個(gè)字符串進(jìn)行urlencode轉(zhuǎn)換 urllib.parse.unquote unquote與quote對(duì)應(yīng),用來(lái)解碼quote處理后的結(jié)果 """ """內(nèi)存IO之StringIO和BytesIO 參考博客: https://zhuanlan.zhihu.com/p/332651899 """ """HTTPServer 參考博客 https://www.cnblogs.com/jason-huawen/p/16241405.html """ """ 瀏覽器運(yùn)行: http://127.0.0.1:18081 """ class MyHTTPRequestHandler(BaseHTTPRequestHandler): address = socket.gethostbyname(socket.gethostname()) treefile = "sesssion_cache.txt" mylist = [] myspace = "" def do_GET(self): """處理GET請(qǐng)求 """ print(MyHTTPRequestHandler.address) paths = unquote(self.path) path = str(paths) print("path is ", paths, path) ''' self.send_error(404, "File not found") ''' if self.remove_dir_or_file(path): return fd = self.send_head() if fd: #關(guān)鍵代碼: self.wfile用來(lái)向客戶(hù)端寫(xiě)入數(shù)據(jù) shutil.copyfileobj(fd, self.wfile) if path == "/": self.traversal_file(translate_path(self.path)) self.write_list(MyHTTPRequestHandler.treefile) fd.close() def do_HEAD(self): """處理HEAD請(qǐng)求 """ fd = self.send_head() if fd: fd.close() def do_POST(self): """處理POST請(qǐng)求 """ r, info = self.deal_post_data() #拼裝HTML文本 f = BytesIO() f.write(b'<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(b"<html>\n<title>Upload Result Page</title>\n") f.write(b"<body>\n<h2>Upload Result Page</h2>\n") f.write(b"<hr>\n") if r: f.write(b"<strong>Success:</strong><br>") else: f.write(b"<strong>Failed:</strong><br>") for i in info: print(r, i, "by: ", self.client_address) f.write(i.encode('utf-8')+b"<br>") f.write(b"<br><a href=\"%s\">back</a>" % self.headers['referer'].encode('ascii')) f.write(b"</body>\n</html>\n") length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html;charset=utf-8") self.send_header("Content-Length", str(length)) self.end_headers() if f: shutil.copyfileobj(f, self.wfile) f.close() #每次提交post請(qǐng)求之后更新目錄樹(shù)文件 self.p(translate_path(self.path)) self.write_list(MyHTTPRequestHandler.treefile) def str_to_chinese(self,var): not_end = True while not_end: start1 = var.find("\\x") # print start1 if start1 > -1: str1 = var[start1 + 2:start1 + 4] print(str1) start2 = var[start1 + 4:].find("\\x") + start1 + 4 if start2 > -1: str2 = var[start2 + 2:start2 + 4] start3 = var[start2 + 4:].find("\\x") + start2 + 4 if start3 > -1: str3 = var[start3 + 2:start3 + 4] else: not_end = False if start1 > -1 and start2 > -1 and start3 > -1: str_all = str1 + str2 + str3 # print str_all str_all = codecs.decode(str_all, "hex").decode('utf-8') str_re = var[start1:start3 + 4] # print str_all, " " ,str_re var = var.replace(str_re, str_all) # print var.decode('utf-8') return var def deal_post_data(self): boundary = self.headers["Content-Type"].split("=")[1].encode('ascii') print("boundary===", boundary) remain_bytes = int(self.headers['content-length']) print("remain_bytes===", remain_bytes) res = [] line = self.rfile.readline() while boundary in line and str(line, encoding = "utf-8")[-4:] != "--\r\n": #line = self.rfile.readline() remain_bytes -= len(line) if boundary not in line: return False, "Content NOT begin with boundary" line = self.rfile.readline() remain_bytes -= len(line) print("line!!!",line) fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', str(line)) if not fn: return False, "Can't find out file name..." path = translate_path(self.path) fname = fn[0] #fname = fname.replace("\\", "\\\\") fname = self.str_to_chinese(fname) print("------",fname) fn = os.path.join(path, fname) while os.path.exists(fn): fn += "_" print("!!!!",fn) dirname = os.path.dirname(fn) if not os.path.exists(dirname): os.makedirs(dirname) line = self.rfile.readline() remain_bytes -= len(line) line = self.rfile.readline() # b'\r\n' remain_bytes -= len(line) try: out = open(fn, 'wb') except IOError: return False, "Can't create file to write, do you have permission to write?" pre_line = self.rfile.readline() print("pre_line", pre_line) remain_bytes -= len(pre_line) print("remain_bytes", remain_bytes) Flag = True while remain_bytes > 0: line = self.rfile.readline() print("while line", line) if boundary in line: remain_bytes -= len(line) pre_line = pre_line[0:-1] if pre_line.endswith(b'\r'): pre_line = pre_line[0:-1] out.write(pre_line) out.close() res.append("File '%s' upload success!" % fn) Flag = False break else: out.write(pre_line) pre_line = line if pre_line is not None and Flag == True: out.write(pre_line) out.close() res.append("File '%s' upload success!" % fn) return True, res def remove_dir_or_file(self, path): plist = path.split("/", 2) print("plist plist plist plist ", plist) if len(plist) > 2 and plist[1] == "delete": #路徑轉(zhuǎn)化 file = translate_path(plist[2]) print("======>>>>>>>>>> file", file) if os.path.exists(file): fdir = os.path.dirname(file) print("======>>>>>>>>>> ", file, fdir) #刪除文件 os.remove(file) if not os.listdir(fdir): #刪除目錄 os.removedirs(fdir) time.sleep(0.5) # 0.5s后重定向 self.send_response(302) self.send_header('Location', "/") self.end_headers() return True print("======>>>>>>>>>> file not exists ", file) return False def send_head(self): """發(fā)送HTTP頭 """ path = translate_path(self.path) if os.path.isdir(path): if not self.path.endswith('/'): # redirect browser - doing basically what apache does self.send_response(301) self.send_header("Location", self.path + "/") self.end_headers() return None for index in "index.html", "index.htm": index = os.path.join(path, index) if os.path.exists(index): path = index break else: return self.list_directory(path) print("=================================") content_type = self.guess_type(path) try: # Always read in binary mode. Opening files in text mode may cause # newline translations, making the actual size of the content # transmitted *less* than the content-length! f = open(path, 'rb') except IOError: self.send_error(404, "File not found") return None self.send_response(200) self.send_header("Content-type", content_type) fs = os.fstat(f.fileno()) self.send_header("Content-Length", str(fs[6])) self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) self.end_headers() return f def list_directory(self, path): """Helper to produce a directory listing (absent index.html). Return value is either a file object, or None (indicating an error). In either case, the headers are sent, making the interface the same as for send_head(). """ try: list_dir = os.listdir(path) except os.error: self.send_error(404, "No permission to list directory") return None list_dir.sort(key=lambda a: a.lower()) f = BytesIO() display_path = escape(unquote(self.path)) f.write(b'<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(b"<html>\n<title>Directory listing for %s</title>\n" % display_path.encode('ascii')) f.write(b"<body>\n<h2>Directory listing for %s</h2>\n" % display_path.encode('ascii')) f.write(b"<hr>\n") #上傳目錄 f.write(b"<h3>Directory Updating</h3>\n") f.write(b"<form ENCTYPE=\"multipart/form-data\" method=\"post\">") #@change=\"handleChange\" @click=\"handelClick\" f.write(b"<input ref=\"input\" webkitdirectory multiple name=\"file\" type=\"file\"/>") f.write(b"<input type=\"submit\" value=\"uploadDir\"/></form>\n") f.write(b"<hr>\n") #上傳文件 f.write(b"<h3>Files Updating</h3>\n") f.write(b"<form ENCTYPE=\"multipart/form-data\" method=\"post\">") f.write(b"<input ref=\"input\" multiple name=\"file\" type=\"file\"/>") f.write(b"<input type=\"submit\" value=\"uploadFiles\"/></form>\n") f.write(b"<hr>\n") #表格 f.write(b"<table with=\"100%\">") f.write(b"<tr><th>path</th>") f.write(b"<th>size(Byte)</th>") f.write(b"<th>modify time</th>") f.write(b"</tr>") # 根目錄下所有的內(nèi)容 for name in list_dir: # 根目錄下的路徑 fullname = os.path.join(path, name) # 目錄名/文件名 display_name = linkname = name print("display_name ==> ", display_name) if display_name.upper() == "HTTP_SERVER.PY": continue # 如果是文件夾的話(huà) if os.path.isdir(fullname): # 遍歷文件夾 for root, dirs, files in os.walk(fullname): # root 表示當(dāng)前正在訪問(wèn)的文件夾路徑 # dirs 表示該文件夾下的子目錄名list # files 表示該文件夾下的文件list # 遍歷文件 for fi in files: print("########", os.path.join(root, fi)) display_name = os.path.join(root, fi) #刪除前面的xx個(gè)字符,取出相對(duì)路徑 relativePath = display_name[len(os.getcwd()):].replace('\\','/') st = os.stat(display_name) fsize = st.st_size fmtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(st.st_mtime)) f.write(b"<tr>") f.write(b'<td><a href="%s" rel="external nofollow" rel="external nofollow" rel="external nofollow" >%s</a></td>' % (quote(relativePath).encode('utf-8'), escape(relativePath).encode('utf-8'))) f.write(b"<td>%d</td>" % fsize) f.write(b"<td>%s</td>" % escape(fmtime).encode('ascii')) f.write(b"<td><a href=\"/delete/%s\">delete</a>" % escape(relativePath).encode('utf-8')) f.write(b"</tr>") # 遍歷所有的文件夾名字,其實(shí)在上面walk已經(jīng)遍歷過(guò)了 # for d in dirs: # print(d) # 如果是鏈接文件 elif os.path.islink(fullname): linkname = linkname + "/" print("real===", linkname) display_name = name + "@" # Note: a link to a directory displays with @ and links with / f.write(b'<li><a href="%s" rel="external nofollow" rel="external nofollow" rel="external nofollow" >%s</a>\n' % (quote(linkname).encode('ascii'), escape(display_name).encode('ascii'))) else: #其他直接在根目錄下的文件直接顯示出來(lái) st = os.stat(display_name) fsize = st.st_size fmtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(st.st_mtime)) f.write(b"<tr>") f.write(b'<td><a href="%s" rel="external nofollow" rel="external nofollow" rel="external nofollow" >%s</a></td>' % (quote(linkname).encode('utf-8'), escape(display_name).encode('utf-8'))) f.write(b"<td>%d</td>" % fsize) f.write(b"<td>%s</td>" % escape(fmtime).encode('ascii')) f.write(b"<td><a href=\"/delete/%s\">delete</a>" % escape(display_name).encode('utf-8')) f.write(b"</tr>") f.write(b"</table>") f.write(b"\n<hr>\n</body>\n</html>\n") length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html;charset=utf-8") self.send_header("Content-Length", str(length)) self.end_headers() return f def guess_type(self, path): """Guess the type of a file. Argument is a PATH (a filename). Return value is a string of the form type/subtype, usable for a MIME Content-type header. The default implementation looks the file's extension up in the table self.extensions_map, using application/octet-stream as a default; however it would be permissible (if slow) to look inside the data to make a better guess. """ base, ext = posixpath.splitext(path) if ext in self.extensions_map: return self.extensions_map[ext] ext = ext.lower() if ext in self.extensions_map: return self.extensions_map[ext] else: return self.extensions_map[''] if not mimetypes.inited: mimetypes.init() # try to read system mime.types extensions_map = mimetypes.types_map.copy() extensions_map.update({ '': 'application/octet-stream', # Default '.py': 'text/plain', '.c': 'text/plain', '.h': 'text/plain', }) def traversal_file(self, url): print("url:", url) files = os.listdir(r''+url) for file in files: myfile = url + "http://" + file size = os.path.getsize(myfile) if os.path.isfile(myfile): MyHTTPRequestHandler.mylist.append(str(MyHTTPRequestHandler.myspace)+"|____"+file +" "+ str(size)+"\n") if os.path.isdir(myfile) : MyHTTPRequestHandler.mylist.append(str(MyHTTPRequestHandler.myspace)+"|____"+file + "\n") #get into the sub-directory,add "| " MyHTTPRequestHandler.myspace = MyHTTPRequestHandler.myspace+"| " self.traversal_file(myfile) #when sub-directory of iteration is finished,reduce "| " MyHTTPRequestHandler.myspace = MyHTTPRequestHandler.myspace[:-5] def get_all_file_list(self): listofme = [] for root, dirs, files in os.walk(translate_path(self.path)): files.sort() for fi in files: display_name = os.path.join(root, fi) #刪除前面的XXX個(gè)字符,取出相對(duì)當(dāng)前目錄的路徑 relativePath = display_name[len(os.getcwd()):].replace('\\','/') st = os.stat(display_name) fsize = st.st_size fmtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(st.st_mtime)) listofme.append(relativePath+"\t") listofme.append(str(fsize)+"\t") listofme.append(str(fmtime)+"\t\n") return listofme def write_list(self,url): f = open(url,'w') f.write("http://"+str(MyHTTPRequestHandler.address)+":8001/ directory tree\n") MyHTTPRequestHandler.mylist.sort() f.writelines(MyHTTPRequestHandler.mylist) f.write("\nFile Path\tFile Size\tFile Modify Time\n") f.writelines(self.get_all_file_list()) MyHTTPRequestHandler.mylist = [] MyHTTPRequestHandler.myspace = "" f.close() print("write_list end") def translate_path(path): path = path.split('?', 1)[0] path = path.split('#', 1)[0] path = posixpath.normpath(unquote(path)) words = path.split('/') words = filter(None, words) path = os.getcwd() for word in words: drive, word = os.path.splitdrive(word) head, word = os.path.split(word) if word in (os.curdir, os.pardir): continue path = os.path.join(path, word) return path def signal_handler(signal, frame): print(signal, frame) exit() def main(): print('python version:', sys.version_info.major, sys.version_info.minor) if sys.argv[1:]: port = int(sys.argv[1]) else: port = 18081 #server_address = ('', port) address = ('127.0.0.1', port) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGQUIT, signal_handler) #忽略ctrl+z:SIGTSTP(掛起信號(hào)) signal.signal(signal.SIGTSTP, signal.SIG_IGN) server = HTTPServer(address, MyHTTPRequestHandler) server_info = server.socket.getsockname() print("server info: " + str(server_info[0]) + ", port: " + str(server_info[1]) + " ...") server.serve_forever() def test_url_code(): values = {'username': 'hello 你好', 'password': 'pass'} data = urllib.parse.urlencode(values) print("data = ", data) name = '狄仁杰' name = urllib.parse.quote(name) print("name = ", name) dname = urllib.parse.unquote(name) print("dname = " + dname) if __name__ == '__main__': test_url_code() main()
運(yùn)行效果如下:
到此這篇關(guān)于python3中http協(xié)議提供文件服務(wù)器功能詳解的文章就介紹到這了,更多相關(guān)python3 http文件服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python 窗體(tkinter)下拉列表框(Combobox)實(shí)例
這篇文章主要介紹了Python 窗體(tkinter)下拉列表框(Combobox)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03Python中XML轉(zhuǎn)JSON、XML轉(zhuǎn)字典代碼示例
大家都知道python的字典和json類(lèi)似,那么可不可以先將xml轉(zhuǎn)換成json再去做其他的事情呢,下面這篇文章主要給大家介紹了關(guān)于Python中XML轉(zhuǎn)JSON、XML轉(zhuǎn)字典的相關(guān)資料,需要的朋友可以參考下2024-02-02python中迭代器(iterator)用法實(shí)例分析
這篇文章主要介紹了python中迭代器(iterator)用法,實(shí)例分析了Python中迭代器的相關(guān)使用技巧,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2015-04-04Python操作mongodb數(shù)據(jù)庫(kù)的方法詳解
這篇文章主要介紹了Python操作mongodb數(shù)據(jù)庫(kù)的方法,結(jié)合實(shí)例形式詳細(xì)分析了Python下載、安裝pymongo及操作MongoDB數(shù)據(jù)庫(kù)相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2018-12-12Python中asyncore異步模塊的用法及實(shí)現(xiàn)httpclient的實(shí)例
asyncore即是一個(gè)異步的socket封裝,特別是dispatcher類(lèi)中包含了很多異步調(diào)用的socket操作方法,非常犀利,下面我們就來(lái)講解Python中asyncore異步模塊的用法及實(shí)現(xiàn)httpclient的實(shí)例2016-06-06Python實(shí)現(xiàn)移動(dòng)指定圖片到指定目錄
這篇文章主要為大家詳細(xì)介紹了如何使用Python的os和shutil庫(kù)實(shí)現(xiàn)自動(dòng)化查找和移動(dòng)圖片功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下2025-02-02Python 自動(dòng)化處理Excel和Word實(shí)現(xiàn)自動(dòng)辦公
毫無(wú)疑問(wèn),Microsoft Excel 和 Word 是我們?nèi)粘^k公中使用最廣泛的辦公軟件。將反復(fù)、復(fù)雜的工作自動(dòng)化處理,是我們需要思考的問(wèn)題,本篇文章幫你解決這個(gè)問(wèn)題2021-11-11