spring-cloud-stream結(jié)合kafka使用詳解
1.pom文件導(dǎo)入依賴
<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
2.application.yml文件配置
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中間件服務(wù)器地址
bindings:
xxx_output: // 通道名稱
destination: xxx // 消息發(fā)往的目的地,對(duì)應(yīng)topic 在發(fā)送消息的配置里面,group是不用配置的
// 如果我們需要傳輸json的信息,那么在發(fā)送消息端需要設(shè)置content-type為json(其實(shí)可以不寫,默認(rèn)content-type就是json)
xxx_input:
destination: xxx // 消息發(fā)往的目的地,對(duì)應(yīng)topic
group: xxx // 對(duì)應(yīng)kafka的group
3.創(chuàng)建消息發(fā)送者
@EnableBinding(Source.class) // @EnableBinding 是綁定通道的,Soure.class是spring 提供的,表示這是一個(gè)可綁定的發(fā)布通道
@Service
public class MqService {
@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
private MessageChannel oesWorkbenchChannel;
/**
* 發(fā)送一條kafka消息
*/
public boolean sendLifeData(Object object) {
return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
}
}
// 發(fā)布通道
public interface Source {
@Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
MessageChannel oesWorkbenchLifeDataOutput(); // 發(fā)布通道用MessageChannel
}
4.創(chuàng)建消息監(jiān)聽者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@StreamListener(KafkaConstants.xxx_input) // 監(jiān)聽接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}
接下來就可以愉快的發(fā)送監(jiān)聽消息了
到此這篇關(guān)于spring-cloud-stream結(jié)合kafka使用詳解的文章就介紹到這了,更多相關(guān)spring-cloud-stream整合kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 代理模式及動(dòng)態(tài)代理機(jī)制深入分析
這篇文章主要介紹了java 代理模式及動(dòng)態(tài)代理機(jī)制深入分析的相關(guān)資料, 代理是一種常用的設(shè)計(jì)模式,其目的就是為其他對(duì)象提供一個(gè)代理以控制對(duì)某個(gè)對(duì)象的訪問,需要的朋友可以參考下2017-03-03
基于RabbitMQ的簡(jiǎn)單應(yīng)用(詳解)
下面小編就為大家分享一篇基于RabbitMQ的簡(jiǎn)單應(yīng)用(詳解),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2017-11-11
SpringBoot集成Spring Data JPA及讀寫分離
這篇文章主要介紹了SpringBoot集成Spring Data JPA及讀寫分離的相關(guān)知識(shí),需要的朋友可以參考下2017-04-04
淺談mybatis中的#和$的區(qū)別 以及防止sql注入的方法
下面小編就為大家?guī)硪黄獪\談mybatis中的#和$的區(qū)別 以及防止sql注入的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-10-10
java中BeanUtils.copyProperties的用法(超詳細(xì))
本文介紹了BeanUtils.copyProperties()方法的使用,包括其功能、用法、注意事項(xiàng)和示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08
基于SpringBoot和Vue實(shí)現(xiàn)分片上傳系統(tǒng)
最近想做一個(gè)關(guān)于文件上傳的個(gè)人小網(wǎng)盤,一開始嘗試使用了OSS的方案,但是該方案對(duì)于大文件來說并不友好,所以開始嘗試分片上傳方案的探索,接下來小編給大家詳細(xì)的介紹一下如何基于SpringBoot和Vue實(shí)現(xiàn)分片上傳系統(tǒng),需要的朋友可以參考下2023-12-12

