SpringAOP+RabbitMQ+WebSocket實(shí)戰(zhàn)詳解
背景
最近公司的客戶要求,分配給員工的任務(wù)除了有微信通知外,還希望PC端的網(wǎng)頁也能實(shí)時(shí)收到通知。管理員分配任務(wù)是在我們的系統(tǒng)A,而員工接受任務(wù)是在系統(tǒng)B。兩個(gè)系統(tǒng)都是現(xiàn)在已投入使用的系統(tǒng)。
技術(shù)選型
根據(jù)需求我們最終選用SpringAOP+RabbitMQ+WebSocket。
SpringAOP可以讓我們不修改原有代碼,直接將原有service作為切點(diǎn),加入切面。RabbitMQ可以讓A系統(tǒng)和B系統(tǒng)解耦。WebSocket則可以達(dá)到實(shí)時(shí)通知的要求。
SpringAOP
AOP稱為面向切面編程,在程序開發(fā)中主要用來解決一些系統(tǒng)層面上的問題,比如日志,事務(wù),權(quán)限等待。是Spring的核心模塊,底層是通過動(dòng)態(tài)代理來實(shí)現(xiàn)(動(dòng)態(tài)代理將在之后的文章重點(diǎn)介紹)。
基本概念
Aspect(切面):通常是一個(gè)類,里面可以定義切入點(diǎn)和通知。
JointPoint(連接點(diǎn)):程序執(zhí)行過程中明確的點(diǎn),一般是方法的調(diào)用。
Advice(通知):AOP在特定的切入點(diǎn)上執(zhí)行的增強(qiáng)處理,有before,after,afterReturning,afterThrowing,around。
Pointcut(切入點(diǎn)):就是帶有通知的連接點(diǎn),在程序中主要體現(xiàn)為書寫切入點(diǎn)表達(dá)式。
通知類型
Before:在目標(biāo)方法被調(diào)用之前做增強(qiáng)處理。
@Before只需要指定切入點(diǎn)表達(dá)式即可
AfterReturning:在目標(biāo)方法正常完成后做增強(qiáng)。
@AfterReturning除了指定切入點(diǎn)表達(dá)式后,還可以指定一個(gè)返回值形參名returning,代表目標(biāo)方法的返回值
AfterThrowing:主要用來處理程序中未處理的異常。
@AfterThrowing除了指定切入點(diǎn)表達(dá)式后,還可以指定一個(gè)throwing的返回值形參名,可以通過該形參名
來訪問目標(biāo)方法中所拋出的異常對(duì)象
After:在目標(biāo)方法完成之后做增強(qiáng),無論目標(biāo)方法時(shí)候成功完成。
@After可以指定一個(gè)切入點(diǎn)表達(dá)式
Around:環(huán)繞通知,在目標(biāo)方法完成前后做增強(qiáng)處理,環(huán)繞通知是最重要的通知類型,像事務(wù),日志等都是環(huán)繞通知,注意編程中核心是一個(gè)ProceedingJoinPoint。
RabbitMQ
從圖中我們可以看到RabbitMQ主要的結(jié)構(gòu)有:Routing、Binding、Exchange、Queue。
Queue
Queue(隊(duì)列)RabbitMQ的作用是存儲(chǔ)消息,隊(duì)列的特性是先進(jìn)先出。
Exchange
生產(chǎn)者產(chǎn)生的消息并不是直接發(fā)送給消息隊(duì)列Queue的,而是要經(jīng)過Exchange(交換器),由Exchange再將消息路由到一個(gè)或多個(gè)Queue,還會(huì)將不符合路由規(guī)則的消息丟棄。
Routing
用于標(biāo)記或生產(chǎn)者尋找Exchange。
Binding
用于Exchange和Queue做關(guān)聯(lián)。
Exchange Type fanout
fanout類型的Exchange路由規(guī)則非常簡單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。
direct
direct會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。
topic
direct規(guī)則是嚴(yán)格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時(shí)候才將消息傳送給Queue,那么topic這個(gè)規(guī)則就是模糊匹配,可以通過通配符滿足一部分規(guī)則就可以傳送。
headers
headers類型的Exchange不依賴于routing key與binding key的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。
WebSocket
了解websocket必須先知道幾個(gè)常用的web通信技術(shù)及其區(qū)別。
短輪詢
短輪詢的基本思路就是瀏覽器每隔一段時(shí)間向?yàn)g覽器發(fā)送http請(qǐng)求,服務(wù)器端在收到請(qǐng)求后,不論是否有數(shù)據(jù)更新,都直接進(jìn)行響應(yīng)。這種方式實(shí)現(xiàn)的即時(shí)通信,本質(zhì)上還是瀏覽器發(fā)送請(qǐng)求,服務(wù)器接受請(qǐng)求的一個(gè)過程,通過讓客戶端不斷的進(jìn)行請(qǐng)求,使得客戶端能夠模擬實(shí)時(shí)地收到服務(wù)器端的數(shù)據(jù)的變化。
這種方式的優(yōu)點(diǎn)是比較簡單,易于理解,實(shí)現(xiàn)起來也沒有什么技術(shù)難點(diǎn)。缺點(diǎn)是顯而易見的,這種方式由于需要不斷的建立http連接,嚴(yán)重浪費(fèi)了服務(wù)器端和客戶端的資源。尤其是在客戶端,距離來說,如果有數(shù)量級(jí)想對(duì)比較大的人同時(shí)位于基于短輪詢的應(yīng)用中,那么每一個(gè)用戶的客戶端都會(huì)瘋狂的向服務(wù)器端發(fā)送http請(qǐng)求,而且不會(huì)間斷。人數(shù)越多,服務(wù)器端壓力越大,這是很不合理的。
因此短輪詢不適用于那些同時(shí)在線用戶數(shù)量比較大,并且很注重性能的Web應(yīng)用。
長輪詢/ comet
comet指的是,當(dāng)服務(wù)器收到客戶端發(fā)來的請(qǐng)求后,不會(huì)直接進(jìn)行響應(yīng),而是先將這個(gè)請(qǐng)求掛起,然后判斷服務(wù)器端數(shù)據(jù)是否有更新。如果有更新,則進(jìn)行響應(yīng),如果一直沒有數(shù)據(jù),則到達(dá)一定的時(shí)間限制(服務(wù)器端設(shè)置)后關(guān)閉連接。
長輪詢和短輪詢比起來,明顯減少了很多不必要的http請(qǐng)求次數(shù),相比之下節(jié)約了資源。長輪詢的缺點(diǎn)在于,連接掛起也會(huì)導(dǎo)致資源的浪費(fèi)。
SSE
SSE是HTML5新增的功能,全稱為Server-Sent Events。它可以允許服務(wù)推送數(shù)據(jù)到客戶端。SSE在本質(zhì)上就與之前的長輪詢、短輪詢不同,雖然都是基于http協(xié)議的,但是輪詢需要客戶端先發(fā)送請(qǐng)求。而SSE最大的特點(diǎn)就是不需要客戶端發(fā)送請(qǐng)求,可以實(shí)現(xiàn)只要服務(wù)器端數(shù)據(jù)有更新,就可以馬上發(fā)送到客戶端。
SSE的優(yōu)勢(shì)很明顯,它不需要建立或保持大量的客戶端發(fā)往服務(wù)器端的請(qǐng)求,節(jié)約了很多資源,提升應(yīng)用性能。并且SSE的實(shí)現(xiàn)非常簡單,不需要依賴其他插件。
WebSocket
WebSocket是Html5定義的一個(gè)新協(xié)議,與傳統(tǒng)的http協(xié)議不同,該協(xié)議可以實(shí)現(xiàn)服務(wù)器與客戶端之間全雙工通信。簡單來說,首先需要在客戶端和服務(wù)器端建立起一個(gè)連接,這部分需要http。連接一旦建立,客戶端和服務(wù)器端就處于平等的地位,可以相互發(fā)送數(shù)據(jù),不存在請(qǐng)求和響應(yīng)的區(qū)別。
WebSocket的優(yōu)點(diǎn)是實(shí)現(xiàn)了雙向通信,缺點(diǎn)是服務(wù)器端的邏輯非常復(fù)雜。現(xiàn)在針對(duì)不同的后臺(tái)語言有不同的插件可以使用。
四種Web即時(shí)通信技術(shù)比較
從兼容性角度考慮,短輪詢>長輪詢>長連接SSE>WebSocket;
從性能方面考慮,WebSocket>長連接SSE>長輪詢>短輪詢。
實(shí)戰(zhàn)
項(xiàng)目使用SpringBoot搭建。RabbitMQ的安裝這里不講述。
RabbitMQ配置
兩個(gè)系統(tǒng)A、B都需要操作RabbitMQ,其中A生產(chǎn)消息,B消費(fèi)消息。故都需要配置。
1、首先引入RabbitMQ的dependency:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
這個(gè)dependency中包含了RabbitMQ相關(guān)dependency。
2、在項(xiàng)目的配置文件里配置為使用rabbitmq及其參數(shù)。
application-pro.yml
#消息隊(duì)列 message.queue.type: rabbitmq ## rabbit mq properties rabbitmq: host: localhost port: 5672 username: guest password: guest
application.properties
#將要使用的隊(duì)列名 rabbitmq.websocket.msg.queue=websocket_msg_queue
3、創(chuàng)建配置文件。隊(duì)列的創(chuàng)建交給spring。
RabbitMQConfig.java
@Configuration @EnableRabbit public class RabbitMQConfig { @Value("${rabbitmq.host}") private String host; @Value("${rabbitmq.port}") private String port; @Value("${rabbitmq.username}") private String username; @Value("${rabbitmq.password}") private String password; @Value("${rabbitmq.websocket.msg.queue}") private String webSocketMsgQueue; @Bean public ConnectionFactory connectionFactory() throws IOException { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUsername(username); factory.setPassword(password); // factory.setVirtualHost("test"); factory.setHost(host); factory.setPort(Integer.valueOf(port)); factory.setPublisherConfirms(true); //設(shè)置隊(duì)列參數(shù),是否持久化、隊(duì)列TTL、隊(duì)列消息TTL等 factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null); return factory; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必須是prototype類型 public RabbitTemplate rabbitTemplate() throws IOException { return new RabbitTemplate(connectionFactory()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
4、系統(tǒng)B中創(chuàng)建隊(duì)列監(jiān)聽,當(dāng)隊(duì)列有消息時(shí),發(fā)送websocket通知。
RabbitMQListener.java
@Component public class RabbitMQListener { @Autowired private RabbitMQService mqService; /** * WebSocket推送監(jiān)聽器 * @param socketEntity * @param deliveryTag * @param channel */ @RabbitListener(queues = "websocket_msg_queue") public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel); } }
RabbitMQService.java
public class RabbitMQService { @Autowired private MessageWebSocketHandler messageWebSocketHandler; /** * @param socketMsgEntity * @param deliveryTag * @param channel * @throws IOException */ void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException { try { messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds()); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); } } }
WebSocketMsgEntity為MQ中傳送的實(shí)體。
public class WebSocketMsgEntity implements Serializable { public enum OrderType{ repair("維修"), maintain("保養(yǎng)"), measure("計(jì)量"); OrderType(String value){ this.value = value; } String value; public String getValue() { return value; } } //設(shè)備名稱 private String EquName; //設(shè)備編號(hào) private String EquId; //工單類型 private OrderType orderType; //工單單號(hào) private String orderId; //工單狀態(tài) private String orderStatus; //創(chuàng)建時(shí)間 private Date createTime; //消息接收人ID private List<String> toUserIds; public String getEquName() { return EquName; } public void setEquName(String equName) { EquName = equName; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getEquId() { return EquId; } public void setEquId(String equId) { EquId = equId; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } public OrderType getOrderType() { return orderType; } public void setOrderType(OrderType orderType) { this.orderType = orderType; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public List<String> getToUserIds() { return toUserIds; } public void setToUserIds(List<String> toUserIds) { this.toUserIds = toUserIds; } public String toJsonString(){ return JSON.toJSONString(this); } }
SpringAOP
1、系統(tǒng)A中創(chuàng)建一個(gè)切面類DataInterceptor.java
@Aspect @Component public class DataInterceptor { @Autowired private MessageQueueService queueService; //維修工單切點(diǎn) @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))") private void repairMsg() { } /** * 返回通知,方法執(zhí)行正常返回時(shí)觸發(fā) * * @param joinPoint * @param result */ @AfterReturning(value = "repairMsg()", returning = "result") public void afterReturning(JoinPoint joinPoint, Object result) { //此處可以獲得切點(diǎn)方法名 //String methodName = joinPoint.getSignature().getName(); EquipmentRepair equipmentRepair = (EquipmentRepair) result; WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair); if (webSocketMsgEntity == null) { return; } queueService.send(webSocketMsgEntity); } /** * 生成發(fā)送到MQ的維修消息 * * @param equipmentRepair * @return */ private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) { WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair); return webSocketMsgEntity; } /** * 從任務(wù)中生成消息 * * @param equipmentRepair * @return */ private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) { //業(yè)務(wù)代碼略 } }
2、發(fā)送消息到MQ。這里只貼了發(fā)送的核心代碼
public class RabbitMessageQueue extends AbstractMessageQueue { @Value("${rabbitmq.websocket.msg.queue}") private String webSocketMsgQueue; @Autowired private RabbitTemplate rabbitTemplate; @Override public void send(WebSocketMsgEntity entity) { //沒有指定exchange,則使用默認(rèn)名為“”的exchange,binding名與queue名相同 rabbitTemplate.convertAndSend(webSocketMsgQueue, entity); } }
WebSocket
1、 系統(tǒng)B中引入websocket服務(wù)端dependency
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.3.10.RELEASE</version> </dependency>
2、 配置websocket,添加處理類
WebSocketConfigurer.java
@Configuration @EnableWebSocket public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer { private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class); @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { //配置webSocket路徑 registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*"); //配置webSocket路徑 支持前端使用socketJs registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS(); } @Bean public MessageWebSocketHandler messageWebSocketHandler() { logger.info("......創(chuàng)建MessageWebSocketHandler......"); return new MessageWebSocketHandler(); } }
MessageWebSocketHandler.java 主要用于websocket連接及消息發(fā)送處理。配置中還使用了連接握手時(shí)的處理,主要是取用戶登陸信息,這里不多講述。
public class MessageWebSocketHandler extends TextWebSocketHandler { private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class); private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketSession>> users = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userId = session.getAttributes().get("WEBSOCKET_USERID").toString(); logger.info("......AfterConnectionEstablished......"); logger.info("session.getId:" + session.getId()); logger.info("session.getLocalAddress:" + session.getLocalAddress().toString()); logger.info("userId:" + userId); //websocket連接后記錄連接信息 if (users.keySet().contains(userId)) { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); webSocketSessions.add(session); } else { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>(); webSocketSessions.add(session); users.put(userId, webSocketSessions); } } @Override public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception { removeUserSession(session); if (session.isOpen()) { session.close(); } logger.info("異常出現(xiàn)handleTransportError" + throwable.getMessage()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { removeUserSession(session); logger.info("關(guān)閉afterConnectionClosed" + closeStatus.getReason()); } @Override public boolean supportsPartialMessages() { return false; } /** * 給符合要求的在線用戶發(fā)送消息 * * @param message */ public void sendMessageToUsers(String message, List<String> userIds) throws IOException{ if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) { return; } if (users.isEmpty()) { return; } for (String userId : userIds) { if (!users.keySet().contains(userId)) { continue; } CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); if (webSocketSessions == null) { continue; } for (WebSocketSession webSocketSession : webSocketSessions) { if (webSocketSession.isOpen()) { try { webSocketSession.sendMessage(new TextMessage(message)); } catch (IOException e) { logger.error(" WebSocket server send message ERROR " + e.getMessage()); try { throw e; } catch (IOException e1) { e1.printStackTrace(); } } } } } } /** * websocket清除連接信息 * * @param session */ private void removeUserSession(WebSocketSession session) { String userId = session.getAttributes().get("WEBSOCKET_USERID").toString(); if (users.keySet().contains(userId)) { CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId); webSocketSessions.remove(session); if (webSocketSessions.isEmpty()) { users.remove(userId); } } } }
整個(gè)功能完成后,A系統(tǒng)分配任務(wù)時(shí),系統(tǒng)B登陸用戶收到的消息如圖:
總體流程:
1、對(duì)于系統(tǒng)B,每個(gè)登陸的用戶都會(huì)和服務(wù)器建立websocket長連接。
2、系統(tǒng)A生成任務(wù),AOP做出響應(yīng),將封裝的消息發(fā)送給MQ。
3、系統(tǒng)B中的MQ監(jiān)聽發(fā)現(xiàn)隊(duì)列有消息到達(dá),消費(fèi)消息。
4、系統(tǒng)B通過websocket長連接將消息發(fā)給指定的登陸用戶。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
通過Java實(shí)現(xiàn)文件斷點(diǎn)續(xù)傳功能
用戶上傳大文件,網(wǎng)絡(luò)差點(diǎn)的需要?dú)v時(shí)數(shù)小時(shí),萬一線路中斷,不具備斷點(diǎn)續(xù)傳的服務(wù)器就只能從頭重傳,而斷點(diǎn)續(xù)傳就是,允許用戶從上傳斷線的地方繼續(xù)傳送,這樣大大減少了用戶的煩惱。本文將用Java語言實(shí)現(xiàn)斷點(diǎn)續(xù)傳,需要的可以參考一下2022-05-05Java數(shù)據(jù)脫敏實(shí)現(xiàn)的方法總結(jié)
數(shù)據(jù)脫敏,指的是對(duì)某些敏感信息通過脫敏規(guī)則進(jìn)行數(shù)據(jù)的變形,實(shí)現(xiàn)敏感隱私數(shù)據(jù)的可靠保護(hù),本文主要是對(duì)后端數(shù)據(jù)脫敏實(shí)現(xiàn)的簡單總結(jié),希望對(duì)大家有所幫助2023-07-07Java利用redis zset實(shí)現(xiàn)延時(shí)任務(wù)詳解
zset作為redis的有序集合數(shù)據(jù)結(jié)構(gòu)存在,排序的依據(jù)就是score。本文就將利用zset score這個(gè)排序的這個(gè)特性,來實(shí)現(xiàn)延時(shí)任務(wù),感興趣的可以了解一下2022-08-08java開發(fā)之SQL語句中DATE_FORMAT函數(shù)舉例詳解
要將日期值格式化為特定格式,請(qǐng)使用DATE_FORMAT函數(shù),下面這篇文章主要給大家介紹了關(guān)于java開發(fā)之SQL語句中DATE_FORMAT函數(shù)的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-05-05java書店系統(tǒng)畢業(yè)設(shè)計(jì) 用戶模塊(3)
這篇文章主要介紹了java書店系統(tǒng)畢業(yè)設(shè)計(jì),第三步系統(tǒng)總體設(shè)計(jì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-10-10Java字節(jié)與字符流永久存儲(chǔ)json數(shù)據(jù)
本篇文章給大家詳細(xì)講述了Java字節(jié)與字符流永久存儲(chǔ)json數(shù)據(jù)的方法,以及代碼分享,有興趣的參考學(xué)習(xí)下。2018-02-02SpringBoot2.X Kotlin系列之?dāng)?shù)據(jù)校驗(yàn)和異常處理詳解
這篇文章主要介紹了SpringBoot 2.X Kotlin系列之?dāng)?shù)據(jù)校驗(yàn)和異常處理詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-04-04