Java實(shí)現(xiàn)Socket服務(wù)端與客戶端雙向通信功能
原理
Socket通信是計(jì)算機(jī)網(wǎng)絡(luò)中常用的一種通信機(jī)制,它是基于TCP/IP協(xié)議實(shí)現(xiàn)的,提供了兩個(gè)應(yīng)用程序之間通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換的能力。Socket本質(zhì)上是一種抽象概念,為網(wǎng)絡(luò)服務(wù)提供了一組API接口。
- Socket通信模型
Socket通信模型通常包括客戶端和服務(wù)器端兩部分。 服務(wù)器端:負(fù)責(zé)在特定的端口監(jiān)聽來自客戶端的連接請(qǐng)求,當(dāng)一個(gè)請(qǐng)求到達(dá)時(shí),服務(wù)器會(huì)與客戶端建立連接,并為客戶端提供相應(yīng)的服務(wù)。 客戶端:主動(dòng)向服務(wù)器的特定IP地址和端口發(fā)起連接請(qǐng)求,連接成功后,客戶端可以通過建立的連接向服務(wù)器發(fā)送請(qǐng)求并接收響應(yīng)。
- Socket通信過程
Socket通信過程一般包括以下幾個(gè)步驟:
- 服務(wù)器監(jiān)聽:
服務(wù)器通過socket()函數(shù)創(chuàng)建一個(gè)Socket,并通過bind()函數(shù)將其綁定到一個(gè)IP地址和端口上。然后,服務(wù)器調(diào)用listen()函數(shù)開始監(jiān)聽該端口上的連接請(qǐng)求。
- 客戶端請(qǐng)求連接:
客戶端也通過socket()函數(shù)創(chuàng)建一個(gè)Socket,然后調(diào)用connect()函數(shù)嘗試與服務(wù)器的指定IP地址和端口建立連接。
- 服務(wù)器接受連接:
服務(wù)器在接收到客戶端的連接請(qǐng)求后,通過accept()函數(shù)接受這個(gè)連接。如果成功,accept()函數(shù)會(huì)返回一個(gè)新的Socket(通常稱為“子Socket”),用于與該客戶端進(jìn)行通信。
數(shù)據(jù)傳輸:連接建立成功后,客戶端和服務(wù)器就可以通過新建立的Socket進(jìn)行數(shù)據(jù)傳輸了。數(shù)據(jù)傳輸可以是單向的也可以是雙向的。應(yīng)用程序可以使用send(), write(), recv(), read()等函數(shù)進(jìn)行數(shù)據(jù)發(fā)送和接收操作。
- 斷開連接:
當(dāng)通信結(jié)束后,客戶端和服務(wù)器都可以調(diào)用close()函數(shù)來關(guān)閉自己持有的Socket,從而斷開兩者之間的連接。 TCP vs UDP 在實(shí)際使用中,基于Socket的通信方式主要有兩種:基于TCP和基于UDP。 TCP Socket:提供可靠、面向連接、基于字節(jié)流的通信方式。適用對(duì)數(shù)據(jù)完整性和順序有要求的應(yīng)用場景。 UDP Socket:提供無連接、不保證可靠性、基于消息(數(shù)據(jù)報(bào))的通信方式。適用于對(duì)實(shí)時(shí)性要求高、容忍部分?jǐn)?shù)據(jù)丟失或亂序的應(yīng)用場景。
代碼實(shí)現(xiàn)
服務(wù)端
服務(wù)端主體邏輯:和每個(gè)接入的客戶端都會(huì)使用獨(dú)立線程建立起長連接,二者之間使用心跳保持聯(lián)系,使用clientSockets 存儲(chǔ)了每個(gè)客戶端的信息便于和客戶端建立起聯(lián)系。
package com.example.demo2.server.socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author kim
*/
@Component
public class TcpServer implements DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);
private final SocketServerConfig config;
private ServerSocket serverSocket;
private ExecutorService executorService;
private volatile boolean running = true;
// 存儲(chǔ)客戶端連接
private final Map<String, Socket> clientSockets = new ConcurrentHashMap<>();
public TcpServer(SocketServerConfig config) {
this.config = config;
}
@PostConstruct
public void start() throws IOException {
executorService = Executors.newFixedThreadPool(config.getMaxThreads());
serverSocket = new ServerSocket(config.getPort());
logger.info("平臺(tái)socket服務(wù)已啟動(dòng), 監(jiān)聽端口為 {}", config.getPort());
new Thread(this::acceptConnections).start();
}
private void acceptConnections() {
while (running) {
try {
Socket clientSocket = serverSocket.accept();
String clientAddress = clientSocket.getInetAddress().getHostAddress();
clientSockets.put(clientAddress, clientSocket);
executorService.execute(new ClientHandler(clientSocket, clientAddress));
} catch (IOException e) {
if (running) {
logger.error("Connection accept error", e);
}
}
}
}
// 用于發(fā)送消息到特定客戶端
public void sendMessageToClient(String clientAddress, String message) throws IOException {
Socket socket = clientSockets.get(clientAddress);
if (socket != null && !socket.isClosed()) {
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
out.println(message);
logger.info("Sent message to {}: {}", clientAddress, message);
} else {
logger.warn("Client {} is not connected or socket is closed", clientAddress);
}
}
@Override
public void destroy() throws Exception {
running = false;
executorService.shutdown();
for (Socket socket : clientSockets.values()) {
if (!socket.isClosed()) {
socket.close();
}
}
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
logger.info("TCP Server stopped");
}
private class ClientHandler implements Runnable {
private final Socket clientSocket;
private final String clientAddress;
ClientHandler(Socket socket, String address) {
this.clientSocket = socket;
this.clientAddress = address;
}
@Override
public void run() {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(
clientSocket.getOutputStream(), true)) {
logger.info("Client connected: {}", clientAddress);
String input;
while ((input = in.readLine()) != null) {
logger.debug("Received: {}", input);
out.println(input);
logger.info("Client connected: {}", clientAddress);
}
} catch (IOException e) {
logger.warn("Client connection closed: {}", e.getMessage());
} finally {
try {
clientSockets.remove(clientAddress);
clientSocket.close();
} catch (IOException e) {
logger.error("Error closing socket", e);
}
}
}
}
}
配置類
package com.example.demo2.server.socket;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "socket.server")
@Data
public class SocketServerConfig {
private int port;
private int maxThreads;
// Getters and Setters
}
配置文件
server:
port: 8080
socket:
server:
port: 8088
maxThreads: 50
向客戶端發(fā)送測試信息
@GetMapping("/send")
public String sendMessage(String clientAddress) throws IOException {
tcpServer.sendMessageToClient("192.168.3.8","77777777777");
return "success";
}

