欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案

 更新時(shí)間:2024年12月25日 08:31:00   作者:李博帥  
網(wǎng)上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應(yīng)用到生產(chǎn)環(huán)境,很多配置都是缺少,或者不正確的,只能當(dāng)個(gè)demo,所以本文給大家介紹了SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案,需要的朋友可以參考下

前言

網(wǎng)上找了很多管理kafka整合springboot的教程,但是很多都沒辦法應(yīng)用到生產(chǎn)環(huán)境。很多配置都是缺少,或者不正確的,只能當(dāng)個(gè)demo。遂自行查找資料、驗(yàn)證得到了一套可以完美應(yīng)對(duì)生產(chǎn)環(huán)境的kafka整合springboot的方案。

基礎(chǔ)配置

引入依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

編寫kafka相關(guān)配置

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
      # transaction-id-prefix: starlinkRisk-
      retries: 3
      acks: 1
      batch-size: 32768
      buffer-memory: 33554432
    consumer:
      bootstrap-servers: localhost:9096,localhost:9097,localhost:9098
      group-id: slr_connector
      auto-commit-interval: 2000
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 3
    properties:
      max:
        poll:
          interval:
            ms: 600000
      session:
        timeout:
          ms: 10000
    listener:
      concurrency: 9
      missing-topics-fatal: false
      poll-timeout: 600000

編寫kafka生產(chǎn)者配置類

import com.liboshuai.starlink.slr.connector.common.handler.KafkaSendResultHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * Kafka生產(chǎn)者配置
 */
@Slf4j
@Configuration
public class KafkaProviderConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
//    @Value("${spring.kafka.producer.transaction-id-prefix}")
//    private String transactionIdPrefix;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Resource
    private KafkaSendResultHandler kafkaSendResultHandler;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //acks=0 : 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。
        //acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器成功響應(yīng)。
        //acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。
        //開啟事務(wù)必須設(shè)為all
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        //發(fā)生錯(cuò)誤后,消息重發(fā)的次數(shù),開啟事務(wù)必須大于0
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //當(dāng)多個(gè)消息發(fā)送到相同分區(qū)時(shí),生產(chǎn)者會(huì)將消息打包到一起,以減少請(qǐng)求交互. 而不是一條條發(fā)送
        //批次的大小可以通過batch.size 參數(shù)設(shè)置.默認(rèn)是16KB
        //較小的批次大小有可能降低吞吐量(批次大小為0則完全禁用批處理)。
        //比如說,kafka里的消息5秒鐘Batch才湊滿了16KB,才能發(fā)送出去。那這些消息的延遲就是5秒鐘
        //實(shí)測(cè)batchSize這個(gè)參數(shù)沒有用
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //有的時(shí)刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時(shí)就很大,所以需要一個(gè)參數(shù). 再設(shè)置一個(gè)時(shí)間,到了這個(gè)時(shí)間,
        //即使數(shù)據(jù)沒達(dá)到16KB,也將這個(gè)批次發(fā)送出去
        props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
        //生產(chǎn)者內(nèi)存緩沖區(qū)的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //反序列化,和生產(chǎn)者的序列化方式對(duì)應(yīng)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        //開啟事務(wù),會(huì)導(dǎo)致 LINGER_MS_CONFIG 配置失效
//        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

//    @Bean
//    public KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
//        return new KafkaTransactionManager<>(producerFactory);
//    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setProducerListener(kafkaSendResultHandler);
        return kafkaTemplate;
    }
}

編寫kafka消費(fèi)者配置類

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消費(fèi)者配置
 */
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.properties.session.timeout.ms}")
    private String sessionTimeout;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String maxPollIntervalTime;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private boolean missingTopicsFatal;
    @Value("${spring.kafka.listener.poll-timeout}")
    private long pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //是否自動(dòng)提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自動(dòng)提交的時(shí)間間隔,自動(dòng)提交開啟時(shí)生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
        //earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),從頭開始消費(fèi)分區(qū)的記錄
        //latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
        //none:當(dāng)各分區(qū)都存在已提交的offset時(shí),從提交的offset開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //兩次poll之間的最大間隔,默認(rèn)值為5分鐘。如果超過這個(gè)間隔會(huì)觸發(fā)reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
        //這個(gè)參數(shù)定義了poll方法最多可以拉取多少條消息,默認(rèn)值為500。如果在拉取消息的時(shí)候新消息不足500條,那有多少返回多少;如果超過500條,每次只返回500。
        //這個(gè)默認(rèn)值在有些場(chǎng)景下太大,有些場(chǎng)景很難保證能夠在5min內(nèi)處理完500條消息,
        //如果消費(fèi)者無法在5分鐘內(nèi)處理完500條消息的話就會(huì)觸發(fā)reBalance,
        //然后這批消息會(huì)被分配到另一個(gè)消費(fèi)者中,還是會(huì)處理不完,這樣這批消息就永遠(yuǎn)也處理不完。
        //要避免出現(xiàn)上述問題,提前評(píng)估好處理一條消息最長(zhǎng)需要多少時(shí)間,然后覆蓋默認(rèn)的max.poll.records參數(shù)
        //注:需要開啟BatchListener批量監(jiān)聽才會(huì)生效,如果不開啟BatchListener則不會(huì)出現(xiàn)reBalance情況
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //當(dāng)broker多久沒有收到consumer的心跳請(qǐng)求后就觸發(fā)reBalance,默認(rèn)值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        // 設(shè)置反序列化器為 ErrorHandlingDeserializer,防止藥丸信息
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        // 配置 ErrorHandlingDeserializer 的委托反序列化器為 StringDeserializer
        propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //在偵聽器容器中運(yùn)行的線程數(shù),一般設(shè)置為 機(jī)器數(shù)*分區(qū)數(shù)
        factory.setConcurrency(concurrency);
        //消費(fèi)監(jiān)聽接口監(jiān)聽的主題不存在時(shí),默認(rèn)會(huì)報(bào)錯(cuò),所以設(shè)置為false忽略錯(cuò)誤
        factory.setMissingTopicsFatal(missingTopicsFatal);
        //自動(dòng)提交關(guān)閉,需要設(shè)置手動(dòng)消息確認(rèn)
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        //設(shè)置為批量監(jiān)聽,需要用List接收(相等于listener.type: batch)
        factory.setBatchListener(true);
        return factory;
    }
}

