詳解spring集成mina實(shí)現(xiàn)服務(wù)端主動(dòng)推送(包含心跳檢測(cè))
本文介紹了spring集成mina實(shí)現(xiàn)服務(wù)端主動(dòng)推送(包含心跳檢測(cè)),分享給大家,具體如下:
服務(wù)端
1.常規(guī)的spring工程集成mina時(shí),pom.xml中需要加入如下配置:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-beans</artifactId>
<version>2.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.13</version>
<type>bundle</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-spring</artifactId>
<version>1.1.7</version>
</dependency>
注意此處mina-core的配置寫法。如果工程中引入上述依賴之后報(bào)錯(cuò):Missing artifact xxx bundle,則需要在pom.xml的plugins之間加入如下插件配置:
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
2.Filter1:編解碼器,實(shí)現(xiàn)ProtocolCodecFactory解碼工廠
package com.he.server;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineDecoder;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
public class MyCodeFactory implements ProtocolCodecFactory {
private final TextLineEncoder encoder;
private final TextLineDecoder decoder;
public MyCodeFactory() {
this(Charset.forName("utf-8"));
}
public MyCodeFactory(Charset charset) {
encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
}
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return decoder;
}
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return encoder;
}
public int getEncoderMaxLineLength() {
return encoder.getMaxLineLength();
}
public void setEncoderMaxLineLength(int maxLineLength) {
encoder.setMaxLineLength(maxLineLength);
}
public int getDecoderMaxLineLength() {
return decoder.getMaxLineLength();
}
public void setDecoderMaxLineLength(int maxLineLength) {
decoder.setMaxLineLength(maxLineLength);
}
}
此處使用了mina自帶的TextLineEncoder編解碼器,此解碼器支持使用固定長(zhǎng)度或者固定分隔符來區(qū)分上下兩條消息。如果要使用自定義協(xié)議,則需要自己編寫解碼器。要使用websocket,也需要重新編寫解碼器,關(guān)于mina結(jié)合websocket,jira上有一個(gè)開源項(xiàng)目https://issues.apache.org/jira/browse/DIRMINA-907,專門為mina編寫了支持websocket的編解碼器,親測(cè)可用。。。此部分不是本文重點(diǎn),略。
3.Filter2:心跳工廠,加入心跳檢測(cè)功能需要實(shí)現(xiàn)KeepAliveMessageFactory:
package com.he.server;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{
private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class);
/** 心跳包內(nèi)容 */
private static final String HEARTBEATREQUEST = "1111";
private static final String HEARTBEATRESPONSE = "1112";
public Object getRequest(IoSession session) {
LOG.warn("請(qǐng)求預(yù)設(shè)信息: " + HEARTBEATREQUEST);
return HEARTBEATREQUEST;
}
public Object getResponse(IoSession session, Object request) {
LOG.warn("響應(yīng)預(yù)設(shè)信息: " + HEARTBEATRESPONSE);
/** 返回預(yù)設(shè)語句 */
return HEARTBEATRESPONSE;
}
public boolean isRequest(IoSession session, Object message) {
LOG.warn("請(qǐng)求心跳包信息: " + message);
if (message.equals(HEARTBEATREQUEST))
return true;
return false;
}
public boolean isResponse(IoSession session, Object message) {
LOG.warn("響應(yīng)心跳包信息: " + message);
if(message.equals(HEARTBEATRESPONSE))
return true;
return false;
}
}
此處我設(shè)置服務(wù)端發(fā)送的心跳包是1111,客戶端應(yīng)該返回1112.
4.實(shí)現(xiàn)必不可少的IoHandlerAdapter,得到監(jiān)聽事件處理權(quán):
package com.he.server;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class MyHandler extends IoHandlerAdapter {
//private final int IDLE = 3000;//(單位s)
private final Logger LOG = Logger.getLogger(MyHandler.class);
// public static Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());
public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
session.closeOnFlush();
LOG.warn("session occured exception, so close it." + cause.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
LOG.warn("客戶端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "連接成功!");
session.setAttribute("type", message);
String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress();
session.setAttribute("ip", remoteAddress);
LOG.warn("服務(wù)器收到的消息是:" + str);
session.write("welcome by he");
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
LOG.warn("messageSent:" + message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
LOG.warn("remote client [" + session.getRemoteAddress().toString() + "] connected.");
// my
Long time = System.currentTimeMillis();
session.setAttribute("id", time);
sessionsConcurrentHashMap.put(time, session);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
LOG.warn("sessionClosed.");
session.closeOnFlush();
// my
sessionsConcurrentHashMap.remove(session.getAttribute("id"));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
LOG.warn("session idle, so disconnecting......");
session.closeOnFlush();
LOG.warn("disconnected.");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
LOG.warn("sessionOpened.");
//
//session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDLE);
}
}
此處有幾點(diǎn)說明:
第一點(diǎn):網(wǎng)上教程會(huì)在此處(sessionOpened()方法中)設(shè)置IDLE,IDLE表示session經(jīng)過多久判定為空閑的時(shí)間,單位s,上述代碼中已經(jīng)注釋掉了,因?yàn)楹竺嬖趕pring配置中加入心跳檢測(cè)部分時(shí)會(huì)進(jìn)行IDLE的配置,已經(jīng)不需要在此處進(jìn)行配置了,而且如果在心跳配置部分和此處都對(duì)BOTH_IDLE模式設(shè)置了空閑時(shí)間,親測(cè)發(fā)現(xiàn)此處配置不生效。
第二點(diǎn):關(guān)于存放session的容器,建議使用
public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>();
而不是用已經(jīng)注釋掉的Collections.synchronizedSet類型的set或者map,至于原因,java5中新增了ConcurrentMap接口和它的一個(gè)實(shí)現(xiàn)類ConcurrentHashMap,可以保證線程的足夠安全。詳細(xì)的知識(shí)你應(yīng)該搜索SynchronizedMap和ConcurrentHashMap的區(qū)別,學(xué)習(xí)更加多的并發(fā)安全知識(shí)。
上述代碼中,每次在收到客戶端的消息時(shí),我會(huì)返回一段文本:welcome by he。
有了map,主動(dòng)推送就不是問題了,想推給誰,在map中找到誰就可以了。
5.完成spring的配置工作
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress"
value="org.apache.mina.integration.beans.InetSocketAddressEditor">
</entry>
</map>
</property>
</bean>
<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"
init-method="bind" destroy-method="unbind">
<!--端口號(hào)-->
<property name="defaultLocalAddress" value=":8888" />
<!--綁定自己實(shí)現(xiàn)的handler-->
<property name="handler" ref="serverHandler" />
<!--聲明過濾器的集合-->
<property name="filterChainBuilder" ref="filterChainBuilder" />
<property name="reuseAddress" value="true" />
</bean>
<bean id="filterChainBuilder"
class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
<property name="filters">
<map>
<!--mina自帶的線程池filter-->
<entry key="executor" value-ref="executorFilter" />
<entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" />
<!--自己實(shí)現(xiàn)的編解碼器filter-->
<entry key="codecFilter" value-ref="codecFilter" />
<!--日志的filter-->
<entry key="loggingFilter" value-ref="loggingFilter" />
<!--心跳filter-->
<entry key="keepAliveFilter" value-ref="keepAliveFilter" />
</map>
</property>
</bean>
<!-- executorFilter多線程處理 -->
<bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" />
<bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter">
<constructor-arg value="remoteAddress" />
</bean>
<!--日志-->
<bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" />
<!--編解碼-->
<bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
<constructor-arg>
<!--構(gòu)造函數(shù)的參數(shù)傳入自己實(shí)現(xiàn)的對(duì)象-->
<bean class="com.he.server.MyCodeFactory"></bean>
</constructor-arg>
</bean>
<!--心跳檢測(cè)filter-->
<bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter">
<!--構(gòu)造函數(shù)的第一個(gè)參數(shù)傳入自己實(shí)現(xiàn)的工廠-->
<constructor-arg>
<bean class="com.he.server.MyKeepAliveMessageFactory"></bean>
</constructor-arg>
<!--第二個(gè)參數(shù)需要的是IdleStatus對(duì)象,value值設(shè)置為讀寫空閑-->
<constructor-arg type = "org.apache.mina.core.session.IdleStatus" value="BOTH_IDLE" >
</constructor-arg>
<!--心跳頻率,不設(shè)置則默認(rèn)60s -->
<property name="requestInterval" value="5" />
<!--心跳超時(shí)時(shí)間,不設(shè)置則默認(rèn)30s -->
<property name="requestTimeout" value="10" />
<!--不設(shè)置默認(rèn)false-->
<property name="forwardEvent" value="true" />
</bean>
<!--自己實(shí)現(xiàn)的handler-->
<bean id="serverHandler" class="com.he.server.MyHandler" />
</beans>
好了,xml中已經(jīng)寫了足夠多的注釋了。說明一下關(guān)于心跳檢測(cè)中的最后一個(gè)屬性:forwardEvent,默認(rèn)false,比如在心跳頻率為5s時(shí),實(shí)際上每5s會(huì)觸發(fā)一次KeepAliveFilter中的session_idle事件,該事件中開始發(fā)送心跳包。當(dāng)此參數(shù)設(shè)置為false時(shí),對(duì)于session_idle事件不再傳遞給其他filter,如果設(shè)置為true,則會(huì)傳遞給其他filter,例如handler中的session_idle事件,此時(shí)也會(huì)被觸發(fā)。另外IdleStatus一共有三個(gè)值,點(diǎn)擊進(jìn)源碼就能看到。
6.寫main方法啟動(dòng)服務(wù)端
package com.he.server;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MainTest {
public static void main(String[] args) {
ClassPathXmlApplicationContext ct = new ClassPathXmlApplicationContext("applicationContext.xml");
}
}
run之后,端口就已經(jīng)開始監(jiān)聽了。此處,如果是web工程,使用tomcat之類的容器,只要在web.xml中配置了
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/classes/applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
則容器在啟動(dòng)時(shí)就會(huì)加載spring的配置文件,端口的監(jiān)聽就開始了,這樣就不需要main方法來啟動(dòng)。
客戶端,本文采用兩種方式來實(shí)現(xiàn)客戶端
方式一:用mina結(jié)構(gòu)來實(shí)現(xiàn)客戶端,引入mina相關(guān)jar包即可,Android也可以使用
1.先實(shí)現(xiàn)IoHandlerAdater得到監(jiān)聽事件,類似于服務(wù)端:
package com.he.client.minaclient;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ClientHandler extends IoHandlerAdapter{
private final Logger LOG = Logger.getLogger(ClientHandler.class);
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
LOG.warn("客戶端收到消息:" + message);
if (message.toString().equals("1111")) {
//收到心跳包
LOG.warn("收到心跳包");
session.write("1112");
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
super.messageSent(session, message);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionClosed(session);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionCreated(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// TODO Auto-generated method stub
super.sessionIdle(session, status);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
super.sessionOpened(session);
}
}
2.寫main方法啟動(dòng)客戶端,連接服務(wù)端:
package com.he.client.minaclient;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class ClientTest {
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建客戶端連接器.
NioSocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); //設(shè)置編碼過濾器
connector.setHandler(new ClientHandler());//設(shè)置事件處理器
ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 8888));//建立連接
cf.awaitUninterruptibly();//等待連接創(chuàng)建完成
cf.getSession().write("hello,測(cè)試!");//發(fā)送消息,中英文都有
//cf.getSession().closeOnFlush();
//cf.getSession().getCloseFuture().awaitUninterruptibly();//等待連接斷開
//connector.dispose();
}
}
過程也是一樣的,加各種filter,綁定handler。上述代碼運(yùn)行之后向服務(wù)器發(fā)送了:“hello,測(cè)試”,并且收到返回值:welcome by he。然后每隔5s,就會(huì)收到服務(wù)端的心跳包:1111。在handler的messageReceived中,確認(rèn)收到心跳包之后返回1112,實(shí)現(xiàn)心跳應(yīng)答。以上過程,每5s重復(fù)一次。
main方法中最后三行注釋掉的代碼如果打開,客戶端在發(fā)送完消息之后會(huì)主動(dòng)斷開。
方式二:客戶端不借助于mina,換用java的普通socket來實(shí)現(xiàn),這樣就可以換成其他任何語言:
package com.he.client;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
/**
*@function:java的簡(jiǎn)單socket連接,長(zhǎng)連接,嘗試連續(xù)從服務(wù)器獲取消息
*@parameter:
*@return:
*@date:2016-6-22 下午03:43:18
*@author:he
*@notice:
*/
public class SocketTestTwo {
public static final String IP_ADDR = "127.0.0.1";// 服務(wù)器地址
public static final int PORT = 8888;// 服務(wù)器端口號(hào)
static String text = null;
public static void main(String[] args) throws IOException {
System.out.println("客戶端啟動(dòng)...");
Socket socket = null;
socket = new Socket(IP_ADDR, PORT);
PrintWriter os = new PrintWriter(socket.getOutputStream());
os.println("al");
os.println("two");
os.flush();
while (true) {
try {
// 創(chuàng)建一個(gè)流套接字并將其連接到指定主機(jī)上的指定端口號(hào)
DataInputStream input = new DataInputStream(socket.getInputStream());
// 讀取服務(wù)器端數(shù)據(jù)
byte[] buffer;
buffer = new byte[input.available()];
if (buffer.length != 0) {
System.out.println("length=" + buffer.length);
// 讀取緩沖區(qū)
input.read(buffer);
// 轉(zhuǎn)換字符串
String three = new String(buffer);
System.out.println("內(nèi)容=" + three);
if (three.equals("1111\n")) {
System.out.println("發(fā)送返回心跳包");
os = new PrintWriter(socket.getOutputStream());
os.println("1112");
os.flush();
}
}
} catch (Exception e) {
System.out.println("客戶端異常:" + e.getMessage());
os.close();
}
}
}
}
以上代碼運(yùn)行效果和前一種方式完全一樣。
但是注意此種方法和使用mina結(jié)構(gòu)的客戶端中有一處不同:對(duì)于心跳包的判斷。本教程中服務(wù)端選用了mina自帶的編解碼器,通過換行符來區(qū)分上下兩條消息,也就是每一條消息后面會(huì)帶上一個(gè)換行符,所以在使用java普通的socket來連接時(shí),判斷心跳包不再是判斷是否為“1111”,而是“1111\n”。對(duì)比mina結(jié)構(gòu)的客戶端中并不需要加上換行符是因?yàn)榭蛻舳酥薪壎讼嗤木幗獯a器。
程序運(yùn)行結(jié)果截圖:
服務(wù)端:
客戶端:

