欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ獲取指定消息的實現(xiàn)方法(源碼)

 更新時間:2020年08月16日 10:14:01   作者:貓毛·波拿巴  
這篇文章主要給大家介紹了關(guān)于RocketMQ獲取指定消息的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用RocketMQ具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧

概要

消息查詢是什么?

消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個節(jié)點如何查詢?

問題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個節(jié)點,即便是同一Topic的數(shù)據(jù),也未必都在一個broker上??蛻舳嗽趺粗罃?shù)據(jù)該去哪個節(jié)點上查?

猜想1:逐個訪問broker節(jié)點查詢數(shù)據(jù)

猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容

實際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實現(xiàn)會從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。

問題:CommitLog文件有多個,只有偏移量估計不能確定在哪個文件吧?

實際:單個Broker節(jié)點內(nèi)offset是全局唯一的,不是每個CommitLog文件的偏移量都是從0開始的。單個節(jié)點內(nèi)所有CommitLog文件共用一套偏移量,每個文件的文件名為其第一個消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個了解下就可以了

public class MessageId {
 private SocketAddress address;
 private long offset;

 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }

 //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

 MessageId messageId = null;
 try {
  //從msgId字符串中解析出address和offset
  //address = ip:port
  //offset為消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的區(qū)別
 //如果msgId總長度超過32字符,則為ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);

 return new MessageId(address, offset);
}

2.長連接客戶端RPC實現(xiàn)

要發(fā)請求首先得先建立連接,這里方法可以看到創(chuàng)建連接相關(guān)的操作。值得注意的是,第一次訪問的時候可能連接還沒建立,建立連接需要消耗一段時間。代碼中對這個時間也做了判斷,如果連接建立完成后,發(fā)現(xiàn)已經(jīng)超時,則不再發(fā)出請求。目的應(yīng)該是盡可能減少請求線程的阻塞時間。

//from NettyRemotingClient.java
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
 throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
 long beginStartTime = System.currentTimeMillis();
 //這里會先檢查有無該地址的通道,有則返回,無則創(chuàng)建
 final Channel channel = this.getAndCreateChannel(addr);
 if (channel != null && channel.isActive()) {
  try {
   //前置鉤子
   doBeforeRpcHooks(addr, request); 
   //判斷通道建立完成時是否已到達(dá)超時時間,如果超時直接拋出異常。不發(fā)請求
   long costTime = System.currentTimeMillis() - beginStartTime;
   if (timeoutMillis < costTime) {
    throw new RemotingTimeoutException("invokeSync call timeout");
   }
   //同步調(diào)用
   RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
   //后置鉤子
   doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子
   return response;
  } catch (RemotingSendRequestException e) {
   log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
   this.closeChannel(addr, channel);
   throw e;
  } catch (RemotingTimeoutException e) {
   if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
    this.closeChannel(addr, channel);
    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
   }
   log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
   throw e;
  }
 } else {
  this.closeChannel(addr, channel);
  throw new RemotingConnectException(addr);
 }
}

下一步看看它的同步調(diào)用做了什么處理。注意到它會構(gòu)建一個Future對象加入待響應(yīng)池,發(fā)出請求報文后就掛起線程,然后等待喚醒(waitResponse內(nèi)部使用CountDownLatch等待)。

//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
 final long timeoutMillis)
 throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
 //請求id
 final int opaque = request.getOpaque();

 try {
  //請求存根
  final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  //加入待響應(yīng)的請求池
  this.responseTable.put(opaque, responseFuture);
  final SocketAddress addr = channel.remoteAddress();
  //將請求發(fā)出,成功發(fā)出時更新狀態(tài)
  channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture f) throws Exception {
    if (f.isSuccess()) { //若成功發(fā)出,更新請求狀態(tài)為“已發(fā)出”
     responseFuture.setSendRequestOK(true);
     return;
    } else {
     responseFuture.setSendRequestOK(false);
    }

    //若發(fā)出失敗,則從池中移除(沒用了,釋放資源)
    responseTable.remove(opaque);
    responseFuture.setCause(f.cause());
    //putResponse的時候會喚醒等待的線程
    responseFuture.putResponse(null);
    log.warn("send a request command to channel <" + addr + "> failed.");
   }
  });

  //只等待一段時間,不會一直等下去
  //若正常響應(yīng),則收到響應(yīng)后,此線程會被喚醒,繼續(xù)執(zhí)行下去
  //若超時,則到達(dá)該時間后線程蘇醒,繼續(xù)執(zhí)行
  RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  if (null == responseCommand) {
   if (responseFuture.isSendRequestOK()) {
    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
     responseFuture.getCause());
   } else {
    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
   }
  }

  return responseCommand;
 } finally {
  //正常響應(yīng)完成時,將future釋放(正常邏輯)
  //超時時,將future釋放。這個請求已經(jīng)作廢了,后面如果再收到響應(yīng),就可以直接丟棄了(由于找不到相關(guān)的響應(yīng)鉤子,就不處理了)
  this.responseTable.remove(opaque);
 }
}

