學(xué)會(huì)Pulsar Consumer的使用方式
1、使用前準(zhǔn)備
引入依賴:
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.6.1</version> </dependency>
2、PulsarClient
在嘗試使用Producer和Consumer前,我們先講一下Pulsar客戶端,因?yàn)椴还苁荘roducer還是Consumer,都是依靠PulsarClient來創(chuàng)建的:
/** * Pulsar工具類 * @author winfun **/ public class PulsarUtils { /** * 根據(jù)serviceUrl創(chuàng)建PulsarClient * @param serviceUrl 服務(wù)地址 * @return 客戶端 * @throws PulsarClientException 異常 */ public static PulsarClient createPulsarClient(String serviceUrl) throws PulsarClientException { return PulsarClient.builder() .serviceUrl(serviceUrl) .build(); } }
我們這里簡(jiǎn)單使用,只借用ServiceUrl創(chuàng)建客戶端,其實(shí)還有很多比較重要的參數(shù),下面稍微列舉一下:
- ioThreads:Set the number of threads to be used for handling connections to brokers (default: 1 thread)
- listenerThreads:Set the number of threads to be used for message listeners (default: 1 thread). 一條線程默認(rèn)只為一個(gè)消費(fèi)者服務(wù)
- enableTcpNoDelay:No-delay features make sure packets are sent out on the network as soon as possible
- …
3、Producer
Producer這里我們也先簡(jiǎn)單使用,只負(fù)責(zé)往指定Topic發(fā)送消息,其他功能不用,例如異步發(fā)送、延時(shí)發(fā)送等
/** * 初次使用Pulsar生產(chǎn)者,無任何封裝 * @author winfun **/ public class FirstProducerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .build(); ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic") .blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS); Producer<String> producer = productBuilder.create(); for (int i = 0; i < 100; i++) { producer.send("hello"+i);; } producer.close(); } }
4、Consumer
下面我們將比較詳細(xì)地介紹消費(fèi)者的使用方式,因?yàn)檫@里能拓展的東西稍微多一點(diǎn),下面開始使用旅程。
4.1 第一次使用:
我們利用PulsarClient創(chuàng)建Consumer;接著在死循環(huán)中利用Consumer#receive方法接收消息然后進(jìn)行消費(fèi)。
/** * 初次使用Pulsar消費(fèi)者,無任何封裝 * @author winfun **/ @Slf4j public class FirstConsumerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .build(); /** * The subscribe method will auto subscribe the consumer to the specified topic and subscription. * One way to make the consumer listen on the topic is to set up a while loop. * In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed. * If the processing logic fails, you can use negative acknowledgement to redeliver the message later. */ Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); // 死循環(huán)接收 while (true){ Message<String> message = consumer.receive(); String msgContent = message.getValue(); log.info("接收到消息: {}",msgContent); consumer.acknowledge(message); } } }
4.2 第二次使用:
上面我們可以看到,我們是利用死循環(huán)來保證及時(shí)消費(fèi),但是這樣會(huì)導(dǎo)致主線程;所以下面我們可以使用Pulsar提供的MessageListener,即監(jiān)聽器,當(dāng)消息來了,會(huì)回調(diào)監(jiān)聽器指定的方法,從而避免阻塞主線程。
/** * 使用MessageListener,避免死循環(huán)代碼&阻塞主線程 * @author winfun **/ @Slf4j public class SecondConsumerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); /** * If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener. * */ Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Exclusive) .messageListener((MessageListener<String>) (consumer1, msg) -> { /** * 當(dāng)接收到一個(gè)新的消息,就會(huì)回調(diào) MessageListener的receive方法。 * 消息將會(huì)保證按順序投放到單個(gè)消費(fèi)者的同一個(gè)線程,因此可以保證順序消費(fèi) * 除非應(yīng)用程序或broker崩潰,否則只會(huì)為每條消息調(diào)用此方法一次 * 應(yīng)用程序負(fù)責(zé)調(diào)用消費(fèi)者的確認(rèn)方法來確認(rèn)消息已經(jīng)被消費(fèi) * 應(yīng)用程序負(fù)責(zé)處理消費(fèi)消息時(shí)可能出現(xiàn)的異常 */ log.info("接收到消息:{}",msg.getValue()); try { consumer1.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } }).subscribe(); } }
4.3 第三次使用:
上面利用監(jiān)聽器來解決死循環(huán)代碼和阻塞主線程問題;但是我們可以發(fā)現(xiàn),每次消費(fèi)都是單線程,當(dāng)一個(gè)消息消費(fèi)完才能進(jìn)行下一個(gè)消息的消費(fèi),這樣會(huì)導(dǎo)致消費(fèi)效率非常的低;
如果如果追求高吞吐量,不在乎消息消費(fèi)的順序性,那么我們可以接入線程池;一有消息來就丟進(jìn)線程池中,這樣不但可以支持異步消費(fèi),還能保證消費(fèi)的效率非常的高。
/** * MessageListener 內(nèi)使用線程池進(jìn)行異步消費(fèi) * @author winfun **/ @Slf4j public class ThirdConsumerDemo { public static void main(String[] args) throws PulsarClientException { Executor executor = new ThreadPoolExecutor( 10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100) ); PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); /** * If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener. * */ Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Exclusive) .messageListener((MessageListener<String>) (consumer1, msg) -> { /** * MessageListener還是保證了接收的順序性 * 但是利用線程池進(jìn)行異步消費(fèi)后不能保證消費(fèi)順序性 */ executor.execute(() -> handleMsg(consumer1, msg)); }).subscribe(); } /** * 線程池異步處理 * @param consumer 消費(fèi)者 * @param msg 消息 */ public static void handleMsg(Consumer consumer, Message msg){ ThreadUtil.sleep(RandomUtil.randomInt(3),TimeUnit.SECONDS); log.info("接收到消息:{}",msg.getValue()); try { consumer.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } }
4.4 第四次使用:
我們可以發(fā)現(xiàn),在上面的三個(gè)例子中,如果在調(diào)用Consumer#acknowledge方法前,因?yàn)榇a問題導(dǎo)致拋異常了,我們是沒有做處理的,那么會(huì)導(dǎo)致消費(fèi)者會(huì)一直重試沒有被確認(rèn)的消息。
那么我們此時(shí)需要接入Pulsar提供的死信隊(duì)列:當(dāng)Consumer消費(fèi)消息時(shí)拋異常,并達(dá)到一定的重試次數(shù),則將消息丟入死信隊(duì)列;但需要注意的是,單獨(dú)使用死信隊(duì)列,Consumer的訂閱類型需要是 Shared/Key_Shared;否則不會(huì)生效。
/** * 超過最大重試次數(shù),進(jìn)入死信隊(duì)列 * @author: winfun **/ @Slf4j public class FourthConsumerDemo { public static void main(String[] args) throws PulsarClientException { /** * 如果指定了死信隊(duì)列策略,但是沒指定死信隊(duì)列 * 死信隊(duì)列:String.format("%s-%s-DLQ", topic, this.subscription) * 這里的this.subscription為上面指定的 subscriptionName。 * * 一般在生產(chǎn)環(huán)境,會(huì)將pulsar的自動(dòng)創(chuàng)建topic功能給關(guān)閉掉,所以上線前,記得先提工單創(chuàng)建指定的死信隊(duì)列。 * * 重點(diǎn)信息: * 如果是單單使用死信隊(duì)列,subscriptionType為 Shared/Key_Shared,否則死信隊(duì)列不生效。 */ PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-topic") .subscriptionName("my-subscription") .receiverQueueSize(100) .ackTimeout(1, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Key_Shared) .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS) .deadLetterPolicy(DeadLetterPolicy.builder() //可以指定最大重試次數(shù),最大重試三次后,進(jìn)入到死信隊(duì)列 .maxRedeliverCount(3) //可以指定死信隊(duì)列 .deadLetterTopic("winfun/study/test-topic-dlq3") .build()) .messageListener((MessageListener<String>) (consumer1, msg) -> { log.info("接收到隊(duì)列「{}」消息:{}",msg.getTopicName(),msg.getValue()); if (msg.getValue().equals("hello3")) { throw new RuntimeException("hello3消息消費(fèi)失敗!"); }else { try { consumer1.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } }).subscribe(); } }
4.5 第五次使用:
死信隊(duì)列一般是不做消費(fèi)的,我們會(huì)關(guān)注死信隊(duì)列的情況,從而作出下一步的動(dòng)作。
而且,一般做消息重試,我們不希望在原Topic中做重試,這樣會(huì)影響原有消息的消費(fèi)進(jìn)度。
那么我們可以同時(shí)使用重試隊(duì)列和死信隊(duì)列。
當(dāng)代碼拋出異常時(shí),我們可以捕獲住,然后調(diào)用Consumer#reconsumeLater方法,將消息丟入重試隊(duì)列;當(dāng)消息重試指定次數(shù)后還無法正常完成消費(fèi),即會(huì)將消息丟入死信隊(duì)列。
/** * 重試隊(duì)列 * @author winfun **/ @Slf4j public class FifthConsumerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); /** * 注意點(diǎn): * 1、使用死信策略,但是沒有指定重試topic和死信topic名稱 * 死信隊(duì)列:String.format("%s-%s-DLQ", topic, this.subscription) * 重試隊(duì)列:String.format("%s-%s-RETRY", topic, this.subscription) * 這里的this.subscription為上面指定的 subscriptionName。 * * 2、是否限制訂閱類型 * 同時(shí)開啟重試隊(duì)列和死信隊(duì)列,不限制subscriptionType只能為Shared/Key_Shared; * 如果只是單獨(dú)使用死信隊(duì)列,需要限制subscriptionType為Shared * * 3、重試原理 * 如果使用重試隊(duì)列,需要保證 enableRetry 是開啟的,否則調(diào)用 reconsumeLater 方法時(shí)會(huì)拋異常:org.apache.pulsar.client.api.PulsarClientException: reconsumeLater method not support! * 如果配置了重試隊(duì)列,consumer會(huì)同時(shí)監(jiān)聽原topic和重試topic,consumer的實(shí)現(xiàn)類對(duì)應(yīng)是:MultiTopicsConsumerImpl * 如果消費(fèi)消息時(shí)調(diào)用了 reconsumeLater 方法,會(huì)將此消息丟進(jìn)重試topic * 如果在重試topic重試maxRedeliverCount次后都無法正確ack消息,即將消息丟到死信隊(duì)列。 * 死信隊(duì)列需要另起Consumer進(jìn)行監(jiān)聽消費(fèi)。 * * 4、直接拋異常 * 如果我們不是業(yè)務(wù)層面上調(diào)用 reconsumeLater 方法來進(jìn)行重試,而是代碼層面拋異常了,如果subscriptionType不為Shared/Key_Shared,消息無法進(jìn)入重試隊(duì)列和死信隊(duì)列,是當(dāng)前消費(fèi)者無限在原topic進(jìn)行消費(fèi)。 * 而如果如果subscriptionType為Shared/Key_Shared,則消息進(jìn)行maxRedeliverCount次消費(fèi)后,會(huì)直接進(jìn)入到死信隊(duì)列,此時(shí)不會(huì)用到重試隊(duì)列。 * 因此,重試隊(duì)列是僅僅針對(duì) reconsumeLater 方法的,而不針對(duì)異常的重試。 */ Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-retry-topic") .subscriptionName("my-subscription") .receiverQueueSize(100) .ackTimeout(1, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Exclusive) .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS) .enableRetry(true) .deadLetterPolicy(DeadLetterPolicy.builder() //可以指定最大重試次數(shù),最大重試三次后,進(jìn)入到死信隊(duì)列 .maxRedeliverCount(3) .retryLetterTopic("winfun/study/test-retry-topic-retry") //可以指定死信隊(duì)列 .deadLetterTopic("winfun/study/test-retry-topic-dlq") .build()) .messageListener((MessageListener<String>) (consumer1, msg) -> { log.info("接收到隊(duì)列「{}」消息:{}",msg.getTopicName(),msg.getValue()); if (msg.getValue().equals("hello3")) { try { consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS); } catch (PulsarClientException e) { e.printStackTrace(); } //throw new RuntimeException("hello3消息消費(fèi)失??!"); }else { try { consumer1.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } }).subscribe(); } }
重試機(jī)制源碼分析
關(guān)于重試機(jī)制,其實(shí)是比較有意思的,下面我們會(huì)簡(jiǎn)單分析一下源碼。
1.判斷是否開啟重試機(jī)制,如果沒有開啟重試機(jī)制,則直接拋異常
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException { // 如果沒開啟重試機(jī)制,直接拋異常 if (!this.conf.isRetryEnable()) { throw new PulsarClientException("reconsumeLater method not support!"); } else { try { // 當(dāng)然了,reconsumeLaterAsync里面也會(huì)判斷是否開啟重試機(jī)制 this.reconsumeLaterAsync(message, delayTime, unit).get(); } catch (Exception var7) { Throwable t = var7.getCause(); if (t instanceof PulsarClientException) { throw (PulsarClientException)t; } else { throw new PulsarClientException(t); } } } }
還有我們可以發(fā)現(xiàn),pulsar很多方法是支持同步和異步的,而同步就是直接調(diào)用異步方法,再后調(diào)用get()方法進(jìn)行同步阻塞等待即可。
2.調(diào)用 reconsumeLaterAsunc 方法,接著調(diào)用 get() 進(jìn)行同步阻塞等待結(jié)果
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) { if (!this.conf.isRetryEnable()) { return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!")); } else { try { return this.doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit); } catch (NullPointerException var6) { return FutureUtil.failedFuture(new InvalidMessageException(var6.getMessage())); } } }
3.調(diào)用 doReconsumeLater 方法
我們知道,在 Pulsar 的 Consumer 中,可以支持多 Topic 監(jiān)聽,而如果我們加入了重試機(jī)制,默認(rèn)是同個(gè) Consumer 同時(shí)監(jiān)聽原隊(duì)列和重試隊(duì)列,所以 Consumer 接口的實(shí)現(xiàn)此時(shí)為 MultiTopicsConsumerImpl,而不是 ComsumerImpl。
那我們看看 MultiConsumerImpl 的 doReconsumeLater 是如何進(jìn)行重新消費(fèi)的:
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) { MessageId messageId = message.getMessageId(); Preconditions.checkArgument(messageId instanceof TopicMessageIdImpl); TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)messageId; if (this.getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } else { MessageId innerId; if (ackType == AckType.Cumulative) { Consumer individualConsumer = (Consumer)this.consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { innerId = topicMessageId.getInnerMessageId(); return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit); } else { return FutureUtil.failedFuture(new NotConnectedException()); } } else { ConsumerImpl<T> consumer = (ConsumerImpl)this.consumers.get(topicMessageId.getTopicPartitionName()); innerId = topicMessageId.getInnerMessageId(); return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit).thenRun(() -> { this.unAckedMessageTracker.remove(topicMessageId); }); } } }
- 首先判斷客戶端是否為準(zhǔn)備狀態(tài)
- 接著判斷 AckType 是累計(jì)的還是單獨(dú)的,如果是累計(jì)的話,subscriptionType 一定要是 exclusive
- 不管是累計(jì)還是單獨(dú)的,最后都是調(diào)用 ConsumerImpl 的 doReconsumerLater 方法
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) { MessageId messageId = message.getMessageId(); if (messageId instanceof TopicMessageIdImpl) { messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId(); } Preconditions.checkArgument(messageId instanceof MessageIdImpl); if (this.getState() != State.Ready && this.getState() != State.Connecting) { this.stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + this.getState()); if (AckType.Individual.equals(ackType)) { this.onAcknowledge(messageId, exception); } else if (AckType.Cumulative.equals(ackType)) { this.onAcknowledgeCumulative(messageId, exception); } return FutureUtil.failedFuture(exception); } else { if (delayTime < 0L) { delayTime = 0L; } // 如果 retryLetterProducer 為null,則嘗試創(chuàng)建 if (this.retryLetterProducer == null) { try { this.createProducerLock.writeLock().lock(); if (this.retryLetterProducer == null) { this.retryLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).blockIfQueueFull(false).create(); } } catch (Exception var28) { log.error("Create retry letter producer exception with topic: {}", this.deadLetterPolicy.getRetryLetterTopic(), var28); } finally { this.createProducerLock.writeLock().unlock(); } } // 如果 retryLetterProcuder 不為空,則嘗試將消息丟進(jìn)重試隊(duì)列中 if (this.retryLetterProducer != null) { try { MessageImpl<T> retryMessage = null; String originMessageIdStr = null; String originTopicNameStr = null; if (message instanceof TopicMessageImpl) { retryMessage = (MessageImpl)((TopicMessageImpl)message).getMessage(); originMessageIdStr = ((TopicMessageIdImpl)message.getMessageId()).getInnerMessageId().toString(); originTopicNameStr = ((TopicMessageIdImpl)message.getMessageId()).getTopicName(); } else if (message instanceof MessageImpl) { retryMessage = (MessageImpl)message; originMessageIdStr = ((MessageImpl)message).getMessageId().toString(); originTopicNameStr = ((MessageImpl)message).getTopicName(); } SortedMap<String, String> propertiesMap = new TreeMap(); int reconsumetimes = 1; if (message.getProperties() != null) { propertiesMap.putAll(message.getProperties()); } // 如果包含 RECONSUMETIMES,則最遞增 if (propertiesMap.containsKey("RECONSUMETIMES")) { reconsumetimes = Integer.valueOf((String)propertiesMap.get("RECONSUMETIMES")); ++reconsumetimes; // 否則先加入「原始隊(duì)列」和「原始messageId」信息 } else { propertiesMap.put("REAL_TOPIC", originTopicNameStr); propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr); } // 加入重試次數(shù)信息 propertiesMap.put("RECONSUMETIMES", String.valueOf(reconsumetimes)); // 加入延時(shí)時(shí)間信息 propertiesMap.put("DELAY_TIME", String.valueOf(unit.toMillis(delayTime))); TypedMessageBuilder typedMessageBuilderNew; // 判斷是否超過最大重試次數(shù),如果還未超過,則重新投放到重試隊(duì)列 if (reconsumetimes <= this.deadLetterPolicy.getMaxRedeliverCount()) { typedMessageBuilderNew = this.retryLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap); if (delayTime > 0L) { typedMessageBuilderNew.deliverAfter(delayTime, unit); } if (message.hasKey()) { typedMessageBuilderNew.key(message.getKey()); } // 發(fā)送延時(shí)消息 typedMessageBuilderNew.send(); // 確認(rèn)當(dāng)前消息 return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null); } // 先忽略 this.processPossibleToDLQ((MessageIdImpl)messageId); // 判斷 deadLetterProducer 是否為null,如果為null,嘗試創(chuàng)建 if (this.deadLetterProducer == null) { try { if (this.deadLetterProducer == null) { this.createProducerLock.writeLock().lock(); this.deadLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getDeadLetterTopic()).blockIfQueueFull(false).create(); } } catch (Exception var25) { log.error("Create dead letter producer exception with topic: {}", this.deadLetterPolicy.getDeadLetterTopic(), var25); } finally { this.createProducerLock.writeLock().unlock(); } } // 如果 deadLetterProducer 不為null if (this.deadLetterProducer != null) { // 加入「原始隊(duì)列」信息 propertiesMap.put("REAL_TOPIC", originTopicNameStr); // 加入「原始MessageId」信息 propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr); typedMessageBuilderNew = this.deadLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap); // 將消息內(nèi)容發(fā)往死信隊(duì)列中 typedMessageBuilderNew.send(); // 確認(rèn)當(dāng)前消息 return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null); } } catch (Exception var27) { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", new Object[]{this.deadLetterProducer.getTopic(), messageId, var27}); Set<MessageId> messageIds = new HashSet(); messageIds.add(messageId); this.unAckedMessageTracker.remove(messageId); this.redeliverUnacknowledgedMessages(messageIds); } } return CompletableFuture.completedFuture((Object)null); } }
分析了一波,我們可以看到和上面代碼的注釋描述的基本一致。
4.6 第六次使用
上面我們提到,當(dāng)Consumer指定了重試隊(duì)列,Consumer會(huì)同時(shí)監(jiān)聽原Topic和重試Topic,那么如果我們想多個(gè)Consumer消費(fèi)重試Topic時(shí),需要將Consumer的訂閱類型指定為 Shared/Key_Shared,讓重試隊(duì)列支持多Consumer監(jiān)聽消費(fèi),提升重試隊(duì)列的消費(fèi)效率。
/** * 重試隊(duì)列-Shared * @author winfun **/ @Slf4j public class SixthConsumerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); /** * 因?yàn)槿绻付酥卦嚥呗?,Consumer會(huì)同時(shí)監(jiān)聽「原隊(duì)列」和「重試隊(duì)列」 * 即如果我們想「重試隊(duì)列」可以讓多個(gè) Consumer 監(jiān)聽,從而提高消費(fèi)能力,那么 Consumer 需指定為 Shared 模式。 */ Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-retry-topic") .subscriptionName("my-subscription") .receiverQueueSize(100) .ackTimeout(1, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS) .enableRetry(true) .deadLetterPolicy(DeadLetterPolicy.builder() //可以指定最大重試次數(shù),最大重試三次后,進(jìn)入到死信隊(duì)列 .maxRedeliverCount(3) .retryLetterTopic("winfun/study/test-retry-topic-retry") //可以指定死信隊(duì)列 .deadLetterTopic("winfun/study/test-retry-topic-dlq") .build()) .messageListener((MessageListener<String>) (consumer1, msg) -> { log.info("接收到隊(duì)列「{}」消息:{}",msg.getTopicName(),msg.getValue()); if (msg.getValue().contains("1") || msg.getValue().contains("2") || msg.getValue().contains("3")) { try { consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS); } catch (PulsarClientException e) { e.printStackTrace(); } //throw new RuntimeException("hello3消息消費(fèi)失??!"); }else { try { consumer1.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } }).subscribe(); } } /** * 監(jiān)聽重試隊(duì)列-Shared訂閱模式 * @author winfun **/ @Slf4j public class RetryConsumerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650"); Consumer<String> deadLetterConsumer = client.newConsumer(Schema.STRING) .topic("winfun/study/test-retry-topic-retry") .subscriptionName("my-subscription2") .receiverQueueSize(100) .ackTimeout(1, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) .messageListener((MessageListener<String>) (consumer1, msg) -> { log.info("接收到隊(duì)列「{}」消息:{}",msg.getTopicName(),msg.getValue()); try { consumer1.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } }).subscribe(); } }
到此,我們已經(jīng)將Consmuer的幾種使用方式都嘗試了一遍,可以說基本包含了常用的操作;但是我們可以發(fā)現(xiàn),如果我們每次新建一個(gè)Consumer都需要寫一堆同樣的代碼,那其實(shí)挺麻煩的,又不好看;并且,現(xiàn)在我們大部分項(xiàng)目都是基于 SpringBoot 來做的,而 SpringBoot 也沒有一個(gè)比較大眾的Starter。
所以接下來的計(jì)劃就是,自己寫一個(gè)編寫一個(gè)關(guān)于Pulsar的SpringBoot Starter,這個(gè)組件不會(huì)特別復(fù)雜,但是會(huì)支持 Producer 和 Cousnmer 的自動(dòng)配置,并且支持 Consumer 上面提到的幾個(gè)點(diǎn):MessageListener 監(jiān)聽、線程池異步并發(fā)消費(fèi)、重試機(jī)制等。
到此這篇關(guān)于學(xué)會(huì)Pulsar Consumer的使用方式的文章就介紹到這了,更多相關(guān)Pulsar Consumer 使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA?code?template配置和參數(shù)方式
這篇文章主要介紹了IDEA?code?template配置和參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教<BR>2024-01-01springBoot中的CORS跨域注解@CrossOrigin詳解
這篇文章主要介紹了springBoot中的CORS跨域注解@CrossOrigin詳解,通常,服務(wù)于?JS?的主機(jī)(例如?example.com)與服務(wù)于數(shù)據(jù)的主機(jī)(例如?api.example.com)是不同的,在這種情況下,CORS?可以實(shí)現(xiàn)跨域通信,需要的朋友可以參考下2023-12-12Java?easyExcel的復(fù)雜表頭多級(jí)表頭導(dǎo)入
最近在項(xiàng)目開發(fā)中遇到的一個(gè)excel復(fù)雜表頭的導(dǎo)入數(shù)據(jù)庫操作,下面這篇文章主要給大家介紹了關(guān)于Java?easyExcel的復(fù)雜表頭多級(jí)表頭導(dǎo)入的相關(guān)資料,需要的朋友可以參考下2022-06-06Java實(shí)現(xiàn)字符數(shù)組全排列的方法
這篇文章主要介紹了Java實(shí)現(xiàn)字符數(shù)組全排列的方法,涉及Java針對(duì)字符數(shù)組的遍歷及排序算法的實(shí)現(xiàn)技巧,需要的朋友可以參考下2015-12-12IDEA配置Tomcat后,控制臺(tái)tomcat?catalina?log出現(xiàn)亂碼問題
本文介紹了如何通過設(shè)置Tomcat和IDEA的編碼格式來解決編碼問題,首先嘗試修改Tomcat的logging.properties文件中的編碼設(shè)置,如果未解決問題,則調(diào)整IDEA的編碼設(shè)置,通過修改vmoptions文件來全局設(shè)置IDEA的編碼格式,作者分享了個(gè)人成功解決問題的方法和步驟,供其他開發(fā)者參考2024-09-09Java8利用Stream實(shí)現(xiàn)列表去重的方法詳解
這篇文章主要為大家介紹了Java利用Stream實(shí)現(xiàn)列表去重的幾種方法詳解,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下2022-04-04springboot處理url中帶斜杠/\字符的參數(shù)報(bào)400問題
這篇文章主要介紹了springboot處理url中帶斜杠/\字符的參數(shù)報(bào)400問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01