關(guān)于spring boot整合kafka+注解方式
spring boot自動(dòng)配置方式整合
spring boot具有許多自動(dòng)化配置,對(duì)于kafka的自動(dòng)化配置當(dāng)然也包含在內(nèi),基于spring boot自動(dòng)配置方式整合kafka,需要做以下步驟。
引入kafka的pom依賴包
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
在配置文件中配置kafka相關(guān)屬性配置,分別配置生產(chǎn)者和消費(fèi)者的屬性,在程序啟動(dòng)時(shí),spring boot框架會(huì)自動(dòng)讀取這些配置的屬性,創(chuàng)建相關(guān)的生產(chǎn)者、消費(fèi)者等。下面展示一個(gè)簡單的配置。
#kafka默認(rèn)消費(fèi)者配置 spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092 spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest #kafka默認(rèn)生產(chǎn)者配置 spring.kafka.producer.bootstrap-servers=192.168.0.15:9092 spring.kafka.producer.acks=-1 spring.kafka.client-id=kafka-producer spring.kafka.producer.batch-size=5
當(dāng)然,在實(shí)際生產(chǎn)中的配置肯定比上面的配置復(fù)雜,需要一些定制化的操作,那么spring boot的自動(dòng)化配置創(chuàng)建的生產(chǎn)者或者消費(fèi)者都不能滿足我們時(shí),應(yīng)該需要自定義化相關(guān)配置,這個(gè)在后續(xù)舉例,這里先分析自動(dòng)化配置。
在進(jìn)行了如上配置之后,需要生產(chǎn)者時(shí),使用方式為下代碼所示。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {UserSSOApplication.class})
public class UserSSOApplicationTests {
@Resource
//注入kafkatemplete,這個(gè)由spring boot自動(dòng)創(chuàng)建
KafkaTemplate kafkaTemplate;
@Test
public void testKafkaSendMsg() {
//發(fā)送消息
kafkaTemplate.send("test", 0,12,"1222");
}
}
消費(fèi)者的使用注解方式整合,代碼如下。
@Component
@Slf4j
public class KafkaMessageReceiver2 {
//指定監(jiān)聽的topic,當(dāng)前消費(fèi)者組id
@KafkaListener(topics = {"test"}, groupId = "receiver")
public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
log.info(integerStringConsumerRecords.value());
}
}
上面是最簡單的配置,實(shí)現(xiàn)的一個(gè)簡單例子,如果需要更加定制化的配置,可以參考類
org.springframework.boot.autoconfigure.kafka.KafkaProperties這里面包含了大部分需要的kafka配置。針對(duì)配置,在properties文件中添加即可。
spring boot自動(dòng)配置的不足
上面是依賴spring boot自動(dòng)化配置完成的整合方式,實(shí)際上所有的配置實(shí)現(xiàn)都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成??梢钥闯鰝€(gè)是依賴于@Configuration完成bean配置,這種配置方式基本能夠?qū)崿F(xiàn)大部分情況,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。
但是這種方式還有一個(gè)問題,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并沒有涵蓋所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,這就導(dǎo)致某些特殊配置不能依賴spring boot自動(dòng)創(chuàng)建,需要我們手動(dòng)創(chuàng)建Producer和comsumer。
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
public KafkaAutoConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
kafkaProducerFactory);
if (this.messageConverter != null) {
kafkaTemplate.setMessageConverter(this.messageConverter);
}
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer()
.getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
//略略略
}
spring boot下手動(dòng)配置kafka
由于需要對(duì)某些特殊配置進(jìn)行配置,我們可能需要手動(dòng)配置kafka相關(guān)的bean,創(chuàng)建一個(gè)配置類如下,類似于
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,這里創(chuàng)建了對(duì)應(yīng)類型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的對(duì)應(yīng)Bean定義將不起作用。
所有的生產(chǎn)者配置可以參考ProducerConfig類,所有的消費(fèi)者配置可以參考ConsumerConfig類。
/**
* kafka配置,實(shí)際上,在KafkaAutoConfiguration中已經(jīng)有默認(rèn)的根據(jù)配置文件信息創(chuàng)建配置,但是自動(dòng)配置屬性沒有涵蓋所有
* 我們可以自定義創(chuàng)建相關(guān)bean,進(jìn)行如下配置
*
* @author zhoujy
* @date 2018年12月17日
**/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
//構(gòu)造消費(fèi)者屬性map,ConsumerConfig中的可配置屬性比spring boot自動(dòng)配置要多
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
/**
* 不使用spring boot默認(rèn)方式創(chuàng)建的DefaultKafkaConsumerFactory,重新定義創(chuàng)建方式
* @return
*/
@Bean("consumerFactory")
public DefaultKafkaConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerProperties());
}
@Bean("listenerContainerFactory")
//個(gè)性化定義消費(fèi)者
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
//指定使用DefaultKafkaConsumerFactory
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//設(shè)置消費(fèi)者ack模式為手動(dòng),看需求設(shè)置
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
//設(shè)置可批量拉取消息消費(fèi),拉取數(shù)量一次3,看需求設(shè)置
factory.setConcurrency(3);
factory.setBatchListener(true);
return factory;
}
/* @Bean
//代碼創(chuàng)建方式topic
public NewTopic batchTopic() {
return new NewTopic("topic.quick.batch", 8, (short) 1);
}*/
//創(chuàng)建生產(chǎn)者配置map,ProducerConfig中的可配置屬性比spring boot自動(dòng)配置要多
private Map<String, Object> producerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "-1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
/**
* 不使用spring boot的KafkaAutoConfiguration默認(rèn)方式創(chuàng)建的DefaultKafkaProducerFactory,重新定義
* @return
*/
@Bean("produceFactory")
public DefaultKafkaProducerFactory produceFactory(){
return new DefaultKafkaProducerFactory(producerProperties());
}
/**
* 不使用spring boot的KafkaAutoConfiguration默認(rèn)方式創(chuàng)建的KafkaTemplate,重新定義
* @param produceFactory
* @return
*/
@Bean
public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
return new KafkaTemplate(produceFactory);
}
}
生產(chǎn)者的使用方式,跟自動(dòng)配置一樣,直接注入KafkaTemplate即可。主要是消費(fèi)者的使用有些不同。
批量消費(fèi)消息
上面的消費(fèi)者配置配置了一個(gè)bean,@Bean(“l(fā)istenerContainerFactory”),這個(gè)bean可以指定為消費(fèi)者,注解方式中是如下的使用方式。
containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作為消費(fèi)者。
- 注意registryReceiver中的參數(shù),ConsumerRecord對(duì)比之前的消費(fèi)者,因?yàn)樵O(shè)置listenerContainerFactory是批量消費(fèi),因此ConsumerRecord是一個(gè)List,如果不是批量消費(fèi)的話,相對(duì)應(yīng)就是一個(gè)對(duì)象。
- 注意第二個(gè)參數(shù)Acknowledgment,這個(gè)參數(shù)只有在設(shè)置消費(fèi)者的ack應(yīng)答模式為AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手動(dòng)ack。
@Component
@Slf4j
public class KafkaMessageReceiver {
/**
* listenerContainerFactory設(shè)置了批量拉取消息,因此參數(shù)是List<ConsumerRecord<Integer, String>>,否則是ConsumerRecord
* @param integerStringConsumerRecords
* @param acknowledgment
*/
@KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
while (it.hasNext()){
ConsumerRecord<Integer, String> consumerRecords = it.next();
//dosome
acknowledgment.acknowledge();
}
}
}
如果不想要批量消費(fèi)消息,那就可以另外定義一個(gè)bean類似于@Bean(“l(fā)istenerContainerFactory”),如下,只要不設(shè)置批量消費(fèi)即可。
@Bean("listenerContainerFactory2")
//個(gè)性化定義消費(fèi)者
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
//指定使用DefaultKafkaConsumerFactory
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//設(shè)置消費(fèi)者ack模式為手動(dòng),看需求設(shè)置
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
spring boot整合kafka報(bào)錯(cuò)
Timeout expired while fetching topic metadata
這種報(bào)錯(cuò)應(yīng)檢查 kafka連接問題,服務(wù)是否啟動(dòng),端口是否正確。
Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time
這種報(bào)錯(cuò)要考慮kafka和spring對(duì)應(yīng)的版本問題,我的springboot 2.1.2在使用kafka_2.12-2.1.1時(shí)出現(xiàn)此問題,將kafka版本換為2.11-1.1.1后,問題解決。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SLF4J報(bào)錯(cuò)解決:No SLF4J providers were found的
這篇文章主要介紹了SLF4J報(bào)錯(cuò)解決:No SLF4J providers were found的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06
SpringBoot管理RabbitMQ中的Channel詳解
這篇文章主要介紹了SpringBoot管理RabbitMQ中的Channel詳解,channel僅存在于connection的上下文中,而不會(huì)單獨(dú)存在,當(dāng)channel關(guān)閉時(shí),其上的所有channel也會(huì)關(guān)閉,需要的朋友可以參考下2023-08-08
springboot整合gateway實(shí)現(xiàn)網(wǎng)關(guān)功能的示例代碼
本文主要介紹了springboot整合gateway實(shí)現(xiàn)網(wǎng)關(guān)功能的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02
Java項(xiàng)目中如何訪問WEB-INF下jsp頁面
這篇文章主要介紹了Java項(xiàng)目中如何訪問WEB-INF下jsp頁面,文章通過示例代碼和圖文解析介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
利用javadoc注釋自動(dòng)生成Swagger注解
由于現(xiàn)在controller方法上面沒有swagger注解,只能拿到接口url地址,無法獲得接口功能描述,所以本文為大家介紹一下如何利用javadoc注釋自動(dòng)生成Swagger注解,感興趣的可以了解下2023-08-08

