欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合Kafka完成生產消費的方案

 更新時間:2024年12月25日 08:31:00   作者:李博帥  
網上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應用到生產環(huán)境,很多配置都是缺少,或者不正確的,只能當個demo,所以本文給大家介紹了SpringBoot整合Kafka完成生產消費的方案,需要的朋友可以參考下

前言

網上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應用到生產環(huán)境。很多配置都是缺少,或者不正確的,只能當個demo。遂自行查找資料、驗證得到了一套可以完美應對生產環(huán)境的kafka整合springboot的方案。

基礎配置

引入依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

編寫kafka相關配置

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
      # transaction-id-prefix: starlinkRisk-
      retries: 3
      acks: 1
      batch-size: 32768
      buffer-memory: 33554432
    consumer:
      bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
      group-id: slr_connector
      auto-commit-interval: 2000
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 3
    properties:
      max:
        poll:
          interval:
            ms: 600000
      session:
        timeout:
          ms: 10000
    listener:
      concurrency: 9
      missing-topics-fatal: false
      poll-timeout: 600000

編寫kafka生產者配置類

import com.liboshuai.starlink.slr.connector.common.handler.KafkaSendResultHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * Kafka生產者配置
 */
@Slf4j
@Configuration
public class KafkaProviderConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
//    @Value("${spring.kafka.producer.transaction-id-prefix}")
//    private String transactionIdPrefix;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Resource
    private KafkaSendResultHandler kafkaSendResultHandler;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
        //acks=1 : 只要集群的首領節(jié)點收到消息,生產者就會收到一個來自服務器成功響應。
        //acks=all :只有當所有參與復制的節(jié)點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
        //開啟事務必須設為all
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        //發(fā)生錯誤后,消息重發(fā)的次數,開啟事務必須大于0
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //當多個消息發(fā)送到相同分區(qū)時,生產者會將消息打包到一起,以減少請求交互. 而不是一條條發(fā)送
        //批次的大小可以通過batch.size 參數設置.默認是16KB
        //較小的批次大小有可能降低吞吐量(批次大小為0則完全禁用批處理)。
        //比如說,kafka里的消息5秒鐘Batch才湊滿了16KB,才能發(fā)送出去。那這些消息的延遲就是5秒鐘
        //實測batchSize這個參數沒有用
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //有的時刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時就很大,所以需要一個參數. 再設置一個時間,到了這個時間,
        //即使數據沒達到16KB,也將這個批次發(fā)送出去
        props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
        //生產者內存緩沖區(qū)的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //反序列化,和生產者的序列化方式對應
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        //開啟事務,會導致 LINGER_MS_CONFIG 配置失效
//        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

//    @Bean
//    public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
//        return new KafkaTransactionManager<>(producerFactory);
//    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setProducerListener(kafkaSendResultHandler);
        return kafkaTemplate;
    }
}

編寫kafka消費者配置類

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消費者配置
 */
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.properties.session.timeout.ms}")
    private String sessionTimeout;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String maxPollIntervalTime;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private boolean missingTopicsFatal;
    @Value("${spring.kafka.listener.poll-timeout}")
    private long pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自動提交的時間間隔,自動提交開啟時生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
        //earliest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費分區(qū)的記錄
        //latest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區(qū)下的數據(在消費者啟動之后生成的記錄)
        //none:當各分區(qū)都存在已提交的offset時,從提交的offset開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //兩次poll之間的最大間隔,默認值為5分鐘。如果超過這個間隔會觸發(fā)reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
        //這個參數定義了poll方法最多可以拉取多少條消息,默認值為500。如果在拉取消息的時候新消息不足500條,那有多少返回多少;如果超過500條,每次只返回500。
        //這個默認值在有些場景下太大,有些場景很難保證能夠在5min內處理完500條消息,
        //如果消費者無法在5分鐘內處理完500條消息的話就會觸發(fā)reBalance,
        //然后這批消息會被分配到另一個消費者中,還是會處理不完,這樣這批消息就永遠也處理不完。
        //要避免出現上述問題,提前評估好處理一條消息最長需要多少時間,然后覆蓋默認的max.poll.records參數
        //注:需要開啟BatchListener批量監(jiān)聽才會生效,如果不開啟BatchListener則不會出現reBalance情況
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //當broker多久沒有收到consumer的心跳請求后就觸發(fā)reBalance,默認值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        // 設置反序列化器為 ErrorHandlingDeserializer,防止藥丸信息
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        // 配置 ErrorHandlingDeserializer 的委托反序列化器為 StringDeserializer
        propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //在偵聽器容器中運行的線程數,一般設置為 機器數*分區(qū)數
        factory.setConcurrency(concurrency);
        //消費監(jiān)聽接口監(jiān)聽的主題不存在時,默認會報錯,所以設置為false忽略錯誤
        factory.setMissingTopicsFatal(missingTopicsFatal);
        //自動提交關閉,需要設置手動消息確認
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        //設置為批量監(jiān)聽,需要用List接收(相等于listener.type: batch)
        factory.setBatchListener(true);
        return factory;
    }
}

