SpringBoot整合rockerMQ消息隊列詳解
Springboot整合RockerMQ
1、maven依賴
<dependencies>
<!-- springboot-web組件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
2、yml配置文件
rocketmq:
###連接地址nameServer
name-server: www.kaicostudy.com:9876;
producer:
group: kaico_producer
server:
port: 8088
3、生產(chǎn)者
@RequestMapping("/sendMsg")
public String sendMsg() {
OrderEntity orderEntity = new OrderEntity("123456","騰訊視頻會員");
SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);
System.out.println("返回發(fā)送消息狀態(tài):" + kaicoTopic);
return "success";
}
4、消費者
@Service
@RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag1", consumerGroup = "kaico_consumer", messageModel = MessageModel.CLUSTERING)
public class OrdeConsumer2 implements RocketMQListener<OrderEntity> {
@Override
public void onMessage(OrderEntity o) {
System.out.println("kaico_consumer2消費者接收對象:" + o.toString());
}
}
使用總結(jié)
消費模式
集群消費
當 consumer 使用集群消費時,每條消息只會被 consumer 集群內(nèi)的任意一個 consumer 實例消費一次。
同時記住一點,使用集群消費的時候,consumer 的消費進度是存儲在 broker 上,consumer 自身是不存儲消費進度的。消息進度存儲在 broker 上的好處在于,當你 consumer 集群是擴大或者縮小時,由于消費進度統(tǒng)一在broker上,消息重復(fù)的概率會被大大降低了。
注意: 在集群消費模式下,并不能保證每一次消息失敗重投都投遞到同一個 consumer 實例。
注解配置:messageModel = MessageModel.CLUSTERING
廣播消費
當 consumer 使用廣播消費時,每條消息都會被 consumer 集群內(nèi)所有的 consumer 實例消費一次,也就是說每條消息至少被每一個 consumer 實例消費一次。
與集群消費不同的是,consumer 的消費進度是存儲在各個 consumer 實例上,這就容易造成消息重復(fù)。還有很重要的一點,對于廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關(guān)注消費失敗的情況。
雖然廣播消費能保證集群內(nèi)每個 consumer 實例都能消費消息,但是消費進度的維護、不具備消息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用集群消費,因為集群消費不僅擁有消費進度存儲的可靠性,還具有消息重投的機制。而且,我們通過集群消費也可以達到廣播消費的效果。
注解配置:messageModel = MessageModel.BROADCASTING
生產(chǎn)者組和消費者組
生產(chǎn)者組
一個生產(chǎn)者組,代表著一群topic相同的Producer。即一個生產(chǎn)者組是同一類Producer的組合。
如果Producer是TransactionMQProducer,則發(fā)送的是事務(wù)消息。如果節(jié)點1發(fā)送完消息后,消息存儲到broker的Half Message Queue中,還未存儲到目標topic的queue中時,此時節(jié)點1崩潰,則可以通過同一Group下的節(jié)點2進行二階段提交,或回溯。
使用時,一個節(jié)點下,一個topic會對應(yīng)一個producer
消費者組
一個消費者組,代表著一群topic相同,tag相同(即邏輯相同)的Consumer。通過一個消費者組,則可容易的進行負載均衡以及容錯
使用時,一個節(jié)點下,一個topic加一個tag可以對應(yīng)一個consumer。一個消費者組就是橫向上多個節(jié)點的相同consumer為一個消費組。
首先分析一下producer。習(xí)慣上我們不會創(chuàng)建多個訂閱了相同topic的Producer實例,因為一個Producer實例發(fā)送消息時是通過ExecutorService線程池去異步執(zhí)行的,不會阻塞完全夠用,如果創(chuàng)建了多個相同topic的Producer則會影響性能。而Consumer則不同。消息會在一topic下會細分多個tag,需要針對tag需要針對不同的tag創(chuàng)建多個消費者實例。
注意:多個不同的消費者組訂閱同一個topic、tag,如果設(shè)定的是集群消費模式,每一個消費者組中都會有一個消費者來消費。也就是說不同的消費者組訂閱同一個topic相互之間是沒有影響的。
生產(chǎn)者投遞消息的三種方式
同步: 發(fā)送消息后需等待結(jié)果,消息的可靠性高發(fā)送速度慢;
SendResult kaicoTopic = rocketMQTemplate.syncSend("kaicoTopic"+":"+"tag1", orderEntity);
異步: 消息發(fā)送后,回調(diào)通知結(jié)果,消息發(fā)送速度快,消息可靠性低;
//異步發(fā)送
rocketMQTemplate.asyncSend("kaicoTopic" + ":" + "tag1", orderEntity, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("異步發(fā)送消息成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("異步發(fā)送消息失敗");
}
});單向(oneway):消息發(fā)送后,不關(guān)心結(jié)果,發(fā)送速度最快,消息可靠性最差,適用于在大量日志數(shù)據(jù)和用戶行為數(shù)據(jù)等場景發(fā)送數(shù)據(jù)。
//單向(oneway)發(fā)送
rocketMQTemplate.sendOneWay("kaicoTopic"+":"+"tag1", orderEntity);如何保證消息不丟失
主要三個步驟
1、生產(chǎn)者保證消息發(fā)送成功
采用同步發(fā)送消息的方式,發(fā)送消息后有返回結(jié)果,保證消息發(fā)送成功。(代碼見上面)
返回四個狀態(tài)
- SEND_OK:消息發(fā)送成功。需要注意的是,消息發(fā)送到 broker 后,還有兩個操作:消息刷盤和消息同步到 slave 節(jié)點,默認這兩個操作都是異步的,只有把這兩個操作都改為同步,SEND_OK 這個狀態(tài)才能真正表示發(fā)送成功。
- FLUSH_DISK_TIMEOUT:消息發(fā)送成功但是消息刷盤超時。
- FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功但是消息同步到 slave 節(jié)點時超時。
- SLAVE_NOT_AVAILABLE:消息發(fā)送成功但是 broker 的 slave 節(jié)點不可用。
2、rocketMQ將消息持久化,保證宕機后消息不會丟失。持久化策略(刷盤策略)
- 異步刷盤:默認。消息寫入 CommitLog 時,并不會直接寫入磁盤,而是先寫入 PageCache 緩存后返回成功,然后用后臺線程異步把消息刷入磁盤。異步刷盤提高了消息吞吐量,但是可能會有消息丟失的情況,比如斷點導(dǎo)致機器停機,PageCache 中沒來得及刷盤的消息就會丟失。
- 同步刷盤:消息寫入內(nèi)存后,立刻請求刷盤線程進行刷盤,如果消息未在約定的時間內(nèi)(默認 5 s)刷盤成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到這個響應(yīng)后,可以進行重試。同步刷盤策略保證了消息的可靠性,同時降低了吞吐量,增加了延遲。要開啟同步刷盤,需要增加下面配置:
flushDiskType=SYNC_FLUSH
3、Broker 多副本和高可用
Broker 為了保證高可用,采用一主多從的方式部署。
消息發(fā)送到 master 節(jié)點后,slave 節(jié)點會從 master 拉取消息保持跟 master 的一致。這個過程默認是異步的,即 master 收到消息后,不等 slave 節(jié)點復(fù)制消息就直接給 Producer 返回成功。
這樣會有一個問題,如果 slave 節(jié)點還沒有完成消息復(fù)制,這時 master 宕機了,進行主備切換后就會有消息丟失。為了避免這個問題,可以采用 slave 節(jié)點同步復(fù)制消息,即等 slave 節(jié)點復(fù)制消息成功后再給 Producer 返回發(fā)送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER
改為同步復(fù)制后,消息復(fù)制流程如下:
- slave 初始化后,跟 master 建立連接并向 master 發(fā)送自己的 offset;
- master 收到 slave 發(fā)送的 offset 后,將 offset 后面的消息批量發(fā)送給 slave;
- slave 把收到的消息寫入 commitLog 文件,并給 master 發(fā)送新的 offset;
- master 收到新的 offset 后,如果 offset >= producer 發(fā)送消息后的 offset,給 Producer 返回 SEND_OK。
4、消費者保證消息消費成功
消費者消費消息后,如果 Consumer 消費成功,返回 CONSUME_SUCCESS,提交 offset 并從 Broker 拉取下一批消息。
@Service
public class NoSpringBootOrderConsumer {
private DefaultMQPushConsumer defaultMQPushConsumer;
@Value("${rocketmq.name-server}")
private String namesrvAddr;
protected String consumerGroup;
protected String topic;
protected String topicTag;
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setTopicTag(String topicTag) {
this.topicTag = topicTag;
}
public static String encoding = System.getProperty("file.encoding");
/*
* @Author ex_fengkai
* @Description //TODO 初始化數(shù)據(jù)(消費者組名稱、topic、topic的tag、nameServer的信息)
* @Date 2020/11/9 14:36
* @Param []
* @return void
**/
private void initParam() {
this.consumerGroup = "kaico_consumer3";
this.topic = "kaicoTopic";
this.topicTag = "tag1";
this.setNamesrvAddr(namesrvAddr);
}
@PostConstruct
private void init() throws InterruptedException, MQClientException {
initParam();
// ConsumerGroupName需要由應(yīng)用來保證唯一,用于把多個Consumer組織到一起,提高并發(fā)處理能力
defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); //設(shè)置nameServer服務(wù)器
defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
defaultMQPushConsumer.setVipChannelEnabled(false);
// 設(shè)置Consumer第一次啟動是從隊列頭部開始消費
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 訂閱指定Topic下的topicTag
System.out.println("consumerGroup:" + consumerGroup + " topic:" + topic + " ,topicTag:" + topicTag);
defaultMQPushConsumer.subscribe(topic, topicTag);
// 設(shè)置為集群消費
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 通過匿名消息監(jiān)聽處理消息消費
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
// 默認msgs里只有一條消息,可以通過設(shè)置consumeMessageBatchMaxSize參數(shù)來批量接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals(topic) && msg.getTags() != null && msg.getTags().equals(topicTag)) {
// 執(zhí)行topic下對應(yīng)tag的消費邏輯
try {
onMessage(new String(msg.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
System.out.println("系統(tǒng)不支持消息編碼格式:" + encoding);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (Exception e) {
System.out.println("消息處理異常");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
System.out.println("consumerGroup:" + consumerGroup + " MsgId:" + msg.getMsgId() + " was done!");
}
// 如果沒有return success ,consumer會重新消費該消息,直到return success
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Consumer對象在使用之前必須要調(diào)用start初始化,初始化一次即可
defaultMQPushConsumer.start();
System.out.println("consumerGroup:" + consumerGroup + " namesrvAddr:" + namesrvAddr + " start success!");
}
@PreDestroy
public void destroy() {
defaultMQPushConsumer.shutdown();
}
private void onMessage(String s) {
System.out.println(consumerGroup + "用spring的方式的消費者消費:" + s);
}
}Consumer 重試
Consumer 消費失敗,這里有 3 種情況:
- 返回 RECONSUME_LATER
- 返回 null
- 拋出異常
Broker 收到這個響應(yīng)后,會把這條消息放入重試隊列,重新發(fā)送給 Consumer。
注意:Broker 默認最多重試 16 次,如果重試 16 次都失敗,就把這條消息放入死信隊列,Consumer 可以訂閱死信隊列進行消費。重試只有在集群模式(MessageModel.CLUSTERING)下生效,在廣播模式(MessageModel.BROADCASTING)下是不生效的。Consumer 端一定要做好冪等處理。
順序消息
- 生產(chǎn)者投遞消息根據(jù)key投遞到同一個隊列中存放
- 消費者應(yīng)該訂閱到同一個隊列實現(xiàn)消費
- 最終應(yīng)該使用同一個線程去消費消息(不能夠?qū)崿F(xiàn)多線程消費。)
生產(chǎn)者代碼
//發(fā)送順序消息
@RequestMapping("/sendMsg1")
public String sendMsg1() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Long orderId = System.currentTimeMillis();
String insertSql = getSqlMsg("insert", orderId);
String updateSql = getSqlMsg("update", orderId);
String deleteSql = getSqlMsg("delete", orderId);
Message insertMsg = new Message("kaicoTopic", "tag6", insertSql.getBytes());
Message updateMsg = new Message("kaicoTopic", "tag6", updateSql.getBytes());
Message deleteMsg = new Message("kaicoTopic", "tag6", deleteSql.getBytes());
DefaultMQProducer producer = rocketMQTemplate.getProducer();
rocketMQTemplate.getProducer().send(insertMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
// 該消息存放到隊列0中
return mqs.get(0);
}
}, orderId);
rocketMQTemplate.getProducer().send(updateMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
// 該消息存放到隊列0中
return mqs.get(0);
}
}, orderId);
rocketMQTemplate.getProducer().send(deleteMsg
, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg,
Object arg) {
// 該消息存放到隊列0中
return mqs.get(0);
}
}, orderId);
return orderId + "";
}消費者代碼
@Service
@RocketMQMessageListener(topic = "kaicoTopic", selectorExpression ="tag6", consumerGroup = "kaico_consumer1",
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, consumeThreadMax = 1)
public class OrdeConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
System.out.println(Thread.currentThread().getName() + "-kaico_consumer1消費者接收對象:隊列" + msg.getQueueId()
+ "=消息:" + new String(msg.getBody()));
}
}分布式事務(wù)

