欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

聊聊Spring?Boot如何配置多個(gè)Kafka數(shù)據(jù)源

 更新時(shí)間:2023年10月20日 09:35:50   作者:Mr?Tang  
這篇文章主要介紹了Spring?Boot配置多個(gè)Kafka數(shù)據(jù)源的相關(guān)知識,包括生產(chǎn)者、消費(fèi)者配置,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧

一、配置文件

application.properties配置文件如下

#kafka多數(shù)據(jù)源配置
#kafka數(shù)據(jù)源一,日志審計(jì)推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka數(shù)據(jù)源二,動(dòng)環(huán)數(shù)據(jù)消費(fèi)
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true

二、pom依賴

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

三、生產(chǎn)者、消費(fèi)者配置

1.第一個(gè)kakfa

package com.gstanzer.convert.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaOneConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.producer.retries}")
    private String retries;
    @Value("${spring.kafka.one.producer.properties.max.block.ms}")
    private String maxBlockMs;
    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

2.第二個(gè)kakfa

package com.gstanzer.convert.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaTwoConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.producer.retries}")
    private String retries;
    @Value("${spring.kafka.two.producer.properties.max.block.ms}")
    private String maxBlockMs;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

四.生產(chǎn)者

@Controller
public class TestController {
    @Autowired
    private KafkaTemplate kafkaOneTemplate;
    @Autowired
    private KafkaTemplate kafkaTwoTemplate;
    @RequestMapping("/send")
    @ResponseBody
    public String send() {
        final String TOPIC = "TOPIC_1";
        kafkaOneTemplate.send(TOPIC, "kafka one");
        kafkaTwoTemplate.send(TOPIC, "kafka two");
        return "success";
    }
}

五.消費(fèi)者

@Component
public class KafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
    final String TOPIC = "TOPIC_1";
    // containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同
    @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
    public void listenerOne(ConsumerRecord<?, ?> record) {
        LOGGER.info(" kafka one 接收到消息:{}", record.value());
    }
    @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
    public void listenerTwo(ConsumerRecord<?, ?> record) {
        LOGGER.info(" kafka two 接收到消息:{}", record.value());
    }
}

備注:

生產(chǎn)者消費(fèi)者代碼參考鏈接,開發(fā)同學(xué)需要以實(shí)際情況按要求自己變更下代碼即可:

Spring Boot 集成多個(gè) Kafka_springboot集成多個(gè)kafka

到此這篇關(guān)于Spring Boot配置多個(gè)Kafka數(shù)據(jù)源的文章就介紹到這了,更多相關(guān)Spring Boot配置Kafka數(shù)據(jù)源內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 手把手教你SpringBoot過濾器N種注冊方式

    手把手教你SpringBoot過濾器N種注冊方式

    這篇文章主要介紹了手把手教你SpringBoot過濾器N種注冊方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • 通過Java壓縮JavaScript代碼實(shí)例分享

    通過Java壓縮JavaScript代碼實(shí)例分享

    這篇文章主要介紹了通過Java壓縮JavaScript代碼實(shí)例分享,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-12-12
  • Java求字符串長度的方法舉例

    Java求字符串長度的方法舉例

    這篇文章主要給大家介紹了關(guān)于Java求字符串長度的相關(guān)資料,Java中的字符串是一種常見的數(shù)據(jù)類型,用于表示文本數(shù)據(jù),文中給出了詳細(xì)的代碼實(shí)例,需要的朋友可以參考下
    2023-10-10
  • Java使用ThreadLocal實(shí)現(xiàn)當(dāng)前登錄信息的存取功能

    Java使用ThreadLocal實(shí)現(xiàn)當(dāng)前登錄信息的存取功能

    ThreadLocal和其他并發(fā)工具一樣,也是用于解決多線程并發(fā)訪問,下這篇文章主要給大家介紹了關(guān)于Java使用ThreadLocal實(shí)現(xiàn)當(dāng)前登錄信息的存取功能,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • IDEA 插件 mapper和xml互相跳轉(zhuǎn)操作

    IDEA 插件 mapper和xml互相跳轉(zhuǎn)操作

    這篇文章主要介紹了IDEA 插件 mapper和xml互相跳轉(zhuǎn)操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • 5個(gè)并發(fā)處理技巧代碼示例

    5個(gè)并發(fā)處理技巧代碼示例

    這篇文章主要介紹了5個(gè)并發(fā)處理技巧代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • 使用java實(shí)現(xiàn)各種數(shù)據(jù)統(tǒng)計(jì)圖(柱形圖,餅圖,折線圖)

    使用java實(shí)現(xiàn)各種數(shù)據(jù)統(tǒng)計(jì)圖(柱形圖,餅圖,折線圖)

    用Jfree實(shí)現(xiàn)條形柱狀圖表,java代碼實(shí)現(xiàn)??山?jīng)常用于報(bào)表的制作,代碼自動(dòng)生成后可以自由查看??梢宰杂膳渲脠D表的各個(gè)屬性,用來達(dá)到自己的要求和目的。本文給大家介紹使用java實(shí)現(xiàn)各種數(shù)據(jù)統(tǒng)計(jì)圖(柱形圖,餅圖,折線圖),需要的朋友可以參考下
    2015-10-10
  • 詳解idea maven nexus 常見命令配置

    詳解idea maven nexus 常見命令配置

    這篇文章主要介紹了idea maven nexus 常見命令配置的相關(guān)知識,通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • Java那點(diǎn)兒事之Map集合不為人知的秘密有哪些

    Java那點(diǎn)兒事之Map集合不為人知的秘密有哪些

    Map用于保存具有映射關(guān)系的數(shù)據(jù),Map集合里保存著兩組值,一組用于保存Map的key,另一組保存著Map的value,和查字典類似,通過key找到對應(yīng)的value,通過頁數(shù)找到對應(yīng)的信息。用學(xué)生類來說,key相當(dāng)于學(xué)號,value對應(yīng)name,age,sex等信息。用這種對應(yīng)關(guān)系方便查找
    2021-10-10
  • Java?Timer使用講解

    Java?Timer使用講解

    Timer是一種工具,線程用其安排以后在后臺線程中執(zhí)行的任務(wù)??砂才湃蝿?wù)執(zhí)行一次,或者定期重復(fù)執(zhí)行,這篇文章主要介紹了Java?Timer使用講解,需要的朋友可以參考下
    2022-11-11

最新評論