欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ producer同步發(fā)送單向發(fā)送源碼解析

 更新時(shí)間:2023年03月17日 11:49:56   作者:hsfxuebao  
這篇文章主要為大家介紹了RocketMQ producer同步發(fā)送單向發(fā)送源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式,分別是同步發(fā)送,異步發(fā)送和單向發(fā)送。

  • 單向發(fā)送,這個(gè)就是發(fā)送之后不用接收結(jié)果的,就是你發(fā)出去一個(gè)消息,然后就返回了,就算有結(jié)果返回也不會(huì)接收了,這是站在消息生產(chǎn)者的角度;
  • 同步發(fā)送的話(huà),就是發(fā)出去一個(gè)消息,這個(gè)線(xiàn)程要等著它返回消息發(fā)送結(jié)果,然后你這個(gè)線(xiàn)程再根據(jù)這個(gè)消息發(fā)送結(jié)果再做一些業(yè)務(wù)操作等等;
  • 異步發(fā)送,這個(gè)就是在你發(fā)送消息之前要給一個(gè)callback,發(fā)送的時(shí)候,你這個(gè)線(xiàn)程就不用等著,該干什么就干什么,然后發(fā)送結(jié)果回來(lái)的時(shí)候,是由其他線(xiàn)程調(diào)用你這個(gè)callback來(lái)處理的,你可以把這個(gè)callback看作是一個(gè)回調(diào)函數(shù),回調(diào)方法,這個(gè)方法里面的業(yè)務(wù)邏輯就是你對(duì)這個(gè)消息發(fā)送結(jié)果的處理。注意,本文介紹的消息發(fā)送只是普通的消息發(fā)送,那種事務(wù)類(lèi)型的消息,我們以后會(huì)有介紹。

1. 同步發(fā)送

producer同步發(fā)送消息的示例在org.apache.rocketmq.example.simple.Producer類(lèi)中,代碼如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 創(chuàng)建 DefaultMQProducer 對(duì)象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 啟動(dòng) producer
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 3. 發(fā)送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}

我們可以看到這個(gè)代碼,你是同步消息你是需要在你自己的業(yè)務(wù)線(xiàn)程里面接收這個(gè)sendResult的,然后在做一些業(yè)務(wù)處理,比如我這里就是打印了一下這個(gè)sendResult。

接下來(lái)我們看下它是怎樣發(fā)送的,這里是調(diào)用了這個(gè)producer的send方法。

@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // topic 和消息長(zhǎng)度 校驗(yàn)
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // todo
    return this.defaultMQProducerImpl.send(msg);
}

我們可以看到,這個(gè) DefaultMQProducer 將這個(gè)消息給了defaultMQProducerImpl 這個(gè)實(shí)現(xiàn)的send方法來(lái)處理了。

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 默認(rèn)超時(shí)時(shí)間3s
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

defaultMQProducerImpl的send方法,加了個(gè)超時(shí)時(shí)間 ,然后有調(diào)用它的重載方法send(msg,timeout)

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 同步模式
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這個(gè)send(msg,timeout)又調(diào)用了sendDefaultImpl 方法,然后他這里加了個(gè)通信模式是同步,CommunicationMode.SYNC。

1.1 DefaultMQProducerImpl#sendDefaultImpl

sendDefaultImpl 方法就比較長(zhǎng)了了我們分成幾部分來(lái)介紹:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 判斷狀態(tài)是否是running
    this.makeSureStateOK();
    // 檢查消息合法性
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 隨機(jī)的invokeID
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // todo 獲取topic信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    ...
}

這一小段代碼其實(shí)就是做了一些準(zhǔn)備檢查工作,注意第二行的個(gè)檢查消息合法性,它要檢查你topic,消息長(zhǎng)度的,你不能發(fā)空消息,消息長(zhǎng)度也不能太長(zhǎng),默認(rèn)是不超過(guò)4m,接下來(lái)這些就是記錄一下時(shí)間了,再看最后一行,就是根據(jù)你這個(gè)消息發(fā)送的topic,然后獲取topic 發(fā)送消息的這么一個(gè)信息,這里面就有這topic 有幾個(gè)MessageQueue,然后每個(gè)MessageQueue對(duì)應(yīng)在哪個(gè)broker上面,broker 的地址又是啥的,它這個(gè)方法會(huì)先從本地的一個(gè)緩存中獲取下,沒(méi)有的話(huà)就從nameserv更新下這個(gè)本地緩存,再找找,要是再找不到,它就認(rèn)為你沒(méi)有這個(gè)topic了,然后就去nameserv上面拉取一個(gè)默認(rèn)topic的一些配置信息給你用(這個(gè)其實(shí)就是在新建一個(gè)topic)。 接著這個(gè)方法往下看,接著就是判斷 這個(gè)TopicPublishInfo 是否存在了,如果不存在的話(huà)就拋出異常了,沒(méi)有后續(xù)了就,如果存在的話(huà):

