欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證

 更新時(shí)間:2024年11月06日 10:22:29   作者:Alina_shu  
在使用Kafka進(jìn)行數(shù)據(jù)推送時(shí),遇到認(rèn)證問題導(dǎo)致連接失敗,本文詳細(xì)介紹了Kafka的認(rèn)證配置過程,包括配置文件的引入和參數(shù)設(shè)置,實(shí)際測(cè)試表明,需要正確配置sasl.jaas.config以及其他認(rèn)證參數(shù),探討了配置文件是否可以同時(shí)存在多個(gè)配置塊的可能性,并提出了實(shí)際操作中的注意事項(xiàng)

遇到的問題

在實(shí)際開發(fā)過程中,因?yàn)橥扑蛿?shù)據(jù)需要用到kafka,為了比較方便與后續(xù)其他需求需要使用kafka,所以開發(fā)的過程中是設(shè)想能寫一個(gè)工具類,方便后續(xù)的使用,但是,測(cè)試不帶認(rèn)證的kafka服務(wù)端的時(shí)候,發(fā)送是正常的,但是實(shí)際情況是,對(duì)方的服務(wù)器需要認(rèn)證,導(dǎo)致遇到連不上對(duì)方服務(wù),推送失敗的問題,需要找對(duì)方確認(rèn)對(duì)方的認(rèn)證配置信息。并且在查詢?cè)趺刺幚淼臅r(shí)候,驗(yàn)證也出現(xiàn)了比較奇葩的情況,那本次文章就簡(jiǎn)單寫遇到的問題和驗(yàn)證結(jié)果。

碰到的天坑

1.度的時(shí)候,總是說引入了配置文件就完事,其他的配置不需要再配sasl.jaas.config,但是我實(shí)際測(cè)試下來不行啊。還是得引入配置文件+配置參數(shù)設(shè)置。
2.第二個(gè)基于未來思考的問題是,這個(gè)引入的配置文件內(nèi)容,是否可以多個(gè)。據(jù)度來度去的結(jié)果說是可以的,按順序會(huì)去逐一匹配直到匹配成功,那就是下面配置的KafkaClient配置塊是可以多個(gè),就名稱不一樣就行。但是礙于條件限制,沒試過。??

處理步驟

加載配置
度了很多文章,都提到了需要在服務(wù)啟動(dòng)的時(shí)候引入認(rèn)證配置文件,設(shè)置屬性 java.security.auth.login.config,度了一下是有兩種方式。
a). 在服務(wù)啟動(dòng)的時(shí)候用參數(shù)引入,命令如下:
java -Djava.security.auth.login.config=(具體地址自己填,因?yàn)槲遗渲梦募歉鷗est.jar同級(jí)目錄所以無前綴)kafka_jaas_config.config -jar test.jar
b). 在代碼中去引入,如:(這方式我是沒試的)
System.setProperty(“java.security.auth.login.config”, “kafka_jaas_config.config”);

c). 配置文件的內(nèi)容(注意password后面那個(gè)該死的分號(hào)是要的,配置參數(shù)設(shè)置的時(shí)候也是要的)

KafkaClient {  
    org.apache.kafka.common.security.scram.ScramLoginModule required  
    username="user"  
    password="123";  
};

設(shè)置認(rèn)證參數(shù)(如果有的話)
類似要填的參數(shù)是:
security.protocol,sasl.mechanism,sasl.jaas.config(,sasl.username和sasl.password,這兩個(gè)我看是高版本直接配置據(jù)說能認(rèn)證,不需要sasl.jaas.config,但是低版本是需要sasl.jaas.config,所以建議是可以都配上)
such as :

props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"123\";");

工具類 

package platform.cars.utils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @Auther: Ms.y
 */
@Configuration
public class KafkaUtil {
    private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>();
    private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        if (otherConfigs != null){
            otherConfigs.forEach(props::put);
        }
        return props;
    }
    public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
        return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs));
    }
    private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
        Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
        return new KafkaTemplate<>(producerFactory);
    }
    @PreDestroy
    public void destroy() {
        for (KafkaTemplate<String, String> template : templateCache.values()) {
            template.destroy();
        }
    }
}

怎么用

1.正常的autowired就行
2.自己查配置還有是否需要認(rèn)證的信息
3.獲取template去發(fā)送數(shù)據(jù)
4.處理結(jié)果

Map<String,Object> configs = new HashMap<>();
//自己給configs 填值
KafkaTemplate kafkaTemplate = kafkaUtil.getKafkaTemplate("ip:port",configs);
kafkaTemplate.send("topic名稱", "消息內(nèi)容");

廢話

我這是因?yàn)闉榱朔奖慵恿藗€(gè)緩存隊(duì)列,存儲(chǔ)了kafka已經(jīng)連過的服務(wù),不需要的話完全可以自己改造去掉這部分。如果有可以精進(jìn)的問題可以提啊,歡迎挑刺。

到此這篇關(guān)于springboot使用kafka推送數(shù)據(jù)到服務(wù)端,帶認(rèn)證的文章就介紹到這了,更多相關(guān)springboot kafka推送數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論