Springboot系列之kafka操作使用詳解
kafka簡介
ApacheKafka®是一個分布式流媒體平臺。有三個關(guān)鍵功能:
- 發(fā)布和訂閱記錄流,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)。
- 以容錯的持久方式存儲記錄流。
- 記錄發(fā)生時處理流。
Kafka通常用于兩大類應(yīng)用:
- 構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實時流數(shù)據(jù)管道
- 構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實時流應(yīng)用程序
kafka概念
(1)什么是流處理?
所謂流處理,我的理解是流水線處理。例如,電子廠每個人負(fù)責(zé)一個功能,來了就處
理,不來就等著。
(2)partition和replication和broker有關(guān)嗎?
partition和replication是分區(qū)和備份的概念。即使是單機(jī)一個broker也一樣支持。
(3)consumer如何設(shè)置和存儲partition的offset偏移量,有哪幾種消費(fèi)模式,怎么確定消息是否被消費(fèi),將偏移量移到前面會立即消費(fèi)到最后嗎?
使用KafkaConsumer設(shè)置partition和offset。有自動提交和手動ack模式提交偏移量兩種消費(fèi)方式。將偏移量移到前面需要設(shè)置成為消費(fèi)狀態(tài)會立即被消費(fèi)(設(shè)置新消費(fèi)組)。
(4)AckMode模式有哪幾種?
RECORD:處理記錄后,偵聽器返回時提交偏移量
BATCH:在處理poll()返回的所有記錄時提交偏移量
TIME:只要已超過自上次提交以來的ackTime,就會在處理poll()返回的所有記錄時提交偏移量
COUNT:只要自上次提交以來已收到ackCount記錄,就會在處理poll()返回的所有記錄時提交偏移量
COUNT_TIME:與TIME和COUNT類似,但如果任一條件為真,則執(zhí)行提交
MANUAL:消息監(jiān)聽器負(fù)責(zé)確認(rèn)()確認(rèn)。 之后,應(yīng)用與BATCH相同的語義
MANUAL_IMMEDIATE:當(dāng)偵聽器調(diào)用Acknowledgment.acknowledge()方法時,立即提交偏移量
Springboot使用kafka
(1)注入NewTopic自動在broker中添加topic
@Bean
public NewTopic topic() {
return new NewTopic("topic1", 2, (short) 1);
}(2)使用KafkaTemplate發(fā)送消息時,topic自動創(chuàng)建,自動創(chuàng)建的partition是0,長度為1
(3)使用KafkaTemplate發(fā)送消息
@RequestMapping("sendMsgWithTopic")
public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key,
@RequestParam String value) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value);
return "success";
}(4)異步發(fā)送消息
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(Throwable ex) {
handleFailure(data, record, ex);
}
});
}(5)同步發(fā)送消息
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}(6)事務(wù)
(1)Spring事務(wù)支持一起使用(@Transactional,TransactionTemplate等)
(2)使用template執(zhí)行事務(wù)
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});(7)消費(fèi)者
(1)簡單使用
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
(2)配置多個topic和partition,TopicPartition中partitions和PartitionOffset不能同時使用
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
(3)使用ack手動確認(rèn)模式
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
(4)獲取消息的header信息
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
(5)批處理
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
(6)使用@Valid校驗數(shù)據(jù)
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
(7)topic根據(jù)參數(shù)類型映射不同方法
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}Springboot使用kafka踩坑
(1)需要修改server.properties的listener主機(jī)地址不然Java獲取不到消息。
(2)不同服務(wù)配置相同groupId只有一個監(jiān)聽者可以收到消息
kafka圖形化工具 kafka tool
下載地址 http://www.kafkatool.com/down...
以上就是Springboot系列之kafka操作使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Springboot kafka操作的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Alibaba?Nacos配置中心動態(tài)感知原理示例解析
這篇文章主要介紹了Alibaba?Nacos配置中心動態(tài)感知原理示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
IDEA提示:Boolean method ‘xxx‘ is always&nb
這篇文章主要介紹了IDEA提示:Boolean method ‘xxx‘ is always inverted問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
sentinel?整合spring?cloud限流的過程解析
這篇文章主要介紹了sentinel?整合spring?cloud限流,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-03-03
實例解析Json反序列化之ObjectMapper(自定義實現(xiàn)反序列化方法)
這篇文章主要介紹了實例解析Json反序列化之ObjectMapper,json自定義序列化的方法,需要的朋友可以了解下。2017-09-09

