RabbitMQ?Stream插件使用案例代碼
2.4版為RabbitMQ流插件引入了對RabbitMQStream插件Java客戶端的初始支持。
- RabbitStreamTemplate
- StreamListener容器
將spring rabbit流依賴項添加到項目中:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-stream</artifactId> <version>3.1.4</version> </dependency>
您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定隊列類型,正常地配置隊列。例如:
@Bean Queue stream() { return QueueBuilder.durable("stream.queue1") .stream() .build(); }
然而,這僅在您還使用non-stream 組件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)時才有效,因為在打開AMQP連接時會觸發(fā)管理員來聲明定義的bean。如果您的應(yīng)用程序僅使用流組件,或者您希望使用高級流配置功能,則應(yīng)改為配置StreamAdmin:
@Bean StreamAdmin streamAdmin(Environment env) { return new StreamAdmin(env, sc -> { sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create(); sc.stream("stream.queue2").create(); }); }
一、Sending Messages
RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。
public interface RabbitStreamOperations extends AutoCloseable { CompletableFuture<Boolean> send(Message message); CompletableFuture<Boolean> convertAndSend(Object message); CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp); CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message); MessageBuilder messageBuilder(); MessageConverter messageConverter(); StreamMessageConverter streamMessageConverter(); @Override void close() throws AmqpException; }
RabbitStreamTemplate實現(xiàn)具有以下構(gòu)造函數(shù)和屬性:
public RabbitStreamTemplate(Environment environment, String streamName) { } public void setMessageConverter(MessageConverter messageConverter) { } public void setStreamConverter(StreamMessageConverter streamConverter) { } public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) { }
MessageConverter在convertAndSend方法中用于將對象轉(zhuǎn)換為Spring AMQP消息。
StreamMessageConverter用于將Spring AMQP消息轉(zhuǎn)換為本機流消息。
您也可以直接發(fā)送本機流消息;使用messageBuilder()方法提供對生產(chǎn)者的消息生成器的訪問。
ProducerCustomizer提供了一種機制,用于在生成生產(chǎn)者之前對其進行自定義。
二、Receiving Messages
異步消息接收由StreamListenerContainer(以及使用@RabbitListener時的StreamRabbitListerContainerFactory)提供。
偵聽器容器需要一個Environment以及一個流名稱。
您可以使用經(jīng)典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:
public interface StreamMessageListener extends MessageListener { void onStreamMessage(Message message, Context context); }
有關(guān)支持的屬性的信息,請參閱消息偵聽器容器配置。
與模板類似,容器具有ConsumerCustomizer屬性。
有關(guān)自定義環(huán)境和使用者的信息,請參閱Java客戶端文檔。
使用@RabbitListener時,配置StreamRabbitListerContainerFactory;此時,大多數(shù)@RabbitListener屬性(并發(fā)等)將被忽略。僅支持id、隊列、autoStartup和containerFactory。此外,隊列只能包含一個流名稱。
三、Examples
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1"); template.setProducerCustomizer((name, builder) -> builder.name("test")); return template; } @Bean RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) { return new StreamRabbitListenerContainerFactory(env); } @RabbitListener(queues = "test.stream.queue1") void listen(String in) { ... } @Bean RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) { StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env); factory.setNativeListener(true); factory.setConsumerCustomizer((id, builder) -> { builder.name("myConsumer") .offset(OffsetSpecification.first()) .manualTrackingStrategy(); }); return factory; } @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory") void nativeMsg(Message in, Context context) { ... context.storeOffset(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue1") .stream() .build(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue2") .stream() .build(); }
2.4.5版將adviceChain屬性添加到StreamListenerContainer(及其工廠)。還提供了一個新的工廠bean來創(chuàng)建一個無狀態(tài)重試攔截器,該攔截器帶有一個可選的StreamMessageRecoverer,用于在使用原始流消息時使用。
@Bean public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) { StreamRetryOperationsInterceptorFactoryBean rfb = new StreamRetryOperationsInterceptorFactoryBean(); rfb.setRetryOperations(retryTemplate); rfb.setStreamMessageRecoverer((msg, context, throwable) -> { ... }); return rfb; }
四、Super Streams
超級流是分區(qū)流的抽象概念,通過將多個流隊列綁定到具有參數(shù)x-Super-Stream:true的交換來實現(xiàn)。
1、調(diào)配
為了方便起見,可以通過定義類型為SuperStream的單個bean來提供超級流。
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3); }
RabbitAdmin檢測到這個bean,并將聲明交換(my.super.stream)和3個隊列(分區(qū))-my.super-stream-n,其中n是0,1,2,綁定的路由密鑰等于n。
如果您還希望通過AMQP向exchange 發(fā)布,您可以提供自定義路由密鑰:
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i) .mapToObj(j -> "rk-" + j) .collect(Collectors.toList())); }
key 的數(shù)量必須等于分區(qū)的數(shù)量。
2、向超級流生產(chǎn)消息
你必須向 RabbitStreamTemplate
添加一個 superStreamRoutingFunction
:
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1"); template.setSuperStreamRouting(message -> { // some logic to return a String for the client's hashing algorithm }); return template; }
你也可以通過AMQP發(fā)布,使用 RabbitTemplate
。
到此這篇關(guān)于RabbitMQ Stream插件使用詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Stream插件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問題總結(jié)及解決
這篇文章主要介紹了java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問題總結(jié)及解決的相關(guān)資料,需要的朋友可以參考下2017-03-03springboot 實現(xiàn)長鏈接轉(zhuǎn)短鏈接的示例代碼
短鏈接服務(wù)通過將長URL轉(zhuǎn)換成6位短碼,并存儲長短鏈接對應(yīng)關(guān)系到數(shù)據(jù)庫中,用戶訪問短鏈接時,系統(tǒng)通過查詢數(shù)據(jù)庫并重定向到原始URL,實現(xiàn)快速訪問,本文就來介紹一下如何使用,感興趣的可以了解一下2024-09-09java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實現(xiàn)
這篇文章主要介紹了java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09