解決rocketmq-client查詢手動發(fā)送消息異常問題
rocketmq-client查詢手動發(fā)送消息異常
今天處理rocketmq的后臺的一些問題
下面這個問題是你也用網(wǎng)上4.x的rocketmq版本的監(jiān)控后臺,才會出現(xiàn)的
MQClientException: CODE: 208 DESC: query message by id finished, but no message.
這個是我們用rocketmq-client手動發(fā)送消息的時候,再去查看消息詳情的時候遇到的問題,會報錯,消息找不到
這是由于查詢出來的mq消息,這些消息的msgId,不是真的msgId,而是UniqueKey
為什么會這樣呢,我們來看一段源碼
public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) { List<MessageExt> msgExts = new ArrayList<MessageExt>(); while (byteBuffer.hasRemaining()) { MessageExt msgExt = clientDecode(byteBuffer, readBody); if (null != msgExt) { msgExts.add(msgExt); } else { break; } } return msgExts; }
1.以上是解析從broker獲取到的消息
public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) { return decode(byteBuffer, readBody, true, true); }
2.注意第四個參數(shù)是true
它代表是否是client端,就是監(jiān)控端
public static MessageExt decode( java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) { try { MessageExt msgExt; //1.重點看這里 if (isClient) { msgExt = new MessageClientExt(); } else { msgExt = new MessageExt(); } //此處省略 ........... ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH); String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); msgExt.setMsgId(msgId); //2.重點看這里 if (isClient) { ((MessageClientExt) msgExt).setOffsetMsgId(msgId); } return msgExt; } catch (Exception e) { byteBuffer.position(byteBuffer.limit()); } return null; }
3.MessageClientExt主要是這個對象惹的禍
老的版本mq就是一個 MessageExt
我們再來看看這個對象為什么惹禍了
public class MessageClientExt extends MessageExt { public String getOffsetMsgId() { return super.getMsgId(); } public void setOffsetMsgId(String offsetMsgId) { super.setMsgId(offsetMsgId); } /** * 沒錯就是這里,默認拿的是屬性里的UNIQ_KEY **/ @Override public String getMsgId() { String uniqID = MessageClientIDSetter.getUniqID(this); if (uniqID == null) { return this.getOffsetMsgId(); } else { return uniqID; } } public void setMsgId(String msgId) { //DO NOTHING //MessageClientIDSetter.setUniqID(this); } }
所以知道了問題,就好解決了
我是在顯示類里面做相應的處理,貼下我的處理方式,我是在 MessageView.class里處理
public static MessageView fromMessageExt(MessageExt messageExt) { MessageView messageView = new MessageView(); BeanUtils.copyProperties(messageExt, messageView); if (messageExt.getBody() != null) { messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8)); } //主要是這里判斷下,是否是這個類,是就把原來的msgId拿出來 if(messageExt instanceof MessageClientExt){ MessageClientExt ext = (MessageClientExt) messageExt; messageView.setMsgId(ext.getOffsetMsgId()); } return messageView; }
總結(jié)
好了,這里就總結(jié)下這個問題
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Java多線程編程基石ThreadPoolExecutor示例詳解
這篇文章主要為大家介紹了Java多線程編程基石ThreadPoolExecutor示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04