java集成kafka實(shí)例代碼
java集成kafka
要在 Java 項(xiàng)目中集成 Apache Kafka 以實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi),步驟如下:
1. 引入 Maven 依賴
在您的 pom.xml
文件中添加以下依賴,以包含 Kafka 客戶端庫:
<dependencies> <!-- Kafka Clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <!-- 如果使用 Spring Boot,可添加以下依賴 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.0</version> </dependency> </dependencies>
2. 配置 Kafka 生產(chǎn)者
首先,設(shè)置生產(chǎn)者的配置屬性:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // 配置屬性 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 創(chuàng)建生產(chǎn)者 Producer<String, String> producer = new KafkaProducer<>(props); // 發(fā)送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i); producer.send(record); } // 關(guān)閉生產(chǎn)者 producer.close(); } }
3. 配置 Kafka 消費(fèi)者
接下來,設(shè)置消費(fèi)者的配置屬性,并訂閱主題以消費(fèi)消息:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // 配置屬性 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 創(chuàng)建消費(fèi)者 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱主題 consumer.subscribe(Collections.singletonList("your_topic")); // 持續(xù)消費(fèi)消息 try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("Consumed message: key = %s, value = %s, offset = %d%n", record.key(), record.value(), record.offset()); }); } } finally { // 關(guān)閉消費(fèi)者 consumer.close(); } } }
4. 使用 Spring Boot 集成 Kafka
如果您使用 Spring Boot,可以通過配置 KafkaTemplate
(用于生產(chǎn)消息)和使用 @KafkaListener
注解(用于消費(fèi)消息)來簡化 Kafka 的集成。
生產(chǎn)者配置:
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
使用 KafkaTemplate
發(fā)送消息:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String key, String value) { kafkaTemplate.send(topic, key, value); } }
消費(fèi)者配置:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
使用 @KafkaListener
消費(fèi)消息:
在 Spring Boot 中,@KafkaListener
注解用于監(jiān)聽指定的 Kafka 主題,并在收到消息時(shí)觸發(fā)相應(yīng)的方法。
以下是一個(gè)基本示例:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "your_topic", groupId = "your_group_id") public void listen(String message) { System.out.println("Received message: " + message); // 在此處添加處理邏輯 } }
在上述代碼中:
topics
:指定要監(jiān)聽的 Kafka 主題。groupId
:指定消費(fèi)者組 ID。
listen
方法:當(dāng)有新消息發(fā)布到指定主題時(shí),該方法會(huì)被調(diào)用,message
參數(shù)包含消息的內(nèi)容。
批量消費(fèi)消息
如果希望一次處理多條消息,可以啟用批量監(jiān)聽。
首先,需要配置一個(gè)支持批量消費(fèi)的 KafkaListenerContainerFactory
:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); // 啟用批量監(jiān)聽 return factory; } }
然后,在消費(fèi)者服務(wù)中使用 @KafkaListener
注解,并指定使用上述配置的工廠:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.util.List; @Service public class KafkaBatchConsumerService { @KafkaListener( topics = "your_topic", groupId = "your_group_id", containerFactory = "kafkaListenerContainerFactory" ) public void listen(List<String> messages) { System.out.println("Received batch messages: " + messages); // 在此處添加批量處理邏輯 } }
在上述代碼中:
containerFactory
:指定使用支持批量消費(fèi)的工廠。
listen
方法的參數(shù)類型為 List<String>
,用于接收一批消息。
控制消費(fèi)者的啟動(dòng)和停止
在某些情況下,可能需要在運(yùn)行時(shí)控制 Kafka 消費(fèi)者的啟動(dòng)和停止。
可以通過 KafkaListenerEndpointRegistry
來實(shí)現(xiàn):
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.listener.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @Service public class KafkaListenerManager { @Autowired private KafkaListenerEndpointRegistry registry; // 啟動(dòng)監(jiān)聽器 public void startListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && !listenerContainer.isRunning()) { listenerContainer.start(); } } // 停止監(jiān)聽器 public void stopListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && listenerContainer.isRunning()) { listenerContainer.stop(); } } }
在上述代碼中:
startListener
方法用于啟動(dòng)指定的監(jiān)聽器。stopListener
方法用于停止指定的監(jiān)聽器。listenerId
對應(yīng)于@KafkaListener
注解中的id
屬性。
通過這種方式,可以在應(yīng)用運(yùn)行時(shí)根據(jù)需要?jiǎng)討B(tài)地控制 Kafka 消費(fèi)者的行為。
通過上述配置和代碼示例,可以在 Spring Boot 項(xiàng)目中有效地集成 Kafka,實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)功能。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法
今天小編就為大家分享一篇Java 獲取Html文本中的img標(biāo)簽下src中的內(nèi)容方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06Java數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)二維數(shù)組與稀疏數(shù)組轉(zhuǎn)換詳解
稀疏數(shù)組是用于優(yōu)化,壓縮具有以下特點(diǎn)的二維數(shù)組:當(dāng)二維數(shù)組中的元素大部分相同,有意義的數(shù)據(jù)元素較少時(shí),可以使用稀疏數(shù)組進(jìn)行簡化,節(jié)省存儲(chǔ)空間2021-10-10JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄
這篇文章主要介紹了JavaWeb如何實(shí)現(xiàn)限制單個(gè)賬號(hào)多處登錄問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08springboot內(nèi)嵌Tomcat安全漏洞修復(fù)方式
針對CVE-2020-1938漏洞,建議升級Tomcat至安全版本以避免受影響,影響版本包括:Apache Tomcat 9.x小于9.0.31、Apache Tomcat 8.x小于8.5.51、Apache Tomcat 7.x小于7.0.100及Apache Tomcat 6.x,2024-10-10Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之二叉樹
今天給大家?guī)淼氖顷P(guān)于Java數(shù)據(jù)結(jié)構(gòu)的相關(guān)知識(shí),文章圍繞著Java二叉樹展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Java并發(fā)編程之詳解CyclicBarrier線程同步
在之前的文章中已經(jīng)為大家介紹了java并發(fā)編程的工具:BlockingQueue接口,ArrayBlockingQueue,DelayQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,BlockingDeque接口,ConcurrentHashMap,CountDownLatch,本文為系列文章第十篇,需要的朋友可以參考下2021-06-06idea自帶database連接mysql失敗問題的解決辦法
在IDEA?帶的數(shù)據(jù)庫連接?具中,可以連接MySQL數(shù)據(jù)庫,但是有的時(shí)候連接出現(xiàn)錯(cuò)誤,連接不上數(shù)據(jù)庫,下面這篇文章主要給大家介紹了關(guān)于idea自帶database連接mysql失敗問題的解決辦法,需要的朋友可以參考下2023-06-06