...
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    // 重試次數(shù) 區(qū)分同步、其他
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    // 存放發(fā)送過(guò)的broker name
    String[] brokersSent = new String[timesTotal];
    // 重試發(fā)送
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        // todo 選擇message queue
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                // todo 進(jìn)行發(fā)送
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                endTimestamp = System.currentTimeMillis();
                // todo isolation 參數(shù)為false(看一下異常情況)
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                switch (communicationMode) {
                    case ASYNC:
                        return null;
                    case ONEWAY:
                        return null;
                    case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    default:
                        break;
                }
            } catch (RemotingException e) {
                endTimestamp = System.currentTimeMillis();
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                log.warn(msg.toString());
            ...

其實(shí)下面還有許多處理異常的操作沒(méi)有放上,不過(guò)不影響我們的主流程,先是判斷你這個(gè)通信模式,如果是同步的話(huà),默認(rèn)重試次數(shù)就是2 ,然后加上本身這次請(qǐng)求,也就是最查請(qǐng)求3次。這個(gè)for循環(huán)就是失敗重試的代碼,再看下代碼selectOneMessageQueue這個(gè)就是選擇一個(gè)MesssageQueue的方法了,這個(gè)是比較重要的,這里我們先不說(shuō),你可以把它理解為 我們的負(fù)載均衡。接著往下走,就是判斷一下時(shí)間了,計(jì)算一下剩下的時(shí)間, 如果這一堆前面的內(nèi)容耗時(shí)很長(zhǎng),然后已經(jīng)超了之前設(shè)置的默認(rèn)超時(shí)時(shí)間,這個(gè)時(shí)候就會(huì)超時(shí)了,然后將這個(gè)calltimeout設(shè)置成true了。

1.2 DefaultMQProducerImpl#sendKernelImpl

接著就是進(jìn)行發(fā)送了調(diào)用sendKernelImpl 方法:

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 根據(jù)MessageQueue獲取Broker的網(wǎng)絡(luò)地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    SendMessageContext context = null;
    ...

這個(gè)sendKernelImpl 也是有點(diǎn)長(zhǎng),然后我們一部分一部分的看下,這就是根據(jù)MessageQueue里面的broker name 獲取一下broker addr,他這個(gè)broker addr 選的是master的,比如說(shuō)我們 broker使用的是 master/slave 高可用架構(gòu),這個(gè)時(shí)候只會(huì)選擇那個(gè)master,畢竟是往里面寫(xiě)消息,然后只能用master,等到介紹消息消費(fèi)者的時(shí)候,消息消費(fèi)者是可以向slave node 獲取消息消費(fèi)的,前提是 master 負(fù)載比較大,然后消息消費(fèi)者下次獲取消費(fèi)的消息已經(jīng)在slave里面了,然后消息消費(fèi)者獲取到消息之后,它里面有個(gè)字段是告訴你下次可以去xxx 地址的broker 拉取消息,這個(gè)我們介紹到消息消費(fèi)者的時(shí)候再說(shuō)。

接著回來(lái),如果沒(méi)有獲取到這個(gè)broker 地址的話(huà),就是去nameserv上更新下本地緩存,然后再獲取下。接著再往下就是再次判斷一下這個(gè)broker addr 了,如果還沒(méi)有就拋出異常,如果有的話(huà) 就執(zhí)行下面的代碼了:

...
SendMessageContext context = null;
if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
        //for MessageBatch,ID has been set in the generating process
        // 給消息設(shè)置全局唯一id, 對(duì)于MessageBatch在生成過(guò)程中已設(shè)置了id
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
        boolean topicWithNamespace = false;
        if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
            msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
            topicWithNamespace = true;
        }
        int sysFlag = 0;
        // 消息體是否壓縮
        boolean msgBodyCompressed = false;
        // 壓縮消息 內(nèi)容部分超了4k就會(huì)壓縮
        if (this.tryToCompressMessage(msg)) {
            sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            msgBodyCompressed = true;
        }
        final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
            sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
        }
        // 判斷有沒(méi)有hook
        if (hasCheckForbiddenHook()) {
            CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
            checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
            checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
            checkForbiddenContext.setCommunicationMode(communicationMode);
            checkForbiddenContext.setBrokerAddr(brokerAddr);
            checkForbiddenContext.setMessage(msg);
            checkForbiddenContext.setMq(mq);
            checkForbiddenContext.setUnitMode(this.isUnitMode());
            // 執(zhí)行Forbidden 鉤子
            this.executeCheckForbiddenHook(checkForbiddenContext);
        }
        ...

