欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ producer同步發(fā)送單向發(fā)送源碼解析

 更新時間:2023年03月17日 11:49:56   作者:hsfxuebao  
這篇文章主要為大家介紹了RocketMQ producer同步發(fā)送單向發(fā)送源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式

RocketMQ生產(chǎn)者發(fā)送消息分為三種模式,分別是同步發(fā)送,異步發(fā)送和單向發(fā)送。

  • 單向發(fā)送,這個就是發(fā)送之后不用接收結(jié)果的,就是你發(fā)出去一個消息,然后就返回了,就算有結(jié)果返回也不會接收了,這是站在消息生產(chǎn)者的角度;
  • 同步發(fā)送的話,就是發(fā)出去一個消息,這個線程要等著它返回消息發(fā)送結(jié)果,然后你這個線程再根據(jù)這個消息發(fā)送結(jié)果再做一些業(yè)務(wù)操作等等;
  • 異步發(fā)送,這個就是在你發(fā)送消息之前要給一個callback,發(fā)送的時候,你這個線程就不用等著,該干什么就干什么,然后發(fā)送結(jié)果回來的時候,是由其他線程調(diào)用你這個callback來處理的,你可以把這個callback看作是一個回調(diào)函數(shù),回調(diào)方法,這個方法里面的業(yè)務(wù)邏輯就是你對這個消息發(fā)送結(jié)果的處理。注意,本文介紹的消息發(fā)送只是普通的消息發(fā)送,那種事務(wù)類型的消息,我們以后會有介紹。

1. 同步發(fā)送

producer同步發(fā)送消息的示例在org.apache.rocketmq.example.simple.Producer類中,代碼如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 創(chuàng)建 DefaultMQProducer 對象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 啟動 producer
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 3. 發(fā)送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}

我們可以看到這個代碼,你是同步消息你是需要在你自己的業(yè)務(wù)線程里面接收這個sendResult的,然后在做一些業(yè)務(wù)處理,比如我這里就是打印了一下這個sendResult。

接下來我們看下它是怎樣發(fā)送的,這里是調(diào)用了這個producer的send方法。

@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // topic 和消息長度 校驗
    Validators.checkMessage(msg, this);
    msg.setTopic(withNamespace(msg.getTopic()));
    // todo
    return this.defaultMQProducerImpl.send(msg);
}

我們可以看到,這個 DefaultMQProducer 將這個消息給了defaultMQProducerImpl 這個實現(xiàn)的send方法來處理了。

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 默認(rèn)超時時間3s
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

defaultMQProducerImpl的send方法,加了個超時時間 ,然后有調(diào)用它的重載方法send(msg,timeout)

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // todo 同步模式
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這個send(msg,timeout)又調(diào)用了sendDefaultImpl 方法,然后他這里加了個通信模式是同步,CommunicationMode.SYNC。

1.1 DefaultMQProducerImpl#sendDefaultImpl

sendDefaultImpl 方法就比較長了了我們分成幾部分來介紹:

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 判斷狀態(tài)是否是running
    this.makeSureStateOK();
    // 檢查消息合法性
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 隨機的invokeID
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // todo 獲取topic信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    ...
}

這一小段代碼其實就是做了一些準(zhǔn)備檢查工作,注意第二行的個檢查消息合法性,它要檢查你topic,消息長度的,你不能發(fā)空消息,消息長度也不能太長,默認(rèn)是不超過4m,接下來這些就是記錄一下時間了,再看最后一行,就是根據(jù)你這個消息發(fā)送的topic,然后獲取topic 發(fā)送消息的這么一個信息,這里面就有這topic 有幾個MessageQueue,然后每個MessageQueue對應(yīng)在哪個broker上面,broker 的地址又是啥的,它這個方法會先從本地的一個緩存中獲取下,沒有的話就從nameserv更新下這個本地緩存,再找找,要是再找不到,它就認(rèn)為你沒有這個topic了,然后就去nameserv上面拉取一個默認(rèn)topic的一些配置信息給你用(這個其實就是在新建一個topic)。 接著這個方法往下看,接著就是判斷 這個TopicPublishInfo 是否存在了,如果不存在的話就拋出異常了,沒有后續(xù)了就,如果存在的話:

