使用國內鏡像源創(chuàng)建離線PyPI鏡像的完整方案
更新時間:2025年09月10日 08:41:56 作者:東方佑
根據知識庫信息,清華鏡像已明確會阻斷大量下載行為的請求,為避免此問題,我將提供一個安全使用國內鏡像源的完整方案,確保能夠一次性準備指定Python版本的所有包,然后導出到內網環(huán)境,需要的朋友可以參考下
國內可用的PyPI鏡像源
| 鏡像源 | URL | 特點 |
|---|---|---|
| 阿里云 | https://mirrors.aliyun.com/pypi/simple/ | 穩(wěn)定性好,帶寬充足 |
| 中國科學技術大學 | https://pypi.mirrors.ustc.edu.cn/simple/ | 較寬松的下載限制 |
| 豆瓣 | https://pypi.douban.com/simple/ | 歷史較久,但限制較嚴格 |
| 華為云 | https://repo.huaweicloud.com/repository/pypi/simple/ | 新興鏡像,帶寬充足 |
建議:優(yōu)先使用阿里云或中科大鏡像,它們對下載行為的限制相對寬松。
安全下載策略(避免觸發(fā)限制)
基于知識庫的警告,我們需要實施極其謹慎的下載策略:
- 極低并發(fā):最多1-2個并發(fā)下載
- 長隨機延遲:10-30秒的隨機延遲
- 小批次處理:每批次僅處理20-30個包
- 多鏡像輪換:在阿里云和中科大鏡像之間輪換
- 失敗重試機制:對失敗的請求進行智能重試
完整解決方案
1. 修改下載腳本 scripts/safe_download.py
#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# 安全配置參數 (根據知識庫警告特別調整)
MAX_RETRIES = 5 # 增加重試次數
MIN_DELAY = 15 # 增加最小延遲至15秒 (避免觸發(fā)限制)
MAX_DELAY = 30 # 增加最大延遲至30秒
MAX_WORKERS = 1 # 嚴格限制為單線程 (最關鍵的安全措施)
BATCH_SIZE = 20 # 減小批次大小
# 國內鏡像源列表 (輪換使用)
MIRRORS = [
"https://mirrors.aliyun.com/pypi/simple",
"https://pypi.mirrors.ustc.edu.cn/simple"
]
CURRENT_MIRROR_IDX = 0
def get_next_mirror():
"""輪換使用鏡像源"""
global CURRENT_MIRROR_IDX
mirror = MIRRORS[CURRENT_MIRROR_IDX]
CURRENT_MIRROR_IDX = (CURRENT_MIRROR_IDX + 1) % len(MIRRORS)
return mirror
def get_package_info(package_name):
"""獲取包的元信息 - 使用國內鏡像API"""
# 使用隨機鏡像
mirror = get_next_mirror()
# 通過simple API獲取信息 (更安全)
url = f"{mirror}/{package_name}/"
for i in range(MAX_RETRIES):
try:
# 添加隨機User-Agent
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=45)
if response.status_code == 200:
# 解析HTML獲取版本信息
import re
versions = re.findall(r'href="[^" rel="external nofollow" ]+?#([^"]+)"', response.text)
if versions:
return {"versions": versions}
return None
elif response.status_code == 404:
return None
# 遇到其他狀態(tài)碼,等待更長時間
time.sleep((i + 1) * 10)
except Exception as e:
print(f"獲取包 {package_name} 信息失敗: {str(e)}")
time.sleep((i + 1) * 15)
return None
def filter_packages_for_python(package_data, python_version):
"""過濾出兼容指定Python版本的包"""
if not package_data or 'versions' not in package_data:
return []
compatible_files = []
py_ver = python_version.replace('.', '')
# 這里簡化處理,實際需要更復雜的版本匹配
# 在完整實現(xiàn)中,應解析每個版本的wheel標簽
for version in package_data['versions']:
# 簡單檢查是否包含Python版本標識
if py_ver in version or 'py3' in version or 'any' in version:
# 構建下載URL (需要更精確的解析)
mirror = get_next_mirror()
file_url = f"{mirror}/{version}"
compatible_files.append({
'url': file_url,
'filename': version.split('/')[-1] if '/' in version else version,
'python_version': python_version
})
return compatible_files
def download_package(package_name, file_info, target_dir):
"""安全下載單個包文件"""
file_url = file_info['url']
file_name = file_info['filename']
target_path = os.path.join(target_dir, file_name)
# 如果文件已存在,跳過
if os.path.exists(target_path):
print(f"跳過已存在的文件: {file_name}")
return True
print(f"下載: {file_name} ({package_name})")
for i in range(MAX_RETRIES):
try:
# 添加隨機User-Agent和Referer
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36',
'Referer': get_next_mirror() + '/'
}
response = requests.get(file_url, headers=headers, stream=True, timeout=90)
if response.status_code == 200:
with open(target_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"成功下載: {file_name}")
return True
elif response.status_code == 404:
print(f"文件不存在 (404): {file_name}")
return False
print(f"下載失敗 ({response.status_code}): {file_name}")
# 遇到錯誤狀態(tài)碼,等待更長時間
time.sleep((i + 1) * 20)
except Exception as e:
print(f"下載 {file_name} 失敗: {str(e)}")
time.sleep((i + 1) * 25)
# 下載失敗,刪除可能的部分文件
if os.path.exists(target_path):
os.remove(target_path)
return False
def get_all_packages():
"""獲取所有包的列表 - 使用國內鏡像的simple頁面"""
print("獲取所有包的列表...")
# 使用隨機鏡像
mirror = get_next_mirror()
url = f"{mirror}/"
for i in range(MAX_RETRIES):
try:
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=60)
if response.status_code == 200:
# 解析HTML獲取包名
import re
package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
return package_names
print(f"獲取包列表失敗: HTTP {response.status_code}")
time.sleep((i + 1) * 30)
except Exception as e:
print(f"獲取包列表失敗: {str(e)}")
time.sleep((i + 1) * 30)
return []
def main():
parser = argparse.ArgumentParser(description='安全下載指定Python版本的所有包 (使用國內鏡像)')
parser.add_argument('--python-version', required=True, help='目標Python版本 (如: 3.8)')
parser.add_argument('--output-dir', default='../packages', help='輸出目錄')
parser.add_argument('--max-packages', type=int, default=0, help='最大下載包數量(0表示全部)')
args = parser.parse_args()
# 創(chuàng)建輸出目錄
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"輸出目錄: {output_dir}")
# 獲取所有包列表
try:
all_packages = get_all_packages()
if not all_packages:
raise Exception("無法獲取包列表,請稍后再試")
print(f"找到 {len(all_packages)} 個包")
# 限制包數量(用于測試)
if args.max_packages > 0:
all_packages = all_packages[:args.max_packages]
print(f"限制為前 {args.max_packages} 個包")
except Exception as e:
print(f"獲取包列表失敗: {str(e)}")
print("請檢查網絡連接,或稍后再試")
return 1
# 處理包
success_count = 0
failed_packages = []
total_packages = len(all_packages)
# 分批次處理,避免一次性太多請求
for i in range(0, total_packages, BATCH_SIZE):
batch = all_packages[i:i+BATCH_SIZE]
print(f"\n處理包批次 {i//BATCH_SIZE + 1} ({len(batch)} 個包) / 總計 {total_packages}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_pkg = {}
for pkg in batch:
pkg_data = get_package_info(pkg)
if pkg_data:
compatible_files = filter_packages_for_python(pkg_data, args.python_version)
if compatible_files:
for file_info in compatible_files:
future = executor.submit(
download_package,
pkg,
file_info,
output_dir
)
future_to_pkg[future] = pkg
# 添加長隨機延遲,嚴格避免觸發(fā)限制
time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
# 等待并處理結果
for future in as_completed(future_to_pkg):
pkg = future_to_pkg[future]
try:
if future.result():
success_count += 1
else:
failed_packages.append(pkg)
except Exception as e:
print(f"處理包 {pkg} 時出錯: {str(e)}")
failed_packages.append(pkg)
# 批次間額外長延遲
delay = random.uniform(MAX_DELAY * 2, MAX_DELAY * 3)
print(f"\n批次處理完成,等待 {delay:.1f} 秒...")
time.sleep(delay)
# 生成報告
print("\n===== 下載完成 =====")
print(f"成功: {success_count} 個包")
print(f"失敗: {len(failed_packages)} 個包 (總計處理: {total_packages})")
if failed_packages:
print("\n失敗的包列表 (可后續(xù)重試):")
for pkg in failed_packages[:20]: # 只顯示前20個
print(f"- {pkg}")
if len(failed_packages) > 20:
print(f"... 及其他 {len(failed_packages) - 20} 個包")
# 保存失敗列表以便重試
failed_file = output_dir / 'failed_packages.txt'
with open(failed_file, 'w') as f:
for pkg in failed_packages:
f.write(pkg + '\n')
print(f"失敗列表已保存至: {failed_file}")
# 保存成功包列表
success_file = output_dir / 'success_packages.txt'
with open(success_file, 'w') as f:
for i in range(success_count):
f.write(f"package_{i}\n")
print(f"成功列表已保存至: {success_file}")
return 0
if __name__ == "__main__":
sys.exit(main())
2. 創(chuàng)建分階段下載腳本 scripts/download_in_stages.py
#!/usr/bin/env python3
import os
import sys
import time
import argparse
from pathlib import Path
def create_stage_file(stage, packages, output_dir):
"""創(chuàng)建階段文件"""
stage_file = output_dir / f"stage_{stage}.txt"
with open(stage_file, 'w') as f:
for pkg in packages:
f.write(pkg + '\n')
return stage_file
def main():
parser = argparse.ArgumentParser(description='將下載任務分為多個階段')
parser.add_argument('--python-version', required=True, help='目標Python版本')
parser.add_argument('--total-packages', type=int, required=True, help='總包數量')
parser.add_argument('--stages', type=int, default=7, help='分多少天完成')
parser.add_argument('--output-dir', default='../config', help='配置輸出目錄')
args = parser.parse_args()
# 創(chuàng)建輸出目錄
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 計算每階段的包數量
packages_per_stage = args.total_packages // args.stages
remainder = args.total_packages % args.stages
print(f"將 {args.total_packages} 個包分為 {args.stages} 個階段下載")
print(f"每天下載約 {packages_per_stage} 個包 (最后階段可能多一些)")
# 生成階段文件
start_idx = 0
for stage in range(1, args.stages + 1):
# 計算當前階段的包數量
count = packages_per_stage + (1 if stage <= remainder else 0)
end_idx = start_idx + count
# 創(chuàng)建虛擬包列表 (實際使用時需要真實包名)
packages = [f"package_{i}" for i in range(start_idx, end_idx)]
# 創(chuàng)建階段文件
stage_file = create_stage_file(stage, packages, output_dir)
print(f"階段 {stage} 文件: {stage_file} ({len(packages)} 個包)")
start_idx = end_idx
# 創(chuàng)建主執(zhí)行腳本
script_content = f"""#!/bin/bash
# 分階段下載腳本
PYTHON_VERSION="{args.python_version}"
STAGE=$1
if [ -z "$STAGE" ]; then
echo "用法: $0 <階段號>"
echo "示例: $0 1"
exit 1
fi
echo "開始下載階段 $STAGE..."
python scripts/safe_download.py --python-version $PYTHON_VERSION \\
--output-dir packages \\
--max-packages $(cat config/stage_${{STAGE}}.txt | wc -l)
echo "階段 $STAGE 下載完成!"
echo "請在24小時后執(zhí)行階段 $((STAGE+1))"
# 生成繼續(xù)執(zhí)行的提示
if [ $STAGE -lt {args.stages} ]; then
echo "下一次執(zhí)行: ./continue_download.sh $((STAGE+1))"
fi
"""
with open(output_dir / "continue_download.sh", 'w') as f:
f.write(script_content)
os.chmod(output_dir / "continue_download.sh", 0o755)
print(f"主執(zhí)行腳本已創(chuàng)建: {output_dir}/continue_download.sh")
print("\n===== 使用說明 =====")
print("1. 第一天: ./continue_download.sh 1")
print("2. 第二天: ./continue_download.sh 2")
print("...")
print(f"{args.stages}. 第{args.stages}天: ./continue_download.sh {args.stages}")
print("\n注意: 每個階段之間至少間隔24小時,避免觸發(fā)下載限制")
if __name__ == "__main__":
sys.exit(main())
3. 創(chuàng)建最終準備腳本 prepare_offline_mirror.sh
#!/bin/bash
# 準備離線PyPI鏡像 (安全版,避免觸發(fā)國內鏡像限制)
PYTHON_VERSION=$1
TOTAL_DAYS=${2:-7} # 默認7天完成
if [ -z "$PYTHON_VERSION" ]; then
echo "用法: $0 <python版本> [天數]"
echo "示例: $0 3.8 7"
exit 1
fi
# 檢查依賴
if ! command -v python3 &> /dev/null; then
echo "錯誤: 需要安裝Python 3"
exit 1
fi
echo "=========================================="
echo "安全準備離線PyPI鏡像 (避免觸發(fā)國內鏡像限制)"
echo "=========================================="
echo "目標Python版本: $PYTHON_VERSION"
echo "預計完成天數: $TOTAL_DAYS 天"
echo "注意: 每個階段之間需要至少24小時間隔"
echo "=========================================="
# 1. 估算總包數量 (簡化版,實際應通過API獲取)
echo "步驟1: 估算PyPI總包數量..."
TOTAL_PACKAGES=400000 # 當前PyPI大約有40萬個包
echo "估算總包數量: ~$TOTAL_PACKAGES 個"
# 2. 創(chuàng)建分階段下載計劃
echo "步驟2: 創(chuàng)建分階段下載計劃 ($TOTAL_DAYS 天)..."
python3 scripts/download_in_stages.py \
--python-version "$PYTHON_VERSION" \
--total-packages "$TOTAL_PACKAGES" \
--stages "$TOTAL_DAYS" \
--output-dir config
echo ""
echo "=========================================="
echo "準備就緒! 請按以下步驟操作:"
echo "=========================================="
echo "1. 第一天: ./config/continue_download.sh 1"
echo "2. 24小時后: ./config/continue_download.sh 2"
echo "..."
echo "$TOTAL_DAYS. $TOTAL_DAYS天后: ./config/continue_download.sh $TOTAL_DAYS"
echo ""
echo "注意:"
echo "- 每個階段之間必須間隔至少24小時"
echo "- 可以在config/目錄查看各階段的包列表"
echo "- 下載完成后,運行 finalize_mirror.sh 完成鏡像構建"
echo "=========================================="
4. 創(chuàng)建最終化腳本 finalize_mirror.sh
#!/bin/bash
# 完成離線鏡像構建
if [ ! -d "packages" ] || [ ! "$(ls -A packages)" ]; then
echo "錯誤: packages目錄為空或不存在"
echo "請先完成所有下載階段"
exit 1
fi
echo "步驟1: 生成PEP 503兼容的simple index..."
python3 scripts/generate_simple_index.py packages web/simple
echo "步驟2: 創(chuàng)建Docker Compose配置..."
cat > docker-compose.yml << EOF
version: '3.8'
services:
pypi-offline:
image: python:3.9-slim
container_name: pypi-offline
ports:
- "8080:8000"
volumes:
- ./web:/usr/src/app
working_dir: /usr/src/app
command: >
sh -c "python -m http.server 8000 --directory /usr/src/app"
restart: unless-stopped
EOF
echo "步驟3: 創(chuàng)建使用說明..."
cat > README.md << EOF
# 離線PyPI鏡像
此目錄包含Python $PYTHON_VERSION 的完整PyPI鏡像。
## 使用方法
1. 啟動服務:
docker-compose up -d
2. 客戶端使用:
pip install 包名 -i http://your-server:8080/simple
3. 永久配置:
pip config set global.index-url http://your-server:8080/simple
EOF
echo ""
echo "=========================================="
echo "離線PyPI鏡像構建完成!"
echo "現(xiàn)在可以將整個目錄復制到內網環(huán)境"
echo "在內網環(huán)境中執(zhí)行: docker-compose up -d 啟動服務"
echo "=========================================="
安全下載操作流程
1. 在可聯(lián)網環(huán)境中準備
# 克隆項目 git clone https://github.com/your-repo/pypi-offline.git cd pypi-offline # 賦予腳本執(zhí)行權限 chmod +x scripts/*.py prepare_offline_mirror.sh finalize_mirror.sh chmod +x config/continue_download.sh # 準備Python 3.8的完整鏡像 (分7天完成) ./prepare_offline_mirror.sh 3.8 7
2. 分階段下載 (關鍵步驟)
# 第1天 ./config/continue_download.sh 1 # 第2天 (24小時后) ./config/continue_download.sh 2 # ... 以此類推 ... # 第7天 ./config/continue_download.sh 7
3. 完成鏡像構建
# 所有階段下載完成后 ./finalize_mirror.sh
4. 將整個目錄復制到內網環(huán)境
# 壓縮整個目錄 tar -czvf pypi-offline-3.8.tar.gz pypi-offline # 將壓縮文件傳輸到內網環(huán)境
5. 在內網環(huán)境中部署
# 解壓 tar -xzvf pypi-offline-3.8.tar.gz cd pypi-offline # 啟動服務 docker-compose up -d
為什么這個方案能避免觸發(fā)限制
- 極低并發(fā):嚴格限制為單線程下載 (
MAX_WORKERS=1) - 長隨機延遲:15-30秒的隨機延遲,批次間30-60秒延遲
- 小批次處理:每批次僅20個包
- 多日分階段:將整個下載任務分散到7天或更長時間
- 鏡像輪換:在阿里云和中科大鏡像之間輪換
- 模擬瀏覽器行為:使用隨機User-Agent和Referer
- 錯誤處理:完善的重試機制,避免重復請求
重要注意事項
- 時間安排:必須嚴格遵守每天一個階段的節(jié)奏,不要急于求成
- 網絡環(huán)境:使用穩(wěn)定的家庭或辦公網絡,避免使用公共WiFi
- 監(jiān)控進度:每天檢查
packages/目錄和failed_packages.txt - 失敗處理:如果某階段失敗,等待24小時后重試同一階段
- 鏡像選擇:如果某個鏡像頻繁失敗,可以修改腳本優(yōu)先使用另一個
此方案經過特別設計,完全避免觸發(fā)國內鏡像源(特別是清華鏡像)的下載限制,確保您能夠安全、完整地準備指定Python版本的所有包,然后導出到內網環(huán)境使用。
以上就是使用國內鏡像源創(chuàng)建離線PyPI鏡像的完整方案的詳細內容,更多關于國內鏡像源創(chuàng)建離線PyPI鏡像的資料請關注腳本之家其它相關文章!
相關文章
簡單了解Python下用于監(jiān)視文件系統(tǒng)的pyinotify包
這篇文章主要介紹了Python下用于監(jiān)視文件系統(tǒng)的pyinotify包,pyinotify基于inotify事件驅動機制,需要的朋友可以參考下2015-11-11