第一句,這個(gè)其實(shí)就是進(jìn)行一個(gè)vip通道地址的轉(zhuǎn)換,這個(gè)比較有意思,如果你這個(gè)支持vip channel的話(huà),它會(huì)把broker addr 里面的端口改變一下,這個(gè)所謂的vip channel ,其實(shí)就是與它的另一個(gè)端口建立連接,這個(gè)端口就是當(dāng)前端口-2 ;

接著,如果這個(gè)消息不是批量消息的話(huà),我們就給這個(gè)消息設(shè)置一個(gè)唯一的消息id,再往下就是 sysflag的處理了,這個(gè)sysflag里面記錄了好幾個(gè)屬性值,使用二進(jìn)制來(lái)處理的,比如說(shuō)消息是否壓縮了(這個(gè)壓縮,就是你消息內(nèi)容超過(guò)了默認(rèn)的4k之后,就會(huì)進(jìn)行壓縮,這個(gè)壓縮的閾值你是可以配置的),是否是個(gè)事務(wù)消息等等。 接下來(lái)就是執(zhí)行hook了,這個(gè)hook就是forbidenHook ,其實(shí)就是對(duì)消息進(jìn)行過(guò)濾。

...
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}
// 封裝消息頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 設(shè)置group
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// topic
requestHeader.setTopic(msg.getTopic());
// 設(shè)置默認(rèn)topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 設(shè)置默認(rèn)topic的隊(duì)列數(shù)量 默認(rèn)4個(gè)
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 隊(duì)列id
requestHeader.setQueueId(mq.getQueueId());
// 消息系統(tǒng)標(biāo)記
requestHeader.setSysFlag(sysFlag);
// 消息發(fā)送時(shí)間
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息標(biāo)記(RocketMQ對(duì)消息標(biāo)記不做任何處理,供應(yīng)用程序使用)
requestHeader.setFlag(msg.getFlag());
// 設(shè)置擴(kuò)展屬性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批量
requestHeader.setBatch(msg instanceof MessageBatch);
// 判斷消息是否是 %RETRY% 開(kāi)頭
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
...

在往下就是執(zhí)行一下發(fā)送消息之前的hook,再往下就是封裝發(fā)送消息請(qǐng)求頭,然后這個(gè)請(qǐng)求頭里面就涵蓋了很多的參數(shù),比如說(shuō)topic,MessageQueue 隊(duì)列Id, 出生日期,flag等等。再往下就是消息發(fā)送了

...
SendResult sendResult = null;
// 同步 異步  單向
switch (communicationMode) {
    // 異步
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }
        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
        // 判斷超時(shí)時(shí)間
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
        // 單向
    case ONEWAY:
        // 同步
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        // 判判是否超時(shí)
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo 交給 mq api去發(fā)送消息
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
// 是否注冊(cè)了消息發(fā)送鉤子函數(shù)
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}
...

因?yàn)楸拘」?jié)主要是介紹下這個(gè)同步發(fā)送消息,然后我們就主要介紹下這個(gè)sync的代碼邏輯: 首先是判斷超時(shí),然后交給 MQClientAPI層去處理,然后返回sendResult。

1.3 MQClientAPIImpl#sendMessage