紅色的打印是mina自帶的打印信息,黑色的是本工程中使用的log4j打印,所以你們的工程應(yīng)該配置有如下log4j的配置文件才能看到一樣的打印結(jié)果:
log4j.rootLogger=WARN,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.Threshold=WARN
log4j.appender.stdout.layout.ConversionPattern = [%-5p] [%d{yyyy-MM-dd HH\:mm\:ss,SSS}] [%x] %c %l - %m%n
應(yīng)大家需求,工程代碼終于抽空放到github了! https://github.com/smile326/minaSpring
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
使用idea遠(yuǎn)程調(diào)試jar包的配置過程
這篇文章主要介紹了使用idea遠(yuǎn)程調(diào)試jar包的配置過程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09
使用 Spring Boot 內(nèi)嵌容器 Undertow創(chuàng)建服務(wù)器的方法
Undertow是一個(gè)非常輕量并高性能的web server,它來自 JBoss。支持blocking和non-blocking兩種NIO API。接下來通過本文給大家介紹使用Spring Boot 內(nèi)嵌容器 Undertow創(chuàng)建服務(wù)器的方法,感興趣的朋友一起看看吧2017-11-11
詳解Springboot+React項(xiàng)目跨域訪問問題
這篇文章主要介紹了詳解Springboot+React項(xiàng)目跨域訪問問題,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-11-11
BeanUtils.copyProperties復(fù)制對(duì)象結(jié)果為空的原因分析
這篇文章主要介紹了BeanUtils.copyProperties復(fù)制對(duì)象結(jié)果為空的原因分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
SpringBoot中實(shí)現(xiàn)@Scheduled動(dòng)態(tài)定時(shí)任務(wù)
SpringBoot中的@Scheduled注解為定時(shí)任務(wù)提供了一種很簡(jiǎn)單的實(shí)現(xiàn),本文主要介紹了SpringBoot中實(shí)現(xiàn)@Scheduled動(dòng)態(tài)定時(shí)任務(wù),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01