編寫kafka發(fā)送結果回調處理類

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component("kafkaSendResultHandler")
public class KafkaSendResultHandler implements ProducerListener<String, Object> {
    @Override
    public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
        // 記錄成功發(fā)送的消息信息
        if (recordMetadata != null) {
            log.info("Kafka消息發(fā)送成功 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}",
                    producerRecord.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    producerRecord.key(),
                    producerRecord.value());
        } else {
            log.warn("Kafka消息發(fā)送成功,但RecordMetadata為null - 鍵: {}, 值: {}",
                    producerRecord.key(),
                    producerRecord.value());
        }
    }

    @Override
    public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        // 記錄發(fā)送失敗的消息信息及異常
        if (recordMetadata != null) {
            log.error("Kafka消息發(fā)送失敗 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}, 異常: {}",
                    producerRecord.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    producerRecord.key(),
                    producerRecord.value(),
                    exception.getMessage(), exception);
        } else {
            log.error("Kafka消息發(fā)送失敗 - RecordMetadata為null, 鍵: {}, 值: {}, 異常: {}",
                    producerRecord.key(),
                    producerRecord.value(),
                    exception.getMessage(), exception);
        }
    }
}

編寫kafka消費異常處理類

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component("kafkaConsumerExceptionHandler")
public class KafkaConsumerExceptionHandler implements KafkaListenerErrorHandler {

    /**
     * 處理錯誤,不帶 Consumer 對象
     */
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
        log.error("kafka消費消息時發(fā)生錯誤。消息內容: {}, 錯誤信息: {}", message, e.getMessage(), e);
        // 可以根據需要選擇是否拋出異常
        // 例如:return null; 表示忽略錯誤
        // throw e;    // 拋出異常以觸發(fā)重試機制
        return null;
    }

    /**
     * 處理錯誤,帶 Consumer 對象
     */
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        log.error("kafka消費消息時發(fā)生錯誤。消息內容: {}, 錯誤信息: {}", message, exception.getMessage(), exception);
        // 可以根據需要選擇處理方式,例如手動提交偏移量,或其他操作
        // 這里僅記錄日志并返回 null
        return null;
    }
}

生產者示例

import com.liboshuai.starlink.slr.engine.api.dto.KafkaEventDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Component
public class KafkaEventProvider {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${slr-connector.kafka.provider_topic}")
    private String providerTopic;

    /**
     * 批量上送事件信息到kafka
     */
    public void batchSend(List<KafkaEventDTO> kafkaEventDTOList) {
        if (CollectionUtils.isEmpty(kafkaEventDTOList)) {
            return;
        }
        kafkaEventDTOList.forEach(eventUploadDTO -> kafkaTemplate.send(providerTopic, eventUploadDTO));
    }
}

消費者實例

