欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ?Stream插件使用案例代碼

 更新時(shí)間:2024年04月17日 11:49:49   作者:Doker技術(shù)人的品牌  
這篇文章主要介紹了RabbitMQ?Stream插件使用案例代碼,2.4版為RabbitMQ流插件引入了對(duì)RabbitMQStream插件Java客戶(hù)端的初始支持,需要的朋友可以參考下

2.4版為RabbitMQ流插件引入了對(duì)RabbitMQStream插件Java客戶(hù)端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

將spring rabbit流依賴(lài)項(xiàng)添加到項(xiàng)目中:

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定隊(duì)列類(lèi)型,正常地配置隊(duì)列。例如:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,這僅在您還使用non-stream 組件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)時(shí)才有效,因?yàn)樵诖蜷_(kāi)AMQP連接時(shí)會(huì)觸發(fā)管理員來(lái)聲明定義的bean。如果您的應(yīng)用程序僅使用流組件,或者您希望使用高級(jí)流配置功能,則應(yīng)改為配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {
	CompletableFuture<Boolean> send(Message message);
	CompletableFuture<Boolean> convertAndSend(Object message);
	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
	MessageBuilder messageBuilder();
	MessageConverter messageConverter();
	StreamMessageConverter streamMessageConverter();
	@Override
	void close() throws AmqpException;
}

RabbitStreamTemplate實(shí)現(xiàn)具有以下構(gòu)造函數(shù)和屬性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于將對(duì)象轉(zhuǎn)換為Spring AMQP消息。

StreamMessageConverter用于將Spring AMQP消息轉(zhuǎn)換為本機(jī)流消息。

您也可以直接發(fā)送本機(jī)流消息;使用messageBuilder()方法提供對(duì)生產(chǎn)者的消息生成器的訪問(wèn)。

ProducerCustomizer提供了一種機(jī)制,用于在生成生產(chǎn)者之前對(duì)其進(jìn)行自定義。

 二、Receiving Messages

異步消息接收由StreamListenerContainer(以及使用@RabbitListener時(shí)的StreamRabbitListerContainerFactory)提供。

偵聽(tīng)器容器需要一個(gè)Environment以及一個(gè)流名稱(chēng)。

您可以使用經(jīng)典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {
	void onStreamMessage(Message message, Context context);
}

有關(guān)支持的屬性的信息,請(qǐng)參閱消息偵聽(tīng)器容器配置。

與模板類(lèi)似,容器具有ConsumerCustomizer屬性。

有關(guān)自定義環(huán)境和使用者的信息,請(qǐng)參閱Java客戶(hù)端文檔。

使用@RabbitListener時(shí),配置StreamRabbitListerContainerFactory;此時(shí),大多數(shù)@RabbitListener屬性(并發(fā)等)將被忽略。僅支持id、隊(duì)列、autoStartup和containerFactory。此外,隊(duì)列只能包含一個(gè)流名稱(chēng)。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}
@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}
@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5版將adviceChain屬性添加到StreamListenerContainer(及其工廠)。還提供了一個(gè)新的工廠bean來(lái)創(chuàng)建一個(gè)無(wú)狀態(tài)重試攔截器,該攔截器帶有一個(gè)可選的StreamMessageRecoverer,用于在使用原始流消息時(shí)使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}

四、Super Streams

超級(jí)流是分區(qū)流的抽象概念,通過(guò)將多個(gè)流隊(duì)列綁定到具有參數(shù)x-Super-Stream:true的交換來(lái)實(shí)現(xiàn)。

1、調(diào)配

為了方便起見(jiàn),可以通過(guò)定義類(lèi)型為SuperStream的單個(gè)bean來(lái)提供超級(jí)流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin檢測(cè)到這個(gè)bean,并將聲明交換(my.super.stream)和3個(gè)隊(duì)列(分區(qū))-my.super-stream-n,其中n是0,1,2,綁定的路由密鑰等于n。

如果您還希望通過(guò)AMQP向exchange 發(fā)布,您可以提供自定義路由密鑰:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

key 的數(shù)量必須等于分區(qū)的數(shù)量。