我們這里接著看下MQClientAPIImpl里面的sendMessage 實(shí)現(xiàn):

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // sendSmartMsg默認(rèn)開(kāi)啟,也算一種優(yōu)化吧 批量消息
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            // 普通消息
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    // 設(shè)置消息體
    request.setBody(msg.getBody());
    switch (communicationMode) {
        case ONEWAY:
            // todo
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí)時(shí)間
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí)時(shí)間
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo 同步發(fā)送
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
    return null;
}

這里先生成一個(gè)RemotingCommand 這么個(gè)實(shí)體對(duì)象,然后RequestCode就是SEND_MESSAGE,其實(shí)這里判斷了一下sendSmartMsg 這個(gè)參數(shù),把requestHeader優(yōu)化了一下,然后換成了requestHeaderV2,其實(shí)這個(gè)requestHeaderV2 內(nèi)容跟requestHeader一樣,但是變量名是單個(gè)字母的,然后序列化,反序列化,傳輸內(nèi)容都有所優(yōu)化,其實(shí)他這個(gè)序列化使用是json形式的,然后想想就知道有些哪些好處了, 唯一的缺點(diǎn)就是可讀性差點(diǎn),但是這個(gè)玩意是對(duì)用戶(hù)透明的,用戶(hù)不需要關(guān)心。

接著就是判斷通信類(lèi)型,然后發(fā)送消息了,這里是同步發(fā)送,先是判斷一下超時(shí)時(shí)間,接著就是調(diào)用sendMessageSync 進(jìn)行同步發(fā)送了,我們接著來(lái)看下這個(gè)sendMessageSync 方法實(shí)現(xiàn)。

1.4 MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // todo 同步調(diào)用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    // 處理響應(yīng)
    return this.processSendResponse(brokerName, msg, response,addr);
}

這里就調(diào)用到了client 模塊(這個(gè)client其實(shí)就是直接操作netty了)來(lái)處理了,然后返回響應(yīng),調(diào)用processSendResponse 方法來(lái)處理響應(yīng)。

1.5 NettyRemotingClient#invokeSync

我們?cè)賮?lái)看下client的 invokeSync 方法:

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 開(kāi)始時(shí)間
    long beginStartTime = System.currentTimeMillis();
    // todo 輪詢(xún)獲取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 執(zhí)行開(kāi)始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判斷超時(shí) 之前有獲取鏈接的操作,可能會(huì)出現(xiàn)超時(shí)的情況
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 進(jìn)行同步執(zhí)行,獲取響應(yīng)
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 執(zhí)行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 遠(yuǎn)程發(fā)送請(qǐng)求異常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 關(guān)閉channel
            this.closeChannel(addr, channel);
            throw e;
            // 超時(shí)異常
        } catch (RemotingTimeoutException e) {
            // 如果超時(shí) 就關(guān)閉cahnnel話(huà),就關(guān)閉channel 默認(rèn)是不關(guān)閉的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里有兩個(gè)點(diǎn)需要關(guān)注下,首先是根據(jù)broker addr 這個(gè)地址獲取一下對(duì)應(yīng)的channel ,如果不存在的話(huà)就創(chuàng)建一下這個(gè)連接, 稍微看下這塊的代碼:

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    // 如果地址不存在,就返回namesrv 的channel
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }
    // 創(chuàng)建channel
    return this.createChannel(addr);
}

如果你這個(gè)addr是空的話(huà),這個(gè)就是默認(rèn)找nameserv的addr ,然后找對(duì)應(yīng)channel就可以了,如果不是null ,然后它會(huì)去這個(gè)channelTable 這個(gè)map中去找,如果沒(méi)有的話(huà)就創(chuàng)建一個(gè)對(duì)應(yīng)的channel

