欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java集成kafka實(shí)例代碼

 更新時(shí)間:2024年12月30日 14:15:59   作者:沉墨的夜  
文章介紹了如何在Java項(xiàng)目中集成Apache Kafka以實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi),通過添加Maven依賴、配置生產(chǎn)者和消費(fèi)者、使用SpringBoot簡化集成以及控制消費(fèi)者的啟動(dòng)和停止,可以實(shí)現(xiàn)高效的消息處理

java集成kafka

要在 Java 項(xiàng)目中集成 Apache Kafka 以實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi),步驟如下:

1. 引入 Maven 依賴

在您的 pom.xml 文件中添加以下依賴,以包含 Kafka 客戶端庫:

<dependencies>
    <!-- Kafka Clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
    <!-- 如果使用 Spring Boot,可添加以下依賴 -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.7.0</version>
    </dependency>
</dependencies>

2. 配置 Kafka 生產(chǎn)者

首先,設(shè)置生產(chǎn)者的配置屬性:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置屬性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 創(chuàng)建生產(chǎn)者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);
            producer.send(record);
        }

        // 關(guān)閉生產(chǎn)者
        producer.close();
    }
}

3. 配置 Kafka 消費(fèi)者

接下來,設(shè)置消費(fèi)者的配置屬性,并訂閱主題以消費(fèi)消息:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置屬性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 創(chuàng)建消費(fèi)者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 訂閱主題
        consumer.subscribe(Collections.singletonList("your_topic"));

        // 持續(xù)消費(fèi)消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("Consumed message: key = %s, value = %s, offset = %d%n",
                            record.key(), record.value(), record.offset());
                });
            }
        } finally {
            // 關(guān)閉消費(fèi)者
            consumer.close();
        }
    }
}

4. 使用 Spring Boot 集成 Kafka

如果您使用 Spring Boot,可以通過配置 KafkaTemplate(用于生產(chǎn)消息)和使用 @KafkaListener 注解(用于消費(fèi)消息)來簡化 Kafka 的集成。

生產(chǎn)者配置:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

使用 KafkaTemplate 發(fā)送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
}

消費(fèi)者配置:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

使用 @KafkaListener 消費(fèi)消息:

在 Spring Boot 中,@KafkaListener 注解用于監(jiān)聽指定的 Kafka 主題,并在收到消息時(shí)觸發(fā)相應(yīng)的方法。

以下是一個(gè)基本示例:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "your_topic", groupId = "your_group_id")
    public void listen(String message) {
        System.out.println("Received message: " + message);
        // 在此處添加處理邏輯
    }
}

 

在上述代碼中:

  • topics:指定要監(jiān)聽的 Kafka 主題。
  • groupId:指定消費(fèi)者組 ID。

listen 方法:當(dāng)有新消息發(fā)布到指定主題時(shí),該方法會(huì)被調(diào)用,message 參數(shù)包含消息的內(nèi)容。

批量消費(fèi)消息

如果希望一次處理多條消息,可以啟用批量監(jiān)聽。

首先,需要配置一個(gè)支持批量消費(fèi)的 KafkaListenerContainerFactory

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true); // 啟用批量監(jiān)聽
        return factory;
    }
}

然后,在消費(fèi)者服務(wù)中使用 @KafkaListener 注解,并指定使用上述配置的工廠:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class KafkaBatchConsumerService {

    @KafkaListener(
        topics = "your_topic",
        groupId = "your_group_id",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void listen(List<String> messages) {
        System.out.println("Received batch messages: " + messages);
        // 在此處添加批量處理邏輯
    }
}

在上述代碼中:

  • containerFactory:指定使用支持批量消費(fèi)的工廠。

listen 方法的參數(shù)類型為 List<String>,用于接收一批消息。

控制消費(fèi)者的啟動(dòng)和停止

在某些情況下,可能需要在運(yùn)行時(shí)控制 Kafka 消費(fèi)者的啟動(dòng)和停止。

可以通過 KafkaListenerEndpointRegistry 來實(shí)現(xiàn):

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class KafkaListenerManager {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    // 啟動(dòng)監(jiān)聽器
    public void startListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && !listenerContainer.isRunning()) {
            listenerContainer.start();
        }
    }

    // 停止監(jiān)聽器
    public void stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && listenerContainer.isRunning()) {
            listenerContainer.stop();
        }
    }
}

