欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成Kafka 配置工具類的詳細代碼

 更新時間:2022年09月26日 16:35:15   作者:Coder-CT  
spring-kafka 是基于 java版的 kafka client與spring的集成,提供了 KafkaTemplate,封裝了各種方法,方便操作,它封裝了apache的kafka-client,不需要再導入client依賴,這篇文章主要介紹了SpringBoot集成Kafka 配置工具類,需要的朋友可以參考下

spring-kafka 是基于 java版的 kafka client與spring的集成,提供了 KafkaTemplate,封裝了各種方法,方便操作,它封裝了apache的kafka-client,不需要再導入client依賴

<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

YML配置

kafka:
    #bootstrap-servers: server1:9092,server2:9093 #kafka開發(fā)地址,
    #生產(chǎn)者配置
    producer:
      # Kafka提供的序列化和反序列化類
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 1 # 消息發(fā)送重試次數(shù)
      #acks = 0:設置成 表示 producer 完全不理睬 leader broker 端的處理結(jié)果。此時producer 發(fā)送消息后立即開啟下 條消息的發(fā)送,根本不等待 leader broker 端返回結(jié)果
      #acks= all 或者-1 :表示當發(fā)送消息時, leader broker 不僅會將消息寫入本地日志,同時還會等待所有其他副本都成功寫入它們各自的本地日志后,才發(fā)送響應結(jié)果給,消息安全但是吞吐量會比較低。
      #acks = 1:默認的參數(shù)值。 producer 發(fā)送消息后 leader broker 僅將該消息寫入本地日志,然后便發(fā)送響應結(jié)果給producer ,而無須等待其他副本寫入該消息。折中方案,只要leader一直活著消息就不會丟失,同時也保證了吞吐量
      acks: 1 #應答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0、1、all/-1)
      batch-size: 16384 #批量大小
      properties:
        linger:
          ms: 0 #提交延遲
      buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小
    # 消費者配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 分組名稱
      group-id: web
      enable-auto-commit: false
      #提交offset延時(接收到消息后多久提交offset)
      # auto-commit-interval: 1000ms
      #當kafka中沒有初始offset或offset超出范圍時將自動重置offset
      # earliest:重置為分區(qū)中最小的offset;
      # latest:重置為分區(qū)中最新的offset(消費分區(qū)中新產(chǎn)生的數(shù)據(jù));
      # none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;
      auto-offset-reset: latest
      properties:
        #消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
        session.timeout.ms: 15000
        #消費請求超時時間
        request.timeout.ms: 18000
      #批量消費每次最多消費多少條消息
      #每次拉取一條,一條條消費,當然是具體業(yè)務狀況設置
      max-poll-records: 1
      # 指定心跳包發(fā)送頻率,即間隔多長時間發(fā)送一次心跳包,優(yōu)化該值的設置可以減少Rebalance操作,默認時間為3秒;
      heartbeat-interval: 6000
      # 發(fā)出請求時傳遞給服務器的 ID。用于服務器端日志記錄 正常使用后解開注釋,不然只有一個節(jié)點會報錯
      #client-id: mqtt
    listener:
      #消費端監(jiān)聽的topic不存在時,項目啟動會報錯(關掉)
      missing-topics-fatal: false
      #設置消費類型 批量消費 batch,單條消費:single
      type: single
      #指定容器的線程數(shù),提高并發(fā)量
      #concurrency: 3
      #手動提交偏移量 manual達到一定數(shù)據(jù)后批量提交
      #ack-mode: manual
      ack-mode: MANUAL_IMMEDIATE #手動確認消息
        # 認證
    #properties:
      #security:
        #protocol: SASL_PLAINTEXT
      #sasl:
        #mechanism: SCRAM-SHA-256
        #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'

簡單工具類,能滿足正常使用,主題是無法修改的