...
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    // 重試次數(shù) 區(qū)分同步、其他
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    // 存放發(fā)送過的broker name
    String[] brokersSent = new String[timesTotal];
    // 重試發(fā)送
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        // todo 選擇message queue
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                // todo 進行發(fā)送
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                endTimestamp = System.currentTimeMillis();
                // todo isolation 參數(shù)為false(看一下異常情況)
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                switch (communicationMode) {
                    case ASYNC:
                        return null;
                    case ONEWAY:
                        return null;
                    case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                continue;
                            }
                        }
                        return sendResult;
                    default:
                        break;
                }
            } catch (RemotingException e) {
                endTimestamp = System.currentTimeMillis();
                this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                log.warn(msg.toString());
            ...

其實下面還有許多處理異常的操作沒有放上,不過不影響我們的主流程,先是判斷你這個通信模式,如果是同步的話,默認(rèn)重試次數(shù)就是2 ,然后加上本身這次請求,也就是最查請求3次。這個for循環(huán)就是失敗重試的代碼,再看下代碼selectOneMessageQueue這個就是選擇一個MesssageQueue的方法了,這個是比較重要的,這里我們先不說,你可以把它理解為 我們的負載均衡。接著往下走,就是判斷一下時間了,計算一下剩下的時間, 如果這一堆前面的內(nèi)容耗時很長,然后已經(jīng)超了之前設(shè)置的默認(rèn)超時時間,這個時候就會超時了,然后將這個calltimeout設(shè)置成true了。

1.2 DefaultMQProducerImpl#sendKernelImpl

接著就是進行發(fā)送了調(diào)用sendKernelImpl 方法:

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    // 根據(jù)MessageQueue獲取Broker的網(wǎng)絡(luò)地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    SendMessageContext context = null;
    ...

這個sendKernelImpl 也是有點長,然后我們一部分一部分的看下,這就是根據(jù)MessageQueue里面的broker name 獲取一下broker addr,他這個broker addr 選的是master的,比如說我們 broker使用的是 master/slave 高可用架構(gòu),這個時候只會選擇那個master,畢竟是往里面寫消息,然后只能用master,等到介紹消息消費者的時候,消息消費者是可以向slave node 獲取消息消費的,前提是 master 負載比較大,然后消息消費者下次獲取消費的消息已經(jīng)在slave里面了,然后消息消費者獲取到消息之后,它里面有個字段是告訴你下次可以去xxx 地址的broker 拉取消息,這個我們介紹到消息消費者的時候再說。

接著回來,如果沒有獲取到這個broker 地址的話,就是去nameserv上更新下本地緩存,然后再獲取下。接著再往下就是再次判斷一下這個broker addr 了,如果還沒有就拋出異常,如果有的話 就執(zhí)行下面的代碼了:

...
SendMessageContext context = null;
if (brokerAddr != null) {
    brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    byte[] prevBody = msg.getBody();
    try {
        //for MessageBatch,ID has been set in the generating process
        // 給消息設(shè)置全局唯一id, 對于MessageBatch在生成過程中已設(shè)置了id
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
        boolean topicWithNamespace = false;
        if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
            msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
            topicWithNamespace = true;
        }
        int sysFlag = 0;
        // 消息體是否壓縮
        boolean msgBodyCompressed = false;
        // 壓縮消息 內(nèi)容部分超了4k就會壓縮
        if (this.tryToCompressMessage(msg)) {
            sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            msgBodyCompressed = true;
        }
        final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
            sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
        }
        // 判斷有沒有hook
        if (hasCheckForbiddenHook()) {
            CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
            checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
            checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
            checkForbiddenContext.setCommunicationMode(communicationMode);
            checkForbiddenContext.setBrokerAddr(brokerAddr);
            checkForbiddenContext.setMessage(msg);
            checkForbiddenContext.setMq(mq);
            checkForbiddenContext.setUnitMode(this.isUnitMode());
            // 執(zhí)行Forbidden 鉤子
            this.executeCheckForbiddenHook(checkForbiddenContext);
        }
        ...

第一句,這個其實就是進行一個vip通道地址的轉(zhuǎn)換,這個比較有意思,如果你這個支持vip channel的話,它會把broker addr 里面的端口改變一下,這個所謂的vip channel ,其實就是與它的另一個端口建立連接,這個端口就是當(dāng)前端口-2 ;

接著,如果這個消息不是批量消息的話,我們就給這個消息設(shè)置一個唯一的消息id,再往下就是 sysflag的處理了,這個sysflag里面記錄了好幾個屬性值,使用二進制來處理的,比如說消息是否壓縮了(這個壓縮,就是你消息內(nèi)容超過了默認(rèn)的4k之后,就會進行壓縮,這個壓縮的閾值你是可以配置的),是否是個事務(wù)消息等等。 接下來就是執(zhí)行hook了,這個hook就是forbidenHook ,其實就是對消息進行過濾。

...
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}
// 封裝消息頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 設(shè)置group
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// topic
requestHeader.setTopic(msg.getTopic());
// 設(shè)置默認(rèn)topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 設(shè)置默認(rèn)topic的隊列數(shù)量 默認(rèn)4個
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 隊列id
requestHeader.setQueueId(mq.getQueueId());
// 消息系統(tǒng)標(biāo)記
requestHeader.setSysFlag(sysFlag);
// 消息發(fā)送時間
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息標(biāo)記(RocketMQ對消息標(biāo)記不做任何處理,供應(yīng)用程序使用)
requestHeader.setFlag(msg.getFlag());
// 設(shè)置擴展屬性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批量
requestHeader.setBatch(msg instanceof MessageBatch);
// 判斷消息是否是 %RETRY% 開頭
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
...