服務(wù)端發(fā)送日志

客戶端
客戶端主體邏輯,使用自己設(shè)計(jì)的心跳機(jī)制,監(jiān)聽服務(wù)端狀態(tài),如果服務(wù)端斷開連接,則客戶端會(huì)嘗試重新建立聯(lián)系。
package com.example.demo1.socketclient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
public class TcpClientService implements ApplicationRunner, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(TcpClientService.class);
private final SocketClientConfig config;
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final MessageListener messageListener;
@Autowired
public TcpClientService(SocketClientConfig config, MessageListener messageListener) {
this.config = config;
this.messageListener = messageListener;
}
@Override
public void run(ApplicationArguments args) throws Exception {
initializeConnection();
}
@Override
public void destroy() throws Exception {
running.set(false);
closeResources();
executor.shutdown();
}
private synchronized void initializeConnection() {
new Thread(() -> {
while (running.get()) {
try {
socket = new Socket();
socket.setKeepAlive(true);
socket.setSoTimeout(config.getHeartbeatTimeout());
socket.connect(new InetSocketAddress(config.getHost(), config.getPort()), config.getTimeout());
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
logger.info("Connected to server {}:{}", config.getHost(), config.getPort());
executor.execute(this::listenToServer);
startHeartbeat();
while (!socket.isClosed() && socket.isConnected()) {
Thread.sleep(1000);
}
} catch (Exception e) {
logger.warn("Connection error: {}", e.getMessage());
} finally {
closeResources();
if (running.get()) {
logger.info("Attempting to reconnect in 5 seconds...");
sleepSafely(5000);
}
}
}
}, "tcp-client-connector").start();
}
private void listenToServer() {
try {
String response;
while (running.get() && !socket.isClosed()) {
try {
response = in.readLine();
if (response == null) {
logger.warn("Server closed connection");
break; // 終止循環(huán),表示連接已關(guān)閉
}
logger.debug("Received server message: {}", response);
messageListener.onMessage(response);
} catch (SocketTimeoutException e) {
logger.debug("Socket read timeout");
} catch (IOException e) {
if (!socket.isClosed()) {
logger.warn("Connection lost: {}", e.getMessage());
break; // 終止循環(huán),表示連接已中斷
}
}
}
} finally {
closeResources(); // 確保資源關(guān)閉
}
}
private void startHeartbeat() {
new Thread(() -> {
while (running.get() && !socket.isClosed()) {
try {
sendMessageInternal("HEARTBEAT");
sleepSafely(config.getHeartbeatInterval());
} catch (Exception e) {
logger.warn("Heartbeat failed: {}", e.getMessage());
break;
}
}
}, "heartbeat-thread").start();
}
public synchronized void sendMessage(String message) throws IOException {
if (socket == null || !socket.isConnected()) {
throw new IOException("Not connected to server");
}
out.println(message);
logger.debug("Sent message: {}", message);
}
private synchronized void sendMessageInternal(String message) {
try {
if (socket != null && socket.isConnected()) {
out.println(message);
}
} catch (Exception e) {
logger.warn("Failed to send heartbeat");
}
}
private synchronized void closeResources() {
try {
if (out != null) out.close();
if (in != null) in.close();
if (socket != null) socket.close();
} catch (IOException e) {
logger.warn("Error closing resources: {}", e.getMessage());
}
}
private void sleepSafely(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public interface MessageListener {
void onMessage(String message);
}
}
消息監(jiān)聽:監(jiān)聽服務(wù)發(fā)送的消息
package com.example.demo1.socketclient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class ServerMessageHandler implements TcpClientService.MessageListener {
private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
@Override
public void onMessage(String message) {
if(StringUtils.isNotEmpty(message)){
if (!message.contains("HEARTBEAT")){
//處理其他邏輯
System.out.println("接收服務(wù)端消息成功:"+message);
}else{
//心跳消息
System.out.println(message);
}
}
}
}
配置類
package com.example.demo1.socketclient;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "socket.client")
@Data
public class SocketClientConfig {
private String host;
private int port;
private int timeout;
private int heartbeatInterval;
private int heartbeatTimeout;
// Getters and Setters
}
發(fā)送測試方法
@GetMapping("/send")
public ResponseEntity<String> sendMessage() {
try {
tcpClient.sendMessage("客戶端發(fā)送信息");
return ResponseEntity.ok("Message sent");
} catch (IOException e) {
return ResponseEntity.status(503).body("Server unavailable");
}
}
配置文件
socket:
client:
host: 192.168.3.8 #服務(wù)端ip地址
port: 8088 #監(jiān)聽端口
timeout: 5000
heartbeat-interval: 3000 # 心跳間隔30秒
heartbeat-timeout: 6000 # 心跳超時(shí)60秒
server:
port: 8082
客戶端發(fā)送信息后,服務(wù)端會(huì)接收到信息。

