RabbitMQ?Stream插件使用案例代碼
2.4版為RabbitMQ流插件引入了對(duì)RabbitMQStream插件Java客戶(hù)端的初始支持。
- RabbitStreamTemplate
- StreamListener容器
將spring rabbit流依賴(lài)項(xiàng)添加到項(xiàng)目中:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-stream</artifactId> <version>3.1.4</version> </dependency>
您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定隊(duì)列類(lèi)型,正常地配置隊(duì)列。例如:
@Bean Queue stream() { return QueueBuilder.durable("stream.queue1") .stream() .build(); }
然而,這僅在您還使用non-stream 組件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)時(shí)才有效,因?yàn)樵诖蜷_(kāi)AMQP連接時(shí)會(huì)觸發(fā)管理員來(lái)聲明定義的bean。如果您的應(yīng)用程序僅使用流組件,或者您希望使用高級(jí)流配置功能,則應(yīng)改為配置StreamAdmin:
@Bean StreamAdmin streamAdmin(Environment env) { return new StreamAdmin(env, sc -> { sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create(); sc.stream("stream.queue2").create(); }); }
一、Sending Messages
RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。
public interface RabbitStreamOperations extends AutoCloseable { CompletableFuture<Boolean> send(Message message); CompletableFuture<Boolean> convertAndSend(Object message); CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp); CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message); MessageBuilder messageBuilder(); MessageConverter messageConverter(); StreamMessageConverter streamMessageConverter(); @Override void close() throws AmqpException; }
RabbitStreamTemplate實(shí)現(xiàn)具有以下構(gòu)造函數(shù)和屬性:
public RabbitStreamTemplate(Environment environment, String streamName) { } public void setMessageConverter(MessageConverter messageConverter) { } public void setStreamConverter(StreamMessageConverter streamConverter) { } public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) { }
MessageConverter在convertAndSend方法中用于將對(duì)象轉(zhuǎn)換為Spring AMQP消息。
StreamMessageConverter用于將Spring AMQP消息轉(zhuǎn)換為本機(jī)流消息。
您也可以直接發(fā)送本機(jī)流消息;使用messageBuilder()方法提供對(duì)生產(chǎn)者的消息生成器的訪問(wèn)。
ProducerCustomizer提供了一種機(jī)制,用于在生成生產(chǎn)者之前對(duì)其進(jìn)行自定義。
二、Receiving Messages
異步消息接收由StreamListenerContainer(以及使用@RabbitListener時(shí)的StreamRabbitListerContainerFactory)提供。
偵聽(tīng)器容器需要一個(gè)Environment以及一個(gè)流名稱(chēng)。
您可以使用經(jīng)典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:
public interface StreamMessageListener extends MessageListener { void onStreamMessage(Message message, Context context); }
有關(guān)支持的屬性的信息,請(qǐng)參閱消息偵聽(tīng)器容器配置。
與模板類(lèi)似,容器具有ConsumerCustomizer屬性。
有關(guān)自定義環(huán)境和使用者的信息,請(qǐng)參閱Java客戶(hù)端文檔。
使用@RabbitListener時(shí),配置StreamRabbitListerContainerFactory;此時(shí),大多數(shù)@RabbitListener屬性(并發(fā)等)將被忽略。僅支持id、隊(duì)列、autoStartup和containerFactory。此外,隊(duì)列只能包含一個(gè)流名稱(chēng)。
三、Examples
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1"); template.setProducerCustomizer((name, builder) -> builder.name("test")); return template; } @Bean RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) { return new StreamRabbitListenerContainerFactory(env); } @RabbitListener(queues = "test.stream.queue1") void listen(String in) { ... } @Bean RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) { StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env); factory.setNativeListener(true); factory.setConsumerCustomizer((id, builder) -> { builder.name("myConsumer") .offset(OffsetSpecification.first()) .manualTrackingStrategy(); }); return factory; } @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory") void nativeMsg(Message in, Context context) { ... context.storeOffset(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue1") .stream() .build(); } @Bean Queue stream() { return QueueBuilder.durable("test.stream.queue2") .stream() .build(); }
2.4.5版將adviceChain屬性添加到StreamListenerContainer(及其工廠)。還提供了一個(gè)新的工廠bean來(lái)創(chuàng)建一個(gè)無(wú)狀態(tài)重試攔截器,該攔截器帶有一個(gè)可選的StreamMessageRecoverer,用于在使用原始流消息時(shí)使用。
@Bean public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) { StreamRetryOperationsInterceptorFactoryBean rfb = new StreamRetryOperationsInterceptorFactoryBean(); rfb.setRetryOperations(retryTemplate); rfb.setStreamMessageRecoverer((msg, context, throwable) -> { ... }); return rfb; }
四、Super Streams
超級(jí)流是分區(qū)流的抽象概念,通過(guò)將多個(gè)流隊(duì)列綁定到具有參數(shù)x-Super-Stream:true的交換來(lái)實(shí)現(xiàn)。
1、調(diào)配
為了方便起見(jiàn),可以通過(guò)定義類(lèi)型為SuperStream的單個(gè)bean來(lái)提供超級(jí)流。
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3); }
RabbitAdmin檢測(cè)到這個(gè)bean,并將聲明交換(my.super.stream)和3個(gè)隊(duì)列(分區(qū))-my.super-stream-n,其中n是0,1,2,綁定的路由密鑰等于n。
如果您還希望通過(guò)AMQP向exchange 發(fā)布,您可以提供自定義路由密鑰:
@Bean SuperStream superStream() { return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i) .mapToObj(j -> "rk-" + j) .collect(Collectors.toList())); }
key 的數(shù)量必須等于分區(qū)的數(shù)量。
2、向超級(jí)流生產(chǎn)消息
你必須向 RabbitStreamTemplate
添加一個(gè) superStreamRoutingFunction
:
@Bean RabbitStreamTemplate streamTemplate(Environment env) { RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1"); template.setSuperStreamRouting(message -> { // some logic to return a String for the client's hashing algorithm }); return template; }
你也可以通過(guò)AMQP發(fā)布,使用 RabbitTemplate
。
到此這篇關(guān)于RabbitMQ Stream插件使用詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Stream插件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決
這篇文章主要介紹了java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決的相關(guān)資料,需要的朋友可以參考下2017-03-03幾種JAVA細(xì)粒度鎖的實(shí)現(xiàn)方式
這篇文章主要為大家詳細(xì)介紹了幾種JAVA細(xì)粒度鎖的實(shí)現(xiàn)方式,感興趣的小伙伴們可以參考一下2016-05-05Java File類(lèi)常用方法與文件過(guò)濾器詳解
Java File類(lèi)以抽象的方式代表文件名和目錄路徑名。該類(lèi)主要用于文件和目錄的創(chuàng)建、文件的查找和文件的刪除等。File對(duì)象代表磁盤(pán)中實(shí)際存在的文件和目錄。本篇文章我們來(lái)講解File類(lèi)的常用方法與文件過(guò)濾器2022-04-04Java并發(fā)編程之死鎖相關(guān)知識(shí)整理
前篇文章在講解線程安全的時(shí)候,有提到過(guò)為了保證每個(gè)線程都能正常執(zhí)行共享資源操作,Java引入了鎖機(jī)制,雖然這樣使多線程改善了系統(tǒng)的處理能力,然而也帶來(lái)了新的問(wèn)題,其中之一:死鎖,需要的朋友可以參考下2021-06-06springboot 實(shí)現(xiàn)長(zhǎng)鏈接轉(zhuǎn)短鏈接的示例代碼
短鏈接服務(wù)通過(guò)將長(zhǎng)URL轉(zhuǎn)換成6位短碼,并存儲(chǔ)長(zhǎng)短鏈接對(duì)應(yīng)關(guān)系到數(shù)據(jù)庫(kù)中,用戶(hù)訪問(wèn)短鏈接時(shí),系統(tǒng)通過(guò)查詢(xún)數(shù)據(jù)庫(kù)并重定向到原始URL,實(shí)現(xiàn)快速訪問(wèn),本文就來(lái)介紹一下如何使用,感興趣的可以了解一下2024-09-09java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實(shí)現(xiàn)
這篇文章主要介紹了java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09