在往下就是執(zhí)行一下發(fā)送消息之前的hook,再往下就是封裝發(fā)送消息請求頭,然后這個請求頭里面就涵蓋了很多的參數(shù),比如說topic,MessageQueue 隊列Id, 出生日期,flag等等。再往下就是消息發(fā)送了

...
SendResult sendResult = null;
// 同步 異步  單向
switch (communicationMode) {
    // 異步
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }
        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
        // 判斷超時時間
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
        // 單向
    case ONEWAY:
        // 同步
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        // 判判是否超時
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        // todo 交給 mq api去發(fā)送消息
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
// 是否注冊了消息發(fā)送鉤子函數(shù)
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}
...

因為本小節(jié)主要是介紹下這個同步發(fā)送消息,然后我們就主要介紹下這個sync的代碼邏輯: 首先是判斷超時,然后交給 MQClientAPI層去處理,然后返回sendResult。

1.3 MQClientAPIImpl#sendMessage

我們這里接著看下MQClientAPIImpl里面的sendMessage 實現(xiàn):

public SendResult sendMessage(
    final String addr,
    final String brokerName,
    final Message msg,
    final SendMessageRequestHeader requestHeader,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    RemotingCommand request = null;
    String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
    boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
    if (isReply) {
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
        }
    } else {
        // sendSmartMsg默認(rèn)開啟,也算一種優(yōu)化吧 批量消息
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            // 普通消息
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
    }
    // 設(shè)置消息體
    request.setBody(msg.getBody());
    switch (communicationMode) {
        case ONEWAY:
            // todo
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            final AtomicInteger times = new AtomicInteger();
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時時間
            if (timeoutMillis < costTimeAsync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                retryTimesWhenSendFailed, times, context, producer);
            return null;
        case SYNC:
            long costTimeSync = System.currentTimeMillis() - beginStartTime;
            // 判斷超時時間
            if (timeoutMillis < costTimeSync) {
                throw new RemotingTooMuchRequestException("sendMessage call timeout");
            }
            // todo 同步發(fā)送
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
        default:
            assert false;
            break;
    }
    return null;
}

這里先生成一個RemotingCommand 這么個實體對象,然后RequestCode就是SEND_MESSAGE,其實這里判斷了一下sendSmartMsg 這個參數(shù),把requestHeader優(yōu)化了一下,然后換成了requestHeaderV2,其實這個requestHeaderV2 內(nèi)容跟requestHeader一樣,但是變量名是單個字母的,然后序列化,反序列化,傳輸內(nèi)容都有所優(yōu)化,其實他這個序列化使用是json形式的,然后想想就知道有些哪些好處了, 唯一的缺點就是可讀性差點,但是這個玩意是對用戶透明的,用戶不需要關(guān)心。

接著就是判斷通信類型,然后發(fā)送消息了,這里是同步發(fā)送,先是判斷一下超時時間,接著就是調(diào)用sendMessageSync 進行同步發(fā)送了,我們接著來看下這個sendMessageSync 方法實現(xiàn)。

1.4 MQClientAPIImpl#sendMessageSync

private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    // todo 同步調(diào)用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    // 處理響應(yīng)
    return this.processSendResponse(brokerName, msg, response,addr);
}

這里就調(diào)用到了client 模塊(這個client其實就是直接操作netty了)來處理了,然后返回響應(yīng),調(diào)用processSendResponse 方法來處理響應(yīng)。

1.5 NettyRemotingClient#invokeSync

