詳解spring集成mina實現(xiàn)服務端主動推送(包含心跳檢測)
本文介紹了spring集成mina實現(xiàn)服務端主動推送(包含心跳檢測),分享給大家,具體如下:
服務端
1.常規(guī)的spring工程集成mina時,pom.xml中需要加入如下配置:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-beans</artifactId>
<version>2.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.13</version>
<type>bundle</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-spring</artifactId>
<version>1.1.7</version>
</dependency>
注意此處mina-core的配置寫法。如果工程中引入上述依賴之后報錯:Missing artifact xxx bundle,則需要在pom.xml的plugins之間加入如下插件配置:
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
2.Filter1:編解碼器,實現(xiàn)ProtocolCodecFactory解碼工廠
package com.he.server;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineDecoder;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
public class MyCodeFactory implements ProtocolCodecFactory {
private final TextLineEncoder encoder;
private final TextLineDecoder decoder;
public MyCodeFactory() {
this(Charset.forName("utf-8"));
}
public MyCodeFactory(Charset charset) {
encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
}
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return decoder;
}
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return encoder;
}
public int getEncoderMaxLineLength() {
return encoder.getMaxLineLength();
}
public void setEncoderMaxLineLength(int maxLineLength) {
encoder.setMaxLineLength(maxLineLength);
}
public int getDecoderMaxLineLength() {
return decoder.getMaxLineLength();
}
public void setDecoderMaxLineLength(int maxLineLength) {
decoder.setMaxLineLength(maxLineLength);
}
}
此處使用了mina自帶的TextLineEncoder編解碼器,此解碼器支持使用固定長度或者固定分隔符來區(qū)分上下兩條消息。如果要使用自定義協(xié)議,則需要自己編寫解碼器。要使用websocket,也需要重新編寫解碼器,關于mina結(jié)合websocket,jira上有一個開源項目https://issues.apache.org/jira/browse/DIRMINA-907,專門為mina編寫了支持websocket的編解碼器,親測可用。。。此部分不是本文重點,略。
3.Filter2:心跳工廠,加入心跳檢測功能需要實現(xiàn)KeepAliveMessageFactory:
package com.he.server;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{
private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class);
/** 心跳包內(nèi)容 */
private static final String HEARTBEATREQUEST = "1111";
private static final String HEARTBEATRESPONSE = "1112";
public Object getRequest(IoSession session) {
LOG.warn("請求預設信息: " + HEARTBEATREQUEST);
return HEARTBEATREQUEST;
}
public Object getResponse(IoSession session, Object request) {
LOG.warn("響應預設信息: " + HEARTBEATRESPONSE);
/** 返回預設語句 */
return HEARTBEATRESPONSE;
}
public boolean isRequest(IoSession session, Object message) {
LOG.warn("請求心跳包信息: " + message);
if (message.equals(HEARTBEATREQUEST))
return true;
return false;
}
public boolean isResponse(IoSession session, Object message) {
LOG.warn("響應心跳包信息: " + message);
if(message.equals(HEARTBEATRESPONSE))
return true;
return false;
}
}
此處我設置服務端發(fā)送的心跳包是1111,客戶端應該返回1112.
4.實現(xiàn)必不可少的IoHandlerAdapter,得到監(jiān)聽事件處理權:
package com.he.server;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class MyHandler extends IoHandlerAdapter {
//private final int IDLE = 3000;//(單位s)
private final Logger LOG = Logger.getLogger(MyHandler.class);
// public static Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());
public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
session.closeOnFlush();
LOG.warn("session occured exception, so close it." + cause.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
LOG.warn("客戶端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "連接成功!");
session.setAttribute("type", message);
String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();
session.setAttribute("ip", remoteAddress);
LOG.warn("服務器收到的消息是:" + str);
session.write("welcome by he");
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
LOG.warn("messageSent:" + message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
LOG.warn("remote client [" + session.getRemoteAddress().toString() + "] connected.");
// my
Long time = System.currentTimeMillis();
session.setAttribute("id", time);
sessionsConcurrentHashMap.put(time, session);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
LOG.warn("sessionClosed.");
session.closeOnFlush();
// my
sessionsConcurrentHashMap.remove(session.getAttribute("id"));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
LOG.warn("session idle, so disconnecting......");
session.closeOnFlush();
LOG.warn("disconnected.");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
LOG.warn("sessionOpened.");
//
//session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDLE);
}
}
此處有幾點說明:
第一點:網(wǎng)上教程會在此處(sessionOpened()方法中)設置IDLE,IDLE表示session經(jīng)過多久判定為空閑的時間,單位s,上述代碼中已經(jīng)注釋掉了,因為后面在spring配置中加入心跳檢測部分時會進行IDLE的配置,已經(jīng)不需要在此處進行配置了,而且如果在心跳配置部分和此處都對BOTH_IDLE模式設置了空閑時間,親測發(fā)現(xiàn)此處配置不生效。
第二點:關于存放session的容器,建議使用
public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();
而不是用已經(jīng)注釋掉的Collections.synchronizedSet類型的set或者map,至于原因,java5中新增了ConcurrentMap接口和它的一個實現(xiàn)類ConcurrentHashMap,可以保證線程的足夠安全。詳細的知識你應該搜索SynchronizedMap和ConcurrentHashMap的區(qū)別,學習更加多的并發(fā)安全知識。
上述代碼中,每次在收到客戶端的消息時,我會返回一段文本:welcome by he。
有了map,主動推送就不是問題了,想推給誰,在map中找到誰就可以了。
5.完成spring的配置工作
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress"
value="org.apache.mina.integration.beans.InetSocketAddressEditor">
</entry>
</map>
</property>
</bean>
<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"
init-method="bind" destroy-method="unbind">
<!--端口號-->
<property name="defaultLocalAddress" value=":8888" />
<!--綁定自己實現(xiàn)的handler-->
<property name="handler" ref="serverHandler" />
<!--聲明過濾器的集合-->
<property name="filterChainBuilder" ref="filterChainBuilder" />
<property name="reuseAddress" value="true" />
</bean>
<bean id="filterChainBuilder"
class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
<property name="filters">
<map>
<!--mina自帶的線程池filter-->
<entry key="executor" value-ref="executorFilter" />
<entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" />
<!--自己實現(xiàn)的編解碼器filter-->
<entry key="codecFilter" value-ref="codecFilter" />
<!--日志的filter-->
<entry key="loggingFilter" value-ref="loggingFilter" />
<!--心跳filter-->
<entry key="keepAliveFilter" value-ref="keepAliveFilter" />
</map>
</property>
</bean>
<!-- executorFilter多線程處理 -->
<bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" />
<bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter">
<constructor-arg value="remoteAddress" />
</bean>
<!--日志-->
<bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" />
<!--編解碼-->
<bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
<constructor-arg>
<!--構造函數(shù)的參數(shù)傳入自己實現(xiàn)的對象-->
<bean class="com.he.server.MyCodeFactory"></bean>
</constructor-arg>
</bean>
<!--心跳檢測filter-->
<bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter">
<!--構造函數(shù)的第一個參數(shù)傳入自己實現(xiàn)的工廠-->
<constructor-arg>
<bean class="com.he.server.MyKeepAliveMessageFactory"></bean>
</constructor-arg>
<!--第二個參數(shù)需要的是IdleStatus對象,value值設置為讀寫空閑-->
<constructor-arg type = "org.apache.mina.core.session.IdleStatus" value="BOTH_IDLE" >
</constructor-arg>
<!--心跳頻率,不設置則默認60s -->
<property name="requestInterval" value="5" />
<!--心跳超時時間,不設置則默認30s -->
<property name="requestTimeout" value="10" />
<!--不設置默認false-->
<property name="forwardEvent" value="true" />
</bean>
<!--自己實現(xiàn)的handler-->
<bean id="serverHandler" class="com.he.server.MyHandler" />
</beans>
好了,xml中已經(jīng)寫了足夠多的注釋了。說明一下關于心跳檢測中的最后一個屬性:forwardEvent,默認false,比如在心跳頻率為5s時,實際上每5s會觸發(fā)一次KeepAliveFilter中的session_idle事件,該事件中開始發(fā)送心跳包。當此參數(shù)設置為false時,對于session_idle事件不再傳遞給其他filter,如果設置為true,則會傳遞給其他filter,例如handler中的session_idle事件,此時也會被觸發(fā)。另外IdleStatus一共有三個值,點擊進源碼就能看到。
6.寫main方法啟動服務端
package com.he.server;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MainTest {
public static void main(String[] args) {
ClassPathXmlApplicationContext ct = new ClassPathXmlApplicationContext("applicationContext.xml");
}
}
run之后,端口就已經(jīng)開始監(jiān)聽了。此處,如果是web工程,使用tomcat之類的容器,只要在web.xml中配置了
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/classes/applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
則容器在啟動時就會加載spring的配置文件,端口的監(jiān)聽就開始了,這樣就不需要main方法來啟動。
客戶端,本文采用兩種方式來實現(xiàn)客戶端
方式一:用mina結(jié)構來實現(xiàn)客戶端,引入mina相關jar包即可,Android也可以使用
1.先實現(xiàn)IoHandlerAdater得到監(jiān)聽事件,類似于服務端:
package com.he.client.minaclient;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ClientHandler extends IoHandlerAdapter{
private final Logger LOG = Logger.getLogger(ClientHandler.class);
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
LOG.warn("客戶端收到消息:" + message);
if (message.toString().equals("1111")) {
//收到心跳包
LOG.warn("收到心跳包");
session.write("1112");
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
super.messageSent(session, message);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionClosed(session);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionCreated(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// TODO Auto-generated method stub
super.sessionIdle(session, status);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionOpened(session);
}
}
2.寫main方法啟動客戶端,連接服務端:
package com.he.client.minaclient;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class ClientTest {
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建客戶端連接器.
NioSocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); //設置編碼過濾器
connector.setHandler(new ClientHandler());//設置事件處理器
ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 8888));//建立連接
cf.awaitUninterruptibly();//等待連接創(chuàng)建完成
cf.getSession().write("hello,測試!");//發(fā)送消息,中英文都有
//cf.getSession().closeOnFlush();
//cf.getSession().getCloseFuture().awaitUninterruptibly();//等待連接斷開
//connector.dispose();
}
}
過程也是一樣的,加各種filter,綁定handler。上述代碼運行之后向服務器發(fā)送了:“hello,測試”,并且收到返回值:welcome by he。然后每隔5s,就會收到服務端的心跳包:1111。在handler的messageReceived中,確認收到心跳包之后返回1112,實現(xiàn)心跳應答。以上過程,每5s重復一次。
main方法中最后三行注釋掉的代碼如果打開,客戶端在發(fā)送完消息之后會主動斷開。
方式二:客戶端不借助于mina,換用java的普通socket來實現(xiàn),這樣就可以換成其他任何語言:
package com.he.client;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
/**
*@function:java的簡單socket連接,長連接,嘗試連續(xù)從服務器獲取消息
*@parameter:
*@return:
*@date:2016-6-22 下午03:43:18
*@author:he
*@notice:
*/
public class SocketTestTwo {
public static final String IP_ADDR = "127.0.0.1";// 服務器地址
public static final int PORT = 8888;// 服務器端口號
static String text = null;
public static void main(String[] args) throws IOException {
System.out.println("客戶端啟動...");
Socket socket = null;
socket = new Socket(IP_ADDR, PORT);
PrintWriter os = new PrintWriter(socket.getOutputStream());
os.println("al");
os.println("two");
os.flush();
while (true) {
try {
// 創(chuàng)建一個流套接字并將其連接到指定主機上的指定端口號
DataInputStream input = new DataInputStream(socket.getInputStream());
// 讀取服務器端數(shù)據(jù)
byte[] buffer;
buffer = new byte[input.available()];
if (buffer.length != 0) {
System.out.println("length=" + buffer.length);
// 讀取緩沖區(qū)
input.read(buffer);
// 轉(zhuǎn)換字符串
String three = new String(buffer);
System.out.println("內(nèi)容=" + three);
if (three.equals("1111\n")) {
System.out.println("發(fā)送返回心跳包");
os = new PrintWriter(socket.getOutputStream());
os.println("1112");
os.flush();
}
}
} catch (Exception e) {
System.out.println("客戶端異常:" + e.getMessage());
os.close();
}
}
}
}
以上代碼運行效果和前一種方式完全一樣。
但是注意此種方法和使用mina結(jié)構的客戶端中有一處不同:對于心跳包的判斷。本教程中服務端選用了mina自帶的編解碼器,通過換行符來區(qū)分上下兩條消息,也就是每一條消息后面會帶上一個換行符,所以在使用java普通的socket來連接時,判斷心跳包不再是判斷是否為“1111”,而是“1111\n”。對比mina結(jié)構的客戶端中并不需要加上換行符是因為客戶端中綁定了相同的編解碼器。
程序運行結(jié)果截圖:
服務端:
客戶端:

