基于SpringBoot實現(xiàn)多線程多主機TCP通信
下面我將介紹如何使用 Spring Boot 實現(xiàn)多線程、多主機的 TCP 通信,包括發(fā)送數(shù)據和接收應答并解析。
1. 項目結構
src/main/java/com/example/tcpdemo/
├── config
│ └── TcpClientConfig.java
├── controller
│ └── TcpController.java
├── handler
│ ├── TcpClientHandler.java
│ └── TcpResponseHandler.java
├── model
│ └── TcpHost.java
├── service
│ ├── TcpClientService.java
│ └── impl
│ └── TcpClientServiceImpl.java
└── TcpDemoApplication.java
2. 核心代碼實現(xiàn)
2.1 配置類
// TcpClientConfig.java
@Configuration
public class TcpClientConfig {
@Value("${tcp.client.thread-pool-size:10}")
private int threadPoolSize;
@Bean
public ExecutorService tcpClientExecutor() {
return Executors.newFixedThreadPool(threadPoolSize);
}
}
2.2 TCP主機模型
// TcpHost.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TcpHost {
private String host;
private int port;
private String name; // 主機標識名稱
}
2.3 TCP客戶端處理器
// TcpClientHandler.java
@Component
public class TcpClientHandler {
@Autowired
private ExecutorService tcpClientExecutor;
@Autowired
private TcpResponseHandler responseHandler;
public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
hosts.forEach(host -> {
tcpClientExecutor.execute(() -> {
try (Socket socket = new Socket(host.getHost(), host.getPort());
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
// 發(fā)送數(shù)據
out.println(message);
System.out.println("Sent to " + host.getName() + ": " + message);
// 接收響應
String response = in.readLine();
responseHandler.handleResponse(host, response);
} catch (IOException e) {
System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
}
});
});
}
}
2.4 響應處理器
// TcpResponseHandler.java
@Component
public class TcpResponseHandler {
public void handleResponse(TcpHost host, String response) {
// 這里實現(xiàn)你的響應解析邏輯
System.out.println("Received from " + host.getName() + ": " + response);
// 示例解析邏輯
if (response != null) {
// 假設響應格式為 "status|data"
String[] parts = response.split("\|");
if (parts.length == 2) {
String status = parts[0];
String data = parts[1];
System.out.println("Parsed response - Status: " + status + ", Data: " + data);
}
}
}
}
2.5 TCP服務接口
// TcpClientService.java
public interface TcpClientService {
void sendToHosts(List<TcpHost> hosts, String message);
}
2.6 TCP服務實現(xiàn)
// TcpClientServiceImpl.java
@Service
public class TcpClientServiceImpl implements TcpClientService {
@Autowired
private TcpClientHandler tcpClientHandler;
@Override
public void sendToHosts(List<TcpHost> hosts, String message) {
tcpClientHandler.sendToMultipleHosts(hosts, message);
}
}
2.7 控制器
// TcpController.java
@RestController
@RequestMapping("/api/tcp")
public class TcpController {
@Autowired
private TcpClientService tcpClientService;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody Map<String, Object> request) {
List<Map<String, Object>> hostsInfo = (List<Map<String, Object>>) request.get("hosts");
String message = (String) request.get("message");
List<TcpHost> hosts = hostsInfo.stream()
.map(info -> new TcpHost(
(String) info.get("host"),
(Integer) info.get("port"),
(String) info.get("name")))
.collect(Collectors.toList());
tcpClientService.sendToHosts(hosts, message);
return ResponseEntity.ok("Messages sent successfully");
}
}
3. 應用配置
在 application.properties或 application.yml中添加配置:
# TCP客戶端線程池大小 tcp.client.thread-pool-size=20 # 服務器端口 server.port=8080
4. 使用示例
4.1 啟動Spring Boot應用
// TcpDemoApplication.java
@SpringBootApplication
public class TcpDemoApplication {
public static void main(String[] args) {
SpringApplication.run(TcpDemoApplication.class, args);
}
}
4.2 發(fā)送請求
使用Postman或curl發(fā)送POST請求:
curl -X POST http://localhost:8080/api/tcp/send \
-H "Content-Type: application/json" \
-d '{
"hosts": [
{"host": "127.0.0.1", "port": 12345, "name": "Server1"},
{"host": "127.0.0.1", "port": 12346, "name": "Server2"}
],
"message": "Hello TCP Server"
}'
5. 高級功能擴展
5.1 連接池管理
// 可以擴展為使用連接池管理TCP連接
@Component
public class TcpConnectionPool {
private Map<String, Socket> connectionPool = new ConcurrentHashMap<>();
public Socket getConnection(TcpHost host) throws IOException {
String key = host.getHost() + ":" + host.getPort();
if (!connectionPool.containsKey(key) || connectionPool.get(key).isClosed()) {
connectionPool.put(key, new Socket(host.getHost(), host.getPort()));
}
return connectionPool.get(key);
}
public void closeAll() {
connectionPool.values().forEach(socket -> {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
});
connectionPool.clear();
}
}
5.2 超時設置
// 在TcpClientHandler中添加超時設置
public void sendToMultipleHosts(List<TcpHost> hosts, String message) {
hosts.forEach(host -> {
tcpClientExecutor.execute(() -> {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host.getHost(), host.getPort()), 5000); // 5秒連接超時
socket.setSoTimeout(10000); // 10秒讀取超時
try (PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
out.println(message);
System.out.println("Sent to " + host.getName() + ": " + message);
String response = in.readLine();
responseHandler.handleResponse(host, response);
}
} catch (SocketTimeoutException e) {
System.err.println("Timeout when communicating with " + host.getName());
} catch (IOException e) {
System.err.println("Error communicating with " + host.getName() + ": " + e.getMessage());
}
});
});
}
5.3 自定義協(xié)議解析
// 擴展TcpResponseHandler實現(xiàn)更復雜的協(xié)議解析
public void handleResponse(TcpHost host, String response) {
try {
// 示例:解析JSON格式響應
JSONObject jsonResponse = new JSONObject(response);
String status = jsonResponse.getString("status");
String data = jsonResponse.getString("data");
long timestamp = jsonResponse.getLong("timestamp");
System.out.printf("Response from %s - Status: %s, Data: %s, Time: %tF %<tT%n",
host.getName(), status, data, new Date(timestamp));
} catch (JSONException e) {
System.err.println("Invalid response format from " + host.getName());
}
}
6. 注意事項
- ??線程安全??:確保在多線程環(huán)境下共享資源的線程安全
- ??資源釋放??:正確關閉Socket、流等資源
- ??異常處理??:合理處理各種網絡異常
- ??性能優(yōu)化??:根據實際需求調整線程池大小
- ??日志記錄??:添加詳細的日志記錄以便排查問題
這個實現(xiàn)提供了基本的TCP多線程通信框架,你可以根據實際需求進行擴展和優(yōu)化。
到此這篇關于基于SpringBoot實現(xiàn)多線程多主機TCP通信的文章就介紹到這了,更多相關SpringBoot TCP通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解Java8?CompletableFuture的并行處理用法
Java8中有一個工具非常有用,那就是CompletableFuture,本章主要講解CompletableFuture的并行處理用法,感興趣的小伙伴可以了解一下2022-04-04
SpringRetry重試機制之@Retryable注解與重試策略詳解
本文將詳細介紹SpringRetry的重試機制,特別是@Retryable注解的使用及各種重試策略的配置,幫助開發(fā)者構建更加健壯的應用程序,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-04-04
java多線程編程之使用thread類創(chuàng)建線程
在Java中創(chuàng)建線程有兩種方法:使用Thread類和使用Runnable接口。在使用Runnable接口時需要建立一個Thread實例2014-01-01