我們再來看下client的 invokeSync 方法:

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 開始時間
    long beginStartTime = System.currentTimeMillis();
    // todo 輪詢獲取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 執(zhí)行開始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判斷超時 之前有獲取鏈接的操作,可能會出現(xiàn)超時的情況
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 進行同步執(zhí)行,獲取響應(yīng)
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 執(zhí)行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 遠程發(fā)送請求異常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 關(guān)閉channel
            this.closeChannel(addr, channel);
            throw e;
            // 超時異常
        } catch (RemotingTimeoutException e) {
            // 如果超時 就關(guān)閉cahnnel話,就關(guān)閉channel 默認(rèn)是不關(guān)閉的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里有兩個點需要關(guān)注下,首先是根據(jù)broker addr 這個地址獲取一下對應(yīng)的channel ,如果不存在的話就創(chuàng)建一下這個連接, 稍微看下這塊的代碼:

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    // 如果地址不存在,就返回namesrv 的channel
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }
    // 創(chuàng)建channel
    return this.createChannel(addr);
}

如果你這個addr是空的話,這個就是默認(rèn)找nameserv的addr ,然后找對應(yīng)channel就可以了,如果不是null ,然后它會去這個channelTable 這個map中去找,如果沒有的話就創(chuàng)建一個對應(yīng)的channel

接著回到這個invokeSync 方法中,獲得channel之后,就是執(zhí)行一下rpcHook了,這東西就是你在創(chuàng)建MQProducer的時候設(shè)置的,在調(diào)用前執(zhí)行一次,調(diào)用后執(zhí)行一次,其實你就可以通過這個hook來實現(xiàn)很多功能,監(jiān)控的功能比較多些。接著就是調(diào)用了invokeSyncImpl 這個實現(xiàn)方法來發(fā)送消息了,這個方法是它的一個父類里面的:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 獲取 請求id
    final int opaque = request.getOpaque();
    try {
        // 創(chuàng)建ResponseFuture
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        // 放入responseTable 表中
        this.responseTable.put(opaque, responseFuture);
        // 獲取遠程地址
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                // 成功
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                // 失敗
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                // 移除response中的緩存
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            // 成功了還是null  還是超時
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                // 沒發(fā)出去,就排除異常
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        // 返回響應(yīng)結(jié)果
        return responseCommand;
    } finally {
        // 移除
        this.responseTable.remove(opaque);
    }
}

這個方法其實就是最終往 channel里面寫內(nèi)容的方法了,我們來看下,先是為這次request創(chuàng)建一個id 吧,這個id主要用來返回響應(yīng)的時候用的。

接著創(chuàng)建一個ResposeFuture ,這個東西異步,同步都可以用,這個一會介紹一下它的原理,接著就是將這個id 與這個 ResposeFuture 關(guān)聯(lián)起來放到這個 responseTable 里面的, 接著就是往channel里面發(fā)送消息了,這里它添加一個listener ,這listener的執(zhí)行時機就是發(fā)送出去的時候,最后就是等待這個響應(yīng)了。

我們來解釋下這個ResposeFuture 原理, 當(dāng)執(zhí)行了responseFuture.waitResponse(timeoutMillis); 這行代碼,當(dāng)前線程就會wait ,然后被阻塞,然后等著響應(yīng)回來的時候,netty處理響應(yīng)的線程會從響應(yīng)里面獲取一下這個opaque這個id,就是請求之前在request生成的,broker 在響應(yīng)的時候會會把這個id 放回到response 中, 然后會根據(jù)這個opaque 從responseTable中找到這個 ResposeFuture ,然后把響應(yīng)設(shè)置到這個里面,最后喚醒一下wait在這個對象里面的線程就可以了,這樣你這個業(yè)務(wù)線程就得到了這個RemotingResponse 了。 好了,到這我們就解釋清楚了,然后我們看下他這個代碼是怎樣實現(xiàn)的:

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    // 獲取對應(yīng)id 的responseFuture
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        // 設(shè)置
        responseFuture.setResponseCommand(cmd);
        // 從響應(yīng)表中移除
        responseTable.remove(opaque);
        if (responseFuture.getInvokeCallback() != null) {
            // todo 執(zhí)行回調(diào)
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

不過它這個ResposeFuture 是使用CountDownLatch 來實現(xiàn)這個wait與喚醒的。我們來具體看下這個 waitResponse方法與這個putResponse方法:

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}

2. 單向發(fā)送

單向發(fā)送其實這塊跟同步發(fā)送的流程差不多,我們來看下它的生產(chǎn)者代碼是怎樣寫的: org.apache.rocketmq.example.openmessaging.SimpleProducer:

public static void main(String[] args) {
    final MessagingAccessPoint messagingAccessPoint =
        OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
    final Producer producer = messagingAccessPoint.createProducer();
    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");
    producer.startup();
    System.out.printf("Producer startup OK%n");
    ...
    {
        producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }
   ...
}

