SpringBoot整合Pulsar的實(shí)現(xiàn)示例
一、添加pom.xml依賴
<parent> ? ? <groupId>org.springframework.boot</groupId> ? ? <artifactId>spring-boot-starter-parent</artifactId> ? ? <version>2.7.0</version> </parent> <dependencies> ? ? <dependency> ? ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? ? <artifactId>spring-boot-starter-web</artifactId> ? ? </dependency> ? ? <dependency> ? ? ? ? <groupId>org.apache.pulsar</groupId> ? ? ? ? <artifactId>pulsar-client</artifactId> ? ? ? ? <version>2.10.0</version> ? ? </dependency> ? ? <dependency> ? ? ? ? <groupId>org.projectlombok</groupId> ? ? ? ? <artifactId>lombok</artifactId> ? ? ? ? <version>1.18.24</version> ? ? ? ? <scope>provided</scope> ? ? </dependency> </dependencies> <build> ? ? <plugins> ? ? ? ? <plugin> ? ? ? ? ? ? <groupId>org.apache.maven.plugins</groupId> ? ? ? ? ? ? <artifactId>maven-compiler-plugin</artifactId> ? ? ? ? ? ? <configuration> ? ? ? ? ? ? ? ? <source>8</source> ? ? ? ? ? ? ? ? <target>8</target> ? ? ? ? ? ? </configuration> ? ? ? ? </plugin> ? ? </plugins> </build> ? ?
二、Pulsar 參數(shù)類
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:32
?* @Description: Pulsar 參數(shù)類
?*/
@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
@Data
public class PulsarProperties {
? ? /**
? ? ?* 接入地址
? ? ?*/
? ? private String serviceurl;
? ? /**
? ? ?* 命名空間tdc
? ? ?*/
? ? private String tdcNamespace;
? ? /**
? ? ?* 角色tdc的token
? ? ?*/
? ? private String tdcToken;
? ? /**
? ? ?* 集群name
? ? ?*/
? ? private String cluster;
? ? /**
? ? ?* topicMap
? ? ?*/
? ? private Map<String, String> topicMap;
? ? /**
? ? ?* 訂閱
? ? ?*/
? ? private Map<String, String> subMap;
? ? /**
? ? ?* 開關(guān) on:Consumer可用 ||||| off:Consumer斷路
? ? ?*/
? ? private String onOff;
}三、Pulsar 配置類
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:33
?* @Description: Pulsar 配置類
?*/
@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {
? ? @Autowired
? ? PulsarProperties pulsarProperties;
? ? @Bean
? ? public PulsarClient getPulsarClient() {
? ? ? ? try {
? ? ? ? ? ? return PulsarClient.builder()
? ? ? ? ? ? ? ? ? ? .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
? ? ? ? ? ? ? ? ? ? .serviceUrl(pulsarProperties.getServiceurl())
? ? ? ? ? ? ? ? ? ? .build();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? System.out.println(e);
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Client失敗");
? ? ? ? }
? ? }
}四、不同消費(fèi)數(shù)據(jù)類型的監(jiān)聽器
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:37
?* @Description:
?*/
@Component
public class UserMessageListener implements MessageListener<User> {
? ? @Override
? ? public void received(Consumer<User> consumer, Message<User> msg) {
? ? ? ? try {
? ? ? ? ? ? User user = msg.getValue();
? ? ? ? ? ? System.out.println(user);
? ? ? ? ? ? consumer.acknowledge(msg);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? consumer.negativeAcknowledge(msg);
? ? ? ? }
? ? }
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:37
?* @Description:
?*/
@Component
public class StringMessageListener implements MessageListener<String> {
? ? @Override
? ? public void received(Consumer<String> consumer, Message<String> msg) {
? ? ? ? try {
? ? ? ? ? ? System.out.println(msg.getValue());
? ? ? ? ? ? consumer.acknowledge(msg);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? consumer.negativeAcknowledge(msg);
? ? ? ? }
? ? }
}五、Pulsar的核心服務(wù)類
import com.yibo.pulsar.common.listener.StringMessageListener;
import com.yibo.pulsar.common.listener.UserMessageListener;
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:35
?* @Description: Pulsar的核心服務(wù)類
?*/
@Component
public class PulsarCommon {
? ? @Autowired
? ? private PulsarProperties pulsarProperties;
? ? @Autowired
? ? private PulsarClient client;
? ? @Autowired
? ? private UserMessageListener userMessageListener;
? ? @Autowired
? ? private StringMessageListener stringMessageListener;
? ? /**
? ? ?* 創(chuàng)建一個(gè)生產(chǎn)者?
? ? ?* @param topic ? ? topic name
? ? ?* @param schema ? ?schema方式
? ? ?* @param <T> ? ? ? 泛型
? ? ?* @return ? ? ? ? ?Producer生產(chǎn)者
? ? ?*/
? ? public <T> Producer<T> createProducer(String topic, Schema<T> schema) {
? ? ? ? try {
? ? ? ? ? ? return client.newProducer(schema)
? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
? ? ? ? ? ? ? ? ? ? .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
? ? ? ? ? ? ? ? ? ? .sendTimeout(10, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? ? ? .blockIfQueueFull(true)
? ? ? ? ? ? ? ? ? ? .create();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Producer失敗");
? ? ? ? }
? ? }
? ? /**
? ? ?*?
? ? ?* @param topic ? ? ? ? ? ? topic name
? ? ?* @param subscription ? ? ?sub name
? ? ?* @param messageListener ? MessageListener的自定義實(shí)現(xiàn)類
? ? ?* @param schema ? ? ? ? ? ?schema消費(fèi)方式
? ? ?* @param <T> ? ? ? ? ? ? ? 泛型
? ? ?* @return ? ? ? ? ? ? ? ? ?Consumer消費(fèi)者
? ? ?*/
? ? public <T> Consumer<T> createConsumer(String topic, String subscription,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MessageListener<T> messageListener, Schema<T> schema) {
? ? ? ? try {
? ? ? ? ? ? return client.newConsumer(schema)
? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
? ? ? ? ? ? ? ? ? ? .subscriptionName(subscription)
? ? ? ? ? ? ? ? ? ? .ackTimeout(10, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? ? ? .subscriptionType(SubscriptionType.Shared)
? ? ? ? ? ? ? ? ? ? .messageListener(messageListener)
? ? ? ? ? ? ? ? ? ? .subscribe();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Consumer失敗");
? ? ? ? }
? ? }
? ??
? ? /**
? ? ?* 異步發(fā)送一條消息
? ? ?* @param message ? ? ? 消息體
? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例
? ? ?* @param <T> ? ? ? ? ? 消息泛型
? ? ?*/
? ? public <T> void sendAsyncMessage(T message, Producer<T> producer) {
? ? ? ? producer.sendAsync(message).thenAccept(msgId -> {
? ? ? ? });
? ? }
? ??
? ??
? ? /**
? ? ?* 同步發(fā)送一條消息
? ? ?* @param message ? ? ? 消息體
? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例
? ? ?* @param <T> ? ? ? ? ? 泛型
? ? ?* @throws PulsarClientException
? ? ?*/
? ? public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {
? ? ? ? MessageId send = producer.send(message);
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println(send);
? ? }
? ??
? ? //-----------consumer-----------
? ? @Bean(name = "comment-publish-topic-consumer")
? ? public Consumer<String> getCommentPublishTopicConsumer() {
? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),
? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("comment-publish-topic-test"),
? ? ? ? ? ? ? ? stringMessageListener, Schema.STRING);
? ? }
? ? @Bean(name = "reply-publish-topic-consumer")
? ? public Consumer<User> getReplyPublishTopicConsumer() {
? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),
? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("reply-publish-topic-test"),
? ? ? ? ? ? ? ? userMessageListener, AvroSchema.of(User.class));
? ? }
? ? //-----------producer-----------
? ? @Bean(name = "comment-publish-topic-producer")
? ? public Producer<String> getCommentPublishTopicProducer() {
? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);
? ? }
? ? @Bean(name = "reply-publish-topic-producer")
? ? public Producer<User> getReplyPublishTopicProducer() {
? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));
? ? }
}六、Pulsar整合Spring Cloud
后來發(fā)現(xiàn)如上代碼會(huì)導(dǎo)致BUG-> 在更新Nacos配置之后 Consumer會(huì)掛掉
經(jīng)排查發(fā)現(xiàn)結(jié)果是由于@RefreshScope注解導(dǎo)致,此注解將摧毀Bean,PulsarConsumer和Producer都將被摧毀,只是說Producer將在下?次調(diào)?中完成重啟,Consumer則不能重啟,因?yàn)闆]有調(diào)?,那么怎么解決呢?
就是發(fā)布系列事件以刷新容器
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:34
?* @Description:
?*/
@Component
@Slf4j
public class RefreshPulsarListener implements ApplicationListener {
? ? @Autowired
? ? ApplicationContext applicationContext;
? ? @Override
? ? public void onApplicationEvent(ApplicationEvent event) {
? ? ? ? if (event.getSource().equals("__refreshAll__")) {
? ? ? ? ? ? log.info("Nacos配置中心配置修改 重啟Pulsar====================================");
? ? ? ? ? ? log.info("重啟PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));
? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));
? ? ? ? }
? ? }
}參考:
https://wenku.baidu.com/view/4d3337ab6b0203d8ce2f0066f5335a8102d266a7.html
https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/
https://blog.csdn.net/weixin_56227932/article/details/122897075
http://www.zzvips.com/article/219361.html
https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ
到此這篇關(guān)于SpringBoot整合Pulsar的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot整合Pulsar內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析
這篇文章主要介紹了Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析,Java多線程實(shí)現(xiàn)方式主要有三種,繼承Thread類、實(shí)現(xiàn)Runnable接口、使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程,需要的朋友可以參考下2023-12-12
MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼
這篇文章主要介紹了MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
Spring與Struts整合之讓Spring管理控制器操作示例
這篇文章主要介紹了Spring與Struts整合之讓Spring管理控制器操作,結(jié)合實(shí)例形式詳細(xì)分析了Spring管理控制器相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01
springBoot?啟動(dòng)指定配置文件環(huán)境多種方案(最新推薦)
springBoot?啟動(dòng)指定配置文件環(huán)境理論上是有多種方案的,一般都是結(jié)合我們的實(shí)際業(yè)務(wù)選擇不同的方案,比如,有pom.xml文件指定、maven命令行指定、配置文件指定、啟動(dòng)jar包時(shí)指定等方案,今天我們一一分享一下,需要的朋友可以參考下2023-09-09
springboot整合vue項(xiàng)目(小試牛刀)
這篇文章主要介紹了springboot整合vue項(xiàng)目(小試牛刀),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-09-09
Java中使用Preconditions來檢查傳入?yún)?shù)介紹
這篇文章主要介紹了Java中使用Preconditions來檢查傳入?yún)?shù)介紹,本文只是作為一個(gè)簡單的用法介紹,需要的朋友可以參考下2015-06-06
在RedisTemplate中使用scan代替keys指令操作
這篇文章主要介紹了在RedisTemplate中使用scan代替keys指令操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11