接著回到這個(gè)invokeSync 方法中,獲得channel之后,就是執(zhí)行一下rpcHook了,這東西就是你在創(chuàng)建MQProducer的時(shí)候設(shè)置的,在調(diào)用前執(zhí)行一次,調(diào)用后執(zhí)行一次,其實(shí)你就可以通過(guò)這個(gè)hook來(lái)實(shí)現(xiàn)很多功能,監(jiān)控的功能比較多些。接著就是調(diào)用了invokeSyncImpl 這個(gè)實(shí)現(xiàn)方法來(lái)發(fā)送消息了,這個(gè)方法是它的一個(gè)父類(lèi)里面的:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 獲取 請(qǐng)求id
    final int opaque = request.getOpaque();
    try {
        // 創(chuàng)建ResponseFuture
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 放入responseTable 表中
        this.responseTable.put(opaque, responseFuture);
        // 獲取遠(yuǎn)程地址
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                // 成功
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                // 失敗
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 移除response中的緩存
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            // 成功了還是null  還是超時(shí)
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                // 沒(méi)發(fā)出去,就排除異常
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        // 返回響應(yīng)結(jié)果
        return responseCommand;
    } finally {
        // 移除
        this.responseTable.remove(opaque);
    }
}

這個(gè)方法其實(shí)就是最終往 channel里面寫(xiě)內(nèi)容的方法了,我們來(lái)看下,先是為這次request創(chuàng)建一個(gè)id 吧,這個(gè)id主要用來(lái)返回響應(yīng)的時(shí)候用的。

接著創(chuàng)建一個(gè)ResposeFuture ,這個(gè)東西異步,同步都可以用,這個(gè)一會(huì)介紹一下它的原理,接著就是將這個(gè)id 與這個(gè) ResposeFuture 關(guān)聯(lián)起來(lái)放到這個(gè) responseTable 里面的, 接著就是往channel里面發(fā)送消息了,這里它添加一個(gè)listener ,這listener的執(zhí)行時(shí)機(jī)就是發(fā)送出去的時(shí)候,最后就是等待這個(gè)響應(yīng)了。

我們來(lái)解釋下這個(gè)ResposeFuture 原理, 當(dāng)執(zhí)行了responseFuture.waitResponse(timeoutMillis); 這行代碼,當(dāng)前線(xiàn)程就會(huì)wait ,然后被阻塞,然后等著響應(yīng)回來(lái)的時(shí)候,netty處理響應(yīng)的線(xiàn)程會(huì)從響應(yīng)里面獲取一下這個(gè)opaque這個(gè)id,就是請(qǐng)求之前在request生成的,broker 在響應(yīng)的時(shí)候會(huì)會(huì)把這個(gè)id 放回到response 中, 然后會(huì)根據(jù)這個(gè)opaque 從responseTable中找到這個(gè) ResposeFuture ,然后把響應(yīng)設(shè)置到這個(gè)里面,最后喚醒一下wait在這個(gè)對(duì)象里面的線(xiàn)程就可以了,這樣你這個(gè)業(yè)務(wù)線(xiàn)程就得到了這個(gè)RemotingResponse 了。 好了,到這我們就解釋清楚了,然后我們看下他這個(gè)代碼是怎樣實(shí)現(xiàn)的:

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    // 獲取對(duì)應(yīng)id 的responseFuture
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        // 設(shè)置
        responseFuture.setResponseCommand(cmd);
        // 從響應(yīng)表中移除
        responseTable.remove(opaque);
        if (responseFuture.getInvokeCallback() != null) {
            // todo 執(zhí)行回調(diào)
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

不過(guò)它這個(gè)ResposeFuture 是使用CountDownLatch 來(lái)實(shí)現(xiàn)這個(gè)wait與喚醒的。我們來(lái)具體看下這個(gè) waitResponse方法與這個(gè)putResponse方法:

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}

2. 單向發(fā)送

單向發(fā)送其實(shí)這塊跟同步發(fā)送的流程差不多,我們來(lái)看下它的生產(chǎn)者代碼是怎樣寫(xiě)的: org.apache.rocketmq.example.openmessaging.SimpleProducer:

public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint =
        OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
    final Producer producer = messagingAccessPoint.createProducer();
    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");
    producer.startup();
    System.out.printf("Producer startup OK%n");
    ...
    {
        producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }
   ...
}

可以看到我們最后發(fā)送的時(shí)候調(diào)用的是sendOneway方法,這個(gè)方法是沒(méi)有返回值的。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

2.1 DefaultMQProducerImpl#sendOneway

這里就是調(diào)用了defaultMQProducerImpl的 sendOneway方法

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