好,我們再來看看收到報文的時候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個任務(wù)提交給其他線程處理,該線程處理完畢后會將結(jié)果寫入到Future對象中,寫入時如果有線程在等待該結(jié)果,則喚醒這些線程。這里也差不多,只不過執(zhí)行線程在服務(wù)端,服務(wù)執(zhí)行完畢后會將結(jié)果通過長連接發(fā)送給客戶端,客戶端收到后根據(jù)報文中的ID信息從待響應(yīng)池中找到Future對象,然后就是類似的處理了。

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

 //底層解碼完畢得到RemotingCommand的報文
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  processMessageReceived(ctx, msg);
 }
}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 final RemotingCommand cmd = msg;
 if (cmd != null) {
  //判斷類型
  switch (cmd.getType()) {
   case REQUEST_COMMAND:
    processRequestCommand(ctx, cmd);
    break;
   case RESPONSE_COMMAND:
    processResponseCommand(ctx, cmd);
    break;
   default:
    break;
  }
 }
}

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
 //取得消息id
 final int opaque = cmd.getOpaque();
 //從待響應(yīng)池中取得對應(yīng)請求
 final ResponseFuture responseFuture = responseTable.get(opaque);
 if (responseFuture != null) {
  //將響應(yīng)值注入到ResponseFuture對象中,等待線程可從這個對象獲取結(jié)果
  responseFuture.setResponseCommand(cmd);
  //請求已處理完畢,釋放該請求
  responseTable.remove(opaque);

  //如果有回調(diào)函數(shù)的話則回調(diào)(由當(dāng)前線程處理)
  if (responseFuture.getInvokeCallback() != null) {
   executeInvokeCallback(responseFuture);
  } else {
   //沒有的話,則喚醒等待線程(由等待線程做處理)
   responseFuture.putResponse(cmd);
   responseFuture.release();
  }
 } else {
  log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
  log.warn(cmd.toString());
 }
}

總結(jié)一下,客戶端的處理時序大概是這樣的:

結(jié)構(gòu)大概是這樣的:

3.服務(wù)端的處理

//todo 服務(wù)端待補(bǔ)充CommitLog文件映射相關(guān)內(nèi)容

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    processMessageReceived(ctx, msg);
  }
}

//from NettyRemotingAbscract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  final RemotingCommand cmd = msg;
  if (cmd != null) {
    switch (cmd.getType()) {
      case REQUEST_COMMAND: //服務(wù)端走這里
        processRequestCommand(ctx, cmd);
        break;
      case RESPONSE_COMMAND:
        processResponseCommand(ctx, cmd);
        break;
      default:
        break;
    }
  }
}

//from NettyRemotingAbscract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
  //查看有無該請求code相關(guān)的處理器
  final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
  //如果沒有,則使用默認(rèn)處理器(可能沒有默認(rèn)處理器)
  final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
  final int opaque = cmd.getOpaque();

  if (pair != null) {
    Runnable run = new Runnable() {
      @Override
      public void run() {
        try {
          doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
          final RemotingResponseCallback callback = new RemotingResponseCallback() {
            @Override
            public void callback(RemotingCommand response) {
              doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
              if (!cmd.isOnewayRPC()) {
                if (response != null) { //不為null,則由本類將響應(yīng)值寫會給請求方
                  response.setOpaque(opaque);
                  response.markResponseType();
                  try {
                    ctx.writeAndFlush(response);
                  } catch (Throwable e) {
                    log.error("process request over, but response failed", e);
                    log.error(cmd.toString());
                    log.error(response.toString());
                  }
                } else { //為null,意味著processor內(nèi)部已經(jīng)將響應(yīng)處理了,這里無需再處理。
                }
              }
            }
          };
          if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器
            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
            processor.asyncProcessRequest(ctx, cmd, callback);
          } else { 
            NettyRequestProcessor processor = pair.getObject1();
            RemotingCommand response = processor.processRequest(ctx, cmd);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
            callback.callback(response);
          }
        } catch (Throwable e) {
          log.error("process request exception", e);
          log.error(cmd.toString());

          if (!cmd.isOnewayRPC()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
              RemotingHelper.exceptionSimpleDesc(e));
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
          }
        }
      }
    };

    if (pair.getObject1().rejectRequest()) {
      final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
        "[REJECTREQUEST]system busy, start flow control for a while");
      response.setOpaque(opaque);
      ctx.writeAndFlush(response);
      return;
    }

    try {
      final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
      pair.getObject2().submit(requestTask);
    } catch (RejectedExecutionException e) {
      if ((System.currentTimeMillis() % 10000) == 0) {
        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
          + ", too many requests and system thread pool busy, RejectedExecutionException "
          + pair.getObject2().toString()
          + " request code: " + cmd.getCode());
      }

      if (!cmd.isOnewayRPC()) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
          "[OVERLOAD]system busy, start flow control for a while");
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
      }
    }
  } else {
    String error = " request type " + cmd.getCode() + " not supported";
    final RemotingCommand response =
      RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
    response.setOpaque(opaque);
    ctx.writeAndFlush(response);
    log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
  }
}