@Component
@Slf4j
public class KafkaUtils<K, V> {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] servers;

    /**
     * 獲取連接
     * @return
     */
    private Admin getAdmin() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        // 正式環(huán)境需要添加賬號密碼
        return Admin.create(properties);
    }

    /**
     * 增加topic
     *
     * @param name      主題名字
     * @param partition 分區(qū)數(shù)量
     * @param replica   副本數(shù)量
     * @date 2022-06-23 chens
     */
    public R addTopic(String name, Integer partition, Integer replica) {
        Admin admin = getAdmin();
        if (replica > servers.length) {
            return R.error("副本數(shù)量不允許超過Broker數(shù)量");
        }
        try {
            NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString()));
            admin.createTopics(Collections.singleton(topic));
        } finally {
            admin.close();
        }
        return R.ok();
    }

    /**
     * 刪除主題
     *
     * @param names 主題名字集合
     * @date 2022-06-23 chens
     */
    public void deleteTopic(List<String> names) {
        Admin admin = getAdmin();
        try {
            admin.deleteTopics(names);
        } finally {
            admin.close();
        }
    }

    /**
     * 查詢所有主題
     *
     * @date 2022-06-24 chens
     */
    public Set<String> queryTopic() {
        Admin admin = getAdmin();
        try {
            ListTopicsResult topics = admin.listTopics();
            Set<String> set = topics.names().get();
            return set;
        } catch (Exception e) {
            log.error("查詢主題錯誤!");
        } finally {
            admin.close();
        }
        return null;
    }

    // 向所有分區(qū)發(fā)送消息
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        return kafkaTemplate.send(topic, data);
    }
    
    // 指定key發(fā)送消息,相同key保證消息在同一個分區(qū)
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, key, data);
    }

    // 指定分區(qū)和key發(fā)送。
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        return kafkaTemplate.send(topic, partition, key, data);
    }
}

發(fā)送消息 使用異步

@GetMapping("/{topic}")
    public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException {

        ListenableFuture future = null;
        Chenshuang user = new Chenshuang(i, "陳爽", "123456", new Date());
        String s = JSON.toJSONString(user);
        KafkaUtils utils = new KafkaUtils();
        future = kafkaUtils.send(topic, s);
        // 異步回調(diào),同步get,會等待 不推薦同步!
        future.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("發(fā)送失敗");
            }
            @Override
            public void onSuccess(Object result) {
                System.out.println("發(fā)送成功:" + result);
            }
        });
        return "發(fā)送成功";
    }

建立主題

如果broker端配置auto.create.topics.enable為true(默認為true),當收到客戶端的元數(shù)據(jù)請求時則會創(chuàng)建topic。

向一個不存在的主題發(fā)送和消費都會創(chuàng)建一個新的主題,很多時候,非預期的創(chuàng)建主題,會導致很多意想不到的問題,建議關掉該特性。

Topic主題用來區(qū)分不同類型的消息,實際也就是適用于不同的業(yè)務場景,默認消息保存一周時間;

同一個Topic主題下,默認是一個partition分區(qū),也就是只能有一個消費者來消費,如果想提升消費能力,就需要增加分區(qū);

同一個Topic的多個分區(qū),可以有三種方式分派消息(key,value)到不同的分區(qū),指定分區(qū)、HASH路由、默認,同一個分區(qū)內(nèi)的消息ID唯一,并順序;

消費者消費partition分區(qū)內(nèi)的消息時,是通過offsert來標識消息的位置;

GroupId用來解決同一個Topic主題下重復消費問題,比如一條消費需要多個消費者接收到,就可以通過設置不同的GroupId實現(xiàn),

實際消息是存一份的,只是通過邏輯上設置標識來區(qū)分,系統(tǒng)會記錄Topic主題下–》GroupId分組下–》partition分區(qū)下的offsert,來標識是否消費過。

發(fā)送消息的高可用—

集群模式,多副本方式實現(xiàn);一條消息的提交,可能通過設置acks標識實現(xiàn)不同的可用性,=0時,發(fā)送成功就OK;=1時,master成功響應才OK,=all時,一半以上的響應才OK(真正的高可用)

消費消息的高可用—

可以關閉自動標識offsert模式,先拉取消息,消費完成后,再去設置offsert位置,來解決消費高可用

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopic {
    // yml自定義主題,項目啟動就創(chuàng)建,
    @Value("${spring.kafka.topic}")
    String topic;
    @Value("${spring.kafka.bootstrap-servers}")
    String[] server;
    /**
     * 項目啟動 初始化主題,如果存在不會覆蓋主題的
     */
    @Bean
    public NewTopic batchTopic() {
        // 最大復制因子 <= 經(jīng)紀人broker數(shù)量.
        return new NewTopic(topic, 10, (short) server.length);
    }
}

