欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解

 更新時(shí)間:2018年11月05日 10:32:14   作者:banq  
這篇文章主要介紹了在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

第1步:生成我們的項(xiàng)目: Spring Initializr 來生成我們的項(xiàng)目。我們的項(xiàng)目將提供Spring MVC / Web支持和Apache Kafka支持。

第2步:發(fā)布/讀取Kafka主題中的消息:

<b>public</b> <b>class</b> User {
  <b>private</b> String name;
  <b>private</b> <b>int</b> age;
  <b>public</b> User(String name, <b>int</b> age) {
    <b>this</b>.name = name;
    <b>this</b>.age = age;
  }
}

第3步:通過application.yml配置文件配置Kafka:

我們需要?jiǎng)?chuàng)建配置文件。我們需要以某種方式配置我們的Kafka生產(chǎn)者和消費(fèi)者,以便能夠發(fā)布和讀取與主題相關(guān)的消息。相比建立一個(gè)使用@Configuration標(biāo)注的Java類,我們可以直接使用配置文件application.properties或application.yml。Spring Boot讓我們避免像過去一樣編寫的所有樣板代碼,同時(shí)為我們提供了更加智能的配置應(yīng)用程序的方法,如下所示:

server: port: 9000
spring:
  kafka:
   consumer:
    bootstrap: localhost:9092
    group-id: group_id
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
    bootstrap: localhost:9092
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

第4步:創(chuàng)建一個(gè)生產(chǎn)者,創(chuàng)建生產(chǎn)者會(huì)將我們的消息寫入該主題。

<b>public</b> <b>class</b> Producer {
  <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);
  <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>;
  @Autowired
  <b>private</b> KafkaTemplate<String, String> kafkaTemplate;
  <b>public</b> <b>void</b> sendMessage(String message) {
    logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message));
    <b>this</b>.kafkaTemplate.send(TOPIC, message);
  }
}
</font>

自動(dòng)連接autowireKafkaTemplate ,使用它將消息發(fā)布到主題 - 這就是消息的生產(chǎn)者!

第5步:創(chuàng)建一個(gè)消費(fèi)者,消費(fèi)者是負(fù)責(zé)根據(jù)您自己的業(yè)務(wù)邏輯的需求閱讀處理消息的消息的服務(wù)。要進(jìn)行設(shè)置,請(qǐng)輸入以下內(nèi)容:

@Service
<b>public</b> <b>class</b> Consumer {

  <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);

  @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>)
  <b>public</b> <b>void</b> consume(String message) throws IOException {
    logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message));
  }
}
</font>

在這里,我們告訴我們的方法void consume(String message)訂閱用戶的主題,并將每條消息發(fā)送到應(yīng)用程序日志。在您的實(shí)際應(yīng)用程序中,您可以按照業(yè)務(wù)需要的方式處理消息。

第6步:創(chuàng)建REST控制器,們已經(jīng)擁有了能夠消費(fèi)Kafka消息所需的全部內(nèi)容。

為了充分展示我們創(chuàng)建的所有內(nèi)容的工作原理,我們需要?jiǎng)?chuàng)建一個(gè)具有單一端點(diǎn)的控制器。消息將發(fā)布到此端點(diǎn),然后由我們的生產(chǎn)者處理。然后,我們的消費(fèi)者將通過登錄到控制臺(tái)來捕獲并處理它。

@RestController
@RequestMapping(value = <font>"/kafka"</font><font>)
<b>public</b> <b>class</b> KafkaController {

  <b>private</b> <b>final</b> Producer producer;

  @Autowired
  KafkaController(Producer producer) {
    <b>this</b>.producer = producer;
  }

  @PostMapping(value = </font><font>"/publish"</font><font>)
  <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) {
    <b>this</b>.producer.sendMessage(message);
  }
}
</font>

讓我們使用cURL將消息發(fā)送給Kafka:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

