欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot集成kafka消費手動啟動停止操作

 更新時間:2022年09月07日 11:28:29   作者:zengliangxi  
這篇文章主要介紹了springboot集成kafka消費手動啟動停止操作,本文給大家介紹項目場景及解決分析,結(jié)合實例代碼給大家介紹的非常詳細,需要的朋友可以參考下

項目場景:

在月結(jié),或者某些時候,我們需要停掉kafka所有的消費端,讓其暫時停止消費,而后等月結(jié)完成,再從新對消費監(jiān)聽恢復(fù),進行消費,此動作不需要重啟服務(wù)

解決分析

KafkaListenerEndpointRegistry這是kafka與spring集成的監(jiān)聽注冊bean,可以通過它獲取監(jiān)聽容器對象,然后對監(jiān)聽容器對象實行啟動,暫停,恢復(fù)等操作

/**
 * kafka服務(wù)操作類
 * @author liangxi.zeng
 */
@Service
@Slf4j
public class KafkaService {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 開啟消費
     * @param listenerId
     */
    public void start(String listenerId) {
        MessageListenerContainer messageListenerContainer = registry
                .getListenerContainer(listenerId);
        if(Objects.nonNull(messageListenerContainer)) {
            if(!messageListenerContainer.isRunning()) {
                messageListenerContainer.start();
            } else {
                if(messageListenerContainer.isContainerPaused()) {
                    log.info("listenerId:{},恢復(fù)",listenerId);
                    messageListenerContainer.resume();
                }
            }
        }
    }

    /**
     * 停止消費
     * @param listenerId
     */
    public void pause(String listenerId) {
        MessageListenerContainer messageListenerContainer = registry
                .getListenerContainer(listenerId);
        if(Objects.nonNull(messageListenerContainer) && !messageListenerContainer.isContainerPaused()) {
            log.info("listenerId:{},暫停",listenerId);
            messageListenerContainer.pause();
        }
    }
}

kafka啟動,停止,恢復(fù)觸發(fā)場景

1.通過定時任務(wù)自動觸發(fā),通過@Scheduled,在某個時間點暫停kafka某個監(jiān)聽的消費,也可以在某個時間點恢復(fù)kafka某個監(jiān)聽的消費

/**
 * 卡夫卡配置類
 * @author liangxi.zeng
 */
@Configuration
@EnableScheduling
public class KafkaConfigure {

    @Autowired
    private KafkaService kafkaService;

    @Autowired
    private KafkaConfigParam kafkaConfigParam;

   @Scheduled(cron = "0/10 * * * * ?")
    public void startListener() {
        List<String> topics = kafkaConfigParam.getStartTopics();
        System.out.println("開啟。。。"+topics);
        Optional.ofNullable(topics).orElse(new ArrayList<>()).forEach(topic -> {
            kafkaService.start(topic);
        });

    }

    @Scheduled(cron = "0/10 * * * * ?")
    public void pauseListener() {
        List<String> topics = kafkaConfigParam.getPauseTopics();
        System.out.println("暫停。。。"+topics);
        Optional.ofNullable(topics).orElse(new ArrayList<>()).forEach(topic -> {
            kafkaService.pause(topic);
        });

    }
}

2.通過訪問接口手動觸發(fā)kafka消費的啟動,暫停,恢復(fù)

@RequestMapping("/start/{kafkaId}")
    public String start(@PathVariable String kafkaId) {
        if(!registry.getListenerContainer(kafkaId).isRunning()) {
            registry.getListenerContainer(kafkaId).start();
        } else {
            registry.getListenerContainer(kafkaId).resume();
        }

        return "ok";
    }

    @RequestMapping("/pause/{kafkaId}")
    public String pause(@PathVariable String kafkaId) {
        registry.getListenerContainer(kafkaId).pause();
        return "ok";
    }

3.監(jiān)聽nacos配置文件,完成動態(tài)的啟停操作

/**
 * nacos配置變更監(jiān)聽
 * @author liangxi.zeng
 */
@Component
@Slf4j
public class NacosConfigListener {

    @Autowired
    private NacosConfigManager nacosConfigManager;

