RocketMQ獲取指定消息的實現(xiàn)方法(源碼)
概要
消息查詢是什么?
消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息
RocketMQ如果有多個節(jié)點如何查詢?
問題:RocketMQ分布式結構中,數(shù)據(jù)分散在各個節(jié)點,即便是同一Topic的數(shù)據(jù),也未必都在一個broker上。客戶端怎么知道數(shù)據(jù)該去哪個節(jié)點上查?
猜想1:逐個訪問broker節(jié)點查詢數(shù)據(jù)
猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲的位置,只要向該中心查詢即可得到消息具體位置,進而取得消息內容
實際:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。
2.客戶端實現(xiàn)會從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。
問題:CommitLog文件有多個,只有偏移量估計不能確定在哪個文件吧?
實際:單個Broker節(jié)點內offset是全局唯一的,不是每個CommitLog文件的偏移量都是從0開始的。單個節(jié)點內所有CommitLog文件共用一套偏移量,每個文件的文件名為其第一個消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。
源碼閱讀
0.使用方式
MessageExt msg = consumer.viewMessage(msgId);
1.消息ID解析
這個了解下就可以了
public class MessageId {
private SocketAddress address;
private long offset;
public MessageId(SocketAddress address, long offset) {
this.address = address;
this.offset = offset;
}
//get-set
}
//from MQAdminImpl.java
public MessageExt viewMessage(
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MessageId messageId = null;
try {
//從msgId字符串中解析出address和offset
//address = ip:port
//offset為消息在CommitLog文件中的偏移量
messageId = MessageDecoder.decodeMessageId(msgId);
} catch (Exception e) {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
}
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
messageId.getOffset(), timeoutMillis);
}
//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
SocketAddress address;
long offset;
//ipv4和ipv6的區(qū)別
//如果msgId總長度超過32字符,則為ipv6
int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
ByteBuffer bb = ByteBuffer.wrap(port);
int portInt = bb.getInt(0);
address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
bb = ByteBuffer.wrap(data);
offset = bb.getLong(0);
return new MessageId(address, offset);
}
2.長連接客戶端RPC實現(xiàn)
要發(fā)請求首先得先建立連接,這里方法可以看到創(chuàng)建連接相關的操作。值得注意的是,第一次訪問的時候可能連接還沒建立,建立連接需要消耗一段時間。代碼中對這個時間也做了判斷,如果連接建立完成后,發(fā)現(xiàn)已經(jīng)超時,則不再發(fā)出請求。目的應該是盡可能減少請求線程的阻塞時間。
//from NettyRemotingClient.java
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
//這里會先檢查有無該地址的通道,有則返回,無則創(chuàng)建
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
//前置鉤子
doBeforeRpcHooks(addr, request);
//判斷通道建立完成時是否已到達超時時間,如果超時直接拋出異常。不發(fā)請求
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
//同步調用
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//后置鉤子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
下一步看看它的同步調用做了什么處理。注意到它會構建一個Future對象加入待響應池,發(fā)出請求報文后就掛起線程,然后等待喚醒(waitResponse內部使用CountDownLatch等待)。
//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
//請求id
final int opaque = request.getOpaque();
try {
//請求存根
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//加入待響應的請求池
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
//將請求發(fā)出,成功發(fā)出時更新狀態(tài)
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) { //若成功發(fā)出,更新請求狀態(tài)為“已發(fā)出”
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
//若發(fā)出失敗,則從池中移除(沒用了,釋放資源)
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
//putResponse的時候會喚醒等待的線程
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//只等待一段時間,不會一直等下去
//若正常響應,則收到響應后,此線程會被喚醒,繼續(xù)執(zhí)行下去
//若超時,則到達該時間后線程蘇醒,繼續(xù)執(zhí)行
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
//正常響應完成時,將future釋放(正常邏輯)
//超時時,將future釋放。這個請求已經(jīng)作廢了,后面如果再收到響應,就可以直接丟棄了(由于找不到相關的響應鉤子,就不處理了)
this.responseTable.remove(opaque);
}
}
好,我們再來看看收到報文的時候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個任務提交給其他線程處理,該線程處理完畢后會將結果寫入到Future對象中,寫入時如果有線程在等待該結果,則喚醒這些線程。這里也差不多,只不過執(zhí)行線程在服務端,服務執(zhí)行完畢后會將結果通過長連接發(fā)送給客戶端,客戶端收到后根據(jù)報文中的ID信息從待響應池中找到Future對象,然后就是類似的處理了。
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
//底層解碼完畢得到RemotingCommand的報文
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
//判斷類型
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
//取得消息id
final int opaque = cmd.getOpaque();
//從待響應池中取得對應請求
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
//將響應值注入到ResponseFuture對象中,等待線程可從這個對象獲取結果
responseFuture.setResponseCommand(cmd);
//請求已處理完畢,釋放該請求
responseTable.remove(opaque);
//如果有回調函數(shù)的話則回調(由當前線程處理)
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
//沒有的話,則喚醒等待線程(由等待線程做處理)
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
總結一下,客戶端的處理時序大概是這樣的:

結構大概是這樣的:

3.服務端的處理
//todo 服務端待補充CommitLog文件映射相關內容
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
//from NettyRemotingAbscract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND: //服務端走這里
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
//from NettyRemotingAbscract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//查看有無該請求code相關的處理器
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
//如果沒有,則使用默認處理器(可能沒有默認處理器)
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) { //不為null,則由本類將響應值寫會給請求方
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else { //為null,意味著processor內部已經(jīng)將響應處理了,這里無需再處理。
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
//from QueryMessageProcesor.java
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.QUERY_MESSAGE:
return this.queryMessage(ctx, request);
case RequestCode.VIEW_MESSAGE_BY_ID: //通過msgId查詢消息
return this.viewMessageById(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ViewMessageRequestHeader requestHeader =
(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
//getMessagetStore得到當前映射到內存中的CommitLog文件,然后根據(jù)偏移量取得數(shù)據(jù)
final SelectMappedBufferResult selectMappedBufferResult =
this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
if (selectMappedBufferResult != null) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
//將響應通過socket寫回給客戶端
try {
//response對象的數(shù)據(jù)作為header
//消息內容作為body
FileRegion fileRegion =
new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("Transfer one message from page cache failed, ", future.cause());
}
}
});
} catch (Throwable e) {
log.error("", e);
selectMappedBufferResult.release();
}
return null; //如果有值,則直接寫回給請求方。這里返回null是不需要由外層處理響應。
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
}
return response;
}
總結
到此這篇關于RocketMQ獲取指定消息的文章就介紹到這了,更多相關RocketMQ獲取指定消息內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Springboot教程之如何設置springboot熱重啟
這篇文章主要介紹了Springboot教程之如何設置springboot熱重啟,本文通過實例圖文相結合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07
解決springboot+activemq啟動報注解錯誤的問題
這篇文章主要介紹了解決springboot+activemq啟動報注解錯誤的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
SpringMVC利用dropzone組件實現(xiàn)圖片上傳
這篇文章主要介紹了SpringMVC利用dropzone組件實現(xiàn)圖片上傳,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-02-02