2、向超級(jí)流生產(chǎn)消息

你必須向 RabbitStreamTemplate 添加一個(gè) superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你也可以通過(guò)AMQP發(fā)布,使用 RabbitTemplate。

到此這篇關(guān)于RabbitMQ Stream插件使用詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Stream插件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決

    java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決

    這篇文章主要介紹了java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問(wèn)題總結(jié)及解決的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • 幾種JAVA細(xì)粒度鎖的實(shí)現(xiàn)方式

    幾種JAVA細(xì)粒度鎖的實(shí)現(xiàn)方式

    這篇文章主要為大家詳細(xì)介紹了幾種JAVA細(xì)粒度鎖的實(shí)現(xiàn)方式,感興趣的小伙伴們可以參考一下
    2016-05-05
  • Spring自動(dòng)裝配@Autowired教程

    Spring自動(dòng)裝配@Autowired教程

    @Autowired注解是Spring中非常重要且常見(jiàn)的,接下來(lái)就簡(jiǎn)要的介紹一下它的用法。@Autowired默認(rèn)是通過(guò)set方法,按照類(lèi)型自動(dòng)裝配JavaBean,set方法可省略不寫(xiě),它主要是修飾在成員變量上
    2023-01-01
  • Java中的自定義異常捕獲方式

    Java中的自定義異常捕獲方式

    這篇文章主要介紹了Java中的自定義異常捕獲方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java File類(lèi)常用方法與文件過(guò)濾器詳解

    Java File類(lèi)常用方法與文件過(guò)濾器詳解

    Java File類(lèi)以抽象的方式代表文件名和目錄路徑名。該類(lèi)主要用于文件和目錄的創(chuàng)建、文件的查找和文件的刪除等。File對(duì)象代表磁盤(pán)中實(shí)際存在的文件和目錄。本篇文章我們來(lái)講解File類(lèi)的常用方法與文件過(guò)濾器
    2022-04-04
  • Java并發(fā)編程之死鎖相關(guān)知識(shí)整理

    Java并發(fā)編程之死鎖相關(guān)知識(shí)整理

    前篇文章在講解線程安全的時(shí)候,有提到過(guò)為了保證每個(gè)線程都能正常執(zhí)行共享資源操作,Java引入了鎖機(jī)制,雖然這樣使多線程改善了系統(tǒng)的處理能力,然而也帶來(lái)了新的問(wèn)題,其中之一:死鎖,需要的朋友可以參考下
    2021-06-06
  • springboot 實(shí)現(xiàn)長(zhǎng)鏈接轉(zhuǎn)短鏈接的示例代碼

    springboot 實(shí)現(xiàn)長(zhǎng)鏈接轉(zhuǎn)短鏈接的示例代碼

    短鏈接服務(wù)通過(guò)將長(zhǎng)URL轉(zhuǎn)換成6位短碼,并存儲(chǔ)長(zhǎng)短鏈接對(duì)應(yīng)關(guān)系到數(shù)據(jù)庫(kù)中,用戶(hù)訪問(wèn)短鏈接時(shí),系統(tǒng)通過(guò)查詢(xún)數(shù)據(jù)庫(kù)并重定向到原始URL,實(shí)現(xiàn)快速訪問(wèn),本文就來(lái)介紹一下如何使用,感興趣的可以了解一下
    2024-09-09
  • java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實(shí)現(xiàn)

    java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實(shí)現(xiàn)

    這篇文章主要介紹了java 地心坐標(biāo)系(ECEF)和WGS-84坐標(biāo)系(WGS84)互轉(zhuǎn)的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • java調(diào)用短信貓發(fā)短信示例

    java調(diào)用短信貓發(fā)短信示例

    這篇文章主要介紹了java調(diào)用短信貓發(fā)短信示例,需要的朋友可以參考下
    2014-04-04
  • java Jersey框架初體驗(yàn)

    java Jersey框架初體驗(yàn)

    本篇主要是Jersey體驗(yàn),你將在不做任何編碼的情況下,體驗(yàn)Jersey框架的神氣魅力!本文還假定你在eclipse里安裝了Maven插件
    2016-07-07

最新評(píng)論