詳解Spring Boot對 Apache Pulsar的支持
https://docs.spring.io/spring-boot/docs/3.2.0/reference/htmlsingle/#messaging.pulsar
Apache Pulsar 通過提供 Spring for Apache Pulsar 項目的自動配置而受到支持。
當類路徑中存在 org.springframework.pulsar:spring-pulsar
時,Spring Boot 將自動配置并注冊經典的(命令式)Spring for Apache Pulsar 組件。當類路徑中存在 org.springframework.pulsar:spring-pulsar-reactive
時,Spring Boot 也會對反應式組件執(zhí)行相同的操作。
分別有適用于命令式和反應式使用的 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
“Starters”,可方便地收集依賴項。
連接到Pulsar
當使用 Pulsar 啟動器時,Spring Boot 將自動配置并注冊一個 PulsarClient
bean。
默認情況下,應用程序嘗試連接到位于 pulsar://localhost:6650
的本地 Pulsar 實例。這可以通過將 spring.pulsar.client.service-url
屬性設置為不同的值來進行調整。
注意:該值必須是有效的 Pulsar 協(xié)議 URL。
可以通過指定任何以 spring.pulsar.client.*
開頭的應用程序屬性來配置客戶端。
如果需要更多控制權來配置 PulsarClient,請考慮注冊一個或多個 PulsarClientBuilderCustomizer
bean。
認證(Authentication)
要連接到需要認證的 Pulsar 集群,需要指定要使用哪個認證插件,通過設置 pluginClassName
和插件所需的任何參數??梢詫翟O置為參數名稱到參數值的映射。以下示例顯示了如何配置 AuthenticationOAuth2
插件。
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
注意:
需要確保在 spring.pulsar.client.authentication.param.*
下定義的名稱與認證插件所期望的名稱完全匹配(通常是駝峰命名法)。Spring Boot 不會嘗試對這些條目進行任何形式的寬松綁定。
例如,如果想為 AuthenticationOAuth2
認證插件配置issuer URL,則必須使用 spring.pulsar.client.authentication.param.issuerUrl
。如果使用其他形式,如 issuerurl
或 issuer-url
,則設置將不會應用于插件。
SSL
默認情況下,Pulsar客戶端以明文形式與Pulsar服務進行通信。以下部分描述了如何配置Pulsar客戶端以使用TLS加密(SSL)。一個先決條件是Broker也已經配置為使用TLS加密。
Spring Boot自動配置目前不支持任何TLS/SSL配置屬性。相反,你可以提供一個PulsarClientBuilderCustomizer
,該定制器會在Pulsar客戶端構建器上設置必要的屬性。Pulsar支持Privacy Enhanced Mail(PEM)和Java KeyStore(JKS)兩種證書格式。
按照以下步驟配置TLS:
- 調整Pulsar客戶端服務URL以使用
pulsar+ssl://
scheme 和TLS端口(通常為6651
)。 - 調整管理客戶端服務URL以使用
https://
scheme 和TLS Web端口(通常為8443
)。 - 提供客戶端構建器定制器,該定制器會在構建器上設置相關屬性。
以響應式方式連接到Pulsar
當Reactive自動配置被激活時,Spring Boot將自動配置并注冊一個ReactivePulsarClient
bean。
連接到Pulsar管理界面
Spring for Apache Pulsar的PulsarAdministration
客戶端也實現了自動配置。
默認情況下,應用程序嘗試連接到位于http://localhost:8080
的本地Pulsar實例??梢酝ㄟ^將spring.pulsar.admin.service-url
屬性設置為(http|https)://<host>:<port>
的不同值來調整此設置。
如果需要更多控制權來配置PulsarAdmin
,請考慮注冊一個或多個PulsarAdminBuilderCustomizer
bean。
認證
當訪問需要身份驗證的Pulsar集群時,管理客戶端需要與普通Pulsar客戶端相同的安全配置。可以通過將spring.pulsar.client.authentication
替換為spring.pulsar.admin.authentication
來使用上述身份驗證配置。
提示:在啟動時創(chuàng)建主題,請?zhí)砑右粋€類型為PulsarTopic
的bean。如果主題已經存在,則該bean將被忽略。
發(fā)送消息
Spring的PulsarTemplate
實現了自動配置,可以使用它來發(fā)送消息,如下所示:
import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final PulsarTemplate<String> pulsarTemplate; public MyBean(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() throws PulsarClientException { this.pulsarTemplate.send("someTopic", "Hello"); } }
PulsarTemplate
依賴于PulsarProducerFactory
來創(chuàng)建底層的Pulsar生產者。Spring Boot的自動配置也提供了這個生產者工廠,默認情況下,它會緩存所創(chuàng)建的生產者。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應用屬性來配置生產者工廠和緩存設置。
如果你需要對生產者工廠的配置進行更多的控制,考慮注冊一個或多個ProducerBuilderCustomizer
bean。這些定制器會應用于所有創(chuàng)建的生產者。你也可以在發(fā)送消息時傳入一個ProducerBuilderCustomizer
,只影響當前的生產者。
如果你需要對正在發(fā)送的消息進行更多的控制,你可以在發(fā)送消息時傳入一個TypedMessageBuilderCustomizer
。
以響應式方式發(fā)送消息
當Reactive自動配置被激活時,Spring的ReactivePulsarTemplate
也會實現自動配置,可以使用它來發(fā)送消息,如下所示:
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarTemplate<String> pulsarTemplate; public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() { this.pulsarTemplate.send("someTopic", "Hello").subscribe(); } }
ReactivePulsarTemplate
依賴于ReactivePulsarSenderFactory
來實際創(chuàng)建底層的發(fā)送器。Spring Boot的自動配置也提供了這個發(fā)送器工廠,默認情況下,它會緩存所創(chuàng)建的發(fā)送器。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應用屬性來配置發(fā)送器工廠和緩存設置。
如果你需要對發(fā)送器工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageSenderBuilderCustomizer
bean。這些定制器會應用于所有創(chuàng)建的發(fā)送器。你也可以在發(fā)送消息時傳入一個ReactiveMessageSenderBuilderCustomizer
,只影響當前的發(fā)送器。
如果你需要對正在發(fā)送的消息進行更多的控制,你可以在發(fā)送消息時傳入一個MessageSpecBuilderCustomizer
。
接收消息
當存在Apache Pulsar基礎設施時,任何bean都可以通過添加@PulsarListener
注解來創(chuàng)建監(jiān)聽器端點。以下組件在someTopic
主題上創(chuàng)建了一個監(jiān)聽器端點:
import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarListener(topics = "someTopic") public void processMessage(String content) { // ... } }
Spring Boot的自動配置為PulsarListener
提供了所有必要的組件,如PulsarListenerContainerFactory
和用于構建底層Pulsar消費者的消費者工廠。你可以通過指定任何以spring.pulsar.listener.*
和spring.pulsar.consumer.*
為前綴的應用屬性來配置這些組件。
如果你需要對消費者工廠的配置進行更多的控制,考慮注冊一個或多個ConsumerBuilderCustomizer
bean。這些定制器會應用于工廠創(chuàng)建的所有消費者,因此適用于所有@PulsarListener
實例。你還可以通過設置@PulsarListener
注解的consumerCustomizer
屬性來定制單個監(jiān)聽器。
以響應式方式接收消息
當存在Apache Pulsar基礎設施且Reactive自動配置被激活時,任何bean都可以通過添加@ReactivePulsarListener
注解來創(chuàng)建響應式監(jiān)聽器端點。以下組件在someTopic
主題上創(chuàng)建了一個響應式監(jiān)聽器端點:
import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @ReactivePulsarListener(topics = "someTopic") public Mono<Void> processMessage(String content) { // ... return Mono.empty(); } }
Spring Boot的自動配置為ReactivePulsarListener
提供了所有必要的組件,如ReactivePulsarListenerContainerFactory
和用于構建底層響應式Pulsar消費者的消費者工廠。你可以通過指定任何以spring.pulsar.listener.
和spring.pulsar.consumer.
為前綴的應用屬性來配置這些組件。
如果你需要對消費者工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageConsumerBuilderCustomizer
bean。這些定制器會應用于工廠創(chuàng)建的所有消費者,因此適用于所有@ReactivePulsarListener
實例。你還可以通過設置@ReactivePulsarListener
注解的consumerCustomizer
屬性來定制單個監(jiān)聽器。
讀取消息
Pulsar的讀取器接口使應用程序能夠手動管理游標。當你使用讀取器連接到主題時,你需要指定當讀取器連接到主題時從哪個消息開始讀取。
當存在Apache Pulsar基礎設施時,任何bean都可以通過添加@PulsarReader
注解來使用讀取器消費消息。以下組件創(chuàng)建了一個讀取器端點,該端點從someTopic
主題的開頭開始讀取消息:
import org.springframework.pulsar.annotation.PulsarReader; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarReader(topics = "someTopic", startMessageId = "earliest") public void processMessage(String content) { // ... } }
@PulsarReader
依賴于PulsarReaderFactory
來創(chuàng)建底層的Pulsar讀取器。Spring Boot的自動配置提供了這個讀取器工廠,可以通過設置任何以spring.pulsar.reader.*
為前綴的應用屬性來定制它。
如果你需要對讀取器工廠的配置進行更多的控制,考慮注冊一個或多個ReaderBuilderCustomizer
bean。這些定制器會應用于工廠創(chuàng)建的所有讀取器,因此適用于所有@PulsarReader
實例。你還可以通過設置@PulsarReader
注解的readerCustomizer
屬性來定制單個監(jiān)聽器。
以響應式方式讀取消息
當存在Apache Pulsar基礎設施且Reactive自動配置被激活時,Spring會提供ReactivePulsarReaderFactory
,你可以使用它來創(chuàng)建讀取器,以響應式的方式讀取消息。以下組件使用提供的工廠創(chuàng)建一個讀取器,并從someTopic
主題中讀取5秒鐘前的一條消息:
import java.time.Instant; import java.util.List; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.StartAtSpec; import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarReaderFactory<String> pulsarReaderFactory; public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) { this.pulsarReaderFactory = pulsarReaderFactory; } public void someMethod() { ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder .topic("someTopic") .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))); Mono<Message<String>> message = this.pulsarReaderFactory .createReader(Schema.STRING, List.of(readerBuilderCustomizer)) .readOne(); // ... } }
Spring Boot的自動配置提供了這個讀取器工廠,可以通過設置任何以spring.pulsar.reader.*
為前綴的應用屬性來定制它。
如果你需要對讀取器工廠的配置進行更多的控制,當使用工廠創(chuàng)建讀取器時,考慮傳入一個或多個ReactiveMessageReaderBuilderCustomizer
實例。
如果你需要對讀取器工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageReaderBuilderCustomizer
bean。這些定制器會應用于所有創(chuàng)建的讀取器。你也可以在創(chuàng)建讀取器時傳入一個或多個ReactiveMessageReaderBuilderCustomizer
,只將定制應用于創(chuàng)建的讀取器。
額外的Pulsar屬性
只有Pulsar支持的屬性子集才能直接通過PulsarProperties
類使用。如果你希望使用額外的屬性來調整自動配置的組件,而這些屬性不被直接支持,你可以使用前面提到的每個組件支持的定制器。
到此這篇關于詳解Spring Boot對 Apache Pulsar的支持的文章就介紹到這了,更多相關Spring Boot Apache Pulsar內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
servlet的url-pattern匹配規(guī)則詳細描述(小結)
在利用servlet或Filter進行url請求的匹配時,很關鍵的一點就是匹配規(guī)則。這篇文章主要介紹了servlet的url-pattern匹配規(guī)則詳細描述(小結),非常具有實用價值,需要的朋友可以參考下2018-07-07springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)
這篇文章主要介紹了springboot + rabbitmq 如何實現消息確認機制,本文給大家分享小編實際開發(fā)中的一點踩坑經驗,內容簡單易懂,需要的朋友可以參考下2020-07-07mybatis-plus的selectById(或者selectOne)在根據主鍵ID查詢實體對象的時候偶爾會出現nul
這篇文章主要介紹了mybatis-plus的selectById(或者selectOne)在根據主鍵ID查詢實體對象的時候偶爾會出現null的問題記錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09Spring Cloud OAuth2 實現用戶認證及單點登錄的示例代碼
這篇文章主要介紹了Spring Cloud OAuth2 實現用戶認證及單點登錄的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-10-10SparkSQL使用IDEA快速入門DataFrame與DataSet的完美教程
本文給大家介紹使用idea開發(fā)Spark SQL 的詳細過程,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-08-08