Java實(shí)現(xiàn)多對(duì)多網(wǎng)絡(luò)通訊的流程
基本流程
客戶(hù)端發(fā)送信息(指定目標(biāo)客戶(hù)端)至固定的一個(gè)服務(wù)端,服務(wù)端接收信息進(jìn)行處理后發(fā)送至相應(yīng)的客戶(hù)端

通訊核心類(lèi)
Socket類(lèi)與流相輔相成,完成通訊。在accept方法返回了一個(gè)Socket對(duì)象后,獲取socket的輸入輸出流,就可以接收信息或發(fā)送信息了,以一對(duì)一為例:
服務(wù)端 :
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @ClassName Server
* @Description 服務(wù)端
* @Author issac
* @Date 2021/4/13 17:26
*/
public class Server {
public static void main(String[] args) throws IOException {
// 創(chuàng)建服務(wù)端套接字并指定端口
ServerSocket server = new ServerSocket(88);
// 接收創(chuàng)建建立,返回連接創(chuàng)建好后服務(wù)器的socket對(duì)象
Socket socket = server.accept();
InputStreamReader reader = new InputStreamReader(socket.getInputStream());
BufferedReader bufferedReader = new BufferedReader(reader);
// 獲取請(qǐng)求
String request = bufferedReader.readLine();
System.out.println("client say:" + request);
// 寫(xiě)到輸出流傳遞給客戶(hù)端
PrintWriter writer = new PrintWriter(socket.getOutputStream());
String line = "hello too";
writer.println(line);
writer.flush();
// 關(guān)閉處理流的工具、socket套接字、服務(wù)套接字
writer.close();
bufferedReader.close();
socket.close();
server.close();
}
}
客戶(hù)端 :
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @ClassName Client
* @Description 客戶(hù)端
* @Author issac
* @Date 2021/4/13 17:26
*/
public class Client {
public static void main(String[] args) throws IOException {
// 創(chuàng)建socket連接,指明其地址和端口
Socket socket = new Socket("127.0.0.1", 88);
// 獲取套接字的輸出流,輸出hello
PrintWriter writer = new PrintWriter(socket.getOutputStream());
String readLine = "Hello";
writer.println(readLine);
writer.flush();
// 從套接字的輸入流中獲取信息
InputStreamReader reader = new InputStreamReader(socket.getInputStream());
BufferedReader bufferedReader = new BufferedReader(reader);
String respond = bufferedReader.readLine();
System.out.println("server say:" + respond);
bufferedReader.close();
writer.close();
socket.close();
}
}
運(yùn)行結(jié)果:

需要注意的是accept方法在沒(méi)有連接的時(shí)候會(huì)阻塞,而導(dǎo)致后面的代碼無(wú)法執(zhí)行,在接下來(lái)的多對(duì)多通訊中需要依靠多線(xiàn)程來(lái)解決這個(gè)問(wèn)題。
多對(duì)多代碼實(shí)現(xiàn)
為了方便服務(wù)端和客戶(hù)端對(duì)信息的處理,解析。首先定義一個(gè)消息類(lèi),定義屬性分別為端口的本地地址,發(fā)送的消息內(nèi)容,發(fā)送的目標(biāo)地址。定義靜態(tài)方法:將字符串解析為該類(lèi)實(shí)例,處理消息的收發(fā):
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import com.alibaba.fastjson.JSON;
import java.io.*;
import java.net.Socket;
/**
* 在網(wǎng)絡(luò)中,所有被進(jìn)行通訊的對(duì)象,都需要實(shí)現(xiàn) Serializable 這個(gè)接口
* <p>
* 該類(lèi),主要用于本項(xiàng)目例子中,socket傳輸?shù)膶?duì)象,請(qǐng)勿使用其他或字符串,
* 為了后期更方便修改或者是其他操作
*
* @ClassName SocketMessage
* @Description TODO
* @Author issac
* @Date 2021/4/18 22:02
*/
public class SocketMessage implements Serializable {
/**
* 我自己的名稱(chēng) ip:port
**/
private String key;
/**
* 我的目標(biāo) ip:port
**/
private String to;
/**
* 發(fā)送的內(nèi)容
**/
private String content;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
/**
* 向目標(biāo)客戶(hù)端寫(xiě)出從發(fā)送者獲取到的消息
*/
public static void writeTargetMessage(SocketMessage message, Socket socket) throws IOException {
PrintWriter writer = new PrintWriter(socket.getOutputStream());
// 統(tǒng)一字符串標(biāo)準(zhǔn),以便于服務(wù)端解析
writer.println(JSON.toJSONString(message));
writer.flush();
}
/**
* 將輸入流中接收的字符串解析為SocketMessage對(duì)象
*
* @param is
* @return SocketMessage
* @throws Exception
*/
public static SocketMessage parseSocket(InputStream is) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String info = reader.readLine();
return parseSocketByStr(info);
}
/**
* 將傳入字符串解析為SocketMessage對(duì)象并返回
*
* @param str
* @return SocketMessage
*/
public static SocketMessage parseSocketByStr(String str) {
SocketMessage socketMessage = null;
try {
socketMessage = JSON.parseObject(str, SocketMessage.class);
} catch (Exception ex) {
throw new RuntimeException("socket之間通訊不能不使用SocketMessage");
}
return socketMessage;
}
@Override
public String toString() {
// 通過(guò) 阿里巴巴 的FastJson 庫(kù),將一個(gè)對(duì)象轉(zhuǎn)換為 字符串 ,統(tǒng)一標(biāo)準(zhǔn),以便于將字符串解析為該類(lèi)
return JSON.toJSONString(this);
}
}
再單獨(dú)定義一個(gè)服務(wù)端的消息處理類(lèi),該類(lèi)用于發(fā)送消息至特定的客戶(hù)端,所以定義兩個(gè)屬性,1.發(fā)送的消息,2.目標(biāo)客戶(hù)端的套接字:
import java.net.Socket;
/**
* @ClassName SocketMessageHandler
* @Description 服務(wù)端針對(duì)客戶(hù)端的消息處理器
* @Author issac
* @Date 2021/4/18 22:34
*/
public class SocketMessageHandler {
SocketMessage sm;
Socket targetSocket;
public SocketMessageHandler(SocketMessage sm,Socket targetSocket) {
this.sm = sm;
this.targetSocket = targetSocket;
}
public void setSm(SocketMessage sm) {
this.sm = sm;
}
/**
* 發(fā)送消息
*/
public void send() {
if (this.sm == null) {
return;
}
try {
System.out.println(sm.getContent());
// 發(fā)送
SocketMessage.writeTargetMessage(sm, this.targetSocket);
} catch ( Exception ex) {
ex.printStackTrace();
}
}
}
接下來(lái)進(jìn)行服務(wù)端的定義,我們的服務(wù)端需要處理多個(gè)客戶(hù)端的消息,所以要定義一個(gè)容器存放客戶(hù)端地址,在此之前我們已經(jīng)定義了處理服務(wù)端消息的SocketMessageHandler類(lèi),因?yàn)槲覀兊淖罱K目的是為了處理信息,所以可以直接將SocketMessageHandler類(lèi)存放至容器。我們用map來(lái)存儲(chǔ),而key就是客戶(hù)端的地址:
import com.issac.task_05.task.msg.SocketMessage;
import com.issac.task_05.task.msg.SocketMessageHandler;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
/**
* n - m: 一個(gè)服務(wù)端,同時(shí)服務(wù)多個(gè)客戶(hù)端
*
* @ClassName SocketServer
* @Description 服務(wù)端
* @Author issac
* @Date 2021/4/18 21:29
*/
public class SocketServer {
// 存放消息處理器
private static final Map<String, SocketMessageHandler> clientContainer = new HashMap<>();
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(8888);
Socket accept;
while (true) {
/* 只有建立新連接時(shí)accept才會(huì)有響應(yīng)而執(zhí)行以下代碼,否則會(huì)阻塞:客戶(hù)端與服務(wù)器連接,并將已連接的客戶(hù)端放入容器 */
accept = ss.accept();
SocketMessage msg = SocketMessage.parseSocket(accept.getInputStream()); // 獲取信息
System.out.println("客戶(hù)端建立連接:" + msg.getKey());
// 建立連接后將客戶(hù)端地址存入容器
clientContainer.put(msg.getKey(), new SocketMessageHandler(msg, accept));
/* 在已經(jīng)建立連接后,沒(méi)有新連接,accept會(huì)處于阻塞狀態(tài),因此我們需要另外開(kāi)辟一個(gè)線(xiàn)程來(lái)處理消息 */
new ServerThread(accept, clientContainer).start();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
在這里需要注意ServerSocket類(lèi)的accept方法,在沒(méi)有新連接的時(shí)候,該方法會(huì)阻塞,而之后的代碼就無(wú)法執(zhí)行了。我們?cè)诳蛻?hù)端與服務(wù)端連接成功之后進(jìn)行消息收發(fā)的時(shí)候是沒(méi)有新連接產(chǎn)生的,此時(shí)的阻塞導(dǎo)致無(wú)法進(jìn)行通訊,于是乎我們需要再開(kāi)辟一個(gè)線(xiàn)程,進(jìn)行消息處理。那么我們定義一個(gè)繼承Thread的消息處理類(lèi),將每次連接成功返回的套接字接收,進(jìn)行信息處理。如此一來(lái),只要有消息的傳遞該線(xiàn)程就可以進(jìn)行獲取:
import com.issac.task_05.task.msg.SocketMessage;
import com.issac.task_05.task.msg.SocketMessageHandler;
import java.io.InputStream;
import java.net.Socket;
import java.util.Map;
/**
* @ClassName ServerThread
* @Description 處理信息
* @Author issac
* @Date 2021/4/21 21:25
*/
public class ServerThread extends Thread{
private Socket socket;
InputStream inputStream;
Map<String, SocketMessageHandler> clientContainer;
public ServerThread(Socket socket,Map<String, SocketMessageHandler> clientContainer){
this.socket = socket;
this.clientContainer = clientContainer;
}
public void run(){
try{
while (true){
// 將輸入流中的數(shù)據(jù)解析為SocketMessage對(duì)象
inputStream = socket.getInputStream();
SocketMessage msg = SocketMessage.parseSocket(inputStream);
System.out.println(msg);
// 在容器中獲取目標(biāo)地址
SocketMessageHandler socketMessageHandler = clientContainer.get(msg.getTo());
// 設(shè)置需要傳輸?shù)男畔?
socketMessageHandler.setSm(msg);
// 傳輸信息
socketMessageHandler.send();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
最后就是客戶(hù)端了,每個(gè)客戶(hù)端所對(duì)應(yīng)的服務(wù)端都相同,在客戶(hù)端寫(xiě)一個(gè)簡(jiǎn)易的菜單,選擇接收或發(fā)送消息即可:
import com.issac.task_05.task.msg.SocketMessage;
import java.net.Socket;
import java.util.Scanner;
/**
* @ClassName Client
* @Description 客戶(hù)端
* @Author issac
* @Date 2021/4/19 21:08
*/
public class Client {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
Socket s = null;
try {
s = new Socket("localhost", 8888);
// 第一次啟動(dòng),創(chuàng)建socket,向服務(wù)器發(fā)送我是誰(shuí)
SocketMessage initMsg = getSocketMsg(s.getLocalSocketAddress().toString(), null, null);
System.out.println("開(kāi)始與服務(wù)器建立連接: " + initMsg.toString());
SocketMessage.writeTargetMessage(initMsg, s);
// 開(kāi)始 循環(huán)等待
while (true) {
System.out.println("===================menu=====================");
System.out.println("1:發(fā)送消息");
System.out.println("2:接收消息");
int choice = scanner.nextInt();
switch (choice){
case 1: // 發(fā)送消息
String target = input("請(qǐng)輸入您要發(fā)給誰(shuí):");
String content = input("請(qǐng)輸入您要發(fā)送的內(nèi)容:");
System.out.println();
SocketMessage afterMsg = getSocketMsg(s.getLocalSocketAddress().toString(), target, content);
SocketMessage.writeTargetMessage(afterMsg, s);
break;
case 2: // 接收并打印消息
showRequiredMsg(s);
break;
default:
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 根據(jù)提示輸入內(nèi)容
**/
public static String input(String tip) {
Scanner input = new Scanner(System.in);
System.out.println(tip);
return input.next();
}
/**
* 將用戶(hù)輸入傳遞的本地地址,目標(biāo)地址與傳遞內(nèi)容轉(zhuǎn)化為SocketMessage對(duì)象
* @param localSocketAddress
* @param to
* @param content
* @return
*/
public static SocketMessage getSocketMsg(String localSocketAddress, String to, String content) {
SocketMessage socketMessage = new SocketMessage();
// to 為null的時(shí)候,說(shuō)明只是對(duì)服務(wù)器的初始
socketMessage.setKey(localSocketAddress.replaceAll("\\/", ""));
socketMessage.setTo(to);
socketMessage.setContent(content);
return socketMessage;
}
/**
* 接收消息并打印
* @param socket
* @throws Exception
*/
public static void showRequiredMsg(Socket socket) throws Exception {
SocketMessage socketMessage = SocketMessage.parseSocket(socket.getInputStream());
String source = socketMessage.getKey();
String content = socketMessage.getContent();
System.out.println("接收到來(lái)自《"+source+"》的信息:"+content+"\n");
}
}
運(yùn)行結(jié)果:


到此這篇關(guān)于Java實(shí)現(xiàn)多對(duì)多網(wǎng)絡(luò)通訊的流程的文章就介紹到這了,更多相關(guān)Java多對(duì)多網(wǎng)絡(luò)通訊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
記一次線(xiàn)上SpringCloud Feign請(qǐng)求服務(wù)超時(shí)異常排查問(wèn)題
這篇文章主要介紹了記一次線(xiàn)上SpringCloud Feign請(qǐng)求服務(wù)超時(shí)異常排查問(wèn)題,本項(xiàng)目與下游項(xiàng)目均注冊(cè)在Eureka上面,對(duì)這個(gè)1秒就超時(shí)感到很迷惑,于是開(kāi)始查閱底層源碼之旅。需要的朋友可以參考下2022-01-01
Spring中WebClient的創(chuàng)建和使用詳解
這篇文章主要介紹了Spring中WebClient的創(chuàng)建和使用詳解,在Spring5中,出現(xiàn)了Reactive響應(yīng)式編程思想,并且為網(wǎng)絡(luò)編程提供相關(guān)響應(yīng)式編程的支持,如提供了WebFlux,它是Spring提供的異步非阻塞的響應(yīng)式的網(wǎng)絡(luò)框架,需要的朋友可以參考下2023-11-11
uploadify java實(shí)現(xiàn)多文件上傳和預(yù)覽
這篇文章主要為大家詳細(xì)介紹了java結(jié)合uploadify實(shí)現(xiàn)多文件上傳和預(yù)覽的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10
Java Jackson之ObjectMapper常用用法總結(jié)
這篇文章主要給大家介紹了關(guān)于Java Jackson之ObjectMapper常用用法的相關(guān)資料,ObjectMapper是一個(gè)Java庫(kù),用于將JSON字符串轉(zhuǎn)換為Java對(duì)象或?qū)ava對(duì)象轉(zhuǎn)換為JSON字符串,需要的朋友可以參考下2024-01-01
SpringBoot整合Lettuce redis過(guò)程解析
這篇文章主要介紹了SpringBoot整合Lettuce redis過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10