總結(jié)
以上就是java接入socket通信服務(wù)端與客戶端的全部代碼,二者實(shí)現(xiàn)了互相通信,具體的業(yè)務(wù)場景則需要小伙伴們?cè)诖嘶A(chǔ)上額外的設(shè)計(jì)邏輯了。
以上就是Java實(shí)現(xiàn)Socket服務(wù)端與客戶端雙向通信功能的詳細(xì)內(nèi)容,更多關(guān)于Java Socket服務(wù)端與客戶端通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Springboot搭建OAuth2.0 Server的方法示例
這篇文章主要介紹了使用Springboot搭建OAuth2.0 Server的方法示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08
spring cloud consul使用ip注冊(cè)服務(wù)的方法示例
這篇文章主要介紹了spring cloud consul使用ip注冊(cè)服務(wù)的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03
springboot+thymeleaf+shiro標(biāo)簽的實(shí)例
這篇文章主要介紹了springboot+thymeleaf+shiro標(biāo)簽的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
java構(gòu)造函數(shù)示例(構(gòu)造方法)
這篇文章主要介紹了java構(gòu)造函數(shù)示例(構(gòu)造方法),需要的朋友可以參考下2014-03-03
Springboot的ThreadPoolTaskScheduler線程池輕松搞定15分鐘不操作自動(dòng)取消訂單
這篇文章主要介紹了Springboot的ThreadPoolTaskScheduler線程池輕松搞定15分鐘不操作自動(dòng)取消訂單,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2025-01-01
SpringBoot使用Swagger生成多模塊的API文檔
這篇文章將以?Spring?Boot?多模塊項(xiàng)目為例,為大家詳細(xì)介紹一下如何使用?Swagger?生成多模塊的?API?文檔,感興趣的小伙伴可以了解一下2025-02-02
Maven統(tǒng)一版本管理的實(shí)現(xiàn)
在使用Maven多模塊結(jié)構(gòu)工程時(shí),配置版本是一個(gè)比較頭疼的事,本文主要介紹了Maven統(tǒng)一版本管理的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03

