微服務(wù)架構(gòu)設(shè)計(jì)RocketMQ基礎(chǔ)及環(huán)境整合
概述&選型
消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開(kāi)發(fā)效率和系統(tǒng)穩(wěn)定性。主要用于三種典型場(chǎng)景:應(yīng)用解耦、流量消峰、消息分發(fā)。
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優(yōu)勢(shì)特性有:
- 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
- 支持結(jié)合rocketmq的多個(gè)系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提)
- 支持18個(gè)級(jí)別的延遲消息(rabbitmq和kafka不支持)
- 支持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā)(kafka不支持,rabbitmq需要手動(dòng)確認(rèn))
- 支持consumer端tag過(guò)濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
- 支持重復(fù)消費(fèi)(rabbitmq不支持,kafka支持)
本文主要介紹RocketMQ的單機(jī)安裝、雙機(jī)主從高可用安裝配置、運(yùn)維管理平臺(tái)搭建、與SpringBoot整合幾個(gè)知識(shí)點(diǎn),具備相關(guān)知識(shí)技能的同學(xué)請(qǐng)直接拉到最后點(diǎn)個(gè) “在看” 即可。
文章開(kāi)始之前需要先準(zhǔn)備好JDK1.8或以上的服務(wù)器環(huán)境以及從rocketmq官網(wǎng)下載好二進(jìn)制安裝包,下載地址http://rocketmq.apache.org/dowloading/releases/
單機(jī)安裝配置
工欲善其事必先利其器,要想深入了解RocketMQ得先把環(huán)境安裝好,咱們先開(kāi)始單機(jī)版RocketMQ的安裝!
解壓安裝
unzip rocketmq-all-4.7.0-bin-release.zip
啟動(dòng) Name Server
> nohup sh bin/mqnamesrv &
查看 Name Server啟動(dòng)日志
> tail -f ~/logs/rocketmqlogs/namesrv.log
啟動(dòng) Broker Server
> nohup sh bin/mqbroker -n localhost:9876 &
查看 Broker Server 啟動(dòng)日志
> tail -f ~/logs/rocketmqlogs/broker.log
單機(jī)情況下安裝使用RocketMQ很簡(jiǎn)單,只需要分別啟動(dòng)NameServer和Broker Server即可!
關(guān)閉RockerMQ需要使用下面的命令:
# 先關(guān)閉Broker Server> sh bin/mqshutdown broker
# 再關(guān)閉NameServer> sh bin/mqshutdown namesrv
雙機(jī)主從高可用搭建
為了消除單機(jī)故障,增加可靠性或增大吞吐量,可以在多臺(tái)服務(wù)器上部署多個(gè)NameServer和Broker,并為每個(gè)Broker部署一個(gè)或多個(gè)Slave。本節(jié)將說(shuō)明使用兩臺(tái)機(jī)器,搭建雙主、雙從、無(wú)單點(diǎn)故障的高可用RocketMQ集群。假設(shè)現(xiàn)在有兩臺(tái)服務(wù)器,IP地址分別為:192.168.100.43和192.168.100.44,部署架構(gòu)如下:
啟動(dòng)多個(gè)NameServer 和 Broker
首先需要在兩臺(tái)服務(wù)器上分別啟動(dòng)NameServer(nohup sh bin/mqnamesrv &),這樣我們就得到了一個(gè)無(wú)單點(diǎn)的NameServer服務(wù),服務(wù)地址為192.168.100.43:9876和192.168.100.44:9876。
然后在兩臺(tái)服務(wù)器中RocketMQ的conf目錄分別建立兩個(gè)文件 broker-master.properties,broker-slave.properties,下面是不同服務(wù)器的配置說(shuō)明:
192.168.100.43 機(jī)器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-a
192.168.100.43 機(jī)器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-b
192.168.100.44 機(jī)器上的broker-master.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-b
192.168.100.44 機(jī)器上的broker-slave.properties文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-a
然后分別使用如下命令啟動(dòng)兩臺(tái)服務(wù)器的主節(jié)點(diǎn)和從節(jié)點(diǎn)
nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
這樣一個(gè)高可用的RockerMQ集群就搭建好了,我們登陸可視化運(yùn)維管理界面查看集群狀態(tài),集群正常啟動(dòng)。
重要參數(shù)說(shuō)明
本節(jié)主要是對(duì)Broker的配置文件中用到的參數(shù)進(jìn)行說(shuō)明
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
指定NameServer的地址,可以是多個(gè)。
brokerClusterName = DefaultCluster
Cluster地址,如果集群數(shù)量比較多,可以分成多個(gè)Cluster,每個(gè)Cluster供一個(gè)業(yè)務(wù)群使用。
brokerName = broker-a
Broker的名稱(chēng),Master 和Slave 通過(guò)使用相同的 Broker 名稱(chēng)來(lái)表明相互關(guān)系,以說(shuō)明某個(gè)Slave 是哪個(gè)Master 的 Slave。
brokerId = 1
一個(gè)Master可以有多個(gè)Slave,0表示Master,大于0的表示不同Slave的ID。
fileReservedTime = 48
在磁盤(pán)上保存消息的時(shí)長(zhǎng),單位是小時(shí),自動(dòng)刪除超時(shí)的消息。
deleteWhen = 04
與 fileReservedTime 參數(shù)對(duì)應(yīng),表明在幾點(diǎn)做消息刪除動(dòng)作,默認(rèn)是凌晨4點(diǎn)。
brokerRole = SYNC_MASTER
brokerRole的可選參數(shù)有SYNC_MASTER,ASYNC_MASTER,SLAVE三種。SYNC 和ASYNC 表示MASTER 和SLAVE 之間同步消息的機(jī)制,SYNC的意思是當(dāng)Slave 和 Master 的消息同步完成后再返回發(fā)送成功的狀態(tài)。
flushDiskType = ASYNC_FLUSH
flushDiskType 表示刷盤(pán)策略,可選值有ASYNC_FLUSH 和 SYNC_FLUSH兩種,分別代表同步刷盤(pán)和異步刷盤(pán)。同步情況下,消息只有真正寫(xiě)入磁盤(pán)才返回成功狀態(tài);異步情況下,消息寫(xiě)入page_cache后就返回成功狀態(tài)。
listenPort = 11011
Broker監(jiān)聽(tīng)的端口,一臺(tái)服務(wù)器啟動(dòng)多個(gè)Broker,需要設(shè)置不同的監(jiān)聽(tīng)端口避免端口沖突。
storePathRootDir = /app/rocketmq/store-a
存儲(chǔ)消息以及配置信息的根目錄。
可視化管理平臺(tái)
RocketMQ可以使用rocketmq-externals
作為運(yùn)維管理平臺(tái),Github地址https://github.com/apache/rocketmq-externals,我們需要將源碼下載下來(lái)后再進(jìn)行手動(dòng)編譯,過(guò)程如下:
下載
從github(https://github.com/apache/rocketmq-externals) 下載RocketMQ可視化管理工具 rocketmq-externals
的源碼;
打包
下載完成后切換進(jìn)rocketmq-console目錄,使用maven命令對(duì)其打包 mvn clean package -Dmaven.test.skip=true
打包完成后生成可執(zhí)行文件rocketmq-console-ng-1.0.1.jar
運(yùn)行
使用 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876 命令啟動(dòng)
這里注意需要設(shè)置兩個(gè)參數(shù):
--server.port
為運(yùn)行的這個(gè)web應(yīng)用的端口,如果不設(shè)置的話默認(rèn)為8080;
--rocketmq.config.namesrvAddr
為RocketMQ命名服務(wù)地址,若NameServer為集群則使用英文 ; 分割
訪問(wèn)
瀏覽器訪問(wèn) xxx.xxx.xxx.xxx:8080 進(jìn)入控制臺(tái)界面,效果如下
SpringBoot整合RocketMQ
在SpringBoot中整合RocketMQ主要用到 rocketmq-spring-boot-starter 組件,下面是詳細(xì)整合過(guò)程。
引入組件rocketmq-spring-boot-starter 依賴(lài)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
修改application.yml,添加RocketMQ相關(guān)配置
rocketmq: name-server: 192.168.100.43:9876;192.168.100.44:9876 producer: group: test-group send-message-timeout: 3000
如果是集群,多個(gè)name-server使用英文 ; 分割。
編寫(xiě)消息生產(chǎn)者 MessageProduce
/** * Description: * rocketMQ消息發(fā)送方法 * @author javadaily */ @Component public class MessageProduce { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發(fā)送消息 * @param topic 主題 * @param message 消息體 */ public void sendMessage(String topic,String message){ this.rocketMQTemplate.convertAndSend(topic,message); } }
使用RocketMQTemplate發(fā)送消息
編寫(xiě)消息消費(fèi)者 MessageConsumer
@Slf4j @Component @RocketMQMessageListener( topic = "test-topic", consumerGroup = "test-group", selectorExpression = "*" ) public class MessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("received message is {}", message); } }
消費(fèi)者只需要繼承RocketMQListener類(lèi)即可,主要關(guān)注實(shí)現(xiàn)類(lèi)上的 @RocketMQMessageListener
注解,配置的 topic
和 consumerGroup
需要跟消息生產(chǎn)者的配置保持一致。
編寫(xiě)單元測(cè)試發(fā)送消息
@RunWith(SpringRunner.class) @SpringBootTest public class MessageProduceTest { @Autowired private MessageProduce messageProduce; @Test public void testSendMessage() { messageProduce.sendMessage("test-topic","Hello,JAVA日知錄"); } }
測(cè)試
先啟動(dòng)springboot應(yīng)用,再執(zhí)行測(cè)試用例。
以上就是微服務(wù)架構(gòu)設(shè)計(jì)入門(mén)RocketMQ基礎(chǔ)及環(huán)境整合的詳細(xì)內(nèi)容,更多關(guān)于微服務(wù)架構(gòu)設(shè)計(jì)RocketMQ環(huán)境整合的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中的對(duì)象和對(duì)象引用實(shí)例淺析
這篇文章主要介紹了Java中的對(duì)象和對(duì)象引用,實(shí)例分析了對(duì)象與對(duì)象引用的概念與相關(guān)使用技巧,需要的朋友可以參考下2015-05-05java實(shí)現(xiàn)微信點(diǎn)餐申請(qǐng)微信退款
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)微信點(diǎn)餐申請(qǐng)微信退款,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-09-09java中 String和StringBuffer的區(qū)別實(shí)例詳解
這篇文章主要介紹了java中 String和StringBuffer的區(qū)別實(shí)例詳解的相關(guān)資料,一個(gè)小的例子,來(lái)測(cè)試String和StringBuffer在時(shí)間和空間使用上的差別,需要的朋友可以參考下2017-04-04Java動(dòng)態(tài)獲取實(shí)現(xiàn)某個(gè)接口下所有的實(shí)現(xiàn)類(lèi)對(duì)象集合
今天小編就為大家分享一篇關(guān)于Java動(dòng)態(tài)獲取實(shí)現(xiàn)某個(gè)接口下所有的實(shí)現(xiàn)類(lèi)對(duì)象集合,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12Java中檢查值是否存在于數(shù)組中的4種詳細(xì)方法
這篇文章主要給大家介紹了關(guān)于Java中檢查值是否存在于數(shù)組中的4種詳細(xì)方法,相信大家在操作Java的時(shí)候經(jīng)常會(huì)要檢查一個(gè)數(shù)組(無(wú)序)是否包含一個(gè)特定的值,需要的朋友可以參考下2023-08-08