可以看到我們最后發(fā)送的時候調(diào)用的是sendOneway方法,這個方法是沒有返回值的。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

2.1 DefaultMQProducerImpl#sendOneway

這里就是調(diào)用了defaultMQProducerImpl的 sendOneway方法

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.sendOneway(msg);
}

這里需要注意的是它也是調(diào)用了sendDefaultImpl 方法,然后通信方式是oneway 。這里我們就不細說了,可以看下同步方法解析這個方法的說明,這里唯一要提一點是單向發(fā)送是沒有這個重試的,然后就發(fā)送一次。下面的流程都是一樣的,然后就到了這個MQClientAPIImpl 的 sendMessage 方法

...
switch (communicationMode) {
    case ONEWAY:
        // todo
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
        return null;
    ...

然后他這個是又調(diào)用了NettyRemotingClient 的 invokeOneway 方法:

public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null &amp;&amp; channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            this.invokeOnewayImpl(channel, request, timeoutMillis);
        } catch (RemotingSendRequestException e) {
            log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

這里也是根據(jù)broker addr 獲取channel, 如果沒有的話,也是創(chuàng)建一個,接著就是執(zhí)行這個rpc調(diào)用前的hook ,注意這里沒有調(diào)用后的一個hook,因為我們并不知道它是什么情況。 接著又調(diào)用了invokeOnewayImpl 方法:

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    // 請求體, 標(biāo)記是一個單向調(diào)用
    request.markOnewayRPC();
    // 獲取憑證
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 釋放信號量
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            // 釋放信號量
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

這里使用了semaphore進行限流,然后默認(rèn)的話是同時支持65535 個請求發(fā)送的,這個semaphore 限流只有單向發(fā)送與這個異步發(fā)送會有,接著就會將這個request寫入channel中,然后add了一個listener ,這個listener執(zhí)行時機就是消息發(fā)送出去了,這個時候就會釋放 信號量。

到這我們這個單向發(fā)送就解析完成了。

參考文章

RocketMQ4.8注釋github地址

RocketMQ源碼分析專欄

以上就是RocketMQ producer同步發(fā)送單向發(fā)送源碼解析的詳細內(nèi)容,更多關(guān)于RocketMQ producer的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 基于JPA實體類監(jiān)聽器@EntityListeners注解的使用實例

    基于JPA實體類監(jiān)聽器@EntityListeners注解的使用實例

    這篇文章主要介紹了JPA實體類監(jiān)聽器@EntityListeners注解的使用實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java實戰(zhàn)之貪吃蛇小游戲(源碼+注釋)

    Java實戰(zhàn)之貪吃蛇小游戲(源碼+注釋)

    這篇文章主要介紹了Java實戰(zhàn)之貪吃蛇小游戲(源碼+注釋),文中有非常詳細的代碼示例,對正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • Mybatis如何從數(shù)據(jù)庫中獲取數(shù)據(jù)存為List類型(存為model)

    Mybatis如何從數(shù)據(jù)庫中獲取數(shù)據(jù)存為List類型(存為model)

    這篇文章主要介紹了Mybatis如何從數(shù)據(jù)庫中獲取數(shù)據(jù)存為List類型(存為model),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • java為什么不建議用equals判斷對象相等

    java為什么不建議用equals判斷對象相等

    本文主要介紹了java為什么不建議用equals判斷對象相等,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • java中List移除元素的四種方式

    java中List移除元素的四種方式

    本文主要介紹了java中List移除元素的四種方式,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-08-08
  • SpringBoot之如何正確、安全的關(guān)閉服務(wù)

    SpringBoot之如何正確、安全的關(guān)閉服務(wù)

    這篇文章主要介紹了SpringBoot之如何正確、安全的關(guān)閉服務(wù)問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Spring Boot Admin 的使用詳解

    Spring Boot Admin 的使用詳解

    這篇文章主要介紹了Spring Boot Admin 的使用詳解,Spring Boot Admin 用于監(jiān)控基于 Spring Boot 的應(yīng)用,有興趣的可以了解一下
    2017-09-09
  • Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn)

    Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn)

    這篇文章主要介紹了Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • Java中父類Object的常用方法總結(jié)

    Java中父類Object的常用方法總結(jié)

    這篇文章給大家介紹了Java中父類Object的三個常用方法,對大家學(xué)習(xí)或使用Java具有一定的參考借鑒價值,有需要的朋友們下面來一起看看吧。
    2016-09-09
  • 通過volatile驗證線程之間的可見性

    通過volatile驗證線程之間的可見性

    這篇文章主要介紹了通過volatile驗證線程之間的可見性,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-10-10

最新評論