java發(fā)送kafka事務(wù)消息的實現(xiàn)方法
前言
事務(wù)對java開發(fā)的同學來說并不陌生,我們使用事務(wù)的目的在于避免產(chǎn)生重復(fù)數(shù)據(jù)或者說利用數(shù)據(jù)存儲中間件的事務(wù)特性確保數(shù)據(jù)的精準性,比如大家熟悉的mysql,我們在程序開始時,只需要在程序中添加上事務(wù)注解即可
kafka客戶端事務(wù),直接使用客戶端提供的相關(guān)的API即可,和jdbc事務(wù)的使用很類似,主要包含下面5個API
// 1 初始化事務(wù) void initTransactions(); // 2 開啟事務(wù) void beginTransaction() throws ProducerFencedException; // 3 在事務(wù)內(nèi)提交已經(jīng)消費的偏移量(主要用于消費者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事務(wù) void commitTransaction() throws ProducerFencedException; // 5 放棄事務(wù)(類似于回滾事務(wù)的操作) void abortTransaction() throws ProducerFencedException;
下面結(jié)合實際的代碼以及效果演示進行說明
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerTransaction { public static void main(String[] args) throws Exception { // 1. 創(chuàng)建 kafka 生產(chǎn)者的配置對象 Properties properties = new Properties(); // 2. 給 kafka 配置對象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 設(shè)置事務(wù) id(必須),事務(wù) id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 創(chuàng)建 kafka 生產(chǎn)者對象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事務(wù) kafkaProducer.initTransactions(); // 開啟事務(wù) kafkaProducer.beginTransaction(); System.out.println("開始發(fā)送消息"); try { // 4. 調(diào)用 send 方法,發(fā)送消息 for (int i = 0; i < 5; i++) { // 發(fā)送消息 kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i)); } //int i = 1 / 0; // 提交事務(wù) kafkaProducer.commitTransaction(); } catch (Exception e) { System.out.println(e); // 終止事務(wù) kafkaProducer.abortTransaction(); } finally { // 5. 關(guān)閉資源 kafkaProducer.close(); } } }
運行上面的代碼,正常是可以發(fā)送到指定的topic下
接下來,我們將上面的代碼中的 1/0 放開,再次運行程序,可以看到,程序中拋異常了,但是消息并沒有發(fā)送到kafka的broker,說明事務(wù)的配置生效了
到此這篇關(guān)于java發(fā)送kafka事務(wù)消息的實現(xiàn)方法的文章就介紹到這了,更多相關(guān)java發(fā)送kafka事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring boot搭建web應(yīng)用集成thymeleaf模板實現(xiàn)登陸
這篇文章主要介紹了Spring boot搭建web應(yīng)用集成thymeleaf模板實現(xiàn)登陸,頁面使用bootstrap,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12Java版超大整數(shù)階乘算法代碼詳解-10,0000級
這篇文章主要介紹了Java版超大整數(shù)階乘算法代碼詳解-10,0000級,具有一定借鑒價值,需要的朋友可以參考下2018-01-01Spring中的@EnableScheduling定時任務(wù)注解
這篇文章主要介紹了Spring中的@EnableScheduling注解,@EnableScheduling是 Spring Framework 提供的一個注解,用于啟用 Spring 的定時任務(wù)功能,通過使用這個注解,可以在 Spring 應(yīng)用程序中創(chuàng)建定時任務(wù),需要的朋友可以參考下2024-01-01詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題
本文主要介紹了詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04idea?springBoot項目自動注入mapper為空報錯的解決方法
這篇文章主要介紹了idea?springBoot項目自動注入mapper為空報錯的解決方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03