基本上就是這樣!在不到10個(gè)步驟中,您了解了將Apache Kafka添加到Spring Boot項(xiàng)目是多么容易。如果您遵循本指南,您現(xiàn)在知道如何將Kafka集成到Spring Boot項(xiàng)目中,并且您已準(zhǔn)備好使用這個(gè)超級(jí)工具!

總結(jié)

以上所述是小編給大家介紹的在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!

相關(guān)文章

  • 詳談spring中bean注入無效和new創(chuàng)建對(duì)象的區(qū)別

    詳談spring中bean注入無效和new創(chuàng)建對(duì)象的區(qū)別

    這篇文章主要介紹了spring中bean注入無效和new創(chuàng)建對(duì)象的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • Mybatis-plus如何更新Null字段詳解

    Mybatis-plus如何更新Null字段詳解

    MyBatis-plus在進(jìn)行更新操作時(shí)不會(huì)更新傳入實(shí)體中為null或默認(rèn)值屬性字段,只更新不為null的值、非默認(rèn)值的屬性字段,這篇文章主要給大家介紹了關(guān)于Mybatis-plus如何更新Null字段的相關(guān)資料,需要的朋友可以參考下
    2023-07-07
  • SpringBoot中的文件上傳和異常處理詳解

    SpringBoot中的文件上傳和異常處理詳解

    這篇文章主要介紹了SpringBoot中的文件上傳和異常處理詳解,對(duì)于機(jī)器客戶端,它將生成JSON響應(yīng),其中包含錯(cuò)誤,HTTP狀態(tài)和異常消息的詳細(xì)信息,對(duì)于瀏覽器客戶端,響應(yīng)一個(gè)"whitelabel"錯(cuò)誤視圖,以HTML格式呈現(xiàn)相同的數(shù)據(jù),需要的朋友可以參考下
    2023-09-09
  • Spark使用IDEA編寫wordcount的示例演示

    Spark使用IDEA編寫wordcount的示例演示

    這篇文章主要介紹了Spark使用IDEA編寫wordcount的示例演示,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-07-07
  • 解決Mybatis中mapper.xml文件update,delete及insert返回值問題

    解決Mybatis中mapper.xml文件update,delete及insert返回值問題

    這篇文章主要介紹了解決Mybatis中mapper.xml文件update,delete及insert返回值問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • Seata?環(huán)境搭建部署過程

    Seata?環(huán)境搭建部署過程

    Seata是一個(gè)分布式事務(wù),seata服務(wù)端也是一個(gè)微服務(wù),需要和其他微服務(wù)一樣需要注冊(cè)中心和配置中心,這篇文章主要介紹了Seata?環(huán)境搭建,需要的朋友可以參考下
    2022-10-10
  • 聊聊mybatis sql的括號(hào)問題

    聊聊mybatis sql的括號(hào)問題

    這篇文章主要介紹了mybatis sql的括號(hào)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動(dòng)jar包下載和使用方法

    Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動(dòng)jar包下載和使用方法

    這篇文章主要給大家介紹了關(guān)于Java連接mysql數(shù)據(jù)庫以及mysql驅(qū)動(dòng)jar包下載和使用方法,MySQL是一款常用的關(guān)系型數(shù)據(jù)庫,它的JDBC驅(qū)動(dòng)程序使得我們可以通過Java程序連接MySQL數(shù)據(jù)庫進(jìn)行數(shù)據(jù)操作,需要的朋友可以參考下
    2023-11-11
  • Java?@Validated遇到的大坑與處理

    Java?@Validated遇到的大坑與處理

    這篇文章主要介紹了Java?@Validated遇到的大坑與處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • springboot使用redisRepository和redistemplate操作redis的過程解析

    springboot使用redisRepository和redistemplate操作redis的過程解析

    本文給大家介紹springboot整合redis/分別用redisRepository和redistemplate操作redis,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2022-05-05

最新評(píng)論