    @Autowired
    private KafkaService kafkaService;

    @Autowired
    private KafkaStartPauseParam kafkaStartPauseParam;

    /**
     * 分隔符
     */
    private static final String SPLIT=",";

    private static final String GROUP = "DEFAULT_GROUP";


    /**
     * nacos 配置文件監(jiān)聽
     * @throws NacosException
     */
    @PostConstruct
    private void reloadConfig() throws NacosException {
        nacosConfigManager.getConfigService().addListener(kafkaStartPauseParam.getDataId(), GROUP, new AbstractConfigChangeListener() {
            @Override
            public void receiveConfigChange(final ConfigChangeEvent event) {
                ConfigChangeItem pauseListeners = event.getChangeItem(KafkaStartPauseParam.PREFIX+".pause-listener");
                ConfigChangeItem startListeners = event.getChangeItem(KafkaStartPauseParam.PREFIX+".start-listener");
                if(Objects.nonNull(pauseListeners)) {
                    pause(pauseListeners);
                }

                if(Objects.nonNull(startListeners)) {
                    start(startListeners);
                }
            }
        });
    }

    /**
     * 暫停消費
     * @param pauseListeners
     */
    private void pause(ConfigChangeItem pauseListeners) {
        String pauseValue = pauseListeners.getNewValue();
        log.info("暫停listener:{}",pauseValue);
        if(!StringUtils.isEmpty(pauseValue)) {
            String[] pauseListenerIds = pauseValue.split(SPLIT);
            for(String pauseListenerId:pauseListenerIds) {
                kafkaService.pause(pauseListenerId);
            }
        }
    }

    /**
     * 恢復(fù)消費
     * @param startListeners
     */
    private void start(ConfigChangeItem startListeners) {
        String startValue = startListeners.getNewValue();
        log.info("啟動listener:{}",startValue);
        if(!StringUtils.isEmpty(startValue)) {
            String[] startListenerIds = startValue.split(SPLIT);
            for(String startListenerId:startListenerIds) {
                kafkaService.start(startListenerId);
            }
        }
    }

}

配置類

/**
 * kafka配置參數(shù)
 * @author liangxi.zeng
 */
@ConfigurationProperties(prefix = KafkaStartPauseParam.PREFIX)
@Data
@Component
@RefreshScope
public class KafkaStartPauseParam {

    public static final String PREFIX = "tcl.kafka";

    private String pauseListener;

    private String startListener;

    private String dataId;
}

源碼分析

1.springboot集成kafka,集成配置類org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

2.@Import({KafkaAnnotationDrivenConfiguration.class})

@Configuration(
        proxyBeanMethods = false
    )
    @EnableKafka
    @ConditionalOnMissingBean(
        name = {"org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor"}
    )
    static class EnableKafkaConfiguration {
        EnableKafkaConfiguration() {
        }
    }
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

	@Override
	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
		return new String[] { KafkaBootstrapConfiguration.class.getName() };
	}

}
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

	@Override
	public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
		if (!registry.containsBeanDefinition(
				KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
		}

		if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
		}
	}

}

3.KafkaListenerAnnotationBeanPostProcessor這個類,就是消費監(jiān)聽的解析類,在此類中,將監(jiān)聽的方法放入了監(jiān)聽容器MessageListenerContainer

4.監(jiān)聽容器中有ListenerConsumer監(jiān)聽消費者的屬性,此內(nèi)部內(nèi)實現(xiàn)了SchedulingAwareRunnable接口,此接口繼承了Runnable接口,完成了定時異步消費等操作

@Override
		public void run() {
			
			while (isRunning()) {
				try {
					pollAndInvoke();
				}
			}
			wrapUp();
		}

