Java實(shí)現(xiàn)Socket服務(wù)端與客戶端雙向通信功能
原理
Socket通信是計(jì)算機(jī)網(wǎng)絡(luò)中常用的一種通信機(jī)制,它是基于TCP/IP協(xié)議實(shí)現(xiàn)的,提供了兩個應(yīng)用程序之間通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換的能力。Socket本質(zhì)上是一種抽象概念,為網(wǎng)絡(luò)服務(wù)提供了一組API接口。
- Socket通信模型
Socket通信模型通常包括客戶端和服務(wù)器端兩部分。 服務(wù)器端:負(fù)責(zé)在特定的端口監(jiān)聽來自客戶端的連接請求,當(dāng)一個請求到達(dá)時,服務(wù)器會與客戶端建立連接,并為客戶端提供相應(yīng)的服務(wù)。 客戶端:主動向服務(wù)器的特定IP地址和端口發(fā)起連接請求,連接成功后,客戶端可以通過建立的連接向服務(wù)器發(fā)送請求并接收響應(yīng)。
- Socket通信過程
Socket通信過程一般包括以下幾個步驟:
- 服務(wù)器監(jiān)聽:
服務(wù)器通過socket()函數(shù)創(chuàng)建一個Socket,并通過bind()函數(shù)將其綁定到一個IP地址和端口上。然后,服務(wù)器調(diào)用listen()函數(shù)開始監(jiān)聽該端口上的連接請求。
- 客戶端請求連接:
客戶端也通過socket()函數(shù)創(chuàng)建一個Socket,然后調(diào)用connect()函數(shù)嘗試與服務(wù)器的指定IP地址和端口建立連接。
- 服務(wù)器接受連接:
服務(wù)器在接收到客戶端的連接請求后,通過accept()函數(shù)接受這個連接。如果成功,accept()函數(shù)會返回一個新的Socket(通常稱為“子Socket”),用于與該客戶端進(jìn)行通信。
數(shù)據(jù)傳輸:連接建立成功后,客戶端和服務(wù)器就可以通過新建立的Socket進(jìn)行數(shù)據(jù)傳輸了。數(shù)據(jù)傳輸可以是單向的也可以是雙向的。應(yīng)用程序可以使用send(), write(), recv(), read()等函數(shù)進(jìn)行數(shù)據(jù)發(fā)送和接收操作。
- 斷開連接:
當(dāng)通信結(jié)束后,客戶端和服務(wù)器都可以調(diào)用close()函數(shù)來關(guān)閉自己持有的Socket,從而斷開兩者之間的連接。 TCP vs UDP 在實(shí)際使用中,基于Socket的通信方式主要有兩種:基于TCP和基于UDP。 TCP Socket:提供可靠、面向連接、基于字節(jié)流的通信方式。適用對數(shù)據(jù)完整性和順序有要求的應(yīng)用場景。 UDP Socket:提供無連接、不保證可靠性、基于消息(數(shù)據(jù)報(bào))的通信方式。適用于對實(shí)時性要求高、容忍部分?jǐn)?shù)據(jù)丟失或亂序的應(yīng)用場景。
代碼實(shí)現(xiàn)
服務(wù)端
服務(wù)端主體邏輯:和每個接入的客戶端都會使用獨(dú)立線程建立起長連接,二者之間使用心跳保持聯(lián)系,使用clientSockets 存儲了每個客戶端的信息便于和客戶端建立起聯(lián)系。
package com.example.demo2.server.socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.*; /** * @author kim */ @Component public class TcpServer implements DisposableBean { private static final Logger logger = LoggerFactory.getLogger(TcpServer.class); private final SocketServerConfig config; private ServerSocket serverSocket; private ExecutorService executorService; private volatile boolean running = true; // 存儲客戶端連接 private final Map<String, Socket> clientSockets = new ConcurrentHashMap<>(); public TcpServer(SocketServerConfig config) { this.config = config; } @PostConstruct public void start() throws IOException { executorService = Executors.newFixedThreadPool(config.getMaxThreads()); serverSocket = new ServerSocket(config.getPort()); logger.info("平臺socket服務(wù)已啟動, 監(jiān)聽端口為 {}", config.getPort()); new Thread(this::acceptConnections).start(); } private void acceptConnections() { while (running) { try { Socket clientSocket = serverSocket.accept(); String clientAddress = clientSocket.getInetAddress().getHostAddress(); clientSockets.put(clientAddress, clientSocket); executorService.execute(new ClientHandler(clientSocket, clientAddress)); } catch (IOException e) { if (running) { logger.error("Connection accept error", e); } } } } // 用于發(fā)送消息到特定客戶端 public void sendMessageToClient(String clientAddress, String message) throws IOException { Socket socket = clientSockets.get(clientAddress); if (socket != null && !socket.isClosed()) { PrintWriter out = new PrintWriter(socket.getOutputStream(), true); out.println(message); logger.info("Sent message to {}: {}", clientAddress, message); } else { logger.warn("Client {} is not connected or socket is closed", clientAddress); } } @Override public void destroy() throws Exception { running = false; executorService.shutdown(); for (Socket socket : clientSockets.values()) { if (!socket.isClosed()) { socket.close(); } } if (serverSocket != null && !serverSocket.isClosed()) { serverSocket.close(); } logger.info("TCP Server stopped"); } private class ClientHandler implements Runnable { private final Socket clientSocket; private final String clientAddress; ClientHandler(Socket socket, String address) { this.clientSocket = socket; this.clientAddress = address; } @Override public void run() { try (BufferedReader in = new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter( clientSocket.getOutputStream(), true)) { logger.info("Client connected: {}", clientAddress); String input; while ((input = in.readLine()) != null) { logger.debug("Received: {}", input); out.println(input); logger.info("Client connected: {}", clientAddress); } } catch (IOException e) { logger.warn("Client connection closed: {}", e.getMessage()); } finally { try { clientSockets.remove(clientAddress); clientSocket.close(); } catch (IOException e) { logger.error("Error closing socket", e); } } } } }
配置類
package com.example.demo2.server.socket; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "socket.server") @Data public class SocketServerConfig { private int port; private int maxThreads; // Getters and Setters }
配置文件
server: port: 8080 socket: server: port: 8088 maxThreads: 50
向客戶端發(fā)送測試信息
@GetMapping("/send") public String sendMessage(String clientAddress) throws IOException { tcpServer.sendMessageToClient("192.168.3.8","77777777777"); return "success"; }
服務(wù)端發(fā)送日志
客戶端
客戶端主體邏輯,使用自己設(shè)計(jì)的心跳機(jī)制,監(jiān)聽服務(wù)端狀態(tài),如果服務(wù)端斷開連接,則客戶端會嘗試重新建立聯(lián)系。
package com.example.demo1.socketclient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @Service public class TcpClientService implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(TcpClientService.class); private final SocketClientConfig config; private Socket socket; private PrintWriter out; private BufferedReader in; private final AtomicBoolean running = new AtomicBoolean(true); private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final MessageListener messageListener; @Autowired public TcpClientService(SocketClientConfig config, MessageListener messageListener) { this.config = config; this.messageListener = messageListener; } @Override public void run(ApplicationArguments args) throws Exception { initializeConnection(); } @Override public void destroy() throws Exception { running.set(false); closeResources(); executor.shutdown(); } private synchronized void initializeConnection() { new Thread(() -> { while (running.get()) { try { socket = new Socket(); socket.setKeepAlive(true); socket.setSoTimeout(config.getHeartbeatTimeout()); socket.connect(new InetSocketAddress(config.getHost(), config.getPort()), config.getTimeout()); out = new PrintWriter(socket.getOutputStream(), true); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); logger.info("Connected to server {}:{}", config.getHost(), config.getPort()); executor.execute(this::listenToServer); startHeartbeat(); while (!socket.isClosed() && socket.isConnected()) { Thread.sleep(1000); } } catch (Exception e) { logger.warn("Connection error: {}", e.getMessage()); } finally { closeResources(); if (running.get()) { logger.info("Attempting to reconnect in 5 seconds..."); sleepSafely(5000); } } } }, "tcp-client-connector").start(); } private void listenToServer() { try { String response; while (running.get() && !socket.isClosed()) { try { response = in.readLine(); if (response == null) { logger.warn("Server closed connection"); break; // 終止循環(huán),表示連接已關(guān)閉 } logger.debug("Received server message: {}", response); messageListener.onMessage(response); } catch (SocketTimeoutException e) { logger.debug("Socket read timeout"); } catch (IOException e) { if (!socket.isClosed()) { logger.warn("Connection lost: {}", e.getMessage()); break; // 終止循環(huán),表示連接已中斷 } } } } finally { closeResources(); // 確保資源關(guān)閉 } } private void startHeartbeat() { new Thread(() -> { while (running.get() && !socket.isClosed()) { try { sendMessageInternal("HEARTBEAT"); sleepSafely(config.getHeartbeatInterval()); } catch (Exception e) { logger.warn("Heartbeat failed: {}", e.getMessage()); break; } } }, "heartbeat-thread").start(); } public synchronized void sendMessage(String message) throws IOException { if (socket == null || !socket.isConnected()) { throw new IOException("Not connected to server"); } out.println(message); logger.debug("Sent message: {}", message); } private synchronized void sendMessageInternal(String message) { try { if (socket != null && socket.isConnected()) { out.println(message); } } catch (Exception e) { logger.warn("Failed to send heartbeat"); } } private synchronized void closeResources() { try { if (out != null) out.close(); if (in != null) in.close(); if (socket != null) socket.close(); } catch (IOException e) { logger.warn("Error closing resources: {}", e.getMessage()); } } private void sleepSafely(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public interface MessageListener { void onMessage(String message); } }
消息監(jiān)聽:監(jiān)聽服務(wù)發(fā)送的消息
package com.example.demo1.socketclient; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component public class ServerMessageHandler implements TcpClientService.MessageListener { private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class); @Override public void onMessage(String message) { if(StringUtils.isNotEmpty(message)){ if (!message.contains("HEARTBEAT")){ //處理其他邏輯 System.out.println("接收服務(wù)端消息成功:"+message); }else{ //心跳消息 System.out.println(message); } } } }
配置類
package com.example.demo1.socketclient; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "socket.client") @Data public class SocketClientConfig { private String host; private int port; private int timeout; private int heartbeatInterval; private int heartbeatTimeout; // Getters and Setters }
發(fā)送測試方法
@GetMapping("/send") public ResponseEntity<String> sendMessage() { try { tcpClient.sendMessage("客戶端發(fā)送信息"); return ResponseEntity.ok("Message sent"); } catch (IOException e) { return ResponseEntity.status(503).body("Server unavailable"); } }
配置文件
socket: client: host: 192.168.3.8 #服務(wù)端ip地址 port: 8088 #監(jiān)聽端口 timeout: 5000 heartbeat-interval: 3000 # 心跳間隔30秒 heartbeat-timeout: 6000 # 心跳超時60秒 server: port: 8082
客戶端發(fā)送信息后,服務(wù)端會接收到信息。
總結(jié)
以上就是java接入socket通信服務(wù)端與客戶端的全部代碼,二者實(shí)現(xiàn)了互相通信,具體的業(yè)務(wù)場景則需要小伙伴們在此基礎(chǔ)上額外的設(shè)計(jì)邏輯了。
以上就是Java實(shí)現(xiàn)Socket服務(wù)端與客戶端雙向通信功能的詳細(xì)內(nèi)容,更多關(guān)于Java Socket服務(wù)端與客戶端通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Springboot搭建OAuth2.0 Server的方法示例
這篇文章主要介紹了使用Springboot搭建OAuth2.0 Server的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08spring cloud consul使用ip注冊服務(wù)的方法示例
這篇文章主要介紹了spring cloud consul使用ip注冊服務(wù)的方法示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03springboot+thymeleaf+shiro標(biāo)簽的實(shí)例
這篇文章主要介紹了springboot+thymeleaf+shiro標(biāo)簽的實(shí)例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01java構(gòu)造函數(shù)示例(構(gòu)造方法)
這篇文章主要介紹了java構(gòu)造函數(shù)示例(構(gòu)造方法),需要的朋友可以參考下2014-03-03Springboot的ThreadPoolTaskScheduler線程池輕松搞定15分鐘不操作自動取消訂單
這篇文章主要介紹了Springboot的ThreadPoolTaskScheduler線程池輕松搞定15分鐘不操作自動取消訂單,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2025-01-01SpringBoot使用Swagger生成多模塊的API文檔
這篇文章將以?Spring?Boot?多模塊項(xiàng)目為例,為大家詳細(xì)介紹一下如何使用?Swagger?生成多模塊的?API?文檔,感興趣的小伙伴可以了解一下2025-02-02Maven統(tǒng)一版本管理的實(shí)現(xiàn)
在使用Maven多模塊結(jié)構(gòu)工程時,配置版本是一個比較頭疼的事,本文主要介紹了Maven統(tǒng)一版本管理的實(shí)現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-03-03