欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot使用kafka的兩種方式

 更新時(shí)間:2023年11月08日 09:02:15   作者:香菜菜  
在公司用kafka比較多,今天整理下Springboot使用kafka的兩種方式,Kafka作為一個(gè)消息發(fā)布訂閱系統(tǒng),就包括消息生成者和消息消費(fèi)者,文中通過代碼示例介紹的非常詳細(xì),具有一定的參考價(jià)值,需要的朋友可以參考下

1、創(chuàng)建實(shí)驗(yàn)項(xiàng)目

第一步創(chuàng)建一個(gè)Springboot項(xiàng)目,引入spring-kafka依賴,這是后面的基礎(chǔ)。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

kafka配置

spring:
  kafka:
    bootstrap-servers: kafka.tyjt.com:9092
    consumer:
      auto-offset-reset: earliest
      group-id: sharingan-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

2、自動(dòng)檔

為了方便使用kafka,Springboot提供了spring-kafka 這個(gè)包,在已開始我們已經(jīng)導(dǎo)入了,下面直接使用吧

Spring項(xiàng)目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的監(jiān)聽器)即可完成生產(chǎn)者-消費(fèi)者的代碼開發(fā)

2.1 監(jiān)聽listener

為了使創(chuàng)建 kafka 監(jiān)聽器更加簡(jiǎn)單,Spring For Kafka 提供了 @KafkaListener 注解,

@KafkaListener 注解配置方法上,凡是此注解的方法就會(huì)被標(biāo)記為是 Kafka 消息監(jiān)聽器,所以可以用

@KafkaListener 注解快速創(chuàng)建消息監(jiān)聽器。

@Configuration
@EnableKafka
public class ConsumerConfigDemo {
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void kafkaListener(String topic,String message){
        System.out.println("消息:"+message);
    }
}

2.2 發(fā)布消息

發(fā)布消息通過kafkaTemplate,kafkaTemplate是spring-kafka 的封裝

@Slf4j
@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessage(String topic, String key, String message) throws Exception {
        kafkaTemplate.send(topic,key,message);
    }
}

kafkaTemplate 有很多不同的發(fā)送方法,根據(jù)自己的需求使用,這里只記錄最簡(jiǎn)單的狀況。

3、手動(dòng)檔

3.1 手動(dòng)創(chuàng)建consumer

關(guān)于consumer的主要的封裝在ConcurrentKafkaListenerContainerFactory這個(gè)里頭,

本身的KafkaConsumer是線程不安全的,無法并發(fā)操作,這里spring又在包裝了一層,根據(jù)配置的spring.kafka.listener.concurrency來生成多個(gè)并發(fā)的KafkaMessageListenerContainer實(shí)例

package com.tyjt.sharingan.kafka;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 啟動(dòng)kafka consumer
 *
 * @author 種鑫
 * @date 2023/10/18 17:26
 */
@EnableKafka
@Component
@Slf4j
public class KafkaConsumerMgr {
    @Resource
    ConcurrentKafkaListenerContainerFactory<String, byte[]> containerFactory;
 
    Map<String, ConcurrentMessageListenerContainer<?, ?>> containerMap = new ConcurrentHashMap<>();
    public void startListener(KafkaProtoConsumer kafkaConsumer) {
        //  停止相同的
        if (containerMap.containsKey(kafkaConsumer.getTopic())) {
            containerMap.get(kafkaConsumer.getTopic()).stop();
        }
        ConcurrentMessageListenerContainer<String, byte[]> container = createListenerContainer(kafkaConsumer);
        container.start();
        containerMap.put(kafkaConsumer.getTopic(), container);
    }
 
    private ConcurrentMessageListenerContainer<String, byte[]> createListenerContainer(KafkaProtoConsumer consumer) {
        ConcurrentMessageListenerContainer<String, byte[]> container = containerFactory.createContainer(consumer.topic());
        container.setBeanName(consumer.group() + "-" + consumer.topic());
        container.setConcurrency(consumer.getPartitionCount());
        consumer.deployContainer(container);
        // 防止被修改的配置
        ContainerProperties containerProperties = container.getContainerProperties();
        containerProperties.setMessageListener(new Listener<>(consumer));
        containerProperties.setAsyncAcks(false);
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setGroupId(consumer.group());
        return container;
    }
 
    /**
     * 定義監(jiān)聽
     */
    private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> {
        private final KafkaConsumer<T> kafkaConsumer;
 
        public Listener(KafkaConsumer<T> consumer) {
            this.kafkaConsumer = consumer;
        }
 
        @Override
        public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            log.info("group【{}】接收到來自topic【{}】的消息", kafkaConsumer.group(), data.topic());
            // 處理數(shù)據(jù)
            kafkaConsumer.process(data.value());
            // 提交offset
            log.info("group【{}】提交topic【{}】的offset", kafkaConsumer.group(), data.topic());
            consumer.commitSync();
        }
    }
}

這個(gè)可以根據(jù)需要?jiǎng)討B(tài)的啟動(dòng)消費(fèi)者

3.2 手動(dòng)創(chuàng)建KafkaProducer

