spring 整合kafka監(jiān)聽消費的配置過程
前言
最近項目里有個需求,要消費kafka里的數(shù)據(jù)。之前也手動寫過代碼去消費kafka數(shù)據(jù)。但是轉念一想。既然spring提供了消費kafka的方法。就沒必要再去重復造輪子。于是嘗試使用spring的API。
項目技術背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。
整合過程
1. 引入spring-kafka的依賴包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
2. 在spring的xml文件里增加配置項,也可以單獨創(chuàng)建一個spring-context-XX.xml文件。
<!-- consumer configuration 該配置項可以根據(jù)自己業(yè)務的實際需求做增加或刪除-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="group.id" value="group" />
<entry key="enable.auto.commit" value="true" />
<entry key="auto.commit.interval.ms" value="3000" />
<entry key="session.timeout.ms" value="10000" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- create factory 該類是spring jar包里提供,就這么配置-->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!-- 自定義的消費類,需要實現(xiàn)spring的接口 -->
<bean id="payPalConsumer"
class="com.chao.service.consumer.PayPalConsumer" />
<!-- 該類也是jar包里提供的,注入的監(jiān)聽類是自己定義的,topic名稱是配置文件引入的-->
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${kafka.paypal.topic.name}"/>
<property name="messageListener" ref="payPalConsumer" />
</bean>
<!-- 改類也是jar里提供的,把這個containerProperties和consumerfactory 注入 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
</bean>
2. 自定義消費者類,消費者類依然可以使用注解。
/**
* get msg from kafka
*/
@Component
public class PayPalConsumer implements MessageListener<String, String> {
private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);
@Autowired
private XXService XXService;
@Override
public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
String value = authorizeRecord.value();
if (StringUtils.isEmpty(value)){
logger.warn("receive message from kafka is null");
return;
}
logger.info("receive message from kafka is {}",value);
}
}
使用這個步驟配置,一次性過。非常順利。
到此這篇關于spring 整合kafka監(jiān)聽消費的配置過程的文章就介紹到這了,更多相關spring 整合kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