package com.liboshuai.starlink.slr.connector.dao.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class KafkaEventListener {

    @KafkaListener(
            topics = "${slr-connector.kafka.consumer_topic}",
            groupId = "${spring.kafka.consumer.group-id}",
            containerFactory = "kafkaListenerContainerFactory",
            errorHandler = "kafkaConsumerExceptionHandler"
    )
    public void onAlert(List<ConsumerRecord<String, String>> consumerRecordList, Acknowledgment ack) {
        for (ConsumerRecord<String, String> record : consumerRecordList) {
            // 打印消費的詳細信息
            log.info("Consumed message - Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
        // 手動提交偏移量
        ack.acknowledge();
    }
}

結語

對于生產者、消費者提供的例子較少,這方面網上倒是很多,我就不多寫了。主要是重要的配置部分解決即可。

以上就是SpringBoot整合Kafka完成生產消費的方案的詳細內容,更多關于SpringBoot Kafka生產消費的資料請關注腳本之家其它相關文章!

相關文章

  • JavaWeb實現學生信息管理系統(tǒng)(2)

    JavaWeb實現學生信息管理系統(tǒng)(2)

    這篇文章主要介紹了JavaWeb實現學生信息管理系統(tǒng)的第二篇,實現學生管理系統(tǒng)的查找和添加功能,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Java?CompletableFuture實現多線程異步編排

    Java?CompletableFuture實現多線程異步編排

    這篇文章主要為大家介紹了Java?CompletableFuture實現多線程異步編排,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-09-09
  • RabbitMQ消息丟失解決方案

    RabbitMQ消息丟失解決方案

    把這篇文章主要為大家介紹了如何保證RabbitMQ消息不丟失的解決方發(fā),分從從丟失的三種情況給大家介紹不同的解決方案,感興趣的小伙伴可以參考閱讀本文
    2023-07-07
  • Java集合操作之List接口及其實現方法詳解

    Java集合操作之List接口及其實現方法詳解

    這篇文章主要介紹了Java集合操作之List接口及其實現方法,詳細分析了Java集合操作中List接口原理、功能、用法及操作注意事項,需要的朋友可以參考下
    2015-07-07
  • Java中SSM框架實現增刪改查功能代碼詳解

    Java中SSM框架實現增刪改查功能代碼詳解

    這篇文章主要介紹了Java中SSM框架實現增刪改查功能代碼詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • 解析ConcurrentHashMap: 紅黑樹的代理類(TreeBin)

    解析ConcurrentHashMap: 紅黑樹的代理類(TreeBin)

    ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment的結構和HashMap類似,是一種數組和鏈表結構,今天給大家普及java面試常見問題---ConcurrentHashMap知識,一起看看吧
    2021-06-06
  • SpringBoot中使用@Async注解失效場景及說明

    SpringBoot中使用@Async注解失效場景及說明

    在Spring?Boot中,@Async注解就像一把刀,能幫你輕松處理那些耗時的任務,讓主線程可以繼續(xù)忙別的事兒,不過,跟所有強大的工具一樣,用不好它也可能出岔子,為了避免這些坑,咱們得深入了解下@Async注解,接下來,咱們就來聊聊7種常見的@Async失效情況,需要的朋友可以參考下
    2024-07-07
  • java 字符串池的深入理解

    java 字符串池的深入理解

    這篇文章主要介紹了java 字符串池的深入理解的相關資料,這里提供實例代碼幫助大家學習理解這部分內容,希望大家能夠掌握,需要的朋友可以參考下
    2017-08-08
  • java重試機制使用RPC必須考慮冪等性原理解析

    java重試機制使用RPC必須考慮冪等性原理解析

    這篇文章主要為大家介紹了java重試機制使用RPC必須考慮冪等性原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03
  • Java打成各種壓縮包的方法詳細匯總

    Java打成各種壓縮包的方法詳細匯總

    在工作過程中,需要將一個文件夾生成壓縮文件,然后提供給用戶下載,下面這篇文章主要給大家介紹了關于Java打成各種壓縮包的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-06-06

最新評論