RocketMQ集群消費(fèi)與廣播消費(fèi)模式
一、概述
RocketMQ主要提供了兩種消費(fèi)模式:集群消費(fèi)以及廣播消費(fèi)。我們只需要在定義消費(fèi)者的時(shí)候通過setMessageModel(MessageModel.XXX)方法就可以指定是集群還是廣播式消費(fèi),默認(rèn)是集群消費(fèi)模式,即每個(gè)Consumer Group中的Consumer均攤所有的消息。下面我們通過簡(jiǎn)單的示例演示一下。
二、集群消費(fèi)
一個(gè) Consumer Group 中的 Consumer 實(shí)例平均分?jǐn)傁M(fèi)消息。
使用方法:setMessageModel(MessageModel.CLUSTERING)
定義生產(chǎn)者
public class MQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 創(chuàng)建DefaultMQProducer類并設(shè)定生產(chǎn)者名稱 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開 mqProducer.setNamesrvAddr("10.0.91.71:9876"); // 消息最大長(zhǎng)度 默認(rèn)4M mqProducer.setMaxMessageSize(4096); // 發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000 mqProducer.setSendMsgTimeout(3000); // 發(fā)送消息失敗重試次數(shù),默認(rèn)2 mqProducer.setRetryTimesWhenSendAsyncFailed(2); // 啟動(dòng)消息生產(chǎn)者 mqProducer.start(); // 循環(huán)十次,發(fā)送十條消息 for (int i = 1; i <= 10; i++) { String msg = "hello, 這是第" + i + "條同步消息"; // 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容 Message message = new Message("CLUSTERING_TOPIC", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發(fā)送同步消息到一個(gè)Broker,可以通過sendResult返回消息是否成功送達(dá) SendResult sendResult = mqProducer.send(message); System.out.println(sendResult); } // 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例 mqProducer.shutdown(); } }
SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC57E00000, offsetMsgId=0A005B4700002A9F00000000000010A9, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=3], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC57EF0001, offsetMsgId=0A005B4700002A9F0000000000001174, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC57F60002, offsetMsgId=0A005B4700002A9F000000000000123F, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=1], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC57FB0003, offsetMsgId=0A005B4700002A9F000000000000130A, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=2], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC58050004, offsetMsgId=0A005B4700002A9F00000000000013D5, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=3], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC58110005, offsetMsgId=0A005B4700002A9F00000000000014A0, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC581D0006, offsetMsgId=0A005B4700002A9F000000000000156B, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=1], queueOffset=7] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC58290007, offsetMsgId=0A005B4700002A9F0000000000001636, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=2], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC582E0008, offsetMsgId=0A005B4700002A9F0000000000001701, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=3], queueOffset=7] SendResult [sendStatus=SEND_OK, msgId=AC6E005625A818B4AAC211CC58340009, offsetMsgId=0A005B4700002A9F00000000000017CC, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=7]
定義消費(fèi)者A
public class MQConsumerA { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開 mqPushConsumer.setNamesrvAddr("10.0.91.71:9876"); // 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) // 如果不是第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 消費(fèi)者最小線程量 mqPushConsumer.setConsumeThreadMin(5); // 消費(fèi)者最大線程量 mqPushConsumer.setConsumeThreadMax(10); // 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來過濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("CLUSTERING_TOPIC", "*"); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消費(fèi)者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 mqPushConsumer.start(); } }
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=6, sysFlag=0, bornTimestamp=1646362604534, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602259, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F000000000000123F, commitLogOffset=4671, bodyCRC=540691780, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1646362604548, UNIQ_KEY=AC6E005625A818B4AAC211CC57F60002, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=5, sysFlag=0, bornTimestamp=1646362604527, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602253, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001174, commitLogOffset=4468, bodyCRC=240311509, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646362604549, UNIQ_KEY=AC6E005625A818B4AAC211CC57EF0001, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 50, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第2條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=6, sysFlag=0, bornTimestamp=1646362604561, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602293, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F00000000000014A0, commitLogOffset=5280, bodyCRC=1516474450, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1646362604579, UNIQ_KEY=AC6E005625A818B4AAC211CC58110005, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 54, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第6條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=7, sysFlag=0, bornTimestamp=1646362604573, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602303, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F000000000000156B, commitLogOffset=5483, bodyCRC=1946878403, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1646362604586, UNIQ_KEY=AC6E005625A818B4AAC211CC581D0006, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 55, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第7條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=204, queueOffset=7, sysFlag=0, bornTimestamp=1646362604596, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602322, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F00000000000017CC, commitLogOffset=6092, bodyCRC=2041898758, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1646362604616, UNIQ_KEY=AC6E005625A818B4AAC211CC58340009, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, 48, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第10條同步消息
定義消費(fèi)者B
public class MQConsumerB { public static void main(String[] args) throws MQClientException { // 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開 mqPushConsumer.setNamesrvAddr("10.0.91.71:9876"); // 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) // 如果不是第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // 消費(fèi)者最小線程量 mqPushConsumer.setConsumeThreadMin(5); // 消費(fèi)者最大線程量 mqPushConsumer.setConsumeThreadMax(10); // 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來過濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用* mqPushConsumer.subscribe("CLUSTERING_TOPIC", "*"); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消費(fèi)者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 mqPushConsumer.start(); } }
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=5, sysFlag=0, bornTimestamp=1646362604513, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602244, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F00000000000010A9, commitLogOffset=4265, bodyCRC=664430631, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646362604525, UNIQ_KEY=AC6E005625A818B4AAC211CC57E00000, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=5, sysFlag=0, bornTimestamp=1646362604539, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602266, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F000000000000130A, commitLogOffset=4874, bodyCRC=1573106993, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1646362604547, UNIQ_KEY=AC6E005625A818B4AAC211CC57FB0003, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 52, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第4條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=6, sysFlag=0, bornTimestamp=1646362604549, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602276, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F00000000000013D5, commitLogOffset=5077, bodyCRC=1940595872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1646362604558, UNIQ_KEY=AC6E005625A818B4AAC211CC58050004, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 53, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第5條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=6, sysFlag=0, bornTimestamp=1646362604585, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602311, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001636, commitLogOffset=5686, bodyCRC=2061592313, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=7, CONSUME_START_TIME=1646362604592, UNIQ_KEY=AC6E005625A818B4AAC211CC58290007, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 56, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第8條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=7, sysFlag=0, bornTimestamp=1646362604590, bornHost=/10.0.90.139:51996, storeTimestamp=1646362602317, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001701, commitLogOffset=5889, bodyCRC=1418327912, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1646362604598, UNIQ_KEY=AC6E005625A818B4AAC211CC582E0008, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 57, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第9條同步消息
可以看到, 生產(chǎn)者發(fā)送了10條消息,ConsumerA與ConsumerB屬于同一個(gè)消費(fèi)者組,集群消費(fèi)模式下每個(gè)消費(fèi)者攤分消費(fèi)所有消息。注意,兩個(gè)消費(fèi)者的ConsumerGroup組名需要一致,才算是同一個(gè)消費(fèi)者組。
簡(jiǎn)單總結(jié)一下:
1、在Rocket集群消費(fèi)模式下,(訂閱)同一個(gè)主題(Topic)下的消息,對(duì)于不同的消費(fèi)者組是一種“廣播形式”,即每個(gè)消費(fèi)者組的都會(huì)消費(fèi)消息。
2、在Rocket集群消費(fèi)模式下,(訂閱)同一個(gè)主題(Topic)下的消息,對(duì)于相同的消費(fèi)者組的消費(fèi)者而言是一種集群模式,即同一個(gè)消費(fèi)者組內(nèi)的所有消費(fèi)者均分消息并消費(fèi)。
三、廣播消費(fèi)
一條消息被多個(gè) Consumer 消費(fèi),即使這些 Consumer 屬于同一個(gè) Consumer Group,消息也會(huì)被 Consumer Group 中的每個(gè) Consumer 都消費(fèi)一次,廣播消費(fèi)中的 Consumer Group 概念可以認(rèn)為在消息劃分方面無意 義。
使用方法:setMessageModel(MessageModel.BROADCASTING)
我們將前面的消費(fèi)者定義中的消息模式設(shè)置為BROADCASTING即可,重新啟動(dòng)后觀察控制臺(tái)輸出。
生產(chǎn)者
SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48F970000, offsetMsgId=0A005B4700002A9F0000000000001898, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48FAE0001, offsetMsgId=0A005B4700002A9F0000000000001963, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=1], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48FBB0002, offsetMsgId=0A005B4700002A9F0000000000001A2E, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=2], queueOffset=7] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48FCB0003, offsetMsgId=0A005B4700002A9F0000000000001AF9, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=3], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48FDB0004, offsetMsgId=0A005B4700002A9F0000000000001BC4, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=9] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D48FF10005, offsetMsgId=0A005B4700002A9F0000000000001C8F, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=1], queueOffset=9] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D4900C0006, offsetMsgId=0A005B4700002A9F0000000000001D5A, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=2], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D4901A0007, offsetMsgId=0A005B4700002A9F0000000000001E25, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=3], queueOffset=9] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D4902A0008, offsetMsgId=0A005B4700002A9F0000000000001EF0, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=0], queueOffset=10] SendResult [sendStatus=SEND_OK, msgId=AC6E0056231818B4AAC211D490360009, offsetMsgId=0A005B4700002A9F0000000000001FBB, messageQueue=MessageQueue [topic=CLUSTERING_TOPIC, brokerName=broker-a, queueId=1], queueOffset=10]
消費(fèi)者A
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143064, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139802, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001898, commitLogOffset=6296, bodyCRC=664430631, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143093, UNIQ_KEY=AC6E0056231818B4AAC211D48F970000, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143086, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139825, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001963, commitLogOffset=6499, bodyCRC=240311509, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143125, UNIQ_KEY=AC6E0056231818B4AAC211D48FAE0001, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 50, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第2條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=7, sysFlag=0, bornTimestamp=1646363143099, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139841, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001A2E, commitLogOffset=6702, bodyCRC=540691780, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1646363143144, UNIQ_KEY=AC6E0056231818B4AAC211D48FBB0002, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143115, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139857, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001AF9, commitLogOffset=6905, bodyCRC=1573106993, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143158, UNIQ_KEY=AC6E0056231818B4AAC211D48FCB0003, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 52, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第4條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143131, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139870, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001BC4, commitLogOffset=7108, bodyCRC=1940595872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143165, UNIQ_KEY=AC6E0056231818B4AAC211D48FDB0004, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 53, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第5條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143153, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139897, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001C8F, commitLogOffset=7311, bodyCRC=1516474450, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143188, UNIQ_KEY=AC6E0056231818B4AAC211D48FF10005, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 54, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第6條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143180, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139922, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001D5A, commitLogOffset=7514, bodyCRC=1946878403, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143205, UNIQ_KEY=AC6E0056231818B4AAC211D4900C0006, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 55, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第7條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143194, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139933, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001E25, commitLogOffset=7717, bodyCRC=2061592313, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143238, UNIQ_KEY=AC6E0056231818B4AAC211D4901A0007, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 56, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第8條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=204, queueOffset=10, sysFlag=0, bornTimestamp=1646363143222, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139956, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001FBB, commitLogOffset=8123, bodyCRC=2041898758, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1646363143240, UNIQ_KEY=AC6E0056231818B4AAC211D490360009, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, 48, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第10條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=10, sysFlag=0, bornTimestamp=1646363143210, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139947, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001EF0, commitLogOffset=7920, bodyCRC=1418327912, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1646363143240, UNIQ_KEY=AC6E0056231818B4AAC211D4902A0008, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 57, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第9條同步消息
消費(fèi)者B
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143064, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139802, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001898, commitLogOffset=6296, bodyCRC=664430631, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143110, UNIQ_KEY=AC6E0056231818B4AAC211D48F970000, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第1條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=7, sysFlag=0, bornTimestamp=1646363143099, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139841, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001A2E, commitLogOffset=6702, bodyCRC=540691780, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1646363143140, UNIQ_KEY=AC6E0056231818B4AAC211D48FBB0002, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 51, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第3條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143086, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139825, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001963, commitLogOffset=6499, bodyCRC=240311509, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143141, UNIQ_KEY=AC6E0056231818B4AAC211D48FAE0001, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 50, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第2條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143115, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139857, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001AF9, commitLogOffset=6905, bodyCRC=1573106993, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143163, UNIQ_KEY=AC6E0056231818B4AAC211D48FCB0003, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 52, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第4條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143131, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139870, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001BC4, commitLogOffset=7108, bodyCRC=1940595872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143169, UNIQ_KEY=AC6E0056231818B4AAC211D48FDB0004, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 53, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第5條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143153, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139897, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001C8F, commitLogOffset=7311, bodyCRC=1516474450, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143187, UNIQ_KEY=AC6E0056231818B4AAC211D48FF10005, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 54, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第6條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=203, queueOffset=8, sysFlag=0, bornTimestamp=1646363143180, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139922, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001D5A, commitLogOffset=7514, bodyCRC=1946878403, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1646363143208, UNIQ_KEY=AC6E0056231818B4AAC211D4900C0006, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 55, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第7條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=203, queueOffset=9, sysFlag=0, bornTimestamp=1646363143194, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139933, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001E25, commitLogOffset=7717, bodyCRC=2061592313, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1646363143233, UNIQ_KEY=AC6E0056231818B4AAC211D4901A0007, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 56, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第8條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=203, queueOffset=10, sysFlag=0, bornTimestamp=1646363143210, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139947, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001EF0, commitLogOffset=7920, bodyCRC=1418327912, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1646363143234, UNIQ_KEY=AC6E0056231818B4AAC211D4902A0008, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 57, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第9條同步消息
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=204, queueOffset=10, sysFlag=0, bornTimestamp=1646363143222, bornHost=/10.0.90.139:52985, storeTimestamp=1646363139956, storeHost=/10.0.91.71:10911, msgId=0A005B4700002A9F0000000000001FBB, commitLogOffset=8123, bodyCRC=2041898758, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='CLUSTERING_TOPIC', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1646363143236, UNIQ_KEY=AC6E0056231818B4AAC211D490360009, CLUSTER=DefaultCluster}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -25, -84, -84, 49, 48, -26, -99, -95, -27, -112, -116, -26, -83, -91, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息內(nèi)容為:hello, 這是第10條同步消息
可以看到, 生產(chǎn)者發(fā)送了10條消息,ConsumerA與ConsumerB屬于同一個(gè)消費(fèi)者組,廣播模式下每個(gè)消費(fèi)者都會(huì)全量消費(fèi)所有消息。
到此這篇關(guān)于RocketMQ集群消費(fèi)與廣播消費(fèi)模式的文章就介紹到這了,更多相關(guān)RocketMQ集群消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合日志功能(slf4j+logback)詳解(最新推薦)
Spring使用commons-logging作為內(nèi)部日志,但底層日志實(shí)現(xiàn)是開放的,可對(duì)接其他日志框架,這篇文章主要介紹了SpringBoot整合日志功能(slf4j+logback)詳解,需要的朋友可以參考下2024-08-08ExpressionUtil工具類的應(yīng)用實(shí)例
這篇文章主要給大家介紹了關(guān)于ExpressionUtil工具類的應(yīng)用實(shí)例,常用的工具類有很多,這是其中一個(gè),了解基本的API可以幫助我們更好的開發(fā),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-04-04Java測(cè)試框架Mockito的簡(jiǎn)明教程
這篇文章主要介紹了Java測(cè)試框架Mockito的簡(jiǎn)明教程,Mock 測(cè)試是單元測(cè)試的重要方法之一。本文介紹了基于 Java 語言的 Mock 測(cè)試框架 – Mockito 的使用。,需要的朋友可以參考下2019-06-06Spring AOP如何自定義注解實(shí)現(xiàn)審計(jì)或日志記錄(完整代碼)
這篇文章主要介紹了Spring AOP如何自定義注解實(shí)現(xiàn)審計(jì)或日志記錄(完整代碼),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12Java API方式調(diào)用Kafka各種協(xié)議的方法
本篇文章主要介紹了Java API方式調(diào)用Kafka各種協(xié)議的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09IntellJ IDEA JAVA代碼任務(wù)標(biāo)記實(shí)例解析
這篇文章主要介紹了IntellJ IDEA JAVA代碼任務(wù)標(biāo)記實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07JAVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例
下面小編就為大家?guī)硪黄狫AVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起 小編過來看看吧2016-11-11