C++利用libcurl庫實現(xiàn)多線程文件下載
可以平時在linux上當作自己的一個小小的下載器
linux環(huán)境準備
1.在項目目錄下新建:
- build 目錄: 用來編譯
- src目錄:用來編寫源碼(本例比較簡單,就沒建了)
2.在build目錄下新建CMakeLists.txt,寫入以下內(nèi)容
cmake_minimum_required(VERSION 3.3) project(ThreadDownLoad) set(CMAKE_CXX_STANDARD 14) # 這里要寫../main.cpp,因為需要在構(gòu)建目錄下make,可執(zhí)行文件要填相對構(gòu)建目錄的路徑 add_executable(ThreadDownLoad ../main.cpp)
3.Cmake生成Makefile等文件
Cmake . -B ./build
4.完成main.cpp,進行make
注意可執(zhí)行文件設(shè)置,因為需要在構(gòu)建目錄下make,可執(zhí)行文件要填相對構(gòu)建目錄的路徑
多線程文件下載
實現(xiàn)功能:
網(wǎng)絡(luò)文件下載:libcurl庫,設(shè)置文件寫入函數(shù),以及下載進度回調(diào)函數(shù)
多文件下載:先獲取服務(wù)器文件大小,將文件分段平均分配給子線程下載
慎用mmap,雖然它能較少磁盤IO,但是它只能進行整個文件的映射,可能會造成內(nèi)存不足
斷點續(xù)傳:注冊信號(如中斷信號),信號發(fā)生時,將子線程的下載range存入文件
文件下載方式
mmap內(nèi)存映射
- 先向服務(wù)器發(fā)送請求,獲取文件大小fileLen,創(chuàng)建文件,將其指針移動到fileLen,寫入數(shù)據(jù)1
- 類似迅雷下載預(yù)先分配大小),將其mmap映射到內(nèi)存,然后再發(fā)請求進行下載
fwrite直接寫文件
這里選擇第二種
mmap實現(xiàn)內(nèi)存映射原理:
- 調(diào)用mmap函數(shù)時,系統(tǒng)會為這個文件在進程的地址空間中分配一塊虛擬內(nèi)存區(qū)域,這個區(qū)域的大小通常是文件大小的整數(shù)倍。
- 系統(tǒng)會將文件的內(nèi)容讀入內(nèi)存中,并將這個虛擬內(nèi)存區(qū)域與文件建立映射關(guān)系,使得進程可以通過對這個虛擬內(nèi)存區(qū)域的訪問來訪問文件的內(nèi)容。
- 進程對這個虛擬內(nèi)存區(qū)域的訪問會被轉(zhuǎn)換為對文件的讀寫操作,這些操作會被操作系統(tǒng)自動同步到磁盤上,從而保證文件內(nèi)容的一致性。
- 當進程不再需要這個虛擬內(nèi)存區(qū)域時,可以調(diào)用munmap函數(shù)來釋放這個區(qū)域,這個區(qū)域的內(nèi)容也會被自動同步到磁盤上。
mmap實現(xiàn)進程貢獻原理:(mmap不是在進程內(nèi)分配虛擬內(nèi)存嗎,怎么實現(xiàn)進程的共享內(nèi)存呢?)
- 使用共享文件:進程A和進程B都打開同一個文件,并使用mmap將這個文件映射到自己的虛擬內(nèi)存中。這樣,進程A和進程B就可以通過對這個虛擬內(nèi)存區(qū)域的訪問來實現(xiàn)共享內(nèi)存。當一個進程修改了這個虛擬內(nèi)存區(qū)域的內(nèi)容時,操作系統(tǒng)會自動同步到磁盤上,從而保證數(shù)據(jù)的一致性。
- 使用匿名映射:進程A和進程B都使用mmap函數(shù)創(chuàng)建一個匿名映射區(qū)域,并將這個區(qū)域映射到自己的虛擬內(nèi)存中。這樣,進程A和進程B就可以通過對這個虛擬內(nèi)存區(qū)域的訪問來實現(xiàn)共享內(nèi)存。當一個進程修改了這個虛擬內(nèi)存區(qū)域的內(nèi)容時,操作系統(tǒng)會自動同步到物理內(nèi)存中,從而保證數(shù)據(jù)的一致性。
加鎖控制:
- 無論是使用共享文件還是匿名映射,進程間共享內(nèi)存的原理都是一樣的,即通過將同一個物理內(nèi)存區(qū)域映射到多個進程的虛擬內(nèi)存中來實現(xiàn)共享。 =》 核心原理
- 由于多個進程可以同時訪問這個物理內(nèi)存區(qū)域,因此需要使用同步機制來保證數(shù)據(jù)的一致性。常用的同步機制包括信號量、互斥鎖、讀寫鎖等。
多線程下載
首先,開啟10個(根據(jù)文件大小和機器性能確定)線程,然后為每個子線程創(chuàng)建一個fileInfo實例
,構(gòu)成一個fileInfo指針數(shù)組傳給每個線程(方便子線程打印下載進度)
struct fileInfo { const char *url; // 服務(wù)器下載路徑 char *fileptr; // 共享內(nèi)存指針 int offset; // start int end; // end pthread_t thid; double download; // 當前線程下載量 double totalDownload; // 所有線程總下載量 FILE *recordfile; // 文件句柄 };
開啟線程下載:
int download(const char *url, const char *filename) { // 1. 先向服務(wù)器發(fā)送請求,獲取文件大小,創(chuàng)建本地文件(迅雷下載預(yù)先分配大?。?,然后再發(fā)一次請求進行下載 // 文件下載兩種思路:fwrite直接寫文件;預(yù)先創(chuàng)建文件通過mmap內(nèi)存映射;這里選第二種 long fileLength = getDownloadFileLength(url); printf("downloadFileLength: %ld\n", fileLength); // write int fd = open(filename, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); // if (fd == -1) { return -1; } // 移動文件指針,使得預(yù)創(chuàng)建的文件和服務(wù)器的文件一樣大(在最后寫一個1) if (-1 == lseek(fd, fileLength-1, SEEK_SET)) { perror("lseek"); close(fd); return -1; } if (1 != write(fd, "", 1)) { perror("write"); close(fd); return -1; } // 2. 將文件映射到內(nèi)存 char *fileptr = (char *)mmap(NULL, fileLength, PROT_READ| PROT_WRITE, MAP_SHARED, fd, 0); if (fileptr == MAP_FAILED) { perror("mmap"); close(fd); return -1; } FILE *fp = fopen("a.txt", "r"); // thread arg int i = 0; long partSize = fileLength / THREAD_NUM; struct fileInfo *info[THREAD_NUM+1] = {NULL}; // 3.創(chuàng)建多個線程,每個線程一個文件下載器 for (i = 0;i <= THREAD_NUM;i ++) { info[i] = (struct fileInfo*)malloc(sizeof(struct fileInfo)); memset(info[i], 0, sizeof(struct fileInfo)); // 多個線程將文件分塊下載 info[i]->offset = i * partSize; if (i < THREAD_NUM) { info[i]->end = (i+1) * partSize - 1; } else { info[i]->end = fileLength - 1; } info[i]->fileptr = fileptr; // 內(nèi)存映射(多線程共享內(nèi)存) info[i]->url = url; info[i]->download = 0; info[i]->recordfile = fp; } pInfoTable = info; //pthread_t thid[THREAD_NUM+1] = {0}; for (i = 0;i <= THREAD_NUM;i ++) { pthread_create(&(info[i]->thid), NULL, worker, info[i]); // 將要傳給worker的文件信息封裝成一個結(jié)構(gòu)體 usleep(1); } // 等到子線程都退出,子線程才退出 for (i = 0;i <= THREAD_NUM;i ++) { pthread_join(info[i]->thid, NULL); } // 釋放資源 for (i = 0;i <= THREAD_NUM;i ++) { free(info[i]); } if (fp) fclose(fp); munmap(fileptr, fileLength); close(fd); return 0; }
子線程利用curl庫下載
子線程實現(xiàn)邏輯: 文件寫入回調(diào)與進度回調(diào)
void *worker(void *arg) { struct fileInfo *info = (struct fileInfo*)arg; char range[64] = {0}; // mutex_lock if (info->recordfile) { // 用于斷點續(xù)傳,從文件中讀取range fscanf(info->recordfile, "%d-%d", &info->offset, &info->end); } // mutex_unlock if (info->offset > info->end) return NULL; snprintf(range, 64, "%d-%d", info->offset, info->end); CURL *curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, info->url); // url curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunc); // 文件下載函數(shù),libcurl在寫入時執(zhí)行 curl_easy_setopt(curl, CURLOPT_WRITEDATA, info); // 寫入回調(diào)傳參 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); // progress =》 沒有下載進度為0 curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressFunc); // 回調(diào)函數(shù):libcurl發(fā)起請求時調(diào)用 curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, info); // 進度回調(diào)函數(shù)傳參 curl_easy_setopt(curl, CURLOPT_RANGE, range); // 控制從服務(wù)器文件的下載范圍 // http range CURLcode res = curl_easy_perform(curl); if (res != CURLE_OK) { printf("res %d\n", res); } curl_easy_cleanup(curl); return NULL; }
文件寫入:將服務(wù)器傳回的n塊文件寫入共享內(nèi)存,這里不需要加鎖,每個進程寫入的共享內(nèi)存地址不一樣
// 寫入回調(diào):ptr 參數(shù)是接收到的數(shù)據(jù)的指針;size 參數(shù)是每個數(shù)據(jù)塊的大?。籱emb 參數(shù)是數(shù)據(jù)塊的數(shù)量 size_t writeFunc(void *ptr, size_t size, size_t memb, void *userdata) { struct fileInfo *info = (struct fileInfo *)userdata; printf("writeFunc\n"); memcpy(info->fileptr + info->offset, ptr, size * memb); info->offset += size * memb; return size * memb; }
進度回調(diào):子進程打印文件的當前下載進度
// 進度回調(diào) int progressFunc(void *userdata, double totalDownload, double nowDownload, double totalUpload, double nowUpload) { printf("progressFunc\n"); int percent = 0; static int print = 1; struct fileInfo *info = (struct fileInfo*)userdata; info->download = nowDownload; info->totalDownload = totalDownload; // save if (totalDownload > 0) { int i = 0; double allDownload = 0; double total = 0; for (i = 0;i <= THREAD_NUM;i ++) { allDownload += pInfoTable[i]->download; total += pInfoTable[i]->totalDownload; } percent = (int)(allDownload / total * 100); } if (percent == print) { printf("threadid: %ld, percent: %d%%\n", info->thid, percent); print += 1; } return 0; }
斷點續(xù)傳
注冊一個信號(如中斷信號),當信號發(fā)生時,在文件中記錄每個線程的下一次下載range,下一次子線程下載時,下載文件中的range部分即可
// 當收到中斷信號后,記錄每個線程的下載進度 void signal_handler(int signum) { printf("signum: %d\n", signum); int fd = open("a.txt", O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); if (fd == -1) { exit(1); } int i = 0; for (i = 0;i <= THREAD_NUM;i ++) { char range[64] = {0}; snprintf(range, 64, "%d-%d\r\n", pInfoTable[i]->offset, pInfoTable[i]->end); write(fd, range, strlen(range)); } close(fd); exit(1); }
源碼
// gcc -o multi_download multi_download.c -lcurl #include <stdio.h> #include <unistd.h> #include <curl/curl.h> #include <fcntl.h> // 需要在linux上使用 #include <sys/mman.h> #include <string.h> #include <stdlib.h> #include <pthread.h> #include <signal.h> // fork(); struct fileInfo { const char *url; // 服務(wù)器下載路徑 char *fileptr; // 共享內(nèi)存指針 int offset; // start int end; // end pthread_t thid; double download; // 當前線程下載量 double totalDownload; // 所有線程總下載量 FILE *recordfile; // 文件句柄 }; #define THREAD_NUM 10 struct fileInfo **pInfoTable; // 二級指針=>指針數(shù)組 double downloadFileLength = 0; // 寫入回調(diào):ptr 參數(shù)是接收到的數(shù)據(jù)的指針;size 參數(shù)是每個數(shù)據(jù)塊的大?。籱emb 參數(shù)是數(shù)據(jù)塊的數(shù)量 size_t writeFunc(void *ptr, size_t size, size_t memb, void *userdata) { struct fileInfo *info = (struct fileInfo *)userdata; printf("writeFunc\n"); memcpy(info->fileptr + info->offset, ptr, size * memb); info->offset += size * memb; return size * memb; } // 進度回調(diào) int progressFunc(void *userdata, double totalDownload, double nowDownload, double totalUpload, double nowUpload) { printf("progressFunc\n"); int percent = 0; static int print = 1; struct fileInfo *info = (struct fileInfo*)userdata; info->download = nowDownload; info->totalDownload = totalDownload; // save if (totalDownload > 0) { int i = 0; double allDownload = 0; double total = 0; for (i = 0;i <= THREAD_NUM;i ++) { allDownload += pInfoTable[i]->download; total += pInfoTable[i]->totalDownload; } percent = (int)(allDownload / total * 100); } if (percent == print) { printf("threadid: %ld, percent: %d%%\n", info->thid, percent); print += 1; } return 0; } // double getDownloadFileLength(const char *url) { CURL *curl = curl_easy_init(); printf("url: %s\n", url); curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_USERAGENT, "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"); curl_easy_setopt(curl, CURLOPT_HEADER, 1); curl_easy_setopt(curl, CURLOPT_NOBODY, 1); CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { printf("downloadFileLength success\n"); curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &downloadFileLength); } else { printf("downloadFileLength error\n"); downloadFileLength = -1; } curl_easy_cleanup(curl); return downloadFileLength; } int recordnum = 0; void *worker(void *arg) { struct fileInfo *info = (struct fileInfo*)arg; char range[64] = {0}; // mutex_lock if (info->recordfile) { // 用于斷點續(xù)傳,從文件中讀取range fscanf(info->recordfile, "%d-%d", &info->offset, &info->end); } // mutex_unlock if (info->offset > info->end) return NULL; snprintf(range, 64, "%d-%d", info->offset, info->end); CURL *curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, info->url); // url curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunc); // 文件下載函數(shù),libcurl在寫入時執(zhí)行 curl_easy_setopt(curl, CURLOPT_WRITEDATA, info); // 寫入回調(diào)傳參 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); // progress =》 沒有下載進度為0 curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressFunc); // 回調(diào)函數(shù):libcurl發(fā)起請求時調(diào)用 curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, info); // 進度回調(diào)函數(shù)傳參 curl_easy_setopt(curl, CURLOPT_RANGE, range); // 控制從服務(wù)器文件的下載范圍 // http range CURLcode res = curl_easy_perform(curl); if (res != CURLE_OK) { printf("res %d\n", res); } curl_easy_cleanup(curl); return NULL; } // https://releases.ubuntu.com/22.04/ubuntu-22.04.2-live-server-amd64.iso.zsync // ubuntu.zsync.backup int download(const char *url, const char *filename) { // 1. 先向服務(wù)器發(fā)送請求,獲取文件大小,創(chuàng)建本地文件(迅雷下載預(yù)先分配大?。?,然后再發(fā)一次請求進行下載 // 文件下載兩種思路:fwrite直接寫文件;預(yù)先創(chuàng)建文件通過mmap內(nèi)存映射;這里選第二種 long fileLength = getDownloadFileLength(url); printf("downloadFileLength: %ld\n", fileLength); // write int fd = open(filename, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); // if (fd == -1) { return -1; } // 移動文件指針,使得預(yù)創(chuàng)建的文件和服務(wù)器的文件一樣大(在最后寫一個1) if (-1 == lseek(fd, fileLength-1, SEEK_SET)) { perror("lseek"); close(fd); return -1; } if (1 != write(fd, "", 1)) { perror("write"); close(fd); return -1; } // 2. 將文件映射到內(nèi)存 char *fileptr = (char *)mmap(NULL, fileLength, PROT_READ| PROT_WRITE, MAP_SHARED, fd, 0); if (fileptr == MAP_FAILED) { perror("mmap"); close(fd); return -1; } FILE *fp = fopen("a.txt", "r"); // thread arg int i = 0; long partSize = fileLength / THREAD_NUM; struct fileInfo *info[THREAD_NUM+1] = {NULL}; // 3.創(chuàng)建多個線程,每個線程一個文件下載器 for (i = 0;i <= THREAD_NUM;i ++) { info[i] = (struct fileInfo*)malloc(sizeof(struct fileInfo)); memset(info[i], 0, sizeof(struct fileInfo)); // 多個線程將文件分塊下載 info[i]->offset = i * partSize; if (i < THREAD_NUM) { info[i]->end = (i+1) * partSize - 1; } else { info[i]->end = fileLength - 1; } info[i]->fileptr = fileptr; // 內(nèi)存映射(多線程共享內(nèi)存) info[i]->url = url; info[i]->download = 0; info[i]->recordfile = fp; } pInfoTable = info; //pthread_t thid[THREAD_NUM+1] = {0}; for (i = 0;i <= THREAD_NUM;i ++) { pthread_create(&(info[i]->thid), NULL, worker, info[i]); // 將要傳給worker的文件信息封裝成一個結(jié)構(gòu)體 usleep(1); } // 等到子線程都退出,子線程才退出 for (i = 0;i <= THREAD_NUM;i ++) { pthread_join(info[i]->thid, NULL); } // 釋放資源 for (i = 0;i <= THREAD_NUM;i ++) { free(info[i]); } if (fp) fclose(fp); munmap(fileptr, fileLength); close(fd); return 0; } // 當收到中斷信號后,記錄每個線程的下載進度 void signal_handler(int signum) { printf("signum: %d\n", signum); int fd = open("a.txt", O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); if (fd == -1) { exit(1); } int i = 0; for (i = 0;i <= THREAD_NUM;i ++) { char range[64] = {0}; snprintf(range, 64, "%d-%d\r\n", pInfoTable[i]->offset, pInfoTable[i]->end); write(fd, range, strlen(range)); } close(fd); exit(1); } // #if 1 // 2G: ./ThreadDownLoad https://releases.ubuntu.com/22.04/ubuntu-22.04.3-live-server-amd64.iso ubuntu.zenger // 11M: ./ThreadDownLoad https://releases.ubuntu.com/22.04/ubuntu-22.04.3-desktop-amd64.iso.zsync ubuntu.zenger int main(int argc, const char *argv[]) { if (argc != 3) { printf("arg error\n"); return -1; } if (SIG_ERR == signal(SIGINT, signal_handler)) { perror("signal"); return -1; } return download(argv[1], argv[2]); } #endif
以上就是C++利用libcurl庫實現(xiàn)多線程文件下載的詳細內(nèi)容,更多關(guān)于C++ libcurl文件下載的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C語言實現(xiàn)數(shù)據(jù)結(jié)構(gòu)和雙向鏈表操作
這篇文章主要介紹了C語言實現(xiàn)數(shù)據(jù)結(jié)構(gòu)雙向鏈表操作,需要的朋友可以參考下2017-03-03虛函數(shù)表-C++多態(tài)的實現(xiàn)原理解析
這篇文章主要介紹了虛函數(shù)表-C++多態(tài)的實現(xiàn)原理,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-02-02C++中volatile關(guān)鍵字及常見的誤解總結(jié)
這篇文章主要給大家介紹了關(guān)于C++中volatile關(guān)鍵字及常見的誤解的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2018-05-05詳細分析C++ 數(shù)據(jù)封裝和數(shù)據(jù)抽象
這篇文章主要介紹了C++ 數(shù)據(jù)封裝和數(shù)據(jù)抽象的的相關(guān)資料,文中代碼非常詳細,幫助大家更好的理解和學習,感興趣的朋友可以了解下2020-06-06vc6.0中c語言控制臺程序中的定時技術(shù)(定時器)
這篇文章主要介紹了vc6.0中c語言控制臺程序中的定時技術(shù)(定時器),需要的朋友可以參考下2014-04-04