欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot系列之kafka操作使用詳解

 更新時(shí)間:2023年08月01日 10:05:13   作者:wotrd  
這篇文章主要為大家介紹了Springboot系列之kafka操作使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

kafka簡(jiǎn)介

ApacheKafka®是一個(gè)分布式流媒體平臺(tái)。有三個(gè)關(guān)鍵功能:

  • 發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。
  • 以容錯(cuò)的持久方式存儲(chǔ)記錄流。
  • 記錄發(fā)生時(shí)處理流。

Kafka通常用于兩大類應(yīng)用:

  • 構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時(shí)流數(shù)據(jù)管道
  • 構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實(shí)時(shí)流應(yīng)用程序

kafka概念

(1)什么是流處理?

所謂流處理,我的理解是流水線處理。例如,電子廠每個(gè)人負(fù)責(zé)一個(gè)功能,來了就處
理,不來就等著。

(2)partition和replication和broker有關(guān)嗎?

partition和replication是分區(qū)和備份的概念。即使是單機(jī)一個(gè)broker也一樣支持。

(3)consumer如何設(shè)置和存儲(chǔ)partition的offset偏移量,有哪幾種消費(fèi)模式,怎么確定消息是否被消費(fèi),將偏移量移到前面會(huì)立即消費(fèi)到最后嗎?

使用KafkaConsumer設(shè)置partition和offset。有自動(dòng)提交和手動(dòng)ack模式提交偏移量?jī)煞N消費(fèi)方式。將偏移量移到前面需要設(shè)置成為消費(fèi)狀態(tài)會(huì)立即被消費(fèi)(設(shè)置新消費(fèi)組)。

(4)AckMode模式有哪幾種?

RECORD:處理記錄后,偵聽器返回時(shí)提交偏移量

BATCH:在處理poll()返回的所有記錄時(shí)提交偏移量

TIME:只要已超過自上次提交以來的ackTime,就會(huì)在處理poll()返回的所有記錄時(shí)提交偏移量

COUNT:只要自上次提交以來已收到ackCount記錄,就會(huì)在處理poll()返回的所有記錄時(shí)提交偏移量

COUNT_TIME:與TIME和COUNT類似,但如果任一條件為真,則執(zhí)行提交

MANUAL:消息監(jiān)聽器負(fù)責(zé)確認(rèn)()確認(rèn)。 之后,應(yīng)用與BATCH相同的語義    

MANUAL_IMMEDIATE:當(dāng)偵聽器調(diào)用Acknowledgment.acknowledge()方法時(shí),立即提交偏移量

Springboot使用kafka

(1)注入NewTopic自動(dòng)在broker中添加topic

@Bean
public NewTopic topic() {
    return new NewTopic("topic1", 2, (short) 1);
}

(2)使用KafkaTemplate發(fā)送消息時(shí),topic自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的partition是0,長(zhǎng)度為1

(3)使用KafkaTemplate發(fā)送消息

@RequestMapping("sendMsgWithTopic")
public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key,
                               @RequestParam String value) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value);
    return "success";
}

(4)異步發(fā)送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(data);
            }
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(data, record, ex);
           }
    });
}

(5)同步發(fā)送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    try {
            template.send(record).get(10, TimeUnit.SECONDS);
            handleSuccess(data);
    }catch (ExecutionException e) {
            handleFailure(data, record, e.getCause());
    }catch (TimeoutException | InterruptedException e) {
            handleFailure(data, record, e);
    }
}

(6)事務(wù)

(1)Spring事務(wù)支持一起使用(@Transactional,TransactionTemplate等)
(2)使用template執(zhí)行事務(wù)
    boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
    });

(7)消費(fèi)者

(1)簡(jiǎn)單使用
 @KafkaListener(id = "myListener", topics = "myTopic",
    autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
 public void listen(String data) {
    ...
 }
(2)配置多個(gè)topic和partition,TopicPartition中partitions和PartitionOffset不能同時(shí)使用
 @KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
 public void listen(ConsumerRecord<?, ?> record) {
    ...
 }
(3)使用ack手動(dòng)確認(rèn)模式
 @KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
 public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
 }
 (4)獲取消息的header信息
 @KafkaListener(id = "qux", topicPattern = "myTopic1")
 public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
    ...
 }