//from QueryMessageProcesor.java
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  switch (request.getCode()) {
    case RequestCode.QUERY_MESSAGE:
      return this.queryMessage(ctx, request);
    case RequestCode.VIEW_MESSAGE_BY_ID: //通過msgId查詢消息
      return this.viewMessageById(ctx, request);
    default:
      break;
  }

  return null;
}

public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  final ViewMessageRequestHeader requestHeader =
    (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);

  response.setOpaque(request.getOpaque());

  //getMessagetStore得到當(dāng)前映射到內(nèi)存中的CommitLog文件,然后根據(jù)偏移量取得數(shù)據(jù)
  final SelectMappedBufferResult selectMappedBufferResult =
    this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
  if (selectMappedBufferResult != null) {
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);

    //將響應(yīng)通過socket寫回給客戶端
    try {
      //response對象的數(shù)據(jù)作為header
      //消息內(nèi)容作為body
      FileRegion fileRegion =
        new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
          selectMappedBufferResult);
      ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          selectMappedBufferResult.release();
          if (!future.isSuccess()) {
            log.error("Transfer one message from page cache failed, ", future.cause());
          }
        }
      });
    } catch (Throwable e) {
      log.error("", e);
      selectMappedBufferResult.release();
    }

    return null; //如果有值,則直接寫回給請求方。這里返回null是不需要由外層處理響應(yīng)。
  } else {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
  }

  return response;
}

總結(jié)

到此這篇關(guān)于RocketMQ獲取指定消息的文章就介紹到這了,更多相關(guān)RocketMQ獲取指定消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • spring security自定義登錄頁面

    spring security自定義登錄頁面

    在項目中我們肯定不能使用Spring自己生成的登錄頁面,而要用我們自己的登錄頁面,下面通過本文給大家分享spring security自定義登錄頁面的實現(xiàn)方法,一起看看吧
    2017-09-09
  • JUC之Semaphore源碼分析

    JUC之Semaphore源碼分析

    這篇文章主要為大家詳細(xì)分析了JUC之Semaphore源碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • java實現(xiàn)Img與PDF相互轉(zhuǎn)換

    java實現(xiàn)Img與PDF相互轉(zhuǎn)換

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)Img與PDF相互轉(zhuǎn)換的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-05-05
  • Springboot教程之如何設(shè)置springboot熱重啟

    Springboot教程之如何設(shè)置springboot熱重啟

    這篇文章主要介紹了Springboot教程之如何設(shè)置springboot熱重啟,本文通過實例圖文相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • mybatis @Intercepts的用法解讀

    mybatis @Intercepts的用法解讀

    這篇文章主要介紹了mybatis @Intercepts的用法解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringBoot多文件分布式上傳功能實現(xiàn)

    SpringBoot多文件分布式上傳功能實現(xiàn)

    本文詳細(xì)介紹了如何在SpringBoot中實現(xiàn)多文件分布式上傳,并用代碼給出了相應(yīng)的實現(xiàn)思路和實現(xiàn)步驟,感興趣的朋友跟隨小編一起看看吧
    2023-06-06
  • ocp開閉原則_動力節(jié)點Java學(xué)院整理

    ocp開閉原則_動力節(jié)點Java學(xué)院整理

    這篇文章主要為大家詳細(xì)介紹了ocp開閉原則的相關(guān)資料,ocp開閉原則指導(dǎo)我們?nèi)绾谓⒁粋€穩(wěn)定的、靈活的系統(tǒng),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • 解決springboot+activemq啟動報注解錯誤的問題

    解決springboot+activemq啟動報注解錯誤的問題

    這篇文章主要介紹了解決springboot+activemq啟動報注解錯誤的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 深入理解Java 類加載全過程

    深入理解Java 類加載全過程

    這篇文章主要介紹了深入理解Java 類加載全過程的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • SpringMVC利用dropzone組件實現(xiàn)圖片上傳

    SpringMVC利用dropzone組件實現(xiàn)圖片上傳

    這篇文章主要介紹了SpringMVC利用dropzone組件實現(xiàn)圖片上傳,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-02-02

最新評論