Python編寫一個優(yōu)美的下載器
更新時間:2018年04月15日 14:09:58 作者:ccpw_cn
這篇文章主要教大家如何使用Python編寫一個優(yōu)美的下載器,具有一定的參考價值,感興趣的小伙伴們可以參考一下
本文實例為大家分享了Python編寫下載器的具體代碼,供大家參考,具體內(nèi)容如下
#!/bin/python3 # author: lidawei # create: 2016-07-11 # version: 1.0 # 功能說明: # 從指定的URL將文件取回本地 ##################################################### import http.client import os import threading import time import logging import unittest from queue import Queue from urllib.parse import urlparse logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt = '%a, %d %b %Y %H:%M:%S', filename = 'Downloader_%s.log' % (time.strftime('%Y-%m-%d')), filemode = 'a') class Downloader(object): '''''文件下載器''' url = '' filename = '' def __init__(self, full_url_str, filename): '''''初始化''' self.url = urlparse(full_url_str) self.filename = filename def download(self): '''''執(zhí)行下載,返回True或False''' if self.url == '' or self.url == None or self.filename == '' or self.filename == None: logging.error('Invalid parameter for Downloader') return False successed = False conn = None if self.url.scheme == 'https': conn = http.client.HTTPSConnection(self.url.netloc) else: conn = http.client.HTTPConnection(self.url.netloc) conn.request('GET', self.url.path) response = conn.getresponse() if response.status == 200: total_size = response.getheader('Content-Length') total_size = (int)(total_size) if total_size > 0: finished_size = 0 file = open(self.filename, 'wb') if file: progress = Progress() progress.start() while not response.closed: buffers = response.read(1024) file.write(buffers) finished_size += len(buffers) progress.update(finished_size, total_size) if finished_size >= total_size: break # ... end while statment file.close() progress.stop() progress.join() else: logging.error('Create local file %s failed' % (self.filename)) # ... end if statment else: logging.error('Request file %s size failed' % (self.filename)) # ... end if statment else: logging.error('HTTP/HTTPS request failed, status code:%d' % (response.status)) # ... end if statment conn.close() return successed # ... end download() method # ... end Downloader class class DataWriter(threading.Thread): filename = '' data_dict = {'offset' : 0, 'buffers_byte' : b''} queue = Queue(128) __stop = False def __init__(self, filename): self.filename = filename threading.Thread.__init__(self) #Override def run(self): while not self.__stop: self.queue.get(True, 1) def put_data(data_dict): '''''將data_dict的數(shù)據(jù)放入隊列,data_dict是一個字典,有兩個元素:offset是偏移量,buffers_byte是二進制字節(jié)串''' self.queue.put(data_dict) def stop(self): self.__stop = True class Progress(threading.Thread): interval = 1 total_size = 0 finished_size = 0 old_size = 0 __stop = False def __init__(self, interval = 0.5): self.interval = interval threading.Thread.__init__(self) #Override def run(self): # logging.info(' Total Finished Percent Speed') print(' Total Finished Percent Speed') while not self.__stop: time.sleep(self.interval) if self.total_size > 0: percent = self.finished_size / self.total_size * 100 speed = (self.finished_size - self.old_size) / self.interval msg = '%12d %12d %10.2f%% %12d' % (self.total_size, self.finished_size, percent, speed) # logging.info(msg) print(msg) self.old_size = self.finished_size else: logging.error('Total size is zero') # ... end while statment # ... end run() method def stop(self): self.__stop = True def update(self, finished_size, total_size): self.finished_size = finished_size self.total_size = total_size class TestDownloaderFunctions(unittest.TestCase): def setUp(self): print('setUp') def test_download(self): url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe' filename = 'QQ8.4.exe' dl = Downloader(url, filename) dl.download() def tearDown(self): print('tearDown') if __name__ == '__main__': unittest.main()
這是測試結(jié)果:
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
基于python+pandoc實現(xiàn)html批量轉(zhuǎn)word
pandoc是一個強大的文檔格式轉(zhuǎn)換工具,支持豐富的格式轉(zhuǎn)換,并盡可能的保留原來的排版,號稱文檔格式轉(zhuǎn)換的瑞士軍刀,本文將給大家介紹一下使用python搭配pandoc實現(xiàn)html批量轉(zhuǎn)word,感興趣的朋友可以參考閱讀下2023-09-09TensorFlow——Checkpoint為模型添加檢查點的實例
今天小編就為大家分享一篇TensorFlow——Checkpoint為模型添加檢查點的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01用Python實現(xiàn)BP神經(jīng)網(wǎng)絡(luò)(附代碼)
這篇文章主要介紹了用Python實現(xiàn)BP神經(jīng)網(wǎng)絡(luò)(附代碼),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07使用Python Fast API發(fā)布API服務(wù)的過程詳解
這篇文章主要介紹了使用Python Fast API發(fā)布API服務(wù),本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04