欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springKafka生產(chǎn)者、消費(fèi)者,數(shù)據(jù)發(fā)送/接收方式

 更新時(shí)間:2025年09月19日 08:50:42   作者:小風(fēng)010766  
這篇文章主要介紹了springKafka生產(chǎn)者、消費(fèi)者,數(shù)據(jù)發(fā)送/接收方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

配置文件

# 指定kafka server的地址,集群配多個(gè),中間,逗號隔開
spring.kafka.bootstrap-servers=
# 配置kafka 授權(quán)認(rèn)證 --開始
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxxx";
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxx";
# 配置kafka 授權(quán)認(rèn)證 --結(jié)束
#重試次數(shù)
spring.kafka.producer.retries=3
# 重試間隔時(shí)間 (毫秒)
spring.kafka.producer.retry.backoff.ms=10000
#批量發(fā)送的消息數(shù)量
spring.kafka.producer.batch-size=1000
#32MB的批處理緩沖區(qū)
spring.kafka.producer.buffer-memory=335544320

#默認(rèn)消費(fèi)者組
spring.kafka.consumer.group-id=isms-group
#最早未被消費(fèi)的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取數(shù)據(jù)量
spring.kafka.consumer.max-poll-records=500
#自動(dòng)提交時(shí)間間隔,單位ms
spring.kafka.consumer.auto-commit-interval=1000
#批消費(fèi)并發(fā)量,小于或等于Topic的分區(qū)數(shù)
spring.kafka.consumer.batch.concurrency=2
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.retries}")
    private Integer retries;
    @Value("${spring.kafka.producer.retry.backoff.ms}")
    private Integer retryBackoff;

    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.batch.concurrency}")
    private Integer batchConcurrency;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;
    @Value("${spring.kafka.properties.security.protocol}")
    private String securityProtocol;
    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String saslMechanism;
    @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
    private String producerSaslJaasConfig;
    @Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
    private String consumerSaslJaasConfig;

    /**
     *  生產(chǎn)者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 900000);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 消息冪等開關(guān),事務(wù)開啟必須打開消息冪等,但是冪等可以單獨(dú)使用
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, producerSaslJaasConfig);
        return props;
    }

    /**
     *  生產(chǎn)者工廠
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }


    /**
     *  生產(chǎn)者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return  new KafkaTemplate<>(producerFactory());
    }


    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(kafkaTemplate().getProducerFactory().getConfigurationProperties());
    }

    /**
     *  消費(fèi)者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 300000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 設(shè)置自動(dòng)提交改成false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, consumerSaslJaasConfig);
        return props;
    }

    /**
     *  消費(fèi)者批量工廠
     */
/*    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
        factory.setConcurrency(batchConcurrency);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);

        return factory;
    }*/

    /**
     * 單個(gè)消費(fèi)者
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> singleConsumerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public KafkaConsumer<String,String> customKafkaConsumer(){
        return new KafkaConsumer<String, String>(consumerConfigs());
    }
}

生產(chǎn)者

@Component
public class SendMessageToKafka {

    private static final Logger log = LoggerFactory.getLogger(SendMessageToKafka.class);

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void sendToKafka(String message,String topicName){
        try {
            String topicDesc = TopicNameEnum.getDescByTopicName(topicName);
            SendResult<String, String> sendResult = kafkaTemplate.send(topicName, null, message).get(60, TimeUnit.SECONDS);
            if(sendResult.getRecordMetadata() != null){
                log.info("topic對象【{}】發(fā)送kafka成功,:{},message sent,topic:{}, partition={}, offset={}",topicDesc,sendResult.getProducerRecord().topic(), sendResult.getRecordMetadata().partition(),
                        sendResult.getRecordMetadata().offset());
            }else {
                log.warn("topic對象【{}】消息發(fā)送kafka失敗,topic:{},進(jìn)行重試。。",topicDesc, topicName);
                // 當(dāng)kafka發(fā)送失敗后,進(jìn)行重試3次。
                FunctionResponse<Boolean> response = FunctionUtil.retryExecute(() -> {
                    try {
                        SendResult<String, String> kafkaResult = kafkaTemplate.send(topicName, null, message).get(60, TimeUnit.SECONDS);
                        if(kafkaResult.getRecordMetadata() != null){
                            return new FunctionResponse<>(true);
                        }else {
                            return new FunctionResponse<>(false);
                        }
                    } catch (Exception e) {
                        return new FunctionResponse<>(false);
                    }
                }, 3, 2000);

                if(!response.getResult()) {
                    log.error("topic對象【{}】 message sent,topic:{},最終重試發(fā)送失敗:{}", topicDesc,topicName, message);
                }
            }
        } catch (Exception e) {
            log.error("topic【{}】寫入kafka失敗,{}",topicName,e);
        }
    }

}

消費(fèi)者

@Component
public class CoreCommandListener {

    private static final Logger log = LoggerFactory.getLogger(CoreCommandListener.class);


    @KafkaListener(topics = {"topic名稱"},containerFactory="singleConsumerFactory")
    private void commandConsumer(ConsumerRecord<Object,String> consumerRecord, Acknowledgment ack){
        DcpMessage message = JSONObject.parseObject(consumerRecord.value(),DcpMessage.class);
        log.info("offset:{},partition:{},消費(fèi)到指令數(shù)據(jù):{}",consumerRecord.offset(),consumerRecord.partition(),message);

        //手動(dòng)提交
        ack.acknowledge();
    }
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論