springboot使用kafka事務(wù)的示例代碼
先看下下面這種情況,程序都出錯(cuò)了,按理說消息也不應(yīng)該成功
@GetMapping("/send")
public void test9(String message) {
kafkaTemplate.send(topic, message);
throw new RuntimeException("fail");
}但是執(zhí)行結(jié)果是發(fā)生了異常并且消息發(fā)送成功了:

Kafka 同數(shù)據(jù)庫一樣支持事務(wù),當(dāng)發(fā)生異常的時(shí)候可以進(jìn)行回滾,確保消息監(jiān)聽器不會(huì)接收到一些錯(cuò)誤的或者不需要的消息。
kafka事務(wù)屬性是指一系列的生產(chǎn)者生產(chǎn)消息和消費(fèi)者提交偏移量的操作在一個(gè)事務(wù),或者說是是一個(gè)原子操作),同時(shí)成功或者失敗。使用事務(wù)也很簡單,需要先開啟事務(wù)支持,然后再使用。
如何開啟事務(wù)
如果使用默認(rèn)配置只需要在yml添加spring.kafka.producer.transaction-id-prefix配置來開啟事務(wù),之前沒有使用默認(rèn)的配置,自定義的kafkaTemplate,那么需要在ProducerFactory中設(shè)置事務(wù)Id前綴開啟事務(wù)并將KafkaTransactionManager注入到spring中,看下KafkaProducerConfig完整代碼:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
public Map<String,Object> producerConfigs(){
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG,retries);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 配置分區(qū)策略
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");
// 配置生產(chǎn)者攔截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor");
// 配置攔截器消息處理類
SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil();
props.put("interceptorUtil",sendMessageInterceptorUtil);
return props;
}
@Bean
public ProducerFactory<String,String> producerFactory(){
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs());
//設(shè)置事務(wù)Id前綴 開啟事務(wù)
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
} 配置開啟事務(wù)后,使用大體有兩種方式,先記錄下第一種使用事務(wù)方式:使用 executeInTransaction 方法
直接看下代碼:
@GetMapping("/send11")
public void test11(String message) {
kafkaTemplate.executeInTransaction(operations ->{
operations.send(topic,message);
throw new RuntimeException("fail");
});
}當(dāng)然你可以這么寫:
@GetMapping("/send11")
public void test11(String message) {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
@Override
public Object doInOperations(KafkaOperations operations) {
operations.send(topic,message);
throw new RuntimeException("fail");
}
});
}啟動(dòng)項(xiàng)目,訪問http://localhost:8080/send10?message=test10 結(jié)果如下:

如上:消費(fèi)者沒打印消息,說明消息沒發(fā)送成功,并且前面會(huì)報(bào)錯(cuò)org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted 的錯(cuò)誤,說明事務(wù)生效了。
第一種使用事務(wù)方式:使用 @Transactional 注解方式 直接在方法上加上@Transactional注解即可,看下代碼:
@GetMapping("/send12")
@Transactional
public void test12(String message) {
kafkaTemplate.send(topic, message);
throw new RuntimeException("fail");
}如果開啟的事務(wù),則后續(xù)發(fā)送消息必須使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否則拋出異常,異常信息如下:
貼下完整的異常吧:java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
到此這篇關(guān)于springboot使用kafka事務(wù)的文章就介紹到這了,更多相關(guān)springboot kafka事務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- Kafka的安裝及接入SpringBoot的詳細(xì)過程
- springboot使用@KafkaListener監(jiān)聽多個(gè)kafka配置實(shí)現(xiàn)
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
相關(guān)文章
Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟
本文主要介紹了使用Jenkins一鍵打包部署SpringBoot應(yīng)用的方法步驟,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
將Java對(duì)象序列化成JSON和XML格式的實(shí)例
下面小編就為大家分享一篇將Java對(duì)象序列化成JSON和XML格式的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2017-12-12
Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列
廣搜BFS的基本思想是: 首先訪問初始點(diǎn)v并將其標(biāo)志為已經(jīng)訪問。接著通過鄰接關(guān)系將鄰接點(diǎn)入隊(duì)。然后每訪問過一個(gè)頂點(diǎn)則出隊(duì)。按照順序,訪問每一個(gè)頂點(diǎn)的所有未被訪問過的頂點(diǎn)直到所有的頂點(diǎn)均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07

