SpringBoot實(shí)現(xiàn)kafka多源配置的示例代碼
背景
實(shí)際開發(fā)中,不同的topic可能來自不同的集群,所以就需要配置不同的kafka數(shù)據(jù)源,基于springboot自動(dòng)配置的思想,最終通過配置文件的配置,自動(dòng)生成生產(chǎn)者及消費(fèi)者的配置。
核心配置
自動(dòng)化配置類
import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
public MyKafkaAutoConfiguration() {
}
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
beanFactory.getBean(CustomKafkaDataSourceRegister.class);
}
}
注冊(cè)生產(chǎn)者、消費(fèi)者核心bean到spring
public void afterPropertiesSet() {
Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
if (factories != null && !factories.isEmpty()) {
factories.forEach((factoryName, consumerConfig) -> {
KafkaProperties.Listener listener = consumerConfig.getListener();
Integer concurrency = consumerConfig.getConcurrency();
// 創(chuàng)建監(jiān)聽容器工廠
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
// 注冊(cè)到容器
if (!beanFactory.containsBean(factoryName)) {
beanFactory.registerSingleton(factoryName, containerFactory);
}
});
}
Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
if (!ObjectUtils.isEmpty(templates)) {
templates.forEach((templateName, producerConfig) -> {
//registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
//注冊(cè)spring bean的兩種方式
registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
});
}
}
配置spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.kafka.MyKafkaAutoConfiguration
yml配置
spring:
kafka:
multiple:
consumer:
factories:
test-factory:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap-servers: 192.168.56.112:9092
group-id: group_a
concurrency: 25
fetch-min-size: 1048576
fetch-max-wait: 3000
listener:
type: batch
properties:
spring-json-trusted-packages: '*'
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
producer:
templates:
test-template:
bootstrap-servers: 192.168.56.112:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
使用


到此這篇關(guān)于SpringBoot實(shí)現(xiàn)kafka多源配置的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot kafka多源配置內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis返回單個(gè)實(shí)體或者返回List的實(shí)現(xiàn)
這篇文章主要介紹了Mybatis返回單個(gè)實(shí)體或者返回List的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07
Java多線程實(shí)戰(zhàn)之交叉打印的兩種方法
今天小編就為大家分享一篇關(guān)于Java多線程實(shí)戰(zhàn)之交叉打印的兩種方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-02-02
java實(shí)現(xiàn)登錄之后抓取數(shù)據(jù)
這篇文章給大家分享了用JAVA實(shí)現(xiàn)在登陸以后抓取網(wǎng)站的數(shù)據(jù)的相關(guān)知識(shí),有興趣的朋友可以測(cè)試參考下。2018-07-07
SpringBoot如何根據(jù)目錄路徑生成接口的url路徑
這篇文章主要介紹了SpringBoot如何根據(jù)目錄路徑生成接口的url路徑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
Spring Boot應(yīng)用開發(fā)初探與實(shí)例講解
這篇文章主要介紹了Spring Boot應(yīng)用開發(fā)初探與實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
springboot項(xiàng)目docker分層構(gòu)建的配置方式
在使用dockerfile構(gòu)建springboot項(xiàng)目時(shí),速度較慢,用時(shí)比較長(zhǎng),為了加快構(gòu)建docker鏡像的速度,采用分層構(gòu)建的方式,這篇文章主要介紹了springboot項(xiàng)目docker分層構(gòu)建,需要的朋友可以參考下2024-03-03