編寫kafka發(fā)送結(jié)果回調(diào)處理類

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component("kafkaSendResultHandler")
public class KafkaSendResultHandler implements ProducerListener<String, Object> {
    @Override
    public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
        // 記錄成功發(fā)送的消息信息
        if (recordMetadata != null) {
            log.info("Kafka消息發(fā)送成功 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}",
                    producerRecord.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    producerRecord.key(),
                    producerRecord.value());
        } else {
            log.warn("Kafka消息發(fā)送成功,但RecordMetadata為null - 鍵: {}, 值: {}",
                    producerRecord.key(),
                    producerRecord.value());
        }
    }

    @Override
    public void onError(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        // 記錄發(fā)送失敗的消息信息及異常
        if (recordMetadata != null) {
            log.error("Kafka消息發(fā)送失敗 - 主題: {}, 分區(qū): {}, 偏移量: {}, 鍵: {}, 值: {}, 異常: {}",
                    producerRecord.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    producerRecord.key(),
                    producerRecord.value(),
                    exception.getMessage(), exception);
        } else {
            log.error("Kafka消息發(fā)送失敗 - RecordMetadata為null, 鍵: {}, 值: {}, 異常: {}",
                    producerRecord.key(),
                    producerRecord.value(),
                    exception.getMessage(), exception);
        }
    }
}

編寫kafka消費(fèi)異常處理類

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component("kafkaConsumerExceptionHandler")
public class KafkaConsumerExceptionHandler implements KafkaListenerErrorHandler {

    /**
     * 處理錯(cuò)誤,不帶 Consumer 對(duì)象
     */
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
        log.error("kafka消費(fèi)消息時(shí)發(fā)生錯(cuò)誤。消息內(nèi)容: {}, 錯(cuò)誤信息: {}", message, e.getMessage(), e);
        // 可以根據(jù)需要選擇是否拋出異常
        // 例如:return null; 表示忽略錯(cuò)誤
        // throw e;    // 拋出異常以觸發(fā)重試機(jī)制
        return null;
    }

    /**
     * 處理錯(cuò)誤,帶 Consumer 對(duì)象
     */
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        log.error("kafka消費(fèi)消息時(shí)發(fā)生錯(cuò)誤。消息內(nèi)容: {}, 錯(cuò)誤信息: {}", message, exception.getMessage(), exception);
        // 可以根據(jù)需要選擇處理方式,例如手動(dòng)提交偏移量,或其他操作
        // 這里僅記錄日志并返回 null
        return null;
    }
}

生產(chǎn)者示例

import com.liboshuai.starlink.slr.engine.api.dto.KafkaEventDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Component
public class KafkaEventProvider {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${slr-connector.kafka.provider_topic}")
    private String providerTopic;

    /**
     * 批量上送事件信息到kafka
     */
    public void batchSend(List<KafkaEventDTO> kafkaEventDTOList) {
        if (CollectionUtils.isEmpty(kafkaEventDTOList)) {
            return;
        }
        kafkaEventDTOList.forEach(eventUploadDTO -> kafkaTemplate.send(providerTopic, eventUploadDTO));
    }
}

消費(fèi)者實(shí)例

package com.liboshuai.starlink.slr.connector.dao.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class KafkaEventListener {

    @KafkaListener(
            topics = "${slr-connector.kafka.consumer_topic}",
            groupId = "${spring.kafka.consumer.group-id}",
            containerFactory = "kafkaListenerContainerFactory",
            errorHandler = "kafkaConsumerExceptionHandler"
    )
    public void onAlert(List<ConsumerRecord<String, String>> consumerRecordList, Acknowledgment ack) {
        for (ConsumerRecord<String, String> record : consumerRecordList) {
            // 打印消費(fèi)的詳細(xì)信息
            log.info("Consumed message - Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
        // 手動(dòng)提交偏移量
        ack.acknowledge();
    }
}

結(jié)語(yǔ)

對(duì)于生產(chǎn)者、消費(fèi)者提供的例子較少,這方面網(wǎng)上倒是很多,我就不多寫了。主要是重要的配置部分解決即可。

以上就是SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Kafka生產(chǎn)消費(fèi)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論