Linux環(huán)境下實現(xiàn)多進程Socket通信功能
需求分析
我們的目標是實現(xiàn)以下功能:
- 服務(wù)器能夠啟動多個客戶端進程,客戶端數(shù)量可配置
- 每個客戶端在不同的端口上監(jiān)聽
- 服務(wù)器能夠連接到所有客戶端
- 服務(wù)器能夠同時向所有客戶端發(fā)送消息
- 客戶端接收到消息后發(fā)送確認
- 程序能夠優(yōu)雅地處理終止信號
系統(tǒng)架構(gòu)
整個系統(tǒng)由兩個主要組件組成:
客戶端程序(Client):
- 在指定端口上監(jiān)聽連接
- 接收服務(wù)器發(fā)送的消息
- 發(fā)送確認消息給服務(wù)器
服務(wù)器程序(Server):
- 從配置文件讀取設(shè)置
- 啟動多個客戶端進程
- 連接到每個客戶端
- 向所有客戶端發(fā)送消息
- 接收客戶端的確認消息
實現(xiàn)細節(jié)
客戶端實現(xiàn)(client.cpp)
客戶端程序需要創(chuàng)建一個Socket,在指定端口上監(jiān)聽連接,接收服務(wù)器發(fā)送的消息,并發(fā)送確認。
#include <iostream> #include <cstring> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #define BUFFER_SIZE 1024 #define DEFAULT_PORT 8888 bool running = true; void signalHandler(int signum) { std::cout << "Interrupt signal (" << signum << ") received.\n"; running = false; } int main(int argc, char* argv[]) { // 注冊信號處理函數(shù),用于優(yōu)雅關(guān)閉 signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); // 解析命令行參數(shù) int port = DEFAULT_PORT; if (argc > 1) { port = std::stoi(argv[1]); } // 客戶端ID(進程ID) pid_t pid = getpid(); std::cout << "Client started with PID: " << pid << std::endl; // 創(chuàng)建Socket int clientSocket = socket(AF_INET, SOCK_STREAM, 0); if (clientSocket < 0) { std::cerr << "Error creating socket" << std::endl; return 1; } // 設(shè)置服務(wù)器地址 struct sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = INADDR_ANY; // 綁定Socket到地址 if (bind(clientSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) { std::cerr << "Error binding socket to port " << port << std::endl; close(clientSocket); return 1; } // 監(jiān)聽連接 if (listen(clientSocket, 5) < 0) { std::cerr << "Error listening on socket" << std::endl; close(clientSocket); return 1; } std::cout << "Client listening on port " << port << std::endl; // 接受服務(wù)器連接 struct sockaddr_in serverConnAddr; socklen_t serverLen = sizeof(serverConnAddr); int serverConnection = accept(clientSocket, (struct sockaddr*)&serverConnAddr, &serverLen); if (serverConnection < 0) { std::cerr << "Error accepting connection" << std::endl; close(clientSocket); return 1; } std::cout << "Connected to server at " << inet_ntoa(serverConnAddr.sin_addr) << ":" << ntohs(serverConnAddr.sin_port) << std::endl; // 接收服務(wù)器消息 char buffer[BUFFER_SIZE]; while (running) { memset(buffer, 0, BUFFER_SIZE); int bytesReceived = recv(serverConnection, buffer, BUFFER_SIZE - 1, 0); if (bytesReceived > 0) { std::cout << "Message received: " << buffer << std::endl; // 發(fā)送確認 std::string ack = "Client " + std::to_string(pid) + " received message"; send(serverConnection, ack.c_str(), ack.length(), 0); } else if (bytesReceived == 0) { std::cout << "Server disconnected" << std::endl; break; } else { if (errno != EINTR) { // 忽略信號中斷 std::cerr << "Error receiving data: " << strerror(errno) << std::endl; break; } } } // 清理資源 close(serverConnection); close(clientSocket); std::cout << "Client terminated" << std::endl; return 0; }
服務(wù)器實現(xiàn)(server.cpp)
服務(wù)器程序需要啟動多個客戶端進程,連接到每個客戶端,并向所有客戶端發(fā)送消息。
#include <iostream> #include <vector> #include <string> #include <cstring> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <thread> #include <chrono> #include <fstream> #include <sstream> #include <signal.h> #include <sys/wait.h> #include <mutex> #include <condition_variable> #define BUFFER_SIZE 1024 #define DEFAULT_BASE_PORT 8888 #define DEFAULT_NUM_CLIENTS 3 #define CONFIG_FILE "server_config.txt" std::mutex mtx; std::condition_variable cv; bool allClientsConnected = false; int connectedClients = 0; int totalClients = 0; std::vector<pid_t> clientPids; std::vector<int> clientSockets; // 從配置文件讀取配置 bool readConfig(int& numClients, int& basePort) { std::ifstream configFile(CONFIG_FILE); if (!configFile.is_open()) { std::cout << "Config file not found, using defaults" << std::endl; return false; } std::string line; while (std::getline(configFile, line)) { std::istringstream iss(line); std::string key; if (std::getline(iss, key, '=')) { std::string value; if (std::getline(iss, value)) { if (key == "NUM_CLIENTS") { numClients = std::stoi(value); } else if (key == "BASE_PORT") { basePort = std::stoi(value); } } } } configFile.close(); return true; } // 寫入默認配置到文件 void writeDefaultConfig() { std::ofstream configFile(CONFIG_FILE); if (configFile.is_open()) { configFile << "NUM_CLIENTS=" << DEFAULT_NUM_CLIENTS << std::endl; configFile << "BASE_PORT=" << DEFAULT_BASE_PORT << std::endl; configFile.close(); std::cout << "Created default configuration file" << std::endl; } else { std::cerr << "Unable to create configuration file" << std::endl; } } // 啟動客戶端進程 pid_t launchClient(int port) { pid_t pid = fork(); if (pid == 0) { // 子進程 std::string portStr = std::to_string(port); execl("./Client", "Client", portStr.c_str(), nullptr); // 如果execl返回,說明出錯了 std::cerr << "Error launching client: " << strerror(errno) << std::endl; exit(1); } else if (pid < 0) { // fork失敗 std::cerr << "Fork failed: " << strerror(errno) << std::endl; return -1; } // 父進程 return pid; } // 連接到客戶端 bool connectToClient(int& clientSocket, int port) { clientSocket = socket(AF_INET, SOCK_STREAM, 0); if (clientSocket < 0) { std::cerr << "Error creating socket for client on port " << port << std::endl; return false; } struct sockaddr_in clientAddr; memset(&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; clientAddr.sin_port = htons(port); clientAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // 嘗試連接,帶重試 int retries = 10; while (retries > 0) { if (connect(clientSocket, (struct sockaddr*)&clientAddr, sizeof(clientAddr)) == 0) { std::cout << "Connected to client on port " << port << std::endl; return true; } std::cout << "Connection attempt failed, retrying in 1 second..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); retries--; } std::cerr << "Failed to connect to client on port " << port << std::endl; close(clientSocket); return false; } // 連接客戶端的線程函數(shù) void clientConnectionThread(int port, int clientIndex) { int clientSocket; if (connectToClient(clientSocket, port)) { std::lock_guard<std::mutex> lock(mtx); clientSockets[clientIndex] = clientSocket; connectedClients++; if (connectedClients == totalClients) { allClientsConnected = true; cv.notify_one(); } } } // 向所有已連接的客戶端發(fā)送消息 void sendToAllClients(const std::string& message) { for (int socket : clientSockets) { if (socket > 0) { send(socket, message.c_str(), message.length(), 0); // 接收確認 char buffer[BUFFER_SIZE]; memset(buffer, 0, BUFFER_SIZE); int bytesReceived = recv(socket, buffer, BUFFER_SIZE - 1, 0); if (bytesReceived > 0) { std::cout << "Acknowledgment: " << buffer << std::endl; } } } } // 清理資源 void cleanup() { // 關(guān)閉所有客戶端Socket for (int socket : clientSockets) { if (socket > 0) { close(socket); } } // 終止所有客戶端進程 for (pid_t pid : clientPids) { if (pid > 0) { kill(pid, SIGTERM); waitpid(pid, nullptr, 0); } } } // 信號處理函數(shù) void signalHandler(int signum) { std::cout << "Interrupt signal (" << signum << ") received.\n"; cleanup(); exit(signum); } int main() { // 注冊信號處理函數(shù) signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); // 讀取配置 int numClients = DEFAULT_NUM_CLIENTS; int basePort = DEFAULT_BASE_PORT; if (!readConfig(numClients, basePort)) { writeDefaultConfig(); } std::cout << "Starting server with " << numClients << " clients, base port: " << basePort << std::endl; // 初始化客戶端向量 totalClients = numClients; clientPids.resize(numClients, -1); clientSockets.resize(numClients, -1); // 啟動客戶端進程 for (int i = 0; i < numClients; i++) { int port = basePort + i; pid_t pid = launchClient(port); if (pid > 0) { clientPids[i] = pid; std::cout << "Launched client " << i + 1 << " with PID " << pid << " on port " << port << std::endl; } else { std::cerr << "Failed to launch client " << i + 1 << std::endl; } } // 給客戶端啟動的時間 std::cout << "Waiting for clients to start..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); // 連接到客戶端 std::vector<std::thread> connectionThreads; for (int i = 0; i < numClients; i++) { if (clientPids[i] > 0) { int port = basePort + i; connectionThreads.push_back(std::thread(clientConnectionThread, port, i)); } } // 等待所有連接建立 { std::unique_lock<std::mutex> lock(mtx); if (!allClientsConnected) { std::cout << "Waiting for all clients to connect..." << std::endl; cv.wait(lock, []{ return allClientsConnected; }); } } // 等待所有連接線程結(jié)束 for (auto& thread : connectionThreads) { thread.join(); } std::cout << "All clients connected. Ready to send messages." << std::endl; // 主循環(huán) std::string message; while (true) { std::cout << "Enter message to send to all clients (or 'exit' to quit): "; std::getline(std::cin, message); if (message == "exit") { break; } std::cout << "Sending message to all clients..." << std::endl; sendToAllClients(message); } // 清理 cleanup(); std::cout << "Server terminated" << std::endl; return 0; }
編譯和運行
為了方便編譯,我們可以創(chuàng)建一個Makefile:
CC = g++ CFLAGS = -std=c++11 -Wall -pthread LDFLAGS = -pthread all: Client Server Client: client.cpp $(CC) $(CFLAGS) -o Client client.cpp $(LDFLAGS) Server: server.cpp $(CC) $(CFLAGS) -o Server server.cpp $(LDFLAGS) clean: rm -f Client Server server_config.txt .PHONY: all clean
編譯和運行的步驟:
# 編譯 make # 運行服務(wù)器 ./Server
代碼解析
端口號傳遞機制
服務(wù)器如何將端口號傳遞給客戶端是本系統(tǒng)的一個關(guān)鍵點。整個過程如下:
- 服務(wù)器為每個客戶端分配一個唯一的端口號(基礎(chǔ)端口號 + 索引)
- 服務(wù)器通過
fork()
創(chuàng)建子進程 - 子進程通過
execl()
執(zhí)行客戶端程序,將端口號作為命令行參數(shù)傳遞 - 客戶端程序解析命令行參數(shù)獲取端口號
- 客戶端使用該端口號創(chuàng)建和綁定Socket
- 服務(wù)器知道每個客戶端的端口號,并使用這些端口號連接到客戶端
信號處理
程序使用信號處理機制來實現(xiàn)優(yōu)雅關(guān)閉:
signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler);
這兩行代碼注冊了信號處理函數(shù),當程序接收到SIGINT(通常是按Ctrl+C)或SIGTERM(通常是系統(tǒng)發(fā)送的終止信號)時,會調(diào)用signalHandler
函數(shù)。在這個函數(shù)中,程序會清理資源并正常退出。
多線程連接
服務(wù)器使用多線程來并行連接到所有客戶端:
std::vector<std::thread> connectionThreads; for (int i = 0; i < numClients; i++) { if (clientPids[i] > 0) { int port = basePort + i; connectionThreads.push_back(std::thread(clientConnectionThread, port, i)); } }
這樣可以同時嘗試連接到所有客戶端,而不是一個接一個地連接,提高了效率。
條件變量同步
服務(wù)器使用條件變量來等待所有客戶端連接完成:
{ std::unique_lock<std::mutex> lock(mtx); if (!allClientsConnected) { std::cout << "Waiting for all clients to connect..." << std::endl; cv.wait(lock, []{ return allClientsConnected; }); } }
當所有客戶端都連接成功后,allClientsConnected
變量會被設(shè)置為true
,條件變量會通知主線程繼續(xù)執(zhí)行。
總結(jié)
本文介紹了如何在Linux環(huán)境下實現(xiàn)一個服務(wù)器程序,該程序能夠啟動多個客戶端進程,并通過Socket與這些客戶端進行通信。主要特點包括:
- 可配置的客戶端數(shù)量
- 動態(tài)端口分配
- 并行連接
- 廣播消息
- 確認機制
- 優(yōu)雅關(guān)閉
這個示例展示了多進程、Socket通信、多線程和同步機制的綜合應(yīng)用,可以作為網(wǎng)絡(luò)編程的參考實現(xiàn)。
進一步改進
這個示例還可以進一步改進,例如:
- 添加錯誤恢復(fù)機制
- 實現(xiàn)客戶端自動重連
- 添加消息隊列
- 實現(xiàn)更復(fù)雜的通信協(xié)議
- 添加安全機制(如TLS加密)
以上就是Linux環(huán)境下實現(xiàn)多進程Socket通信功能的詳細內(nèi)容,更多關(guān)于Linux多進程Socket通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Linux設(shè)置Service服務(wù)開機自啟的教程指南
在 Linux 系統(tǒng)中,確保關(guān)鍵服務(wù)能夠在系統(tǒng)啟動時自動運行是一項非常重要的任務(wù),尤其是在服務(wù)器環(huán)境中,我們希望一些服務(wù)能夠在系統(tǒng)每次啟動后自動啟動,從而確保業(yè)務(wù)的持續(xù)運行,本指南將詳細介紹如何在Linux系統(tǒng)中設(shè)置Service服務(wù)開機自啟,需要的朋友可以參考下2024-10-10Ubuntu18.04 一鍵升級Python所有第三方包 及安裝python包的方法
pip 是 Python 包管理工具,該工具提供了對Python 包的查找、下載、安裝、卸載的功能。這篇文章給大家介紹Ubuntu18.04 一鍵升級Python所有第三方包 ,感興趣的朋友一起看看吧2019-10-10Linux系統(tǒng)中systemd服務(wù)啟動失敗問題排查和解決方法(以ad_auth.service為例)
在 Linux 系統(tǒng)中,systemd 是管理服務(wù)和進程的核心工具,然而,在實際運維中,我們經(jīng)常會遇到服務(wù)啟動失敗的情況,本文將以一個具體的案例——ad_auth.service 啟動失敗為例,詳細介紹如何排查和解決此類問題,需要的朋友可以參考下2025-01-01