在上述代碼中:

  • startListener 方法用于啟動(dòng)指定的監(jiān)聽器。
  • stopListener 方法用于停止指定的監(jiān)聽器。
  • listenerId 對應(yīng)于 @KafkaListener 注解中的 id 屬性。

通過這種方式,可以在應(yīng)用運(yùn)行時(shí)根據(jù)需要?jiǎng)討B(tài)地控制 Kafka 消費(fèi)者的行為。

通過上述配置和代碼示例,可以在 Spring Boot 項(xiàng)目中有效地集成 Kafka,實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)功能。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法

    Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法

    今天小編就為大家分享一篇Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-06-06
  • Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)二維數(shù)組與稀疏數(shù)組轉(zhuǎn)換詳解

    Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)二維數(shù)組與稀疏數(shù)組轉(zhuǎn)換詳解

    稀疏數(shù)組是用于優(yōu)化,壓縮具有以下特點(diǎn)的二維數(shù)組:當(dāng)二維數(shù)組中的元素大部分相同,有意義的數(shù)據(jù)元素較少時(shí),可以使用稀疏數(shù)組進(jìn)行簡化,節(jié)省存儲(chǔ)空間
    2021-10-10
  • Java項(xiàng)目有中多個(gè)線程如何查找死鎖

    Java項(xiàng)目有中多個(gè)線程如何查找死鎖

    這篇文章主要介紹了Java項(xiàng)目有中多個(gè)線程如何查找死鎖,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄

    JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄

    這篇文章主要介紹了JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • springboot內(nèi)嵌Tomcat安全漏洞修復(fù)方式

    springboot內(nèi)嵌Tomcat安全漏洞修復(fù)方式

    針對CVE-2020-1938漏洞,建議升級Tomcat至安全版本以避免受影響,影響版本包括:Apache Tomcat 9.x小于9.0.31、Apache Tomcat 8.x小于8.5.51、Apache Tomcat 7.x小于7.0.100及Apache Tomcat 6.x,
    2024-10-10
  • Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之二叉樹

    Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之二叉樹

    今天給大家?guī)淼氖顷P(guān)于Java數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識(shí),文章圍繞著Java二叉樹展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • Java并發(fā)編程之詳解CyclicBarrier線程同步

    Java并發(fā)編程之詳解CyclicBarrier線程同步

    在之前的文章中已經(jīng)為大家介紹了java并發(fā)編程的工具:BlockingQueue接口,ArrayBlockingQueue,DelayQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,BlockingDeque接口,ConcurrentHashMap,CountDownLatch,本文為系列文章第十篇,需要的朋友可以參考下
    2021-06-06
  • idea自帶database連接mysql失敗問題的解決辦法

    idea自帶database連接mysql失敗問題的解決辦法

    在IDEA?帶的數(shù)據(jù)庫連接?具中,可以連接MySQL數(shù)據(jù)庫,但是有的時(shí)候連接出現(xiàn)錯(cuò)誤,連接不上數(shù)據(jù)庫,下面這篇文章主要給大家介紹了關(guān)于idea自帶database連接mysql失敗問題的解決辦法,需要的朋友可以參考下
    2023-06-06
  • java銀行管理系統(tǒng)源碼

    java銀行管理系統(tǒng)源碼

    這篇文章主要為大家詳細(xì)介紹了java銀行管理系統(tǒng)源碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • 基于MyBatis的簡單使用(推薦)

    基于MyBatis的簡單使用(推薦)

    下面小編就為大家?guī)硪黄贛yBatis的簡單使用(推薦)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-10-10

最新評論