這里需要注意的是它也是調(diào)用了sendDefaultImpl 方法,然后通信方式是oneway 。這里我們就不細(xì)說(shuō)了,可以看下同步方法解析這個(gè)方法的說(shuō)明,這里唯一要提一點(diǎn)是單向發(fā)送是沒(méi)有這個(gè)重試的,然后就發(fā)送一次。下面的流程都是一樣的,然后就到了這個(gè)MQClientAPIImpl 的 sendMessage 方法

...
switch (communicationMode) {
    case ONEWAY:
        // todo
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    ...

然后他這個(gè)是又調(diào)用了NettyRemotingClient 的 invokeOneway 方法:

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null &amp;&amp; channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            this.invokeOnewayImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里也是根據(jù)broker addr 獲取channel, 如果沒(méi)有的話(huà),也是創(chuàng)建一個(gè),接著就是執(zhí)行這個(gè)rpc調(diào)用前的hook ,注意這里沒(méi)有調(diào)用后的一個(gè)hook,因?yàn)槲覀儾⒉恢浪鞘裁辞闆r。 接著又調(diào)用了invokeOnewayImpl 方法:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 請(qǐng)求體, 標(biāo)記是一個(gè)單向調(diào)用
    request.markOnewayRPC();
    // 獲取憑證
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 釋放信號(hào)量
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            // 釋放信號(hào)量
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

這里使用了semaphore進(jìn)行限流,然后默認(rèn)的話(huà)是同時(shí)支持65535 個(gè)請(qǐng)求發(fā)送的,這個(gè)semaphore 限流只有單向發(fā)送與這個(gè)異步發(fā)送會(huì)有,接著就會(huì)將這個(gè)request寫(xiě)入channel中,然后add了一個(gè)listener ,這個(gè)listener執(zhí)行時(shí)機(jī)就是消息發(fā)送出去了,這個(gè)時(shí)候就會(huì)釋放 信號(hào)量。

到這我們這個(gè)單向發(fā)送就解析完成了。

參考文章

RocketMQ4.8注釋github地址

RocketMQ源碼分析專(zhuān)欄

以上就是RocketMQ producer同步發(fā)送單向發(fā)送源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ producer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Mybatis如何從數(shù)據(jù)庫(kù)中獲取數(shù)據(jù)存為L(zhǎng)ist類(lèi)型(存為model)

    Mybatis如何從數(shù)據(jù)庫(kù)中獲取數(shù)據(jù)存為L(zhǎng)ist類(lèi)型(存為model)

    這篇文章主要介紹了Mybatis如何從數(shù)據(jù)庫(kù)中獲取數(shù)據(jù)存為L(zhǎng)ist類(lèi)型(存為model),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • java為什么不建議用equals判斷對(duì)象相等

    java為什么不建議用equals判斷對(duì)象相等

    本文主要介紹了java為什么不建議用equals判斷對(duì)象相等,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • java中List移除元素的四種方式

    java中List移除元素的四種方式

    本文主要介紹了java中List移除元素的四種方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-08-08
  • SpringBoot之如何正確、安全的關(guān)閉服務(wù)

    SpringBoot之如何正確、安全的關(guān)閉服務(wù)

    這篇文章主要介紹了SpringBoot之如何正確、安全的關(guān)閉服務(wù)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Spring Boot Admin 的使用詳解

    Spring Boot Admin 的使用詳解

    這篇文章主要介紹了Spring Boot Admin 的使用詳解,Spring Boot Admin 用于監(jiān)控基于 Spring Boot 的應(yīng)用,有興趣的可以了解一下
    2017-09-09
  • Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn)

    Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn)

    這篇文章主要介紹了Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • Java中父類(lèi)Object的常用方法總結(jié)

    Java中父類(lèi)Object的常用方法總結(jié)

    這篇文章給大家介紹了Java中父類(lèi)Object的三個(gè)常用方法,對(duì)大家學(xué)習(xí)或使用Java具有一定的參考借鑒價(jià)值,有需要的朋友們下面來(lái)一起看看吧。
    2016-09-09
  • 通過(guò)volatile驗(yàn)證線(xiàn)程之間的可見(jiàn)性

    通過(guò)volatile驗(yàn)證線(xiàn)程之間的可見(jiàn)性

    這篇文章主要介紹了通過(guò)volatile驗(yàn)證線(xiàn)程之間的可見(jiàn)性,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • 最新評(píng)論