@Bean
    public KafkaProducer<String, byte[]> kafkaProducer() {
 
        Properties props = new Properties();
 
        // 這里可以配置幾臺(tái)broker即可,他會(huì)自動(dòng)從broker去拉取元數(shù)據(jù)進(jìn)行緩存
        props.put("bootstrap.servers", bootstrapServers);
        // 這個(gè)就是負(fù)責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
        props.put("key.serializer", keySerializer);
        // 這個(gè)就是負(fù)責(zé)把你發(fā)送的實(shí)際的message從字符串序列化為字節(jié)數(shù)組
        props.put("value.serializer", valueSerializer);
        // 默認(rèn)是32兆=33554432
        props.put("buffer.memory", bufferMemory);
        // 一般來說是要自己手動(dòng)設(shè)置的,不是純粹依靠默認(rèn)值的,16kb
        props.put("batch.size", batchSize);
        // 發(fā)送一條消息出去,100ms內(nèi)還沒有湊成一個(gè)batch發(fā)送,必須立即發(fā)送出去
        props.put("linger.ms", lingerMs);
        // 這個(gè)是說你可以發(fā)送的最大的請(qǐng)求的大小 默認(rèn)是1m=1048576
//        props.put("max.request.size", 10485760);
        // follower有沒有同步成功你就不管了
        props.put("acks", acks);
        // 這個(gè)重試,一般來說,給個(gè)3次~5次就足夠了,可以cover住一般的異常場(chǎng)景
        props.put("retries", retries);
        // 每次重試間隔100ms
        props.put("retry.backoff.ms", retryBackOffMs);
 
        props.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
 
        return new KafkaProducer<>(props);
    }

4、總結(jié)

4.1 區(qū)別

KafkaProducer是Kafka-client提供的原生Java Kafka客戶端發(fā)送消息的API。

KafkaTemplate是Spring Kafka中提供的一個(gè)高級(jí)工具類,用于可以方便地發(fā)送消息到Kafka。它封裝了KafkaProducer,提供了更多的便利方法和更高級(jí)的消息發(fā)送方式。

org.apache.kafka.clients.producer.KafkaProducer

org.springframework.kafka.core.KafkaTemplate

4.2 場(chǎng)景選擇

在spring應(yīng)用中如果需要訂閱kafka消息,通常情況下我們不會(huì)直接使用kafka-client, 而是使用更方便的一層封裝spring-kafka。

不需要?jiǎng)討B(tài)的選擇時(shí)候可以使用Spring-kafka,在需要?jiǎng)討B(tài)創(chuàng)建時(shí)可以使用kafka-client的api進(jìn)行處理

4.3 ConsumerRecord和ProducerRecord

兩者都是kafka-client的類,在Spring-kafka中依然可以使用,可以發(fā)送和接受

以上就是Springboot使用kafka的兩種方式的詳細(xì)內(nèi)容,更多關(guān)于Springboot使用kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么

    詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么

    這篇文章主要介紹了詳解SpringBoot中添加@ResponseBody注解會(huì)發(fā)生什么,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • 解讀JVM的組成部分有什么

    解讀JVM的組成部分有什么

    JVM主要由類加載器子系統(tǒng)、運(yùn)行時(shí)數(shù)據(jù)區(qū)、執(zhí)行引擎和本地庫(kù)接口等組成,類加載器負(fù)責(zé)加載類文件,運(yùn)行時(shí)數(shù)據(jù)區(qū)管理內(nèi)存,執(zhí)行引擎執(zhí)行字節(jié)碼指令和垃圾回收,本地庫(kù)接口連接其他語(yǔ)言
    2025-03-03
  • 使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象

    使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象

    這篇文章主要為大家詳細(xì)介紹了使用BeanFactory實(shí)現(xiàn)創(chuàng)建對(duì)象,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-08-08
  • Mybatisplus更新某個(gè)字段為null問題

    Mybatisplus更新某個(gè)字段為null問題

    Mybatisplus更新某個(gè)字段為null時(shí),可以使用@TableField(updateStrategy=FieldStrategy.IGNORED)注解,該注解提供了字段映射、忽略非表字段、自動(dòng)填充策略、字段條件處理等功能
    2025-02-02
  • java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié)

    java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié)

    這篇文章主要介紹了java調(diào)用微信現(xiàn)金紅包接口的心得與體會(huì)總結(jié),有需要的朋友可以了解一下。
    2016-11-11
  • Springboot使用maven打包指定mainClass問題

    Springboot使用maven打包指定mainClass問題

    這篇文章主要介紹了Springboot使用maven打包指定mainClass問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • Spring @Valid和@Validated區(qū)別和用法實(shí)例

    Spring @Valid和@Validated區(qū)別和用法實(shí)例

    這篇文章主要介紹了Spring @Valid和@Validated區(qū)別和用法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • JAVA提高第七篇 類加載器解析

    JAVA提高第七篇 類加載器解析

    這篇文章主要為大家詳細(xì)介紹了JAVA提高第七篇類加載器的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • Mybatis-Plus中的MetaObjectHandler組件的使用

    Mybatis-Plus中的MetaObjectHandler組件的使用

    MetaObjectHandler是Mybatis-Plus中一個(gè)實(shí)用組件,專門用于自動(dòng)處理實(shí)體對(duì)象中的特定字段,如創(chuàng)建時(shí)間、更新時(shí)間、創(chuàng)建人和修改人等,該接口允許開發(fā)者在不修改業(yè)務(wù)代碼的情況下,實(shí)現(xiàn)自動(dòng)填充功能,極大地簡(jiǎn)化了代碼的復(fù)雜性,感興趣的可以了解一下
    2024-10-10
  • Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題

    Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題

    本文主要介紹了Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07

最新評(píng)論