SpringCloud中的Stream服務(wù)間消息傳遞詳解
一、Stream 的介紹
Stream 就是在消息隊列的基礎(chǔ)上,對其進行封裝,可以是我們更方便的去使用。
Spring Cloud Stream應(yīng)用由第三方的中間件組成。應(yīng)用間的通信通過輸入通道(input channel)和輸出通道(output channel)完成。這些通道是有Spring Cloud Stream 注入的。而通道與外部的代理(可以理解為上文所說的數(shù)據(jù)中心)的連接又是通過Binder實現(xiàn)的。
二、Stream 的快速入門
2.1 編輯消費者
2.1.1 導(dǎo)入相關(guān)依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2.1.2 編寫配置文件
spring: rabbitmq: host: 192.168.31.138 port: 5672 username: test password: test virtual-host: /test
2.1.3 聲明 channel(通道)
通過 @Input() 注解來指定所要聲明的通道。
public interface StreamClient { @Input("myMessage") SubscribableChannel input(); }
被 @Input 和@Output 注解的方法。其中 @Input 注解的方法返回的是 SubscribableChannel ,@Output 注解的方法返回的是 MessageChannel 。
聲明通道(channel)的方法就是使用 @Input 和 @Output 注解方法。你想要多少通道就注解多少方法。
默認情況下,通道的名稱就是注解的方法的名稱,如果需要自己指定,只需要給這兩個注解傳遞 String 類型的參數(shù)即可。
使用@Input或者@Output注解聲明了通道(channel)的接口。Spring Cloud Stream會自動實現(xiàn)這些接口。
2.1.4 創(chuàng)建和綁定 channel(通道)
使用 @EnableBinding 就能創(chuàng)建和綁定通道(channel)。
@SpringBootApplication @EnableEurekaClient @EnableBinding(StreamClient.class) public class SearchApplication { public static void main(String[] args) { SpringApplication.run(SearchApplication.class,args); } }
@EnableBinding 注解接收的參數(shù)就是使用 @Input 或者 @Output 注解聲明了通道(channel)的接口。
2.1.5 消費消息
@StreamListener 接收的參數(shù)是要處理的通道(channel)的名,所注解的方法就是處理從通道獲取到的數(shù)據(jù)的方法。方法的參數(shù)就是獲取到的數(shù)據(jù)。
@Component public class StreamReceiver { @StreamListener("myMessage") public void msg(Object msg){ System.out.println("接收到消息:"+msg); } }
2.2 編輯生產(chǎn)者
2.2.1 導(dǎo)入相關(guān)依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2.2.2 編寫配置文件
spring: rabbitmq: host: 192.168.31.138 port: 5672 username: test password: test virtual-host: /test
2.2.3 聲明 channel(通道)
public interface StreamClient { @Output("myMessage") MessageChannel output(); }
2.2.4 創(chuàng)建和綁定
@SpringBootApplication ....... @EnableBinding(StreamClient.class) public class CustomerApplication { public static void main(String[] args) { SpringApplication.run(CustomerApplication.class,args); } ........ }
2.2.5 生產(chǎn)消息
@RestController public class MessageController { @Autowired private StreamClient streamClient; @GetMapping("/send") public String send(){ streamClient.output().send(MessageBuilder.withPayload("Hello stream!!!!!").build()); return "消息發(fā)送成功!"; } }
2.3 測試
三、Stream 重復(fù)消費消息
避免一個消息被多個消費者消費,只需要將多個消費者指定為一個消費者組即可。
**消費組:**直觀的理解就是一群消費者一起處理消息(每個發(fā)送到消費組的數(shù)據(jù),僅由消費組中的一個消費者處理)。
spring: cloud: stream: bindings: myMessage: #指定channel group: customer #指定消費者組
四、Stream 消費者的手動 ACK
4.1 編寫配置文件
spring: cloud: stream: rabbit: bindings: myMessage: #指定 channel name consumer: acknowledgeMode: MANUAL # 指定規(guī)則默認 AUTO
4.2 修改消費消息的方法
消息是帶有 Header 的,類似 Http 的 headler,我們可以通過 @Header 來獲取指定的 Header。
@Component public class StreamReceiver { @StreamListener("myMessage") public void msg(Object msg, @Header(name = AmqpHeaders.CHANNEL) Channel channel, @Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException { System.out.println("接收到消息:"+msg); channel.basicAck(deliveryTag,false); } }
到此這篇關(guān)于SpringCloud中的Stream服務(wù)間消息傳遞詳解的文章就介紹到這了,更多相關(guān)SpringCloud的Stream內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringCloudStream原理和深入使用小結(jié)
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動消息確認問題
- SpringCloudStream中的消息分區(qū)數(shù)詳解
- 關(guān)于SpringCloudStream配置問題
- Spring Cloud Stream 高級特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實現(xiàn)分布式事務(wù)的實踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
Java的ConcurrentLinkedQueue源碼分析
這篇文章主要介紹了Java的ConcurrentLinkedQueue源碼分析,ConcurrentLinkedQueue 是一個基于鏈接節(jié)點的無界線程安全的隊列,當(dāng)我們添加一個元素的時候,它會添加到隊列的尾部,當(dāng)我們獲取一個元素時,它會返回隊列頭部的元素,需要的朋友可以參考下2023-12-12