紅色的打印是mina自帶的打印信息,黑色的是本工程中使用的log4j打印,所以你們的工程應該配置有如下log4j的配置文件才能看到一樣的打印結(jié)果:
log4j.rootLogger=WARN,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.Threshold=WARN
log4j.appender.stdout.layout.ConversionPattern = [%-5p] [%d{yyyy-MM-dd HH\:mm\:ss,SSS}] [%x] %c %l - %m%n
應大家需求,工程代碼終于抽空放到github了! https://github.com/smile326/minaSpring
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
使用 Spring Boot 內(nèi)嵌容器 Undertow創(chuàng)建服務器的方法
Undertow是一個非常輕量并高性能的web server,它來自 JBoss。支持blocking和non-blocking兩種NIO API。接下來通過本文給大家介紹使用Spring Boot 內(nèi)嵌容器 Undertow創(chuàng)建服務器的方法,感興趣的朋友一起看看吧2017-11-11
BeanUtils.copyProperties復制對象結(jié)果為空的原因分析
這篇文章主要介紹了BeanUtils.copyProperties復制對象結(jié)果為空的原因分析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06
SpringBoot中實現(xiàn)@Scheduled動態(tài)定時任務
SpringBoot中的@Scheduled注解為定時任務提供了一種很簡單的實現(xiàn),本文主要介紹了SpringBoot中實現(xiàn)@Scheduled動態(tài)定時任務,具有一定的參考價值,感興趣的可以了解一下2024-01-01

