Kafka?在?java?中的基本使用步驟
1. 使用 kafka 原生客戶端
現(xiàn)在基本都直接使用 springboot 版本,但了解原生客戶端,能更好的理解 springboot 版的 kafka 客戶端原理。
步驟1:pom 引入核心依賴:
引入依賴時,盡量選擇和 kafka 版本對應(yīng)的依賴版本。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>4.0.0</version>
</dependency>步驟2:提供者客戶端代碼:
提供者客戶端要做三件事:
- 設(shè)置提供者客戶端屬性(可選屬性都被定義在 ProducerConfig 類中)
- 設(shè)置要發(fā)送的消息
- 發(fā)送(有三種發(fā)送方式,下面代碼中都有)
public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 第一步:設(shè)置提供者屬性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// 第二步:設(shè)置要發(fā)送的消息
ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");
// 第三部:發(fā)送消息
// send(producer, record);
// sendSync(producer, record);
sendASync(producer, record);
}
}
/**
* 發(fā)送方式1:單向推送,不關(guān)心服務(wù)器的應(yīng)答
*/
private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {
producer.send(record);
}
/**
* 發(fā)送方式2:同步推送,得到服務(wù)器的應(yīng)答前會阻塞當前線程
*/
private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
}
/**
* 發(fā)送方式3:異步推送,不需等待服務(wù)器應(yīng)答,當服務(wù)器有應(yīng)答后會觸發(fā)函數(shù)回調(diào)
*/
private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
throw new RuntimeException("向 kafka 推送失敗", exception);
}
System.out.println(metadata.topic());
System.out.println(metadata.partition());
System.out.println(metadata.offset());
});
}
}步驟3:消費者客戶端代碼:
消費者客戶端要做三件事:
- 設(shè)置消費者客戶端屬性(可選屬性都被定義在 ConsumerConfig 類中)
- 設(shè)置消費者訂閱的主題
- 拉取消息
- 提交 offset(有兩種提交方式,下面代碼中都有)
public class MyConsumer {
public static void main(String[] args) {
// 第一步:設(shè)置消費者屬性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 第二步:設(shè)置要訂閱的主題
consumer.subscribe(Collections.singletonList("testTopic"));
while (true) {
// 第三步:拉取消息,100 代表最大等待時間,如果時間到了還沒有拉取到消息就不阻塞了繼續(xù)往后執(zhí)行
ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
// 第四步:提交 offset
// consumer.commitSync(); // 同步提交,表示必須等到 offset 提交完畢,再去消費下?批數(shù)據(jù)
consumer.commitSync(); // 異步提交,表示發(fā)送完提交 offset 請求后,就開始消費下?批數(shù)據(jù)了。不?等到Broker的確認。
}
}
}
}2. Kafka 集成 springboot
springboot 版本是最常用的,比原生客戶端使用方便。但是道理是一樣的,底層也是原生客戶端。
pom 引入核心依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>yaml 配置文件:
這一步無非就是把原生客戶端中的屬性配置,寫在 yaml 中
spring:
kafka:
bootstrap-servers: 192.168.2.28:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: testGroup
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer提供者客戶端代碼:
只需要兩步:
- 注入 KafkaTemplate
- 發(fā)送
@RestController
public class ProducerController {
/**
* kafka
*/
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/test")
public void send() {
// 發(fā)送 kafka 消息
kafkaTemplate.send("testTopic", "testKey", "testValue");
}
}消費者客戶端代碼:
只需要監(jiān)聽主題就可以
@RestController
public class ConsumerController {
// 監(jiān)聽 kafka 消息
@KafkaListener(topics = {"testTopic"})
public void test(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}到此這篇關(guān)于Kafka 在 java 中的基本使用的文章就介紹到這了,更多相關(guān)Kafka java使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中基于AOP和Semaphore實現(xiàn)API限流
調(diào)用速率限制是 Web API 中的常見要求,旨在防止濫用并確保公平使用資源,借助Spring Boot 中的 AOP,我們可以通過攔截方法調(diào)用并限制在特定時間范圍內(nèi)允許的請求數(shù)量來實現(xiàn)速率限制,需要的朋友可以參考下2024-10-10
java數(shù)據(jù)結(jié)構(gòu)和算法之馬踏棋盤算法
這篇文章主要為大家詳細介紹了java數(shù)據(jù)結(jié)構(gòu)和算法之馬踏棋盤算法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
spring?java?動態(tài)獲取consul?K/V的方法
這篇文章主要介紹了spring?java?動態(tài)獲取consul?K/V的相關(guān)資料,主要包括springConsul配置kv路徑以及自動注入consulKV到服務(wù)中,本文給大家介紹的非常詳細,需要的朋友可以參考下2023-10-10
Java二維數(shù)組與稀疏數(shù)組相互轉(zhuǎn)換實現(xiàn)詳解
在某些應(yīng)用場景中需要大量的二維數(shù)組來進行數(shù)據(jù)存儲,但是二維數(shù)組中卻有著大量的無用的位置占據(jù)著內(nèi)存空間,稀疏數(shù)組就是為了優(yōu)化二維數(shù)組,節(jié)省內(nèi)存空間2022-09-09
Spring Boot中slf4j日志依賴關(guān)系示例詳解
在項目開發(fā)中,記錄日志是必做的一件事情。而當我們使用Springboot框架時,記錄日志就變得極其簡單了。下面這篇文章主要給大家介紹了關(guān)于Spring Boot中slf4j日志依賴關(guān)系的相關(guān)資料,需要的朋友可以參考下2018-11-11
Mybatis中resultMap標簽和sql標簽的設(shè)置方式
這篇文章主要介紹了Mybatis中resultMap標簽和sql標簽的設(shè)置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
springboot 使用zookeeper實現(xiàn)分布式隊列的基本步驟
這篇文章主要介紹了springboot 使用zookeeper實現(xiàn)分布式隊列,通過ZooKeeper的協(xié)調(diào)和同步機制,多個應(yīng)用程序可以共享一個隊列,并按照先進先出的順序處理隊列中的消息,需要的朋友可以參考下2023-08-08

