Spring?Boot?基于?SCRAM?認證集成?Kafka?的過程詳解
一、說明
在現(xiàn)代微服務(wù)架構(gòu)中,Kafka
作為消息中間件被廣泛使用,而安全性則是其中的一個關(guān)鍵因素。在本篇文章中,我們將探討如何在 Spring Boot
應(yīng)用中集成 Kafka
并使用 SCRAM
認證機制進行安全連接;并實現(xiàn)動態(tài)創(chuàng)建賬號、ACL 權(quán)限、Topic,以及生產(chǎn)者和消費者等操作。
需要準備一個配置了 SCRAM 認證的 Kafka 環(huán)境,可參考《基于 SASL/SCRAM 讓 Kafka 實現(xiàn)動態(tài)授權(quán)認證》 進行部署。
二、添加依賴
在 Spring Boot
項目的 pom.xml
中添加 spring-kafka
依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
三、配置 Kafka
在 application.yml
中配置 Kafka 的相關(guān)屬性,包括服務(wù)器地址、認證信息等。
spring: kafka: bootstrap-servers: localhost:9092 properties: security.protocol: SASL_PLAINTEXT sasl.mechanism: SCRAM-SHA-256 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="your_username" password="your_password"; consumer: group-id: test-consumer-group auto-offset-reset: earliest properties: sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test"; producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers
Kafka 的集群地址security.protocol
通訊協(xié)議指定啟用SASLsasl.mechanism
指定 SASL 使用的具體身份驗證機制sasl.jaas.config
指定認證模塊的處理類以及 用戶名 和 密碼auto-offset-reset
指定偏移量的邏輯,earliest 代表新加入的消費者都是從頭開始消費
四、動態(tài)管理資源
4.1. 創(chuàng)建 KafkaAdminClient
KafkaAdminClient
用于管理 Kafka 資源(用戶、ACL、主題等)。以下是示例代碼:
@Configuration public class KafkaConfig { @Bean public KafkaAdminClient kafkaAdminClient(KafkaAdmin kafkaAdmin) { return (KafkaAdminClient) KafkaAdminClient.create(kafkaAdmin.getConfigurationProperties()); } }
4.2. 動態(tài)創(chuàng)建用戶和設(shè)置權(quán)限
使用 Kafka AdminClient API
實現(xiàn)動態(tài)創(chuàng)建用戶和設(shè)置 ACL 權(quán)限:
/** * 創(chuàng)建用戶 */ public void createUser(String userName, String password) throws ExecutionException, InterruptedException { // 構(gòu)造Scram認證機制信息 ScramCredentialInfo info = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192); //用戶信息 UserScramCredentialAlteration userScramCredentialAdd = new UserScramCredentialUpsertion(userName, info, password); AlterUserScramCredentialsResult result = kafkaAdminClient.alterUserScramCredentials(List.of(userScramCredentialAdd)); result.all().get(); } /** * 配置用戶只讀權(quán)限 */ public void createAcl(String account, String topicName, String consumerGroup) { AclBinding aclBindingTopic = genAclBinding(account, ResourceType.TOPIC, topicName, AclOperation.READ); AclBinding aclBindingGroup = genAclBinding(account, ResourceType.GROUP, consumerGroup, AclOperation.READ); kafkaAdminClient.createAcls(List.of(aclBindingTopic, aclBindingGroup)); }
4.3. 動態(tài)創(chuàng)建主題
public void createTopic(String topicName, int partitions, short replicationFactor) throws ExecutionException, InterruptedException { NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); CreateTopicsResult result = kafkaAdminClient.createTopics(List.of(newTopic)); result.all().get(); }
五、生產(chǎn)者和消費者配置
5.1. 生產(chǎn)者配置
配置 Kafka 生產(chǎn)者,用于發(fā)送消息:
@Service public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { kafkaTemplate.send("test", message); } }
5.2. 消費者配置
使用 @KafkaListener
注解實現(xiàn)消費消息方法:
@Service public class KafkaConsumer { @KafkaListener(topics = "test", groupId = "test-consumer-group") public void consume(String message) { System.out.println("Received message: " + message); } }
六、總結(jié)
通過以上步驟,我們成功地在 Spring Boot 應(yīng)用中集成了 Kafka,并使用 SCRAM 認證機制進行安全連接;確保在生產(chǎn)環(huán)境中妥善管理用戶憑證,并根據(jù)需要調(diào)整 Kafka 的安全配置。
到此這篇關(guān)于Spring Boot 基于 SCRAM 認證集成 Kafka 的詳解的文章就介紹到這了,更多相關(guān)Spring Boot SCRAM 認證集成 Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot整合Kafka教程詳解
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對應(yīng)問題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡單實現(xiàn)
- SpringBoot集成Kafka的步驟
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- Springboot集成Kafka實現(xiàn)producer和consumer的示例代碼
相關(guān)文章
IntelliJ IDEA Tomcat控制臺中文亂碼問題的四種解決方案
這篇文章主要給大家分享了4種方法完美解決IntelliJ IDEA Tomcat控制臺中文亂碼問題,文中有詳細的圖文介紹,對我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-08-08java?LeetCode刷題稍有難度的貪心構(gòu)造算法
這篇文章主要為大家介紹了java?LeetCode刷題稍有難度的貪心構(gòu)造題解示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02SpringBoot三種方法接口返回日期格式化小結(jié)
本文介紹了三種在Spring Boot中格式化接口返回日期的方法,包含使用@JsonFormat注解、全局配置JsonConfig、以及在yml文件中配置時區(qū),具有一定的參考價值,感興趣的可以了解一下2025-01-01Java實現(xiàn)對華北、華南、華東和華中四個區(qū)域的劃分
在Java中,通過定義枚舉類、編寫主程序和進行測試,本文詳細介紹了如何劃分華北、華南、華東和華中四個區(qū)域,首先定義枚舉類標識區(qū)域,然后通過主程序接收用戶輸入并返回相應(yīng)區(qū)域,最后通過測試用例確保正確性,文章還介紹了甘特圖和餅狀圖的使用2024-09-09Java spring事務(wù)及事務(wù)不生效的原因詳解
在日常編碼過程中常常涉及到事務(wù),在前兩天看到一篇文章提到了Spring事務(wù),那么在此總結(jié)下在Spring環(huán)境下事務(wù)失效的幾種原因2021-09-09