監(jiān)聽類 ,一條消息,各分組內(nèi)的消費者只有一個消費者消費一次,如果消息在1區(qū),指定分區(qū)1監(jiān)聽也會消費
也可以同個方法監(jiān)聽不同的主題,指定位移監(jiān)聽
同組會均勻消費,不同組會重復消費。

1、單播模式,只有一個消費者組

(1)topic只有1個partition,該組內(nèi)有多個消費者時,此時同一個partition內(nèi)的消息只能被該組中的一 個consumer消費。當消費者數(shù)量多于partition數(shù)量時,多余的消費者是處于空閑狀態(tài)的,如圖1所示。topic,test只有一個partition,并且只有1個group,G1,該group內(nèi)有多個consumer,只能被其中一個消費者消費,其他的處于空閑狀態(tài)。

在這里插入圖片描述

圖一

(2)該topic有多個partition,該組內(nèi)有多個消費者,比如test 有3個partition,該組內(nèi)有2個消費者,那么可能就是C0對應消費p0,p1內(nèi)的數(shù)據(jù),c1對應消費p2的數(shù)據(jù);如果有3個消費者,就是一個消費者對應消費一個partition內(nèi)的數(shù)據(jù)了。圖解分別如圖2,圖3.這種模式在集群模式下使用是非常普遍的,比如我們可以起3個服務,對應的topic設置3個partiition,這樣就可以實現(xiàn)并行消費,大大提高處理消息的效率。

在這里插入圖片描述

圖二

在這里插入圖片描述

圖三

2、廣播模式,多個消費者組

如果想實現(xiàn)廣播的模式就需要設置多個消費者組,這樣當一個消費者組消費完這個消息后,絲毫不影響其他組內(nèi)的消費者進行消費,這就是廣播的概念。

(1)多個消費者組,1個partition

該topic內(nèi)的數(shù)據(jù)被多個消費者組同時消費,當某個消費者組有多個消費者時也只能被一個消費者消費,如圖4所示:

在這里插入圖片描述

圖四

(2)多個消費者組,多個partition

該topic內(nèi)的數(shù)據(jù)可被多個消費者組多次消費,在一個消費者組內(nèi),每個消費者又可對應該topic內(nèi)的一個或者多個partition并行消費,如圖五:

在這里插入圖片描述

注意: 消費者的數(shù)量并不能決定一個topic的并行度。它是由分區(qū)的數(shù)目決定的。
再多的消費者,分區(qū)數(shù)少,也是浪費!
一個組的最大并行度將等于該主題的分區(qū)數(shù)。

@Component
@Slf4j
public class Consumer {
    // 監(jiān)聽主題 分組a
    @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a")
    public  void  getMessage(ConsumerRecord message, Acknowledgment ack) {
        //確認收到消息
        ack.acknowledge();
    }
    // 監(jiān)聽主題 分組a
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a")
    public  void getMessage2(ConsumerRecord message, Acknowledgment ack) {
        //確認收到消息
        ack.acknowledge();
    }
    // 監(jiān)聽主題 分組b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage3(ConsumerRecord message, Acknowledgment ack) {
        //確認收到消息//確認收到消息
        ack.acknowledge();
    }
    // 監(jiān)聽主題 分組b
    @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b")
    public  void getMessage4(ConsumerRecord message, Acknowledgment ack) {
        //確認收到消息//確認收到消息
        ack.acknowledge();
    }

    // 指定監(jiān)聽分區(qū)1的消息
    @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})})
    public void getMessage5(ConsumerRecord message, Acknowledgment ack) {
        Long id = JSONObject.parseObject(message.value().toString()).getLong("id");
        //確認收到消息//確認收到消息
        ack.acknowledge();
    }
    
    /**
     * @Title 指定topic、partition、offset消費
     * @Description 同時監(jiān)聽topic1和topic2,監(jiān)聽topic1的0號分區(qū)、topic2的 "0號和1號" 分區(qū),指向1號分區(qū)的offset初始值為8
     * 注意:topics和topicPartitions不能同時使用;
     **/
    @KafkaListener(id = "c1",groupId = "c",topicPartitions = {
            @TopicPartition(topic = "t1", partitions = { "0" }),
            @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))})
    public void getMessage6(ConsumerRecord record,Acknowledgment ack) {
        //確認收到消息
        ack.acknowledge();
    }
    
    /**        
     * 批量消費監(jiān)聽goods變更消息
     * yml配置listener:type 要改為batch
     * ymk配置consumer:max-poll-records: ??(每次拉取多少條數(shù)據(jù)消費)
     * concurrency = "2" 啟動多少線程執(zhí)行,應小于等于broker數(shù)量,避免資源浪費
     */
    @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4")
    public void getMessage7(List<ConsumerRecord<String, String>> records){
        for (ConsumerRecord<String, String> msg:records) {
            GoodsChangeMsg changeMsg = null;
            try {
                changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
                syncGoodsProcessor.handle(changeMsg);
            }catch (Exception exception) {
                log.error("解析失敗{}", msg, exception);
            }
        }
    }
}