protected void pollAndInvoke() {
			if (!this.autoCommit && !this.isRecordAck) {
				processCommits();
			}
			idleBetweenPollIfNecessary();
			if (this.seeks.size() > 0) {
				processSeeks();
			}
			pauseConsumerIfNecessary();
			this.lastPoll = System.currentTimeMillis();
			this.polling.set(true);
			ConsumerRecords<K, V> records = doPoll();
			if (!this.polling.compareAndSet(true, false) && records != null) {
				/*
				 * There is a small race condition where wakeIfNecessary was called between
				 * exiting the poll and before we reset the boolean.
				 */
				if (records.count() > 0) {
					this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
				}
				return;
			}
			resumeConsumerIfNeccessary();
			debugRecords(records);
			if (records != null && records.count() > 0) {
				if (this.containerProperties.getIdleEventInterval() != null) {
					this.lastReceive = System.currentTimeMillis();
				}
				invokeListener(records);
			}
			else {
				checkIdle();
			}
		}

遺留問題

在對kafka消費監(jiān)聽啟停的過程中,發(fā)現(xiàn)當暫停消費的時候,對于存量的topic還是會消費完,不會立即停止,只是對于新產(chǎn)生的topic不會再消費了

到此這篇關(guān)于springboot集成kafka消費手動啟動停止的文章就介紹到這了,更多相關(guān)springboot集成kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java線程狀態(tài)及jstack命令詳解

    Java線程狀態(tài)及jstack命令詳解

    jstack是Java虛擬機(JVM)提供的一個非常有用的命令行工具,它允許開發(fā)人員和系統(tǒng)管理員在運行時獲取Java應(yīng)用程序的線程堆棧跟蹤,在某些情況下,可能需要以管理員或root用戶的身份運行jstack命令,這篇文章主要介紹了Java線程狀態(tài)及jstack命令詳解,需要的朋友可以參考下
    2024-03-03
  • sentinel整合ribbon與fallback流程分步講解

    sentinel整合ribbon與fallback流程分步講解

    這篇文章主要介紹了sentinel整合ribbon與fallback分步流程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-08-08
  • java對圖片進行壓縮和resize縮放的方法

    java對圖片進行壓縮和resize縮放的方法

    本篇文章主要介紹了java對圖片進行壓縮和resize調(diào)整的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-07-07
  • Java?CopyOnWriteArrayList源碼超詳細分析

    Java?CopyOnWriteArrayList源碼超詳細分析

    為了將讀取的性能發(fā)揮到極致,jdk中提供了CopyOnWriteArrayList類,下面這篇文章主要給大家介紹了關(guān)于java中CopyOnWriteArrayList源碼解析的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-11-11
  • IDEA中如何查找jar包之間的依賴關(guān)系并忽略依賴的某個包

    IDEA中如何查找jar包之間的依賴關(guān)系并忽略依賴的某個包

    這篇文章主要介紹了IDEA中如何查找jar包之間的依賴關(guān)系并忽略依賴的某個包?本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-08-08
  • java文件復(fù)制代碼片斷(java實現(xiàn)文件拷貝)

    java文件復(fù)制代碼片斷(java實現(xiàn)文件拷貝)

    本文介紹java實現(xiàn)文件拷貝的代碼片斷,大家可以直接放到程序里運行
    2014-01-01
  • Spring?question問題小結(jié)

    Spring?question問題小結(jié)

    在AppConfig配置類中,通過@Bean注解創(chuàng)建了Service和Controller的實例,Spring會自動將這些實例納入容器的管理,并處理它們之間的依賴關(guān)系,本文給大家介紹Spring?question問題小結(jié),感興趣的朋友跟隨小編一起看看吧
    2023-10-10
  • 劍指Offer之Java算法習題精講字符串操作與數(shù)組及二叉搜索樹

    劍指Offer之Java算法習題精講字符串操作與數(shù)組及二叉搜索樹

    跟著思路走,之后從簡單題入手,反復(fù)去看,做過之后可能會忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化
    2022-03-03
  • 詳解Spring Boot 自定義PropertySourceLoader

    詳解Spring Boot 自定義PropertySourceLoader

    這篇文章主要介紹了詳解Spring Boot 自定義PropertySourceLoader,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • spring.profiles.active配置使用小結(jié)

    spring.profiles.active配置使用小結(jié)

    spring.profiles.active?配置使得應(yīng)用程序能夠在不同的環(huán)境中使用不同的配置,本文主要介紹了spring.profiles.active配置使用小結(jié),具有一定的參考價值,感興趣的可以了解一下
    2024-07-07

最新評論