實現(xiàn)思路
- 生產(chǎn)者(發(fā)送方)投遞事務(wù)消息到Broker中,設(shè)置該消息為半消息 不可以被消費;
- 開始執(zhí)行我們的本地事務(wù),將本地事務(wù)執(zhí)行的結(jié)果(回滾或者提交)發(fā)送給Broker
- Broker獲取回滾或者提交,如果是回滾的情況則刪除該消息、如果是提交的話,該消息就可以被消費者消費;
- Broker如果沒有及時的獲取發(fā)送方本地事務(wù)結(jié)果的話,會主動查詢本地事務(wù)結(jié)果。
1、生產(chǎn)者發(fā)送事務(wù)消息sendMessageInTransaction
public String saveOrder() {
// 提前生成我們的訂單id
String orderId = System.currentTimeMillis() + "";
/**
* 1.提前生成我們的半消息
* 2.半消息發(fā)送成功之后,在執(zhí)行我們的本地事務(wù)
*/
OrderEntity orderEntity = createOrder(orderId);
String msg = JSONObject.toJSONString(orderEntity);
MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(msg);
stringMessageBuilder.setHeader("msg", msg);
Message message = stringMessageBuilder.build();
// 該消息不允許被消費者消費,生產(chǎn)者的事務(wù)邏輯代碼在生產(chǎn)者事務(wù)監(jiān)聽類中executeLocalTransaction方法中執(zhí)行。
rocketMQTemplate.sendMessageInTransaction("kaicoProducer",
"orderTopic", message, null);
return orderId;
}2、事務(wù)監(jiān)聽類
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "kaicoProducer") //這個mayiktProducer生產(chǎn)者的事務(wù)管理
public class SyncProducerListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransationalUtils transationalUtils;
/**
* 執(zhí)行我們訂單的事務(wù)
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
//拿到消息
Object object = headers.get("msg");
if (object == null) {
return null;
}
String orderMsg = (String) object;
OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
TransactionStatus begin = null;
try {
begin = transationalUtils.begin();
int result = orderMapper.addOrder(orderEntity);
transationalUtils.commit(begin);
if (result <= 0) {
return RocketMQLocalTransactionState.ROLLBACK;
}
// 告訴我們的Broke可以消費者該消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
if (begin != null) {
transationalUtils.rollback(begin);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//add.Order
return null;
}
/**
* 提供給我們的Broker定時檢查
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
Object object = headers.get("msg");
if (object == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
String orderMsg = (String) object;
OrderEntity orderEntity = JSONObject.parseObject(orderMsg, OrderEntity.class);
String orderId = orderEntity.getOrderId();
// 直接查詢我們的數(shù)據(jù)庫
OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
if (orderDbEntity == null) {
//不確認,繼續(xù)重試
return RocketMQLocalTransactionState.UNKNOWN;
}
//提交事務(wù)
return RocketMQLocalTransactionState.COMMIT;
}
}3、消費者消費消息
@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "kaicoTopic")
public class OrdeConsumer implements RocketMQListener<String> {
@Autowired
private DispatchMapper dispatchMapper;
@Override
public void onMessage(String msg) {
OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
String orderId = orderEntity.getOrderId();
// 模擬userid為=123456
DispatchEntity dispatchEntity = new DispatchEntity(orderId, 123456L);
dispatchMapper.insertDistribute(dispatchEntity);
}
}
到此這篇關(guān)于SpringBoot整合rockerMQ消息隊列詳解的文章就介紹到這了,更多相關(guān)SpringBoot整合rockerMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java的BigDecimal在math包中提供的API類場景使用詳解
這篇文章主要介紹了Java的BigDecimal在math包中提供的API類場景使用詳解,BigDecimal,用來對超過16位有效位的數(shù)進行精確的運算,雙精度浮點型變量double可以處理16位有效數(shù),在實際應(yīng)用中,需要對更大或者更小的數(shù)進行運算和處理,需要的朋友可以參考下2023-12-12
SpringBoot3實現(xiàn)統(tǒng)一結(jié)果封裝的示例代碼
Spring Boot進行統(tǒng)一結(jié)果封裝的主要目的是提高開發(fā)效率、降低代碼重復(fù)率,并且提供一致的API響應(yīng)格式,從而簡化前后端交互和錯誤處理,所以本文給大家介紹了SpringBoot3實現(xiàn)統(tǒng)一結(jié)果封裝的方法,需要的朋友可以參考下2024-03-03

