SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收
1、SpringBoot webSocket
SpringBoot 使用的websocket 協(xié)議,不是標準的websocket協(xié)議,使用的是名稱叫做STOMP的協(xié)議。
1.1 STOMP協(xié)議說明
STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息協(xié)議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設(shè)計的簡單文本協(xié)議。
它提供了一個可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進行交互,類似于OpenWire(一種二進制協(xié)議)。
由于其設(shè)計簡單,很容易開發(fā)客戶端,因此在多種語言和多種平臺上得到廣泛應(yīng)用。其中最流行的STOMP消息代理是Apache ActiveMQ。
1.2 搭建
本人使用的是Inject idea 搭建的springBoot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。
項目結(jié)構(gòu)如下
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.drawthink</groupId>
<artifactId>websocketdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>webSocketdemo</name>
<description>webSocketDemo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Application:
package com.drawthink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebSocketdemoApplication {
public static void main(String[] args) {
SpringApplication.run(WebSocketdemoApplication.class, args);
}
}
WebSocketConfig
package com.drawthink.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
/**
* Created by lincoln on 16-10-25
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
//允許使用socketJs方式訪問,訪問點為hello,允許跨域
stompEndpointRegistry.addEndpoint("/hello").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//訂閱Broker名稱
registry.enableSimpleBroker("/topic","/user");
//全局使用的訂閱前綴(客戶端訂閱路徑上會體現(xiàn)出來)
registry.setApplicationDestinationPrefixes("/app/");
//點對點使用的訂閱前綴(客戶端訂閱路徑上會體現(xiàn)出來),不設(shè)置的話,默認也是/user/
//registry.setUserDestinationPrefix("/user/");
}
}
WebSocketController
package com.drawthink.websocket.controller;
import com.drawthink.message.ClientMessage;
import com.drawthink.message.ServerMessage;
import com.drawthink.message.ToUserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
/**
* Created by lincoln on 16-10-25
*/
@Controller
public class WebSocketController {
@MessageMapping("/welcome")
//SendTo 發(fā)送至 Broker 下的指定訂閱路徑
@SendTo("/topic/getResponse")
public ServerMessage say(ClientMessage clientMessage){
//方法用于廣播測試
System.out.println("clientMessage.getName() = " + clientMessage.getName());
return new ServerMessage("Welcome , "+clientMessage.getName()+" !");
}
//注入SimpMessagingTemplate 用于點對點消息發(fā)送
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping("/cheat")
// 發(fā)送的訂閱路徑為/user/{userId}/message
// /user/路徑是默認的一個,如果想要改變,必須在config 中setUserDestinationPrefix
public void cheatTo(ToUserMessage toUserMessage){
//方法用于點對點測試
System.out.println("toUserMessage.getMessage() = " + toUserMessage.getMessage());
System.out.println("toUserMessage.getUserId() = " + toUserMessage.getUserId()); messagingTemplate.convertAndSendToUser(toUserMessage.getUserId(),"/message",toUserMessage.getMessage());
}
}
Vo
package com.drawthink.message;
/**
* Created by lincoln on 16-10-25
*/
public class ClientMessage {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.drawthink.message;
/**
* Created by lincoln on 16-10-25
*/
public class ServerMessage {
private String responseMessage;
public ServerMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
package com.drawthink.message;
/**
* Created by lincoln on 16-10-25
*/
public class ToUserMessage {
private String userId;
private String message;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
Android 客戶端
STOMP協(xié)議在Android系統(tǒng)中沒有默認實現(xiàn),必須自行去實現(xiàn)。不過好消息是,開源大神們已經(jīng)完成了Android上使用STOMP協(xié)議的實現(xiàn),所以我們只需要使用就好了。
地址:StompProtocolAndroid_jb51.rar
搭建
build.gradle(app)
apply plugin: 'com.android.application'
android {
compileSdkVersion 24
buildToolsVersion "24.0.3"
defaultConfig {
applicationId "com.drawthink.websocket"
minSdkVersion 16
targetSdkVersion 24
versionCode 1
versionName "1.0"
testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
}
dependencies {
compile fileTree(include: ['*.jar'], dir: 'libs')
androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
exclude group: 'com.android.support', module: 'support-annotations'
})
compile 'com.android.support:appcompat-v7:24.2.1'
testCompile 'junit:junit:4.12'
//依賴STOMP協(xié)議的Android實現(xiàn)
compile 'com.github.NaikSoftware:StompProtocolAndroid:1.1.1'
//StompProtocolAndroid 依賴于webSocket的標準實現(xiàn)
compile 'org.java-websocket:Java-WebSocket:1.3.0'
}
接收廣播實例:
package com.drawthink.websocket;
import android.content.Intent;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;
import android.widget.Toast;
import org.java_websocket.WebSocket;
import rx.Subscriber;
import rx.functions.Action1;
import ua.naiksoftware.stomp.LifecycleEvent;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.client.StompClient;
import ua.naiksoftware.stomp.client.StompMessage;
import static android.content.ContentValues.TAG;
public class MainActivity extends AppCompatActivity {
private TextView serverMessage;
private Button start;
private Button stop;
private Button send;
private EditText editText;
private StompClient mStompClient;
private Button cheat;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
bindView();
start.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
//創(chuàng)建client 實例
createStompClient();
//訂閱消息
registerStompTopic();
}
});
send.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
mStompClient.send("/app/welcome","{\"name\":\""+editText.getText()+"\"}")
.subscribe(new Subscriber<Void>() {
@Override
public void onCompleted() {
toast("發(fā)送成功");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
toast("發(fā)送錯誤");
}
@Override
public void onNext(Void aVoid) {
}
});
}
});
stop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
mStompClient.disconnect();
}
});
cheat.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startActivity(new Intent(MainActivity.this,CheatActivity.class));
if(mStompClient != null) {
mStompClient.disconnect();
}
finish();
}
});
}
private void showMessage(final StompMessage stompMessage) {
runOnUiThread(new Runnable() {
@Override
public void run() {
serverMessage.setText("stomp command is --->"+stompMessage.getStompCommand() +" body is --->"+stompMessage.getPayload());
}
});
}
//創(chuàng)建client 實例
private void createStompClient() {
mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");
mStompClient.connect();
Toast.makeText(MainActivity.this,"開始連接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();
mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() {
@Override
public void call(LifecycleEvent lifecycleEvent) {
switch (lifecycleEvent.getType()) {
case OPENED:
Log.d(TAG, "Stomp connection opened");
toast("連接已開啟");
break;
case ERROR:
Log.e(TAG, "Stomp Error", lifecycleEvent.getException());
toast("連接出錯");
break;
case CLOSED:
Log.d(TAG, "Stomp connection closed");
toast("連接關(guān)閉");
break;
}
}
});
}
//訂閱消息
private void registerStompTopic() {
mStompClient.topic("/topic/getResponse").subscribe(new Action1<StompMessage>() {
@Override
public void call(StompMessage stompMessage) {
Log.e(TAG, "call: " +stompMessage.getPayload() );
showMessage(stompMessage);
}
});
}
private void toast(final String message) {
runOnUiThread(new Runnable() {
@Override
public void run() {
Toast.makeText(MainActivity.this,message,Toast.LENGTH_SHORT).show();
}
});
}
private void bindView() {
serverMessage = (TextView) findViewById(R.id.serverMessage);
start = (Button) findViewById(R.id.start);
stop = (Button) findViewById(R.id.stop);
send = (Button) findViewById(R.id.send);
editText = (EditText) findViewById(R.id.clientMessage);
cheat = (Button) findViewById(R.id.cheat);
}
}
點對點
package com.drawthink.websocket;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.LinearLayout;
import android.widget.TextView;
import android.widget.Toast;
import org.java_websocket.WebSocket;
import rx.Subscriber;
import rx.functions.Action1;
import ua.naiksoftware.stomp.LifecycleEvent;
import ua.naiksoftware.stomp.Stomp;
import ua.naiksoftware.stomp.client.StompClient;
import ua.naiksoftware.stomp.client.StompMessage;
import static android.content.ContentValues.TAG;
public class CheatActivity extends AppCompatActivity {
private EditText cheat;
private Button send;
private LinearLayout message;
private StompClient mStompClient;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_cheat);
bindView();
createStompClient();
registerStompTopic();
send.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
// 向/app/cheat發(fā)送Json數(shù)據(jù)
mStompClient.send("/app/cheat","{\"userId\":\"lincoln\",\"message\":\""+cheat.getText()+"\"}")
.subscribe(new Subscriber<Void>() {
@Override
public void onCompleted() {
toast("發(fā)送成功");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
toast("發(fā)送錯誤");
}
@Override
public void onNext(Void aVoid) {
}
});
}
});
}
private void bindView() {
cheat = (EditText) findViewById(R.id.cheat);
send = (Button) findViewById(R.id.send);
message = (LinearLayout) findViewById(R.id.message);
}
private void createStompClient() {
mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");
mStompClient.connect();
Toast.makeText(CheatActivity.this,"開始連接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();
mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() {
@Override
public void call(LifecycleEvent lifecycleEvent) {
switch (lifecycleEvent.getType()) {
case OPENED:
Log.d(TAG, "Stomp connection opened");
toast("連接已開啟");
break;
case ERROR:
Log.e(TAG, "Stomp Error", lifecycleEvent.getException());
toast("連接出錯");
break;
case CLOSED:
Log.d(TAG, "Stomp connection closed");
toast("連接關(guān)閉");
break;
}
}
});
}
// 接收/user/xiaoli/message路徑發(fā)布的消息
private void registerStompTopic() {
mStompClient.topic("/user/xiaoli/message").subscribe(new Action1<StompMessage>() {
@Override
public void call(StompMessage stompMessage) {
Log.e(TAG, "call: " +stompMessage.getPayload() );
showMessage(stompMessage);
}
});
}
private void showMessage(final StompMessage stompMessage) {
runOnUiThread(new Runnable() {
@Override
public void run() {
TextView text = new TextView(CheatActivity.this);
text.setLayoutParams(new LinearLayout.LayoutParams(LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.WRAP_CONTENT));
text.setText(System.currentTimeMillis() +" body is --->"+stompMessage.getPayload());
message.addView(text);
}
});
}
private void toast(final String message) {
runOnUiThread(new Runnable() {
@Override
public void run() {
Toast.makeText(CheatActivity.this,message,Toast.LENGTH_SHORT).show();
}
});
}
}
代碼比較亂,說明一下。
1、STOMP 使用的時候,關(guān)鍵是發(fā)布訂閱的關(guān)系,使用過消息隊列,例如rabbitMQ的應(yīng)該很容易理解。
服務(wù)器端 WebSocketConfig.Java文件控制的就是訂閱發(fā)布的路徑關(guān)系。
2、websocket的路徑說明,本例中連接的是ws://192.168.0.46:8080/hello/websocket路徑,/hello是在WebSocketConfig的stompEndpointRegistry.addEndpoint(“/hello”).setAllowedOrigins(““).withSockJS();*確定的, 如果有多個endpoint,這個地方的路徑也會隨之變化。
3、發(fā)布路徑
發(fā)布信息的路徑是由WebSocketConfig中的 setApplicationDestinationPrefixes(“/app/”); 和 Controller 中@MessageMapping(“/welcome”) 組合確定的。
例如發(fā)廣播消息,路徑為/app/welcome
例如發(fā)點對點消息,路徑為/app/cheat
4、消息訂閱路徑
訂閱broker源自WebSocketConfig中的registry.enableSimpleBroker(“/topic”,”/user”);此處開放了兩個broker,具體的訂閱服務(wù)路徑給基于Controller中的 @SendTo(“/topic/getResponse”)或SimpMessagingTemplate中給定。(注:此處,服務(wù)器和客戶端須約定訂閱路徑)
5、關(guān)于心跳
訂閱發(fā)布模型的心跳很簡單,客戶端向一個指定的心跳路徑發(fā)送心跳,服務(wù)器處理,服務(wù)器使用指定的訂閱路徑向客戶端發(fā)心跳,即可。因為沒有Socket,只需要記錄是否聯(lián)通的狀態(tài)即可,重連客戶端做一下就好了。
本人菜鳥,肯定有些地方?jīng)]有搞清楚,如果有誤,請大神斧正。
代碼下載地址:blogRepository_jb51.rar
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
vscode搭建java開發(fā)環(huán)境的實現(xiàn)步驟
本文主要介紹了vscode搭建java開發(fā)環(huán)境,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
SpringCloud網(wǎng)關(guān)(Zuul)如何給多個微服務(wù)之間傳遞共享參數(shù)
這篇文章主要介紹了SpringCloud網(wǎng)關(guān)(Zuul)如何給多個微服務(wù)之間傳遞共享參數(shù),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
mybatis通過TypeHandler?list轉(zhuǎn)換string類型轉(zhuǎn)換方式
這篇文章主要介紹了mybatis通過TypeHandler?list轉(zhuǎn)換string類型轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
使用JAVA實現(xiàn)高并發(fā)無鎖數(shù)據(jù)庫操作步驟分享
一個在線2k的游戲,每秒鐘并發(fā)都嚇死人。傳統(tǒng)的hibernate直接插庫基本上是不可行的。我就一步步推導(dǎo)出一個無鎖的數(shù)據(jù)庫操作,詳情看下文2013-11-11

