springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
遇到的問題
在實(shí)際開發(fā)過程中,因?yàn)橥扑蛿?shù)據(jù)需要用到kafka,為了比較方便與后續(xù)其他需求需要使用kafka,所以開發(fā)的過程中是設(shè)想能寫一個(gè)工具類,方便后續(xù)的使用,但是,測(cè)試不帶認(rèn)證的kafka服務(wù)端的時(shí)候,發(fā)送是正常的,但是實(shí)際情況是,對(duì)方的服務(wù)器需要認(rèn)證,導(dǎo)致遇到連不上對(duì)方服務(wù),推送失敗的問題,需要找對(duì)方確認(rèn)對(duì)方的認(rèn)證配置信息。并且在查詢?cè)趺刺幚淼臅r(shí)候,驗(yàn)證也出現(xiàn)了比較奇葩的情況,那本次文章就簡(jiǎn)單寫遇到的問題和驗(yàn)證結(jié)果。
碰到的天坑
1.度的時(shí)候,總是說引入了配置文件就完事,其他的配置不需要再配sasl.jaas.config,但是我實(shí)際測(cè)試下來不行啊。還是得引入配置文件+配置參數(shù)設(shè)置。
2.第二個(gè)基于未來思考的問題是,這個(gè)引入的配置文件內(nèi)容,是否可以多個(gè)。據(jù)度來度去的結(jié)果說是可以的,按順序會(huì)去逐一匹配直到匹配成功,那就是下面配置的KafkaClient配置塊是可以多個(gè),就名稱不一樣就行。但是礙于條件限制,沒試過。??
處理步驟
加載配置
度了很多文章,都提到了需要在服務(wù)啟動(dòng)的時(shí)候引入認(rèn)證配置文件,設(shè)置屬性 java.security.auth.login.config,度了一下是有兩種方式。
a). 在服務(wù)啟動(dòng)的時(shí)候用參數(shù)引入,命令如下:
java -Djava.security.auth.login.config=(具體地址自己填,因?yàn)槲遗渲梦募歉鷗est.jar同級(jí)目錄所以無前綴)kafka_jaas_config.config -jar test.jar
b). 在代碼中去引入,如:(這方式我是沒試的)
System.setProperty(“java.security.auth.login.config”, “kafka_jaas_config.config”);
c). 配置文件的內(nèi)容(注意password后面那個(gè)該死的分號(hào)是要的,配置參數(shù)設(shè)置的時(shí)候也是要的)
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="123"; };
設(shè)置認(rèn)證參數(shù)(如果有的話)
類似要填的參數(shù)是:
security.protocol,sasl.mechanism,sasl.jaas.config(,sasl.username和sasl.password,這兩個(gè)我看是高版本直接配置據(jù)說能認(rèn)證,不需要sasl.jaas.config,但是低版本是需要sasl.jaas.config,所以建議是可以都配上)
such as :
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"123\";");
工具類
package platform.cars.utils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import javax.annotation.PreDestroy; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Auther: Ms.y */ @Configuration public class KafkaUtil { private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>(); private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (otherConfigs != null){ otherConfigs.forEach(props::put); } return props; } public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) { return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs)); } private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) { Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs); return new KafkaTemplate<>(producerFactory); } @PreDestroy public void destroy() { for (KafkaTemplate<String, String> template : templateCache.values()) { template.destroy(); } } }
怎么用
1.正常的autowired就行
2.自己查配置還有是否需要認(rèn)證的信息
3.獲取template去發(fā)送數(shù)據(jù)
4.處理結(jié)果
Map<String,Object> configs = new HashMap<>(); //自己給configs 填值 KafkaTemplate kafkaTemplate = kafkaUtil.getKafkaTemplate("ip:port",configs); kafkaTemplate.send("topic名稱", "消息內(nèi)容");
廢話
我這是因?yàn)闉榱朔奖慵恿藗€(gè)緩存隊(duì)列,存儲(chǔ)了kafka已經(jīng)連過的服務(wù),不需要的話完全可以自己改造去掉這部分。如果有可以精進(jìn)的問題可以提啊,歡迎挑刺。
到此這篇關(guān)于springboot使用kafka推送數(shù)據(jù)到服務(wù)端,帶認(rèn)證的文章就介紹到這了,更多相關(guān)springboot kafka推送數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
- SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案
- SpringBoot 整合 Avro 與 Kafka的詳細(xì)過程
- SpringBoot使用Kafka來優(yōu)化接口請(qǐng)求的并發(fā)方式
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- Spring Boot 集成 Kafka的詳細(xì)步驟
- SpringKafka錯(cuò)誤處理(重試機(jī)制與死信隊(duì)列)
相關(guān)文章
java實(shí)現(xiàn)簡(jiǎn)單的ATM項(xiàng)目
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)單的ATM項(xiàng)目,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-10-10SpringBoot集成RbbitMQ隊(duì)列踩坑記錄
這篇文章主要介紹了SpringBoot集成RbbitMQ隊(duì)列踩坑記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04Java實(shí)現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組
這篇文章主要介紹了Java實(shí)現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09Java實(shí)現(xiàn)幀動(dòng)畫的實(shí)例代碼
這篇文章主要介紹了Java實(shí)現(xiàn)幀動(dòng)畫的實(shí)例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05Spring?Boot緩存實(shí)戰(zhàn)之Redis?設(shè)置有效時(shí)間和自動(dòng)刷新緩存功能(時(shí)間支持在配置文件中配置)
這篇文章主要介紹了Spring?Boot緩存實(shí)戰(zhàn)?Redis?設(shè)置有效時(shí)間和自動(dòng)刷新緩存,時(shí)間支持在配置文件中配置,需要的朋友可以參考下2023-05-05java類實(shí)現(xiàn)日期的時(shí)間差的實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于java類實(shí)現(xiàn)日期的時(shí)間差的實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-01-01詳解Java七大阻塞隊(duì)列之SynchronousQueue
SynchronousQueue不需要存儲(chǔ)線程間交換的數(shù)據(jù),它的作用像是一個(gè)匹配器,使生產(chǎn)者和消費(fèi)者一一匹配。本文詳細(xì)講解了Java七大阻塞隊(duì)列之一SynchronousQueue,需要了解的小伙伴可以參考一下這篇文章2021-09-09SpringBoot整合阿里云視頻點(diǎn)播的過程詳解
視頻點(diǎn)播(ApsaraVideo for VoD)是集音視頻采集、編輯、上傳、自動(dòng)化轉(zhuǎn)碼處理、媒體資源管理、分發(fā)加速于一體的一站式音視頻點(diǎn)播解決方案。這篇文章主要介紹了SpringBoot整合阿里云視頻點(diǎn)播的詳細(xì)過程,需要的朋友可以參考下2021-12-12