欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java發(fā)送kafka事務(wù)消息的實現(xiàn)方法

 更新時間:2022年07月15日 09:52:58   作者:逆風飛翔的小叔  
本文主要介紹了java發(fā)送kafka事務(wù)消息的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

前言

事務(wù)對java開發(fā)的同學來說并不陌生,我們使用事務(wù)的目的在于避免產(chǎn)生重復(fù)數(shù)據(jù)或者說利用數(shù)據(jù)存儲中間件的事務(wù)特性確保數(shù)據(jù)的精準性,比如大家熟悉的mysql,我們在程序開始時,只需要在程序中添加上事務(wù)注解即可

kafka客戶端事務(wù),直接使用客戶端提供的相關(guān)的API即可,和jdbc事務(wù)的使用很類似,主要包含下面5個API

// 1 初始化事務(wù)
void initTransactions();


// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;


// 3 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量(主要用于消費者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;


// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;


// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;

下面結(jié)合實際的代碼以及效果演示進行說明

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class ProducerTransaction {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 創(chuàng)建 kafka 生產(chǎn)者的配置對象
        Properties properties = new Properties();
        // 2. 給 kafka 配置對象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        // 設(shè)置事務(wù) id(必須),事務(wù) id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
 
        // 3. 創(chuàng)建 kafka 生產(chǎn)者對象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
 
        // 初始化事務(wù)
        kafkaProducer.initTransactions();
        // 開啟事務(wù)
        kafkaProducer.beginTransaction();
        System.out.println("開始發(fā)送消息");
        try {
            // 4. 調(diào)用 send 方法,發(fā)送消息
            for (int i = 0; i < 5; i++) {
                // 發(fā)送消息
                kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
            }
            //int i = 1 / 0;
            // 提交事務(wù)
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(e);
            // 終止事務(wù)
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 關(guān)閉資源
            kafkaProducer.close();
        }
    }
 
}

運行上面的代碼,正常是可以發(fā)送到指定的topic下

接下來,我們將上面的代碼中的 1/0 放開,再次運行程序,可以看到,程序中拋異常了,但是消息并沒有發(fā)送到kafka的broker,說明事務(wù)的配置生效了

 到此這篇關(guān)于java發(fā)送kafka事務(wù)消息的實現(xiàn)方法的文章就介紹到這了,更多相關(guān)java發(fā)送kafka事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring boot搭建web應(yīng)用集成thymeleaf模板實現(xiàn)登陸

    Spring boot搭建web應(yīng)用集成thymeleaf模板實現(xiàn)登陸

    這篇文章主要介紹了Spring boot搭建web應(yīng)用集成thymeleaf模板實現(xiàn)登陸,頁面使用bootstrap,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • Java 高并發(fā)一:前言

    Java 高并發(fā)一:前言

    本系列基于煉數(shù)成金課程,為了更好的學習,做了系列的記錄。 本文主要介紹 1.高并發(fā)的概念,為以后系列知識做鋪墊。 2.兩個重要的定理
    2016-09-09
  • Java如何基于反射機制獲取不同的類

    Java如何基于反射機制獲取不同的類

    這篇文章主要介紹了Java如何基于反射機制獲取不同的類,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-08-08
  • Java語法關(guān)于泛型與類型擦除的分析

    Java語法關(guān)于泛型與類型擦除的分析

    泛型沒有其看起來那么深不可測,它并不神秘與神奇,泛型是Java 中一個很小巧的概念,但同時也是一個很容易讓人迷惑的知識點,它讓人迷惑的地方在于它的許多表現(xiàn)有點違反直覺
    2021-09-09
  • Java版超大整數(shù)階乘算法代碼詳解-10,0000級

    Java版超大整數(shù)階乘算法代碼詳解-10,0000級

    這篇文章主要介紹了Java版超大整數(shù)階乘算法代碼詳解-10,0000級,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • Spring中的@EnableScheduling定時任務(wù)注解

    Spring中的@EnableScheduling定時任務(wù)注解

    這篇文章主要介紹了Spring中的@EnableScheduling注解,@EnableScheduling是 Spring Framework 提供的一個注解,用于啟用 Spring 的定時任務(wù)功能,通過使用這個注解,可以在 Spring 應(yīng)用程序中創(chuàng)建定時任務(wù),需要的朋友可以參考下
    2024-01-01
  • Java中為什么this可以調(diào)用當前實例

    Java中為什么this可以調(diào)用當前實例

    本文主要介紹了為什么可以通過this關(guān)鍵字訪問到當前對象呢,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • 解決BeanUtils.copyProperties之大坑

    解決BeanUtils.copyProperties之大坑

    這篇文章主要介紹了解決BeanUtils.copyProperties之大坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題

    詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題

    本文主要介紹了詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-04-04
  • idea?springBoot項目自動注入mapper為空報錯的解決方法

    idea?springBoot項目自動注入mapper為空報錯的解決方法

    這篇文章主要介紹了idea?springBoot項目自動注入mapper為空報錯的解決方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-03-03

最新評論