欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java rocketmq--消息的產(chǎn)生(普通消息)

 更新時(shí)間:2019年06月20日 17:04:44   作者:有愛jj  
這篇文章主要介紹了java rocketmq--消息的產(chǎn)生(普通消息),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下

前言

與消息發(fā)送緊密相關(guān)的幾行代碼:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那這幾行代碼執(zhí)行時(shí),背后都做了什么?

一. 首先是DefaultMQProducer.start

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

調(diào)用了默認(rèn)生成消息的實(shí)現(xiàn)類 -- DefaultMQProducerImpl

調(diào)用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會(huì)初始化得到MQClientInstance實(shí)例對象,MQClientInstance實(shí)例對象調(diào)用它自己的start方法會(huì) ,啟動(dòng)一些服務(wù),如拉去消息服務(wù)PullMessageService.Start()、啟動(dòng)負(fù)載平衡服務(wù)RebalanceService.Start(),比如網(wǎng)絡(luò)通信服務(wù)MQClientAPIImpl.Start()

另外,還會(huì)執(zhí)行與生產(chǎn)消息相關(guān)的信息,如注冊produceGroup、new一個(gè)TopicPublishInfo對象并以默認(rèn)TopicKey為鍵值,構(gòu)成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,獲取的MQClientInstance實(shí)例對象會(huì)調(diào)用sendHeartbeatToAllBroker()方法,不斷向broker發(fā)送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

上圖中的三個(gè)部分中涉及的內(nèi)容:

1.1 初始化MQClientInstance

一個(gè)客戶端只能產(chǎn)生一個(gè)MQClientInstance實(shí)例對象,產(chǎn)生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動(dòng)一些服務(wù),源碼如下:

public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

1.2 注冊producer

該過程會(huì)將這個(gè)當(dāng)前producer對象注冊到MQClientInstance實(shí)例對象的的producerTable中。一個(gè)jvm(一個(gè)客戶端)中一個(gè)producerGroup只能有一個(gè)實(shí)例,MQClientInstance操作producerTable大概有如下幾個(gè)方法:

  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown

注:

根據(jù)不同的clientId,MQClientManager將給出不同的MQClientInstance;

根據(jù)不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定義:

public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一個(gè)以topic為key的Map型數(shù)據(jù)結(jié)構(gòu),DefaultMQProducerImpl.start()時(shí)會(huì)默認(rèn)創(chuàng)建一個(gè)key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 發(fā)送心跳包

MQClientInstance向broker發(fā)送心跳包時(shí),調(diào)用sendHeartbeatToAllBroker( ),以及從MQClientInstance實(shí)例對象的brokerAddrTable中拿到所有broker地址,向這些broker發(fā)送心跳包。

sendHeartbeatToAllBroker會(huì)涉及到prepareHeartbeatData()方法,該方法會(huì)生成heartbeatData數(shù)據(jù),發(fā)送心跳包時(shí),heartbeatData作為心跳包的body。與producer相關(guān)的部分代碼如下:

// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}

二、. SendResult sendResult = producer.send(msg)

首先會(huì)調(diào)用DefaultMQProducer.send(msg) ,繼而調(diào)用sendDefaultImpl:

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

sendDefaultImpl做了啥?

2.1. 獲取topicPublishInfo

根據(jù)msg的topic從topicPublishInfoTable獲取對應(yīng)的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 選擇消息發(fā)送的隊(duì)列

普通消息:默認(rèn)方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息,默認(rèn)采用長輪詢的方式選擇隊(duì)列 。

它的機(jī)制如下:正常情況下,順序選擇queue進(jìn)行發(fā)送;如果某一個(gè)節(jié)點(diǎn)發(fā)生了超時(shí),則下次選擇queue時(shí),跳過相同的broker。不同的隊(duì)列選擇策略形成了生產(chǎn)消息的幾種模式,如順序消息,事務(wù)消息。

順序消息:將一組需要有序消費(fèi)的消息發(fā)往同一個(gè)broker的同一個(gè)隊(duì)列上即可實(shí)現(xiàn)順序消息,假設(shè)相同訂單號的支付,退款需要放到同一個(gè)隊(duì)列,那么就可以在send的時(shí)候,自己實(shí)現(xiàn)MessageQueueSelector,根據(jù)參數(shù)arg字段來選擇queue。

private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事務(wù)消息:只有在消息發(fā)送成功,并且本地操作執(zhí)行成功時(shí),才發(fā)送提交事務(wù)消息,做事務(wù)提交,消息發(fā)送失敗,直接發(fā)送回滾消息,進(jìn)行回滾,具體如何實(shí)現(xiàn)后面會(huì)單獨(dú)成文分析。

2.3 封裝消息體通信包,發(fā)送數(shù)據(jù)包

首先,根據(jù)獲取的MessageQueue中的getBrokerName,調(diào)用findBrokerAddressInPublish得到該消息存放對應(yīng)的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知獲取的broker均為master(id=0)

然后, 將與該消息相關(guān)信息打包成RemotingCommand數(shù)據(jù)包,其RequestCode.SEND_MESSAGE

