Java Spring Boot消息服務(wù)萬字詳解分析
消息服務(wù)概述
為什么要使用消息服務(wù)
在多數(shù)應(yīng)用尤其是分布式系統(tǒng)中,消息服務(wù)是不可或缺的重要部分,它使用起來比較簡單,同時解決了不少難題,例如異步處理、應(yīng)用解耦、流量削峰、分布式事務(wù)管理等,使用消息服務(wù)可以實現(xiàn)一個高性能、高可用、高拓展的系統(tǒng)。下面我們使用實際開發(fā)中的若干場景來分析和說明為什么要使用消息服務(wù),以及使用消息服務(wù)的好處。
異步處理
場景說明:用戶注冊后,系統(tǒng)需要將信息寫入數(shù)據(jù)庫,并發(fā)送注冊郵件和注冊短信通知
在圖8-1中,針對上述注冊業(yè)務(wù)的場景需求,處理方法有3種。
1)串行處理方式:用戶發(fā)送注冊請求后,服務(wù)器會先將注冊信息寫入數(shù)據(jù)庫,依次發(fā)送注冊郵件和短信消息,服務(wù)器只有在消息處理完畢后才會將處理結(jié)果返回客戶端。這種串行處理消息的方式非常耗時,用戶體驗不友好。
2)并行處理方式:用戶發(fā)送注冊請求后,將注冊信息寫入數(shù)據(jù)庫,同時發(fā)送注冊郵件和短信,最后返回給客戶端,這種并行處理的方式在一定程度上提高了后臺業(yè)務(wù)處理的效率,但如果遇到較為耗時的業(yè)務(wù)處理,仍然顯得不夠完善。
3)消息服務(wù)處理方式:可以在業(yè)務(wù)中嵌入消息服務(wù)進行業(yè)務(wù)處理,這種方式先將注冊信息寫入數(shù)據(jù)庫,在極短的時間內(nèi)將注冊信息寫入消息隊列后即可返回響應(yīng)信息。此時前端業(yè)務(wù)不需要理會不相干的后臺業(yè)務(wù)處理,而發(fā)送注冊郵件和短息的業(yè)務(wù)會自動讀取消息隊列中的相關(guān)信息進行后續(xù)業(yè)務(wù)處理。
應(yīng)用解耦
場景說明:用戶下單后,訂單服務(wù)需要通知庫存服務(wù)。
如果使用傳統(tǒng)方式處理訂單業(yè)務(wù),用戶下單后,訂單服務(wù)會直接調(diào)用庫存服務(wù)接口進行庫存更新,這種方式有一個很大的問題是:一旦庫存系統(tǒng)出現(xiàn)異常,訂單服務(wù)會失敗導(dǎo)致訂單丟失。如果使用消息服務(wù)模式,訂單服務(wù)的下訂單消息會快速寫入消息隊列,庫存服務(wù)會監(jiān)聽并讀取到訂單,從而修改庫存。相較于傳統(tǒng)方式,消息服務(wù)模式顯得更加高效、可靠。
流量削峰
場景說明:秒殺活動是流量削峰的一種應(yīng)用場景,由于服務(wù)器處理資源能力有限,因此出現(xiàn)峰值時很容易造成服務(wù)器宕機、用戶無法訪問的情況。為了解決這個問題,通常會采用消息隊列緩沖瞬時高峰流量,對請求進行分層過濾,從而過濾掉一些請求。
針對上述秒殺業(yè)務(wù)的場景需求,如果專門增設(shè)服務(wù)器來應(yīng)對秒殺活動期間的請求瞬時高峰的話,在非秒殺活動期間,這些多余的服務(wù)器和配置顯得有些浪費;如果不進行有效處理的話,秒殺活動瞬時高峰流量請求有可能壓垮服務(wù),因此,在秒殺活動中加入消息服務(wù)是較為理想的解決方案。通過在應(yīng)用前端加入消息服務(wù),先將所有請求寫入到消息隊列,并限定一定的閾值,多余的請求直接返回秒殺失敗,秒殺服務(wù)會根據(jù)秒殺規(guī)則從消息隊列中讀取并處理有限的秒殺請求。
分布式事務(wù)管理
場景說明:在分布式系統(tǒng)中,分布式事務(wù)是開發(fā)中必須要面對的技術(shù)難題,怎樣保證分布式系統(tǒng)的請求業(yè)務(wù)處理的數(shù)據(jù)一致性通常是要重點考慮的問題。針對這種分布式事務(wù)管理的情況,目前較為可靠的處理方式是基于消息隊列的二次提交,在失敗的情況可以進行多次嘗試,或者基于隊列數(shù)據(jù)進行回滾操作。因此,在分布式系統(tǒng)中加入消息服務(wù)是一個既能保證性能不變,又能保證業(yè)務(wù)一致性的方案。
針對上述分布式事務(wù)管理的場景需求,如果使用傳統(tǒng)方式在訂單系統(tǒng)中寫入訂單支付成功信息后,再遠程調(diào)用庫存系統(tǒng)進行庫存更新,一旦庫存系統(tǒng)異常,很有可能導(dǎo)致庫存更新失敗而訂單支付成功的情況,從而導(dǎo)致數(shù)據(jù)不一致。針對這種分布式系統(tǒng)的事務(wù)管理,通常會在分布式系統(tǒng)之間加入消息服務(wù)進行管理。訂單支付成功后,寫入消息表;然后定時掃描消息表消息寫入到消息隊列中,庫存系統(tǒng)會立即讀取消息隊列中的消息進行庫存更新,同時添加消息處理狀態(tài);接著,庫存系統(tǒng)向消息隊列中寫入庫存處理結(jié)果,訂單系統(tǒng)會立即讀取消息隊列中的庫存處理狀態(tài)。接著,庫存系統(tǒng)向消息隊列中寫入庫存處理結(jié)果,訂單系統(tǒng)會立即讀取消息隊列中的庫存處理狀態(tài)。如果庫存服務(wù)處理失敗,訂單服務(wù)還會重復(fù)掃描并發(fā)送消息表中的消息,讓庫存系統(tǒng)進行最終一致性的庫存更新。如果處理成功,訂單服務(wù)直接刪除消息表數(shù)據(jù),并寫入到歷史消息表。
常用消息中間件介紹
消息隊列中間件(簡稱消息中間件)是指利用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。目前開源的消息中間件有很多。
ActiveMQ
ActiveMQ是Apache公司出品的,采用Java語言編寫的、完全基于JMS規(guī)范(Java Message Service)的、面向消息的中間件,它為應(yīng)用程序提供高效、可拓展的、穩(wěn)定的、安全的企業(yè)級消息通信。ActiveMQ豐富的API和多種集群構(gòu)建模式使得它成為業(yè)界老牌的消息中間件,廣泛應(yīng)用于中小型企業(yè)中。相較于后續(xù)出現(xiàn)的RabbitMQ、RocketMQ、Kafka等消息中間件來說,ActiveMQ性能相對較弱,在如今的高并發(fā)、大數(shù)據(jù)處理的場景下顯得力不從心,經(jīng)常會出現(xiàn)一些問題,例如消息延遲、堆積、堵塞等。
RabbitMQ
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議(Advanced Message Queuing Protocol)實現(xiàn)。AMQP是為應(yīng)對大規(guī)模并發(fā)活動而提供統(tǒng)一消息服務(wù)的應(yīng)用層標準高級消息隊列協(xié)議,專門為面向消息的中間件設(shè)計,該協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。正是基于AMQP協(xié)議的各種優(yōu)勢性能,使得RabbitMQ消息中間件在應(yīng)用開發(fā)中越來越受歡迎。
Kafka
Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,它是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),采用Scala和Java語言編寫,提供了快速、可拓展的、分布式的、分區(qū)的和可復(fù)制的日志訂閱服務(wù),其主要特定是追求高吞吐量,適用于產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
RocketMQ
RocketMQ是阿里巴巴公司開源產(chǎn)品,目前也是Apache公司的頂級項目,使用純Java開發(fā),具有高吞吐量、高可用、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點。RocketMQ的思路起源于Kafka,對消息的可靠傳輸以及事務(wù)性做了優(yōu)化,目前在阿里巴巴中被廣泛應(yīng)用于交易、充值、流計算、消息推送、日志流式處理場景,不過維護上稍微麻煩。
在實際項目技術(shù)選型時,在沒有特別要求的場景下,通常會選擇使用RabbitMQ作為消息中間件,如果針對的是大數(shù)據(jù)業(yè)務(wù),推薦使用Kafka或者是RocketMQ作為消息中間件。
RabbitMQ消息中間件
RabbitMQ簡介
RabbitMQ是基于AMQP協(xié)議的輕量級、可靠、可伸縮和可移植的消息代理,Spring使用RabbitMQ通過AMQP協(xié)議進行通信;在Spring Boot中對RabbitMQ進行了集成管理。
在所有的消息服務(wù)中,消息中間件都會作為一個第三方消息代理,接收發(fā)布者發(fā)布的消息,并推送給消息消費者。不同消息中間件內(nèi)部轉(zhuǎn)換消息的細節(jié)不同。
RabbitMQ的消息代理流程中有很多細節(jié)內(nèi)容和內(nèi)部組件,這里不必會組件的具體作用,先對整個流程梳理一遍。
- 消息發(fā)布者(Publisher,簡稱P)向RabbitMQ代理(Broker)指定的虛擬主機服務(wù)器(Virtual Host)發(fā)送消息。
- 虛擬主機服務(wù)器內(nèi)部的交換器(Exchange,簡稱X)接收消息,并將消息傳遞并存儲到與之綁定(Binding)的消息隊列(Queue)中。
- 消息消費者(Consumer,簡稱C)通過一定的網(wǎng)絡(luò)連接(Connection)與消息代理建立連接,同時為了簡化開支,在連接內(nèi)部使用了多路復(fù)用的信道進行消息的最終消費。
RabbitMQ工作模式介紹
RabbitMQ消息中間件針對不同的服務(wù)需求,提供了多種工作模式。
Work queues(工作隊列模式)
在Work queues工作模式中,不需要設(shè)置交換器(RabbitMQ會使用內(nèi)部默認交換器進行消息轉(zhuǎn)換),需要指定唯一的消息隊列進行消息傳遞,并且開源由多個消息消費者。在這種模式下,多個消息消費者通過輪詢的方式依次接收消息隊列中存儲的消息,一旦消息被某個消費者接收,消息隊列會將消息移除,而接收并處理消息的消費者必須在消費完一條消息后再準備接收下一條消息。
從上面的分析可以發(fā)現(xiàn),Work queues工作模式適用于那些較為繁重,并且可以進行拆分處理的業(yè)務(wù),這種情況下可以分派給多個消費者輪流處理業(yè)務(wù)。
Public/Subscribe(發(fā)布訂閱模式)
在Public/Subscribe工作模式中,必須先配置一個fanout類型的交換器,不需要指定對應(yīng)的路由鍵(Routing key),同時會將消息路由到每一個消息隊列上,然后每個消息隊列都可以對相同的消息進行接收存儲,進而由各自消息隊列關(guān)聯(lián)的消費者進行消費。
從上面的分析可以發(fā)現(xiàn),Publish/Subscribe工作模式適用于進行相同業(yè)務(wù)功能處理的場合。例如,用戶注冊成功后,需要同時發(fā)送郵件通知和短信通知,那么郵件服務(wù)消費者和短信服務(wù)消費者需要共同消費"用戶注冊成功"這一條消息。
Routing(路由模式)
在Routing工作模式中,必須先配置一個direct類型的交換器,并指定不同的路由鍵值(Routing key)將對應(yīng)的消息從交換器路由到不同的消息隊列進行存儲,由消費者進行各自消費。
從上面的分析可以發(fā)現(xiàn),Routing工作模式適用于進行不同類型消息分類處理的場合。例如,日志收集處理,用戶可以配置不同的路由鍵值分別對不同級別的日志信息進行分類處理。
Topics(通配符模式)
在Topics工作模式中,必須先配置一個topic類型的交換器,并指定不同的路由鍵值(Routing key)將對應(yīng)的消息從交換器路由到不同的消息隊列進行存儲,然后由消費者進行各自消費。Topics模式與Routing模式的主要在于:Topics模式設(shè)置的路由鍵是包括通配符的,其中,#
匹配多個字符,*
匹配一個字符,然后與其他字符一起使用.
進行連接,從而組成動態(tài)路由鍵,在發(fā)送消息時可以根據(jù)需求設(shè)置不同的路由鍵,從而將消息路由到不同的消息隊列。
通常情況下,Topics工作模式適用于根據(jù)不同需求動態(tài)傳遞處理業(yè)務(wù)的場合。例如一些訂閱客戶只接收郵件消息,一些訂閱客戶只接收短信消息,那么可以根據(jù)客戶需求進行動態(tài)路由匹配,從而將訂閱消息分發(fā)到不同的消息隊列中。
RPC
RPC工作模式與Work queues工作模式主體流程相似,都需要設(shè)置交換器,需要指定唯一的消息隊列進行消息傳遞。RPC模式與Work queues模式的主要不同在于:RPC模式是一個回環(huán)結(jié)構(gòu),主要針對分布式架構(gòu)的消息傳遞。RPC模式與Work queues模式的主要不同在于:RPC模式是一個回環(huán)結(jié)構(gòu),主要針對分布式架構(gòu)的消息傳遞業(yè)務(wù),客戶端Cilent先發(fā)送消息到消息隊列,遠程服務(wù)端Server獲取消息,然后再寫入另一個消息隊列,向原始客戶端Client相應(yīng)消息處理結(jié)果。
RPC工作模式適用于遠程服務(wù)調(diào)用的業(yè)務(wù)處理場合。例如,在分布式架構(gòu)中必須考慮的分布式事務(wù)管理問題。
Headers
Headers工作模式在RabbitMQ所支持的工作模式中是較為少用的一種模式,其主體流程與Routing工作模式有些相似。不過,使用Headers工作模式時,必須設(shè)置一個headers類型的交換器,而不需要設(shè)置路由鍵,取而代之的是在Properties屬性配置中的headers頭信息中使用key/value的形式配置路由規(guī)則。由于Headers工作模式使用較少,官方文檔也滅有詳細說明。
上面的6中工作模式,有些可以嵌套使用,例如,在發(fā)布訂閱模式中加入工作隊列模式。其中Publish/Subscribe、Routing、Topics和RPC模式是開發(fā)中較為常用的工作模式。
RabbitMQ安裝以及整合環(huán)境搭建
安裝RabbitMQ
在使用RabbitMQ之前必須預(yù)先安裝配置,參考RabbitMQ官網(wǎng)說明,RabbitMQ支持多平臺安裝,例如Linux、Windows、MacOS、Docker等。這里,我們?yōu)榱朔奖汩_發(fā)使用Windows環(huán)境為例,介紹RabbitMQ的安裝配置。
下載RabbitMQ
鏈接:https://pan.baidu.com/s/1REAC7btmaR7a-pLKfLGJqA
提取碼:1234
在安裝RabbitMQ之前需要Erlang語言包支持。
安裝RabbitMQ
RabbitMQ安裝包依賴于Erlang語言包的支持,所以需要先安裝Erlang語言包,在安裝RabbitMQ安裝包。RabbitMQ安裝包和Erlang語言包的安裝都非常簡單。(需要注意的是,安裝Erlang語言包,必須以管理員的身份進行安裝)。
在Windos環(huán)境下首先執(zhí)行RabbitMQ的安裝,系統(tǒng)環(huán)境變量中會自動增加一個變量名為ERLANG_HOME的變量配置,它的配置路徑是Erlang選擇安裝的具體路徑,無須手動修改,同時,RabbitMQ服務(wù)也會自動啟動。如果是多次卸載安裝的RabbitMQ,需要保證ERLANG_HOME環(huán)境的配置正確,同時保證RabbitMQ服務(wù)正常啟動。
RabbitMQ可視化效果展示
查看開啟服務(wù)
rabbitmq-plugins.bat list
可以看到managerment服務(wù)沒有開啟
開啟可視化服務(wù)
rabbitmq-plugins enable rabbitmq_management
重啟rabbitmqctl
rabbitmqctl.bat start_app
RabbitMQ默認提供了兩個端口號5672和15672,其中5672作為服務(wù)端口號,15672用作可視化管理端口號。在瀏覽器上訪問http://localhost:15672通過可視化的方式查看RabbitMQ。
首次登錄RabbitMQ可視化管理頁面時需要進行用戶登錄,RabbitMQ安裝過程中默認提供了用戶名和密碼均為guest的用戶,可以使用該賬戶進行登錄。登陸成功后會進入RabbitMQ可視化管理頁面得首頁。
RabbitMQ可視化管理頁面中,顯示出了RabbitMQ的版本、用戶信息等信息,同時頁面還包括Connections、Channeis、Exchanges、Queues、Admin在內(nèi)的管理面板。
Spring Boot整合RabbitMQ環(huán)境搭建
完成RabbitMQ的安裝后,下面我們開始對Spring Boot整合RabbitMQ實現(xiàn)消息服務(wù)需要的整合環(huán)境進行搭建。
1)創(chuàng)建Spring Boot項目。使用Spring Intializr方式創(chuàng)建一個名為chapter08的Spring Boot項目,在Dependercies依賴選擇中選擇Web模塊中Web依賴以及Integertion模塊中的RabbitMQ依賴。
2)編寫配置文件,連接RabbitMQ服務(wù)。打開創(chuàng)建項目時自動生成的application.properties全局配置文件,在該文件中編寫RabbitMQ服務(wù)對應(yīng)的連接配置。
application.properties
#配置RabbitMQ消息中間件連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虛擬主機路徑/,默認可以省略 spring.rabbitmq.virtual-host=/
需要強調(diào)的是,在上述項目全局配置文件application.properties中,編寫了外部RabbitMQ消息中間件的連接配置,這樣在進行整合消息服務(wù)時,使用的都是我們自己安裝配置的RabbitMQ服務(wù)。而在Spring Boot中,也集成了一個內(nèi)部默認的RabbitMQ中間件,如果我們沒有在配置文件中配置外部RabbtiMQ連接,會啟用內(nèi)部的RabbitMQ中間件,這種內(nèi)部RabbitMQ中間件是不推薦使用的。
Spring Boot與RabbitMQ整合實現(xiàn)
Publish/Subscribe(發(fā)布訂閱模式)
Spring Boot整合RabbitMQ中間件實現(xiàn)消息服務(wù),注意圍繞3個部分的工作進行展開:定制中間件、消息發(fā)送者發(fā)送消息、消息消費者接收消息。其中。定制中間件是比較麻煩的工作,且必須預(yù)先定制。下面我們以用戶注冊成功后同時發(fā)送郵件通知和短信通知這一場景為例,分別使用基于API、基于配置類和基于注解這3種方式實現(xiàn)Publish/Subscribe工作模式的整合。
基于API的方式
基于API的方式注意講的是使用Spring框架提供的API管理類AmqpAdmin定制消息發(fā)送組件,并進行消息發(fā)送。這種定制消息發(fā)送組件的方式與RabbitMQ可視化界面上通過對應(yīng)面板進行組件操作的實現(xiàn)基本一樣,都是通過管理員的身份,預(yù)先手動聲明交換器、隊列、路由鍵等,然后組裝消息隊列供應(yīng)用程序調(diào)用,從而實現(xiàn)消息服務(wù)。
1)使用AmqpAdmin定制消息發(fā)送組件
打開chapter08項目的測試類Chapter08ApplicationTests,在該測試類中先引入AmqpAdmin管理類定制Publish/Subscribe工作模式所需的消息組件。
Chapter08ApplicationTests.java
package com.example.chapter08; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter08ApplicationTests { @Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin() { //1.定義fanout類型的交換器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定義兩個默認持久化隊列,分別處理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); //3.將隊列分別與交換器進行綁定 amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); } }
使用Spring框架提供的消息管理組件AmqpAdmin定制了消息組件。其中amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
定義了一個fanout類型的交換器fanout_exchange。amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
定義了兩個消息隊列fanout_queue_email和fanout_queue_sms,分別用來處理郵件信息和短信信息。
amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
將定義的兩個隊列分別與交換器綁定。
執(zhí)行上述單元測試方法amqpAdmin(),驗證RabbitMQ消息組件的定制效果。
執(zhí)行成功后,通過RabbitMQ可視化管理頁面的Exchanges面板查看效果。
通過上述操作可以發(fā)現(xiàn),在管理頁面中提供了消息組件交換器、隊列的定制功能。在程序中使用Spring框架提供的管理員API組件AmqpAdmin定制消息組件和管理頁面上手動定制消息組件的本質(zhì)是一樣的。
2)消息發(fā)送者發(fā)送消息
完成消息組件的定制工作后,創(chuàng)建消息發(fā)送者發(fā)送消息到消息隊列中。發(fā)送消息時,借助一個實體類傳遞消息,需要預(yù)先創(chuàng)建一個實體類對象。
首先,在chapter08項目中創(chuàng)建名為com.example.chapter.domain的包,并在該包下創(chuàng)建一個實體類User。
package com.example.chapter08.domain; public class User { private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
其次,在項目測試類Chapter08ApplicationTests中使用Spring框架提供的RabbitTemplate模板類是實現(xiàn)消息發(fā)送。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void psubPublisher(){ User user=new User(); user.setId(1); user.setUsername("石頭"); rabbitTemplate.convertAndSend("fanout_exchange","",user); }
上述代碼中,先使用@Autowired注解引入了進行消息中間件管理的RabbitTemplate組件對象,然后使用該模板工具類的convertAndSend(String exchange, String routingKey, Object object)
方法進行消息發(fā)布。其中,第一個參數(shù)表示發(fā)送消息的交換器,這個參數(shù)值要與之前定制的交換器名稱一致;第二個參數(shù)表示路由鍵,因為實現(xiàn)的是Public/Subscribe工作模式,所以不需要指定;第3個參數(shù)是發(fā)送的消息內(nèi)容,接收Object類型。
然后,執(zhí)行測試方法。
顯示消息發(fā)送過程中默認使用了SimpleMessageConverter轉(zhuǎn)換器進行消息轉(zhuǎn)換存儲,該轉(zhuǎn)換器只支持字符串或?qū)嶓w對象序列化后的消息。而測試類中發(fā)送的是User實體類對象消息,所以發(fā)生異常。
解決方法
執(zhí)行JDK自帶的Serializable序列化接口定制其他類型的消息轉(zhuǎn)換器。
兩種方法都可行,但是相對于第二種實現(xiàn)方式而言,第一種方式實現(xiàn)后的可視化效果較差,轉(zhuǎn)換后的消息無法辨別,所以一般使用第二種方式。
在chapter08項目中創(chuàng)建名為com.example.chapter08.config的包,并在該包下創(chuàng)建一個RabbitMQ消息配置類RabbitMQConfig。
package com.example.chapter08.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
創(chuàng)建一個RabbitMQ消息配置類RabbitMQConfig,并在該配置類中通過@Bean注解自定義一個Jackson2JsonMessageConverter類型的消息轉(zhuǎn)換器組件,該組件的返回值必須為MessageConverter類型。
再次執(zhí)行psubPublisher()方法,該方法執(zhí)行成功后,查看可視化界面
3)消息消費者接收消息
在chapter08項目中創(chuàng)建名為com.example.chapter08.service的包,并在該包下創(chuàng)建一個針對RabbitMQ消息中間件進行消息接收和處理的業(yè)務(wù)類RabbitMQService。
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,處理郵件業(yè)務(wù) * */ @RabbitListener(queues = "fanout_queue_email") public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("郵件業(yè)務(wù)接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,處理短信業(yè)務(wù) * */ @RabbitListener(queues = "fanout_queue_sms") public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信業(yè)務(wù)接收到消息:" + s); } }
創(chuàng)建一個接受處理RabbitMQ消息的業(yè)務(wù)處理類RabbitMQService,在該類中使用Spring框架提供的@RabbitListener注解監(jiān)聽隊列名稱為fanout_queue_email和fanout_queue_sms的消息,監(jiān)聽這兩個隊列是前面指定發(fā)送并存儲消息的消息隊列。
需要說明的是,使用@RabbitListener注解監(jiān)聽隊列消息后,一旦服務(wù)啟動且監(jiān)聽到指定的隊列有消息存在(目前兩個隊列中各有一條相同的消息),對應(yīng)注解的方法會立即接收并消費隊列中的消息。另外,在接受消息的方法中,參數(shù)類型可以與發(fā)送的消息類型保持一致,或者使用Object類型和Message類型。如果使用消息類型對應(yīng)的參數(shù)接收消息的話,只能夠得到具體的消息體信息;如果使用Object或者Message類型參數(shù)接收消息的話,還可以獲得除了消息體外的參數(shù)信息MessageProperties。
案例中使用的是開發(fā)中常用的@RabbitLIsenter注解監(jiān)聽指定名稱的消息情況,這種方式會在監(jiān)聽到指定隊列存在消息后立即進行消費處理。除此之外,還可以使用RabbitTemplate模板類的receiveAndConvert(String queueName)方法手動消費指定隊列中的消息。
基于配置類的方式
基于配置類的方式主要講的是使用Spring Boot框架提供的@Configuration注解配置類定制消息發(fā)送組件,并進行消息發(fā)送。
RabbitMQConfig.java
package com.example.chapter08.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //自定義消息轉(zhuǎn)化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } //1.定義fanout類型的交換器 @Bean public Exchange fanout_exchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定義兩個不同名稱的消息隊列 @Bean public Queue fanout_queue_email(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanout_queue_sms(){ return new Queue("fanout_queue_sms"); } //3.將兩個不同名稱的消息隊列與交換器進行綁定 @Bean public Binding bindingEmail(){ return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs(); } @Bean public Binding bindingSms(){ return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs(); } }
使用@Bean注解定制了3種類型的Bean組件,這3種組件分別表示交換器、消息隊列和消息隊列與綁定器的綁定。這種基于配置類方式定制的消息組件內(nèi)容和基于API方式定制的消息組件內(nèi)容完全一樣,只不過是實現(xiàn)方式不同而已。
按照消息服務(wù)整合實現(xiàn)步驟,完成消息組件的定制后,還需要編寫消息發(fā)送者和消息消費者,而在基于API的方式中已經(jīng)實現(xiàn)類消息發(fā)送者和消息消費者,并基于配置類方式定制的消息組件名稱和之前測試用的消息發(fā)送和消息組件名稱都是一樣的,所以這里可以直接重復(fù)使用。
重新運行消息發(fā)送者測試方法psubPublisher(),消息消費者可以自動監(jiān)聽并消費消息隊列種存在的消息,效果與基于API的方式測試效果一樣。
基于注解的方式
基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息發(fā)送組件并發(fā)送消息。
打開進行消息接收和處理的業(yè)務(wù)類RabbitMQService,將針對郵件業(yè)務(wù)和短線業(yè)務(wù)處理的消息消費者方式進行注解,使用@RabbitListener注解機器相關(guān)屬性定制消息發(fā)送組件。
RabbitMQService.java
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,處理郵件業(yè)務(wù) * */ //@RabbitListener(queues = "fanout_queue_email") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_email") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("郵件業(yè)務(wù)接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,處理短信業(yè)務(wù) * */ //@RabbitListener(queues = "fanout_queue_sms") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_sms") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信業(yè)務(wù)接收到消息:" + s); } }
至此,在Spring Boot中完成了使用基于API、基于配置類和基于注解3種方式來實現(xiàn)Publish/Subscribe工作模式的整合講解。在這3種實現(xiàn)消息服務(wù)的方式中,基于API的方式相對簡單、直觀,但容易與業(yè)務(wù)代碼產(chǎn)生耦合;基于配置類的方式相對隔離、容易統(tǒng)一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易與業(yè)務(wù)代碼產(chǎn)生耦合。在實際開發(fā)中,使用基于配置類的方式和基于注解的方式定制組件實現(xiàn)消息服務(wù)較為常見,使用基于API的方式偶爾使用。
Routing(路由模式)
使用基于注解的方式定制消息組件和消費者
RabbitMQService
/* * 2.1路由模式消息接收、處理error級別日志信息 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange",type = "direct"), key = "error_routing_key" )) public void routingConsumerError(String message){ System.out.println("接收到error級別日志消息:" + message); } /* * 2.2路由模式消息接收、處理info、error、warning級別日志信息 * */ public void routingConsumerAll(String message){ System.out.println("接收到info、error、warning等級別日志消息:" + message); }
上述代碼中,在消息業(yè)務(wù)處理類RabbitMQService中新增了兩個用來處理Routing路由模式的消息消費者方法,在兩個消費者方式上使用@RabbitListener注解及其相關(guān)屬性定制了路由模式下的消息服務(wù)組件。從示例代碼可以看出,與發(fā)布訂閱模式下的注解相比,Routing路由模式下的交換器類型type屬性為direct,而且還必須指定key屬性(每個消息隊列可以映射多個路由鍵,而在Spring Boot 1.X版本中,@QueueBinding中的key屬性只接收Spring類型而不接收Spring[]類型)。
消息發(fā)送者發(fā)送消息
打開項目測試類Chapter08ApplicationTests,在該測試類中使用RabbitTemplate模板類實現(xiàn)Routing路由模式下的消息發(fā)送。
// 2.Routing工作模式消息發(fā)送端 @Test public void routingPublish(){ rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send error message"); }
在路由工作模式下發(fā)送消息時,必須指定路由鍵參數(shù),該參數(shù)要與消息隊列映射的路由鍵保持一致,否則發(fā)送的消息將會丟失。本次示例中使用的是error_routing_key路由鍵,根據(jù)定制規(guī)則,編寫的兩個消息消費者方式應(yīng)該都可以正常接收并消費該發(fā)送端的消息。
Topics(通配符模式)
使用基于注解的方式定制消息組件和消息消費者
/* * 3.1通配符模式消息接收、進行郵件業(yè)務(wù)訂閱處理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.email.#" )) public void topicConsumerEmail(String message){ System.out.println("接收到郵件訂閱需求處理消息:" + message); } /* * 3.2通配符消息接收、進行短信業(yè)務(wù)訂閱處理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.sms.#" )) public void topicConsumerSms(String message){ System.out.println("接收到短信訂閱需求處理消息:" + message); }
@Test public void topicPublisher(){ rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message"); }
到此這篇關(guān)于Java Spring Boot消息服務(wù)萬字詳解分析的文章就介紹到這了,更多相關(guān)Java Spring Boot消息服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA 多態(tài)操作----父類與子類轉(zhuǎn)換問題實例分析
這篇文章主要介紹了JAVA 多態(tài)操作----父類與子類轉(zhuǎn)換問題,結(jié)合實例形式分析了JAVA 多態(tài)操作中父類與子類轉(zhuǎn)換問題相關(guān)原理、操作技巧與注意事項,需要的朋友可以參考下2020-05-05IDEA配置Tomcat創(chuàng)建web項目的詳細步驟
Tomcat是一個Java?Web應(yīng)用服務(wù)器,實現(xiàn)了多個Java?EE規(guī)范(JSP、Java?Servlet等),這篇文章主要給大家介紹了關(guān)于IDEA配置Tomcat創(chuàng)建web項目的詳細步驟,需要的朋友可以參考下2023-12-12Java使用ant.jar執(zhí)行SQL腳本文件的示例代碼
這篇文章主要介紹了Java使用ant.jar執(zhí)行SQL腳本文件,文中通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2024-02-02Java中對AtomicInteger和int值在多線程下遞增操作的測試
這篇文章主要介紹了Java中對AtomicInteger和int值在多線程下遞增操作的測試,本文得出AtomicInteger操作 與 int操作的效率大致相差在50-80倍上下的結(jié)論,需要的朋友可以參考下2014-09-09SpringBoot如何通過webjars管理靜態(tài)資源文件夾
這篇文章主要介紹了SpringBoot如何通過webjars管理靜態(tài)資源文件夾,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友可以參考下2020-10-10Spring中的@ConfigurationProperties詳解
這篇文章主要介紹了Spring中的@ConfigurationProperties詳解,ConfigurationProperties注解主要用于將外部配置文件配置的屬性填充到這個Spring Bean實例中,需要的朋友可以參考下2023-09-09