在Spring Boot應用程序中使用Apache Kafka的方法步驟詳解
第1步:生成我們的項目: Spring Initializr 來生成我們的項目。我們的項目將提供Spring MVC / Web支持和Apache Kafka支持。
第2步:發(fā)布/讀取Kafka主題中的消息:
<b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age; <b>public</b> User(String name, <b>int</b> age) { <b>this</b>.name = name; <b>this</b>.age = age; } }
第3步:通過application.yml
配置文件配置Kafka:
我們需要創(chuàng)建配置文件。我們需要以某種方式配置我們的Kafka生產(chǎn)者和消費者,以便能夠發(fā)布和讀取與主題相關的消息。相比建立一個使用@Configuration
標注的Java類,我們可以直接使用配置文件application.properties或application.yml。Spring Boot讓我們避免像過去一樣編寫的所有樣板代碼,同時為我們提供了更加智能的配置應用程序的方法,如下所示:
server: port: 9000 spring: kafka: consumer: bootstrap: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
第4步:創(chuàng)建一個生產(chǎn)者,創(chuàng)建生產(chǎn)者會將我們的消息寫入該主題。
<b>public</b> <b>class</b> Producer { <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>; @Autowired <b>private</b> KafkaTemplate<String, String> kafkaTemplate; <b>public</b> <b>void</b> sendMessage(String message) { logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message)); <b>this</b>.kafkaTemplate.send(TOPIC, message); } } </font>
自動連接autowire
到 KafkaTemplate
,使用它將消息發(fā)布到主題 - 這就是消息的生產(chǎn)者!
第5步:創(chuàng)建一個消費者,消費者是負責根據(jù)您自己的業(yè)務邏輯的需求閱讀處理消息的消息的服務。要進行設置,請輸入以下內(nèi)容:
@Service <b>public</b> <b>class</b> Consumer { <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>) <b>public</b> <b>void</b> consume(String message) throws IOException { logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message)); } } </font>
在這里,我們告訴我們的方法void consume(String message)
訂閱用戶的主題,并將每條消息發(fā)送到應用程序日志。在您的實際應用程序中,您可以按照業(yè)務需要的方式處理消息。
第6步:創(chuàng)建REST控制器,們已經(jīng)擁有了能夠消費Kafka消息所需的全部內(nèi)容。
為了充分展示我們創(chuàng)建的所有內(nèi)容的工作原理,我們需要創(chuàng)建一個具有單一端點的控制器。消息將發(fā)布到此端點,然后由我們的生產(chǎn)者處理。然后,我們的消費者將通過登錄到控制臺來捕獲并處理它。
@RestController @RequestMapping(value = <font>"/kafka"</font><font>) <b>public</b> <b>class</b> KafkaController { <b>private</b> <b>final</b> Producer producer; @Autowired KafkaController(Producer producer) { <b>this</b>.producer = producer; } @PostMapping(value = </font><font>"/publish"</font><font>) <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) { <b>this</b>.producer.sendMessage(message); } } </font>
讓我們使用cURL將消息發(fā)送給Kafka:
curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
基本上就是這樣!在不到10個步驟中,您了解了將Apache Kafka添加到Spring Boot項目是多么容易。如果您遵循本指南,您現(xiàn)在知道如何將Kafka集成到Spring Boot項目中,并且您已準備好使用這個超級工具!
總結(jié)
以上所述是小編給大家介紹的在Spring Boot應用程序中使用Apache Kafka的方法步驟詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關文章
詳談spring中bean注入無效和new創(chuàng)建對象的區(qū)別
這篇文章主要介紹了spring中bean注入無效和new創(chuàng)建對象的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02解決Mybatis中mapper.xml文件update,delete及insert返回值問題
這篇文章主要介紹了解決Mybatis中mapper.xml文件update,delete及insert返回值問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動jar包下載和使用方法
這篇文章主要給大家介紹了關于Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動jar包下載和使用方法,MySQL是一款常用的關系型數(shù)據(jù)庫,它的JDBC驅(qū)動程序使得我們可以通過Java程序連接MySQL數(shù)據(jù)庫進行數(shù)據(jù)操作,需要的朋友可以參考下2023-11-11springboot使用redisRepository和redistemplate操作redis的過程解析
本文給大家介紹springboot整合redis/分別用redisRepository和redistemplate操作redis,本文結(jié)合實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2022-05-05