根據(jù)獲取的broke地址,將數(shù)據(jù)包到對應(yīng)的broker,默認(rèn)是發(fā)送超時(shí)時(shí)間為3s。

封裝消息請求包的包頭:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

發(fā)送消息包(普通消息默認(rèn)為同步方式):

SendResult sendResult = null;
switch (communicationMode) {
   case SYNC:
  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  brokerAddr,
  mq.getBrokerName(),
   msg,
  requestHeader,
   timeout,
  communicationMode,
  context,
  this);
break;

處理來自broker端的響應(yīng)數(shù)據(jù)包:

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

broker端處理request數(shù)據(jù)包后會(huì)將消息存儲(chǔ)到commitLog,具體過程后續(xù)分析。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 使用Spring事件監(jiān)聽機(jī)制實(shí)現(xiàn)跨模塊調(diào)用的步驟詳解

    使用Spring事件監(jiān)聽機(jī)制實(shí)現(xiàn)跨模塊調(diào)用的步驟詳解

    Spring 事件監(jiān)聽機(jī)制是 Spring 框架中用于在應(yīng)用程序的不同組件之間進(jìn)行通信的一種機(jī)制,Spring 事件監(jiān)聽機(jī)制基于觀察者設(shè)計(jì)模式,使得應(yīng)用程序的各個(gè)部分可以解耦,提高模塊化和可維護(hù)性,本文給大家介紹了使用Spring事件監(jiān)聽機(jī)制實(shí)現(xiàn)跨模塊調(diào)用,需要的朋友可以參考下
    2024-06-06
  • springboot jpaRepository為何一定要對Entity序列化

    springboot jpaRepository為何一定要對Entity序列化

    這篇文章主要介紹了springboot jpaRepository為何一定要對Entity序列化,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot通過參數(shù)注解自動(dòng)獲取當(dāng)前用戶信息的方法

    SpringBoot通過參數(shù)注解自動(dòng)獲取當(dāng)前用戶信息的方法

    這篇文章主要介紹了SpringBoot通過參數(shù)注解自動(dòng)獲取當(dāng)前用戶信息的方法,文中使用HandlerMethodArgumentResolver 類來實(shí)現(xiàn)這個(gè)功能,并通過代碼示例講解的非常詳細(xì),需要的朋友可以參考下
    2024-03-03
  • Java?@Validated遇到的大坑與處理

    Java?@Validated遇到的大坑與處理

    這篇文章主要介紹了Java?@Validated遇到的大坑與處理方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • ZooKeeper集群操作及集群Master選舉搭建啟動(dòng)

    ZooKeeper集群操作及集群Master選舉搭建啟動(dòng)

    這篇文章主要為大家介紹了ZooKeeper集群操作及集群Master選舉搭的建啟動(dòng)詳解,<BR>有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • springboot]logback日志框架配置教程

    springboot]logback日志框架配置教程

    這篇文章主要介紹了springboot]logback日志框架配置,logback既可以通過application配置文件進(jìn)行日志的配置,又可以通過logback-spring.xml進(jìn)行日志的配置,本文給大家介紹的非常詳細(xì),需要的朋友參考下吧
    2022-04-04
  • Java中List集合去除重復(fù)數(shù)據(jù)的方法匯總

    Java中List集合去除重復(fù)數(shù)據(jù)的方法匯總

    這篇文章主要給大家介紹了關(guān)于Java中List集合去除重復(fù)數(shù)據(jù)的方法,文中通過圖文介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • POI導(dǎo)出之Excel實(shí)現(xiàn)單元格的背景色填充問題

    POI導(dǎo)出之Excel實(shí)現(xiàn)單元格的背景色填充問題

    這篇文章主要介紹了POI導(dǎo)出之Excel實(shí)現(xiàn)單元格的背景色填充問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 全面分析Java方法的使用與遞歸

    全面分析Java方法的使用與遞歸

    在java中,方法就是用來完成解決某件事情或?qū)崿F(xiàn)某個(gè)功能的辦法;程序調(diào)用自身的編程技巧稱為遞歸( recursion)。遞歸做為一種算法在程序設(shè)計(jì)語言中廣泛應(yīng)用。但是如果沒終止條件會(huì)造成死循環(huán),所以遞歸代碼里要有結(jié)束自調(diào)自的條件,本篇接下來講解一下方法與遞歸
    2022-04-04
  • 一文搞懂Java克隆及深拷貝與淺拷貝的區(qū)別

    一文搞懂Java克隆及深拷貝與淺拷貝的區(qū)別

    在編程中,通常通過實(shí)現(xiàn)Cloneable接口和重寫clone方法來實(shí)現(xiàn)對象的克隆,然而,需要注意的是克隆操作可能存在深拷貝和淺拷貝的區(qū)別,在使用時(shí)需要根據(jù)實(shí)際需求選擇合適的克隆方式,本文就給大家詳細(xì)講講什么是克隆以及深拷貝與淺拷貝的區(qū)別,需要的朋友可以參考下
    2023-08-08

最新評論