(5)批處理
 @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
 public void listen(List<String> list,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
 }
(6)使用@Valid校驗(yàn)數(shù)據(jù)
 @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
   containerFactory = "kafkaJsonListenerContainerFactory")
 public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
 }
 @Bean
 public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
 }
(7)topic根據(jù)參數(shù)類型映射不同方法
 @KafkaListener(id = "multi", topics = "myTopic")
 static class MultiListenerBean {
    @KafkaHandler
    public void listen(String cat) {
        ...
    }
    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }
    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }
 }

Springboot使用kafka踩坑

(1)需要修改server.properties的listener主機(jī)地址不然Java獲取不到消息。

(2)不同服務(wù)配置相同groupId只有一個(gè)監(jiān)聽者可以收到消息

kafka圖形化工具 kafka tool

下載地址 http://www.kafkatool.com/down...

以上就是Springboot系列之kafka操作使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Springboot kafka操作的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 一文詳解Spring是怎樣處理循環(huán)依賴的

    一文詳解Spring是怎樣處理循環(huán)依賴的

    循環(huán)依賴簡(jiǎn)單理解就是A,B 兩個(gè)bean相互依賴,A依賴B,B依賴A,A->B、B->A大概就是這樣,這篇文章主要介紹了Spring是怎樣處理循環(huán)依賴的,文中通過代碼示例給大家介紹的非常詳細(xì),具有一定的參考價(jià)值,需要的朋友可以參考下
    2024-01-01
  • maven環(huán)境變量配置講解

    maven環(huán)境變量配置講解

    這篇文章主要介紹了maven環(huán)境變量配置講解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • java多線程:基礎(chǔ)詳解

    java多線程:基礎(chǔ)詳解

    這篇文章主要介紹了java多線程編程實(shí)例,分享了幾則多線程的實(shí)例代碼,具有一定參考價(jià)值,加深多線程編程的理解還是很有幫助的,需要的朋友可以參考下。
    2021-08-08
  • SpringBoot整合FreeMarker的過程詳解

    SpringBoot整合FreeMarker的過程詳解

    FreeMarker 是一個(gè)模板引擎,可以將模板與數(shù)據(jù)結(jié)合生成文本輸出,本文給大家介紹SpringBoot整合FreeMarker的過程,感興趣的朋友一起看看吧
    2024-01-01
  • 最最常用的 100 個(gè) Java類分享

    最最常用的 100 個(gè) Java類分享

    這篇文章主要介紹了最最常用的 100 個(gè) Java類分享,需要的朋友可以參考下
    2015-04-04
  • 詳解Java中Duration類的使用方法

    詳解Java中Duration類的使用方法

    Duration類通過秒和納秒相結(jié)合來描述一個(gè)時(shí)間量,最高精度是納秒。本文將通過示例詳細(xì)為大家講講Duration類的使用,需要的可以參考一下
    2022-05-05
  • Alibaba?Nacos配置中心動(dòng)態(tài)感知原理示例解析

    Alibaba?Nacos配置中心動(dòng)態(tài)感知原理示例解析

    這篇文章主要介紹了Alibaba?Nacos配置中心動(dòng)態(tài)感知原理示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • IDEA提示:Boolean method ‘xxx‘ is always inverted問題

    IDEA提示:Boolean method ‘xxx‘ is always&nb

    這篇文章主要介紹了IDEA提示:Boolean method ‘xxx‘ is always inverted問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • sentinel?整合spring?cloud限流的過程解析

    sentinel?整合spring?cloud限流的過程解析

    這篇文章主要介紹了sentinel?整合spring?cloud限流,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-03-03
  • 實(shí)例解析Json反序列化之ObjectMapper(自定義實(shí)現(xiàn)反序列化方法)

    實(shí)例解析Json反序列化之ObjectMapper(自定義實(shí)現(xiàn)反序列化方法)

    這篇文章主要介紹了實(shí)例解析Json反序列化之ObjectMapper,json自定義序列化的方法,需要的朋友可以了解下。
    2017-09-09

最新評(píng)論