詳解Spring Boot對(duì) Apache Pulsar的支持
https://docs.spring.io/spring-boot/docs/3.2.0/reference/htmlsingle/#messaging.pulsar
Apache Pulsar 通過提供 Spring for Apache Pulsar 項(xiàng)目的自動(dòng)配置而受到支持。
當(dāng)類路徑中存在 org.springframework.pulsar:spring-pulsar
時(shí),Spring Boot 將自動(dòng)配置并注冊(cè)經(jīng)典的(命令式)Spring for Apache Pulsar 組件。當(dāng)類路徑中存在 org.springframework.pulsar:spring-pulsar-reactive
時(shí),Spring Boot 也會(huì)對(duì)反應(yīng)式組件執(zhí)行相同的操作。
分別有適用于命令式和反應(yīng)式使用的 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
“Starters”,可方便地收集依賴項(xiàng)。
連接到Pulsar
當(dāng)使用 Pulsar 啟動(dòng)器時(shí),Spring Boot 將自動(dòng)配置并注冊(cè)一個(gè) PulsarClient
bean。
默認(rèn)情況下,應(yīng)用程序嘗試連接到位于 pulsar://localhost:6650
的本地 Pulsar 實(shí)例。這可以通過將 spring.pulsar.client.service-url
屬性設(shè)置為不同的值來進(jìn)行調(diào)整。
注意:該值必須是有效的 Pulsar 協(xié)議 URL。
可以通過指定任何以 spring.pulsar.client.*
開頭的應(yīng)用程序?qū)傩詠砼渲每蛻舳恕?/p>
如果需要更多控制權(quán)來配置 PulsarClient,請(qǐng)考慮注冊(cè)一個(gè)或多個(gè) PulsarClientBuilderCustomizer
bean。
認(rèn)證(Authentication)
要連接到需要認(rèn)證的 Pulsar 集群,需要指定要使用哪個(gè)認(rèn)證插件,通過設(shè)置 pluginClassName
和插件所需的任何參數(shù)??梢詫?shù)設(shè)置為參數(shù)名稱到參數(shù)值的映射。以下示例顯示了如何配置 AuthenticationOAuth2
插件。
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
注意:
需要確保在 spring.pulsar.client.authentication.param.*
下定義的名稱與認(rèn)證插件所期望的名稱完全匹配(通常是駝峰命名法)。Spring Boot 不會(huì)嘗試對(duì)這些條目進(jìn)行任何形式的寬松綁定。
例如,如果想為 AuthenticationOAuth2
認(rèn)證插件配置issuer URL,則必須使用 spring.pulsar.client.authentication.param.issuerUrl
。如果使用其他形式,如 issuerurl
或 issuer-url
,則設(shè)置將不會(huì)應(yīng)用于插件。
SSL
默認(rèn)情況下,Pulsar客戶端以明文形式與Pulsar服務(wù)進(jìn)行通信。以下部分描述了如何配置Pulsar客戶端以使用TLS加密(SSL)。一個(gè)先決條件是Broker也已經(jīng)配置為使用TLS加密。
Spring Boot自動(dòng)配置目前不支持任何TLS/SSL配置屬性。相反,你可以提供一個(gè)PulsarClientBuilderCustomizer
,該定制器會(huì)在Pulsar客戶端構(gòu)建器上設(shè)置必要的屬性。Pulsar支持Privacy Enhanced Mail(PEM)和Java KeyStore(JKS)兩種證書格式。
按照以下步驟配置TLS:
- 調(diào)整Pulsar客戶端服務(wù)URL以使用
pulsar+ssl://
scheme 和TLS端口(通常為6651
)。 - 調(diào)整管理客戶端服務(wù)URL以使用
https://
scheme 和TLS Web端口(通常為8443
)。 - 提供客戶端構(gòu)建器定制器,該定制器會(huì)在構(gòu)建器上設(shè)置相關(guān)屬性。
以響應(yīng)式方式連接到Pulsar
當(dāng)Reactive自動(dòng)配置被激活時(shí),Spring Boot將自動(dòng)配置并注冊(cè)一個(gè)ReactivePulsarClient
bean。
連接到Pulsar管理界面
Spring for Apache Pulsar的PulsarAdministration
客戶端也實(shí)現(xiàn)了自動(dòng)配置。
默認(rèn)情況下,應(yīng)用程序嘗試連接到位于http://localhost:8080
的本地Pulsar實(shí)例??梢酝ㄟ^將spring.pulsar.admin.service-url
屬性設(shè)置為(http|https)://<host>:<port>
的不同值來調(diào)整此設(shè)置。
如果需要更多控制權(quán)來配置PulsarAdmin
,請(qǐng)考慮注冊(cè)一個(gè)或多個(gè)PulsarAdminBuilderCustomizer
bean。
認(rèn)證
當(dāng)訪問需要身份驗(yàn)證的Pulsar集群時(shí),管理客戶端需要與普通Pulsar客戶端相同的安全配置??梢酝ㄟ^將spring.pulsar.client.authentication
替換為spring.pulsar.admin.authentication
來使用上述身份驗(yàn)證配置。
提示:在啟動(dòng)時(shí)創(chuàng)建主題,請(qǐng)?zhí)砑右粋€(gè)類型為PulsarTopic
的bean。如果主題已經(jīng)存在,則該bean將被忽略。
發(fā)送消息
Spring的PulsarTemplate
實(shí)現(xiàn)了自動(dòng)配置,可以使用它來發(fā)送消息,如下所示:
import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final PulsarTemplate<String> pulsarTemplate; public MyBean(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() throws PulsarClientException { this.pulsarTemplate.send("someTopic", "Hello"); } }
PulsarTemplate
依賴于PulsarProducerFactory
來創(chuàng)建底層的Pulsar生產(chǎn)者。Spring Boot的自動(dòng)配置也提供了這個(gè)生產(chǎn)者工廠,默認(rèn)情況下,它會(huì)緩存所創(chuàng)建的生產(chǎn)者。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應(yīng)用屬性來配置生產(chǎn)者工廠和緩存設(shè)置。
如果你需要對(duì)生產(chǎn)者工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ProducerBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于所有創(chuàng)建的生產(chǎn)者。你也可以在發(fā)送消息時(shí)傳入一個(gè)ProducerBuilderCustomizer
,只影響當(dāng)前的生產(chǎn)者。
如果你需要對(duì)正在發(fā)送的消息進(jìn)行更多的控制,你可以在發(fā)送消息時(shí)傳入一個(gè)TypedMessageBuilderCustomizer
。
以響應(yīng)式方式發(fā)送消息
當(dāng)Reactive自動(dòng)配置被激活時(shí),Spring的ReactivePulsarTemplate
也會(huì)實(shí)現(xiàn)自動(dòng)配置,可以使用它來發(fā)送消息,如下所示:
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarTemplate<String> pulsarTemplate; public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() { this.pulsarTemplate.send("someTopic", "Hello").subscribe(); } }
ReactivePulsarTemplate
依賴于ReactivePulsarSenderFactory
來實(shí)際創(chuàng)建底層的發(fā)送器。Spring Boot的自動(dòng)配置也提供了這個(gè)發(fā)送器工廠,默認(rèn)情況下,它會(huì)緩存所創(chuàng)建的發(fā)送器。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應(yīng)用屬性來配置發(fā)送器工廠和緩存設(shè)置。
如果你需要對(duì)發(fā)送器工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ReactiveMessageSenderBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于所有創(chuàng)建的發(fā)送器。你也可以在發(fā)送消息時(shí)傳入一個(gè)ReactiveMessageSenderBuilderCustomizer
,只影響當(dāng)前的發(fā)送器。
如果你需要對(duì)正在發(fā)送的消息進(jìn)行更多的控制,你可以在發(fā)送消息時(shí)傳入一個(gè)MessageSpecBuilderCustomizer
。
接收消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施時(shí),任何bean都可以通過添加@PulsarListener
注解來創(chuàng)建監(jiān)聽器端點(diǎn)。以下組件在someTopic
主題上創(chuàng)建了一個(gè)監(jiān)聽器端點(diǎn):
import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarListener(topics = "someTopic") public void processMessage(String content) { // ... } }
Spring Boot的自動(dòng)配置為PulsarListener
提供了所有必要的組件,如PulsarListenerContainerFactory
和用于構(gòu)建底層Pulsar消費(fèi)者的消費(fèi)者工廠。你可以通過指定任何以spring.pulsar.listener.*
和spring.pulsar.consumer.*
為前綴的應(yīng)用屬性來配置這些組件。
如果你需要對(duì)消費(fèi)者工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ConsumerBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于工廠創(chuàng)建的所有消費(fèi)者,因此適用于所有@PulsarListener
實(shí)例。你還可以通過設(shè)置@PulsarListener
注解的consumerCustomizer
屬性來定制單個(gè)監(jiān)聽器。
以響應(yīng)式方式接收消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施且Reactive自動(dòng)配置被激活時(shí),任何bean都可以通過添加@ReactivePulsarListener
注解來創(chuàng)建響應(yīng)式監(jiān)聽器端點(diǎn)。以下組件在someTopic
主題上創(chuàng)建了一個(gè)響應(yīng)式監(jiān)聽器端點(diǎn):
import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @ReactivePulsarListener(topics = "someTopic") public Mono<Void> processMessage(String content) { // ... return Mono.empty(); } }
Spring Boot的自動(dòng)配置為ReactivePulsarListener
提供了所有必要的組件,如ReactivePulsarListenerContainerFactory
和用于構(gòu)建底層響應(yīng)式Pulsar消費(fèi)者的消費(fèi)者工廠。你可以通過指定任何以spring.pulsar.listener.
和spring.pulsar.consumer.
為前綴的應(yīng)用屬性來配置這些組件。
如果你需要對(duì)消費(fèi)者工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ReactiveMessageConsumerBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于工廠創(chuàng)建的所有消費(fèi)者,因此適用于所有@ReactivePulsarListener
實(shí)例。你還可以通過設(shè)置@ReactivePulsarListener
注解的consumerCustomizer
屬性來定制單個(gè)監(jiān)聽器。
讀取消息
Pulsar的讀取器接口使應(yīng)用程序能夠手動(dòng)管理游標(biāo)。當(dāng)你使用讀取器連接到主題時(shí),你需要指定當(dāng)讀取器連接到主題時(shí)從哪個(gè)消息開始讀取。
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施時(shí),任何bean都可以通過添加@PulsarReader
注解來使用讀取器消費(fèi)消息。以下組件創(chuàng)建了一個(gè)讀取器端點(diǎn),該端點(diǎn)從someTopic
主題的開頭開始讀取消息:
import org.springframework.pulsar.annotation.PulsarReader; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarReader(topics = "someTopic", startMessageId = "earliest") public void processMessage(String content) { // ... } }
@PulsarReader
依賴于PulsarReaderFactory
來創(chuàng)建底層的Pulsar讀取器。Spring Boot的自動(dòng)配置提供了這個(gè)讀取器工廠,可以通過設(shè)置任何以spring.pulsar.reader.*
為前綴的應(yīng)用屬性來定制它。
如果你需要對(duì)讀取器工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ReaderBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于工廠創(chuàng)建的所有讀取器,因此適用于所有@PulsarReader
實(shí)例。你還可以通過設(shè)置@PulsarReader
注解的readerCustomizer
屬性來定制單個(gè)監(jiān)聽器。
以響應(yīng)式方式讀取消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施且Reactive自動(dòng)配置被激活時(shí),Spring會(huì)提供ReactivePulsarReaderFactory
,你可以使用它來創(chuàng)建讀取器,以響應(yīng)式的方式讀取消息。以下組件使用提供的工廠創(chuàng)建一個(gè)讀取器,并從someTopic
主題中讀取5秒鐘前的一條消息:
import java.time.Instant; import java.util.List; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.StartAtSpec; import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarReaderFactory<String> pulsarReaderFactory; public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) { this.pulsarReaderFactory = pulsarReaderFactory; } public void someMethod() { ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder .topic("someTopic") .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))); Mono<Message<String>> message = this.pulsarReaderFactory .createReader(Schema.STRING, List.of(readerBuilderCustomizer)) .readOne(); // ... } }
Spring Boot的自動(dòng)配置提供了這個(gè)讀取器工廠,可以通過設(shè)置任何以spring.pulsar.reader.*
為前綴的應(yīng)用屬性來定制它。
如果你需要對(duì)讀取器工廠的配置進(jìn)行更多的控制,當(dāng)使用工廠創(chuàng)建讀取器時(shí),考慮傳入一個(gè)或多個(gè)ReactiveMessageReaderBuilderCustomizer
實(shí)例。
如果你需要對(duì)讀取器工廠的配置進(jìn)行更多的控制,考慮注冊(cè)一個(gè)或多個(gè)ReactiveMessageReaderBuilderCustomizer
bean。這些定制器會(huì)應(yīng)用于所有創(chuàng)建的讀取器。你也可以在創(chuàng)建讀取器時(shí)傳入一個(gè)或多個(gè)ReactiveMessageReaderBuilderCustomizer
,只將定制應(yīng)用于創(chuàng)建的讀取器。
額外的Pulsar屬性
只有Pulsar支持的屬性子集才能直接通過PulsarProperties
類使用。如果你希望使用額外的屬性來調(diào)整自動(dòng)配置的組件,而這些屬性不被直接支持,你可以使用前面提到的每個(gè)組件支持的定制器。
到此這篇關(guān)于詳解Spring Boot對(duì) Apache Pulsar的支持的文章就介紹到這了,更多相關(guān)Spring Boot Apache Pulsar內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
servlet的url-pattern匹配規(guī)則詳細(xì)描述(小結(jié))
在利用servlet或Filter進(jìn)行url請(qǐng)求的匹配時(shí),很關(guān)鍵的一點(diǎn)就是匹配規(guī)則。這篇文章主要介紹了servlet的url-pattern匹配規(guī)則詳細(xì)描述(小結(jié)),非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-07-07springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
這篇文章主要介紹了springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家分享小編實(shí)際開發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),內(nèi)容簡單易懂,需要的朋友可以參考下2020-07-07mybatis-plus的selectById(或者selectOne)在根據(jù)主鍵ID查詢實(shí)體對(duì)象的時(shí)候偶爾會(huì)出現(xiàn)nul
這篇文章主要介紹了mybatis-plus的selectById(或者selectOne)在根據(jù)主鍵ID查詢實(shí)體對(duì)象的時(shí)候偶爾會(huì)出現(xiàn)null的問題記錄,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Jmeter跨線程組傳值調(diào)用實(shí)現(xiàn)圖解
這篇文章主要介紹了Jmeter跨線程組傳值調(diào)用實(shí)現(xiàn)圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Spring Cloud OAuth2 實(shí)現(xiàn)用戶認(rèn)證及單點(diǎn)登錄的示例代碼
這篇文章主要介紹了Spring Cloud OAuth2 實(shí)現(xiàn)用戶認(rèn)證及單點(diǎn)登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10Java HashTable的原理與實(shí)現(xiàn)
Java中的HashTable是一種線程安全的哈希表實(shí)現(xiàn),它可以高效地存儲(chǔ)和快速查找數(shù)據(jù),本文將介紹Java中的HashTable的實(shí)現(xiàn)原理、常用方法和測試用例,需要的小伙伴可以參考一下2023-09-09SparkSQL使用IDEA快速入門DataFrame與DataSet的完美教程
本文給大家介紹使用idea開發(fā)Spark SQL 的詳細(xì)過程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-08-08詳解SpringBoot啟動(dòng)代碼和自動(dòng)裝配源碼分析
這篇文章主要介紹了SpringBoot啟動(dòng)代碼和自動(dòng)裝配源碼分析,使用SpringBoot很簡單,在主類中添加一個(gè)@SpringBootApplication,以及調(diào)用SpringApplication.run()并傳入主類,本文通過示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Springboot利于第三方服務(wù)進(jìn)行ip定位獲取省份城市
本文主要介紹了Springboot利于第三方服務(wù)進(jìn)行ip定位獲取省份城市,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07