java集成kafka實(shí)例代碼
java集成kafka
要在 Java 項(xiàng)目中集成 Apache Kafka 以實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi),步驟如下:
1. 引入 Maven 依賴
在您的 pom.xml 文件中添加以下依賴,以包含 Kafka 客戶端庫(kù):
<dependencies>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- 如果使用 Spring Boot,可添加以下依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>2. 配置 Kafka 生產(chǎn)者
首先,設(shè)置生產(chǎn)者的配置屬性:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置屬性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 創(chuàng)建生產(chǎn)者
Producer<String, String> producer = new KafkaProducer<>(props);
// 發(fā)送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);
producer.send(record);
}
// 關(guān)閉生產(chǎn)者
producer.close();
}
}3. 配置 Kafka 消費(fèi)者
接下來,設(shè)置消費(fèi)者的配置屬性,并訂閱主題以消費(fèi)消息:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置屬性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 創(chuàng)建消費(fèi)者
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.subscribe(Collections.singletonList("your_topic"));
// 持續(xù)消費(fèi)消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Consumed message: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());
});
}
} finally {
// 關(guān)閉消費(fèi)者
consumer.close();
}
}
}4. 使用 Spring Boot 集成 Kafka
如果您使用 Spring Boot,可以通過配置 KafkaTemplate(用于生產(chǎn)消息)和使用 @KafkaListener 注解(用于消費(fèi)消息)來簡(jiǎn)化 Kafka 的集成。
生產(chǎn)者配置:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}使用 KafkaTemplate 發(fā)送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value);
}
}消費(fèi)者配置:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}使用 @KafkaListener 消費(fèi)消息:
在 Spring Boot 中,@KafkaListener 注解用于監(jiān)聽指定的 Kafka 主題,并在收到消息時(shí)觸發(fā)相應(yīng)的方法。
以下是一個(gè)基本示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "your_topic", groupId = "your_group_id")
public void listen(String message) {
System.out.println("Received message: " + message);
// 在此處添加處理邏輯
}
}
在上述代碼中:
topics:指定要監(jiān)聽的 Kafka 主題。groupId:指定消費(fèi)者組 ID。
listen 方法:當(dāng)有新消息發(fā)布到指定主題時(shí),該方法會(huì)被調(diào)用,message 參數(shù)包含消息的內(nèi)容。
批量消費(fèi)消息
如果希望一次處理多條消息,可以啟用批量監(jiān)聽。
首先,需要配置一個(gè)支持批量消費(fèi)的 KafkaListenerContainerFactory:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 啟用批量監(jiān)聽
return factory;
}
}然后,在消費(fèi)者服務(wù)中使用 @KafkaListener 注解,并指定使用上述配置的工廠:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class KafkaBatchConsumerService {
@KafkaListener(
topics = "your_topic",
groupId = "your_group_id",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(List<String> messages) {
System.out.println("Received batch messages: " + messages);
// 在此處添加批量處理邏輯
}
}在上述代碼中:
containerFactory:指定使用支持批量消費(fèi)的工廠。
listen 方法的參數(shù)類型為 List<String>,用于接收一批消息。
控制消費(fèi)者的啟動(dòng)和停止
在某些情況下,可能需要在運(yùn)行時(shí)控制 Kafka 消費(fèi)者的啟動(dòng)和停止。
可以通過 KafkaListenerEndpointRegistry 來實(shí)現(xiàn):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class KafkaListenerManager {
@Autowired
private KafkaListenerEndpointRegistry registry;
// 啟動(dòng)監(jiān)聽器
public void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) {
listenerContainer.start();
}
}
// 停止監(jiān)聽器
public void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
}在上述代碼中:
startListener方法用于啟動(dòng)指定的監(jiān)聽器。stopListener方法用于停止指定的監(jiān)聽器。listenerId對(duì)應(yīng)于@KafkaListener注解中的id屬性。
通過這種方式,可以在應(yīng)用運(yùn)行時(shí)根據(jù)需要?jiǎng)討B(tài)地控制 Kafka 消費(fèi)者的行為。
通過上述配置和代碼示例,可以在 Spring Boot 項(xiàng)目中有效地集成 Kafka,實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)功能。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法
今天小編就為大家分享一篇Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-06-06
Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)二維數(shù)組與稀疏數(shù)組轉(zhuǎn)換詳解
稀疏數(shù)組是用于優(yōu)化,壓縮具有以下特點(diǎn)的二維數(shù)組:當(dāng)二維數(shù)組中的元素大部分相同,有意義的數(shù)據(jù)元素較少時(shí),可以使用稀疏數(shù)組進(jìn)行簡(jiǎn)化,節(jié)省存儲(chǔ)空間2021-10-10
JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄
這篇文章主要介紹了JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
springboot內(nèi)嵌Tomcat安全漏洞修復(fù)方式
針對(duì)CVE-2020-1938漏洞,建議升級(jí)Tomcat至安全版本以避免受影響,影響版本包括:Apache Tomcat 9.x小于9.0.31、Apache Tomcat 8.x小于8.5.51、Apache Tomcat 7.x小于7.0.100及Apache Tomcat 6.x,2024-10-10
Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之二叉樹
今天給大家?guī)淼氖顷P(guān)于Java數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識(shí),文章圍繞著Java二叉樹展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
Java并發(fā)編程之詳解CyclicBarrier線程同步
在之前的文章中已經(jīng)為大家介紹了java并發(fā)編程的工具:BlockingQueue接口,ArrayBlockingQueue,DelayQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,BlockingDeque接口,ConcurrentHashMap,CountDownLatch,本文為系列文章第十篇,需要的朋友可以參考下2021-06-06
idea自帶database連接mysql失敗問題的解決辦法
在IDEA?帶的數(shù)據(jù)庫(kù)連接?具中,可以連接MySQL數(shù)據(jù)庫(kù),但是有的時(shí)候連接出現(xiàn)錯(cuò)誤,連接不上數(shù)據(jù)庫(kù),下面這篇文章主要給大家介紹了關(guān)于idea自帶database連接mysql失敗問題的解決辦法,需要的朋友可以參考下2023-06-06

