Python 工具類實(shí)現(xiàn)大文件斷點(diǎn)續(xù)傳功能詳解
依賴
os、sys、requests
工具代碼
廢話不多說(shuō),上代碼。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Oct 23 13:54:39 2021
@author: huyi
"""
import os
import sys
import requests
def download(url, file_path):
# 重試計(jì)數(shù)
count = 0
# 第一次請(qǐng)求是為了得到文件總大小
r1 = requests.get(url, stream=True, verify=False)
total_size = int(r1.headers['Content-Length'])
# 判斷本地文件是否存在,存在則讀取文件數(shù)據(jù)大小
if os.path.exists(file_path):
temp_size = os.path.getsize(file_path) # 本地已經(jīng)下載的文件大小
else:
temp_size = 0
# 對(duì)比一下,是不是還沒(méi)下完
print(temp_size)
print(total_size)
# 開(kāi)始下載
while count < 10:
if count != 0:
temp_size = os.path.getsize(file_path)
# 文件大小一致,跳出循環(huán)
if temp_size >= total_size:
break
count += 1
print(
"第[{}]次下載文件,已經(jīng)下載數(shù)據(jù)大小:[{}],應(yīng)下載數(shù)據(jù)大小:[{}]".format(
count, temp_size, total_size))
# 重新請(qǐng)求網(wǎng)址,加入新的請(qǐng)求頭的
# 核心部分,這個(gè)是請(qǐng)求下載時(shí),從本地文件已經(jīng)下載過(guò)的后面下載
headers = {"Range": f"bytes={temp_size}-{total_size}"}
# r = requests.get(url, stream=True, verify=False)
r = requests.get(url, stream=True, verify=False, headers=headers)
# "ab"表示追加形式寫(xiě)入文件
with open(file_path, "ab") as f:
if count != 1:
f.seek(temp_size)
for chunk in r.iter_content(chunk_size=1024 * 64):
if chunk:
temp_size += len(chunk)
f.write(chunk)
f.flush()
###這是下載實(shí)現(xiàn)進(jìn)度顯示####
done = int(50 * temp_size / total_size)
sys.stdout.write("\r[%s%s] %d%%" % (
'█' * done, ' ' * (50 - done), 100 * temp_size / total_size))
sys.stdout.flush()
print("\n")
return file_path
代碼說(shuō)明:
1、重試次數(shù)可以自己修改,按照需求來(lái),我這邊是10次。
2、增加了進(jìn)度條的打印,別問(wèn),好看就完了。
驗(yàn)證一下,我們準(zhǔn)備個(gè)文件下載服務(wù)。上文件服務(wù)代碼。代碼對(duì)flask、gevent有依賴。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Oct 23 19:53:18 2021
@author: huyi
"""
from flask import Flask, request, make_response, send_from_directory
from gevent.pywsgi import WSGIServer
from gevent import monkey
# 將python標(biāo)準(zhǔn)的io方法,都替換成gevent中的同名方法,遇到io阻塞gevent自動(dòng)進(jìn)行協(xié)程切換
monkey.patch_all()
app = Flask(__name__)
@app.route("/download", methods=['GET'])
def download_file():
get_data = request.args.to_dict()
file_path = get_data.get('fileName')
response = make_response(
send_from_directory('/Users/huyi/Movies/Videos',file_path,as_attachment=True))
response.headers["Content-Disposition"] = "attachment; filename={}".format(
file_path.encode().decode('latin-1'))
return response
if __name__ == '__main__':
WSGIServer(('0.0.0.0', 8080), app).serve_forever()
啟動(dòng)文件下載服務(wù),測(cè)試下載代碼
download('http://localhost:8080/download?fileName=test.mp4', '/Users/huyi/Downloads/test.mp4')
首先我們下載一部分,然后關(guān)閉,模擬下載一半的情況。

重新執(zhí)行一下,把剩下的執(zhí)行

OK,驗(yàn)證通過(guò)。
總結(jié)
斷點(diǎn)續(xù)傳的這部分代碼,你可以好好理解一下原理,其實(shí)不復(fù)雜。還是不明白可以私信我。
如果本文對(duì)你有幫助,請(qǐng)點(diǎn)個(gè)贊支持一下吧。

到此這篇關(guān)于Python 工具類實(shí)現(xiàn)大文件斷點(diǎn)續(xù)傳功能詳解的文章就介紹到這了,更多相關(guān)Python 文件下載內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Django xadmin 實(shí)現(xiàn)修改時(shí)間選擇器為不可輸入狀態(tài)
這篇文章主要介紹了使用Django xadmin 實(shí)現(xiàn)修改時(shí)間選擇器為不可輸入狀態(tài),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03
postman傳遞當(dāng)前時(shí)間戳實(shí)例詳解
在本篇文章里小編給大家整理的是一篇關(guān)于postman傳遞當(dāng)前時(shí)間戳知識(shí)點(diǎn)相關(guān)內(nèi)容,有需要的朋友們可以學(xué)習(xí)下。2019-09-09
python使用paramiko模塊實(shí)現(xiàn)ssh遠(yuǎn)程登陸上傳文件并執(zhí)行
使用paramiko,ssh遠(yuǎn)程登陸,長(zhǎng)傳文件并執(zhí)行。其中用到了多線程和隊(duì)列,paramiko是用python語(yǔ)言寫(xiě)的一個(gè)模塊,遵循SSH2協(xié)議,支持以加密和認(rèn)證的方式,進(jìn)行遠(yuǎn)程服務(wù)器的連接。2014-01-01
python 對(duì)一幅灰度圖像進(jìn)行直方圖均衡化
這篇文章主要介紹了python 如何對(duì)一幅灰度圖像進(jìn)行直方圖均衡化,幫助大家更好的利用python處理圖像,感興趣的朋友可以了解下2020-10-10
Python光學(xué)仿真學(xué)習(xí)處理高斯光束分布圖像
這篇文章主要為大家介紹了Python光學(xué)仿真學(xué)習(xí)之如何處理高斯光束的分布圖像,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-10-10
Python閉包實(shí)現(xiàn)計(jì)數(shù)器的方法
這篇文章主要介紹了Python閉包實(shí)現(xiàn)計(jì)數(shù)器的方法,分析了閉包的概念及實(shí)現(xiàn)計(jì)數(shù)器的相關(guān)技巧,需要的朋友可以參考下2015-05-05

