欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot項(xiàng)目配置多個(gè)kafka的示例代碼

 更新時(shí)間:2023年04月24日 14:44:41   作者:CccccDi  
這篇文章主要介紹了springboot項(xiàng)目配置多個(gè)kafka,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

1.spring-kafka

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>

2.配置文件相關(guān)信息

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并發(fā)消費(fèi)的線程數(shù) (通常與partition數(shù)量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
        
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false

3.kafka配置類

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${kafka.consumer.enable.auto.commit}")
    private String autoCommit;

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;


    @Value("${kafka.consumer.group.id.pic}")
    private String groupIdPic;

    @Value("${kafka.consumer.concurrency.pic}")
    private int concurrencyPic;

    @Value("${kafka.consumer.enable.auto.commit.pic}")
    private String autoCommitPic;

    @Value("${kafka.bootstrap-servers.pic}")
    private String bootstrapServerPic;


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        String bootstrapServers = bootstrapServer;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }




    @Bean
    public ConsumerFactory<String, String> consumerFactoryPic() {
        String bootstrapServers = bootstrapServerPic;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryPic());
        factory.setConcurrency(concurrencyPic);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

4.消費(fèi)主題消息

@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            String jsonString = message.value();
            if (StringUtils.isNoneBlank(jsonString)) {
                log.info("消費(fèi):{}",jsonString);
                //TODO ....
            }
        } catch (Exception e) {
            log.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            if (StringUtils.isNoneBlank(message.value())) {
                  //TODO ....
            }
        } catch (Exception e) {
            logger.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

到此這篇關(guān)于springboot項(xiàng)目配置多個(gè)kafka的文章就介紹到這了,更多相關(guān)springboot配置多個(gè)kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例

    Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例

    這篇文章主要為大家介紹了Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • Jenkins發(fā)送測試報(bào)告郵件過程詳解

    Jenkins發(fā)送測試報(bào)告郵件過程詳解

    這篇文章主要介紹了Jenkins發(fā)送測試報(bào)告郵件過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 基于OpenCv與JVM實(shí)現(xiàn)加載保存圖像功能(JAVA?圖像處理)

    基于OpenCv與JVM實(shí)現(xiàn)加載保存圖像功能(JAVA?圖像處理)

    openCv有一個(gè)名imread的簡單函數(shù),用于從文件中讀取圖像,本文給大家介紹JAVA?圖像處理基于OpenCv與JVM實(shí)現(xiàn)加載保存圖像功能,感興趣的朋友一起看看吧
    2022-01-01
  • 查找jdk安裝路徑并且切換多版本jdk的詳細(xì)步驟

    查找jdk安裝路徑并且切換多版本jdk的詳細(xì)步驟

    在日常的工作學(xué)習(xí)中可能需要用到不同版本的jdk,下面這篇文章主要給大家介紹了關(guān)于查找jdk安裝路徑并且切換多版本jdk的詳細(xì)步驟,文中介紹的非常詳細(xì),需要的朋友可以參考下
    2024-01-01
  • java實(shí)現(xiàn)秒表功能

    java實(shí)現(xiàn)秒表功能

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)秒表功能,利用javax.swing.Timer類設(shè)計(jì)實(shí)現(xiàn)秒表應(yīng)用程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • Java面試synchronized偏向鎖后hashcode存址

    Java面試synchronized偏向鎖后hashcode存址

    這篇文章主要為大家介紹了Java面試中synchronized偏向鎖后hashcode存址詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Java集合之LinkedHashSet類詳解

    Java集合之LinkedHashSet類詳解

    這篇文章主要介紹了Java集合之LinkedHashSet類詳解,LinkedHashSet 是 Java 中的一個(gè)集合類,它是 HashSet 的子類,并實(shí)現(xiàn)了 Set 接口,與 HashSet 不同的是,LinkedHashSet 保留了元素插入的順序,并且具有 HashSet 的快速查找特性,需要的朋友可以參考下
    2023-09-09
  • MyBatis-Plus雪花算法實(shí)現(xiàn)源碼解讀

    MyBatis-Plus雪花算法實(shí)現(xiàn)源碼解讀

    雪花算法是一種用于生成唯一標(biāo)識(shí)符(ID)的分布式算法,雪花算法的設(shè)計(jì)目標(biāo)是在分布式系統(tǒng)中生成全局唯一的ID,同時(shí)保證ID的有序性和趨勢遞增,這篇文章主要介紹了MyBatis-Plus雪花算法實(shí)現(xiàn)源碼解析,需要的朋友可以參考下
    2023-12-12
  • JAVA中數(shù)組從小到大排序的2種方法實(shí)例

    JAVA中數(shù)組從小到大排序的2種方法實(shí)例

    JAVA中在運(yùn)用數(shù)組進(jìn)行排序功能時(shí)一般有多種解決方案,下面這篇文章主要給大家介紹了關(guān)于JAVA中數(shù)組從小到大排序的2種方法,文中都給出了詳細(xì)的實(shí)例代碼,需要的朋友可以參考下
    2023-03-03
  • 深入了解Spring中最常用的11個(gè)擴(kuò)展點(diǎn)

    深入了解Spring中最常用的11個(gè)擴(kuò)展點(diǎn)

    我們一說到spring,可能第一個(gè)想到的是?IOC(控制反轉(zhuǎn))?和?AOP(面向切面編程)。除此之外,我們?cè)谑褂胹pring的過程中,有沒有發(fā)現(xiàn)它的擴(kuò)展能力非常強(qiáng)。今天就來跟大家一起聊聊,在Spring中最常用的11個(gè)擴(kuò)展點(diǎn)
    2022-09-09

最新評(píng)論