到此這篇關于SpringBoot集成Kafka 配置工具類的文章就介紹到這了,更多相關SpringBoot集成Kafka 配置工具類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java 中復合機制的實例詳解

    java 中復合機制的實例詳解

    這篇文章主要介紹了java 中復合機制的實例詳解的相關資料,希望通過本文大家能了解繼承和復合的區(qū)別并應用復合這種機制,需要的朋友可以參考下
    2017-09-09
  • 詳解Java中的println輸入和toString方法的重寫問題

    詳解Java中的println輸入和toString方法的重寫問題

    這篇文章主要介紹了Java中的println輸入和toString方法的重寫,一個對象數(shù)組在調(diào)用Arrays.toString打印時,相當于遍歷數(shù)組,然后打印里邊每個對象,這再打印對象就調(diào)用對象自己的toString了,需要的朋友可以參考下
    2022-04-04
  • Springboot中yml文件沒有葉子圖標的解決

    Springboot中yml文件沒有葉子圖標的解決

    這篇文章主要介紹了Springboot中yml文件沒有葉子圖標的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Java中Lambda表達式和函數(shù)式接口的使用和特性

    Java中Lambda表達式和函數(shù)式接口的使用和特性

    Java Lambda表達式是一種函數(shù)式編程的特性,可簡化匿名內(nèi)部類的寫法,與函數(shù)式接口搭配使用,實現(xiàn)代碼簡潔、可讀性高、易于維護的特點,適用于集合操作、多線程編程等場景
    2023-04-04
  • dubbo新手學習之事件通知實踐教程

    dubbo新手學習之事件通知實踐教程

    這篇文章主要給大家介紹了關于dubbo新手學習之事件通知實踐的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09
  • Java 中限制方法的返回時間最新方法

    Java 中限制方法的返回時間最新方法

    最近在研究 ChatGPT 的 API 調(diào)用,因為 ChatGPT 的 API 調(diào)用時間通常超過 30 秒,所以我們希望在程序中限制這個方法的執(zhí)行時間,不要讓方法花太長時間去執(zhí)行了,今天通過本文給大家分享Java 中如何限制方法的返回時間,感興趣的朋友跟隨小編一起看看吧
    2023-05-05
  • java使用UDP實現(xiàn)點對點通信

    java使用UDP實現(xiàn)點對點通信

    這篇文章主要為大家詳細介紹了java使用UDP實現(xiàn)點對點通信,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Java多線程中的Future類詳細解讀

    Java多線程中的Future類詳細解讀

    這篇文章主要介紹了Java多線程中的Future類詳細解讀,Future表示一個可能還沒有完成的異步任務的結(jié)果,針對這個結(jié)果可以添加Callback以便在任務執(zhí)行成功或失敗后作出相應的操作,需要的朋友可以參考下
    2023-11-11
  • SpringBoot預防XSS攻擊的實現(xiàn)

    SpringBoot預防XSS攻擊的實現(xiàn)

    XSS攻擊是一種在web應用中的計算機安全漏洞,它允許惡意web用戶將代碼植入到提供給其它用戶使用的頁面,本文主要介紹了SpringBoot預防XSS攻擊的實現(xiàn),感興趣的可以了解一下
    2023-08-08
  • 一文帶你掌握Java8中函數(shù)式接口的使用和自定義

    一文帶你掌握Java8中函數(shù)式接口的使用和自定義

    函數(shù)式接口是?Java?8?引入的一種接口,用于支持函數(shù)式編程,下面我們就來深入探討函數(shù)式接口的概念、用途以及如何創(chuàng)建和使用函數(shù)式接口吧
    2023-08-08

最新評論