欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot如何處理@KafkaListener消息

 更新時間:2024年12月12日 09:16:30   作者:小豹子的技術(shù)筆記  
Spring通過KafkaMessageListenerContainer、ConcurrentMessageListenerContainer等組件實現(xiàn)Kafka消息的監(jiān)聽和處理,并通過@KafkaListener注解將業(yè)務(wù)邏輯與Kafka消費者連接起來,Spring?Boot自動配置Kafka相關(guān)組件,簡化了Kafka的使用

消息監(jiān)聽容器

1、KafkaMessageListenerContainer

由spring提供用于監(jiān)聽以及拉取消息,并將這些消息按指定格式轉(zhuǎn)換后交給由@KafkaListener注解的方法處理,相當于一個消費者;

看看其整體代碼結(jié)構(gòu):

可以發(fā)現(xiàn)其入口方法為doStart(), 往上追溯到實現(xiàn)了SmartLifecycle接口,很明顯,由spring管理其start和stop操作;

ListenerConsumer, 內(nèi)部真正拉取消息消費的是這個結(jié)構(gòu),其 實現(xiàn)了Runable接口,簡言之,它就是一個后臺線程輪訓拉取并處理消息(while true死循環(huán)拉取消息)。

在doStart方法中會創(chuàng)建ListenerConsumer并交給線程池處理

以上步驟就開啟了消息監(jiān)聽過程。

KafkaMessageListenerContainer#doStart

protected void doStart() {
		if (isRunning()) {
			return;
		}
		ContainerProperties containerProperties = getContainerProperties();
		if (!this.consumerFactory.isAutoCommit()) {
			AckMode ackMode = containerProperties.getAckMode();
			if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
				Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
			}
			if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
					&& containerProperties.getAckTime() == 0) {
				containerProperties.setAckTime(5000);
			}
		}

		Object messageListener = containerProperties.getMessageListener();
		Assert.state(messageListener != null, "A MessageListener is required");
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
		this.listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
		if (this.listener instanceof DelegatingMessageListener) {
			Object delegating = this.listener;
			while (delegating instanceof DelegatingMessageListener) {
				delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
			}
			listenerType = ListenerUtils.determineListenerType(delegating);
		}
        // 這里創(chuàng)建了監(jiān)聽消費者對象
		this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
		setRunning(true);
        // 將消費者對象放入到線程池中執(zhí)行
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
	}

KafkaMessageListenerContainer.ListenerConsumer#run

public void run() {
			this.consumerThread = Thread.currentThread();
			if (this.genericListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
			}
			if (this.transactionManager != null) {
				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			if (isRunning() && this.definedPartitions != null) {
				try {
					initPartitionsIfNeeded();
				}
				catch (Exception e) {
					this.logger.error("Failed to set initial offsets", e);
				}
			}
			long lastReceive = System.currentTimeMillis();
			long lastAlertAt = lastReceive;
			while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					if (!this.consumerPaused && isPaused()) {
						this.consumer.pause(this.consumer.assignment());
						this.consumerPaused = true;
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Paused consumption from: " + this.consumer.paused());
						}
						publishConsumerPausedEvent(this.consumer.assignment());
					}
                    // 拉取信息
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();
					if (this.consumerPaused && !isPaused()) {
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Resuming consumption from: " + this.consumer.paused());
						}
						Set<TopicPartition> paused = this.consumer.paused();
						this.consumer.resume(paused);
						this.consumerPaused = false;
						publishConsumerResumedEvent(paused);
					}
					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
						if (records.count() > 0 && this.logger.isTraceEnabled()) {
							this.logger.trace(records.partitions().stream()
								.flatMap(p -> records.records(p).stream())
								// map to same format as send metadata toString()
								.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
								.collect(Collectors.toList()));
						}
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}
					else {
						if (this.containerProperties.getIdleEventInterval() != null) {
							long now = System.currentTimeMillis();
							if (now > lastReceive + this.containerProperties.getIdleEventInterval()
									&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
								publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
										? this.consumer : null, this.consumerPaused);
								lastAlertAt = now;
								if (this.genericListener instanceof ConsumerSeekAware) {
									seekPartitions(getAssignedPartitions(), true);
								}
							}
						}
					}
				}
				catch (WakeupException e) {
					// Ignore, we're stopping
				}
				catch (NoOffsetForPartitionException nofpe) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
					break;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
			}
			ProducerFactoryUtils.clearConsumerGroupId();
			if (!this.fatalError) {
				if (this.kafkaTxManager == null) {
					commitPendingAcks();
					try {
						this.consumer.unsubscribe();
					}
					catch (WakeupException e) {
						// No-op. Continue process
					}
				}
			}
			else {
				ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
				KafkaMessageListenerContainer.this.stop();
			}
			this.monitorTask.cancel(true);
			if (!this.taskSchedulerExplicitlySet) {
				((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
			}
			this.consumer.close();
			this.logger.info("Consumer stopped");
		}

2、ConcurrentMessageListenerContainer

并發(fā)消息監(jiān)聽,相當于創(chuàng)建消費者;其底層邏輯仍然是通過KafkaMessageListenerContainer實現(xiàn)處理;從實現(xiàn)上看就是在KafkaMessageListenerContainer上做了層包裝,有多少的concurrency就創(chuàng)建多個KafkaMessageListenerContainer,也就是concurrency個消費者。

	protected void doStart() {
		if (!isRunning()) {
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);

            // 創(chuàng)建多個消費者
			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties);
				}
				else {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties, partitionSubset(containerProperties, i));
				}
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-" + i);
				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
				container.start();
				this.containers.add(container);
			}
		}
	}

3、@KafkaListener底層監(jiān)聽原理

上面已經(jīng)介紹了KafkaMessageListenerContainer的作用是拉取并處理消息,但還缺少關(guān)鍵的一步,即 如何將我們的業(yè)務(wù)邏輯與KafkaMessageListenerContainer的處理邏輯聯(lián)系起來?

那么這個橋梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 從后綴BeanPostProcessor就可以知道這是Spring IOC初始化bean相關(guān)的操作,當然這里也是;

此類會掃描帶@KafkaListener注解的類或者方法,通過 KafkaListenerContainerFactory工廠創(chuàng)建對應(yīng)的KafkaMessageListenerContainer,并調(diào)用start方法啟動監(jiān)聽,也就是這樣打通了這條路…

4、Spring Boot 自動加載kafka相關(guān)配置

1、KafkaAutoConfiguration

自動生成kafka相關(guān)配置,比如當缺少這些bean的時候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默認創(chuàng)建bean實例

2、KafkaAnnotationDrivenConfiguration

主要是針對于spring-kafka提供的注解背后的相關(guān)操作,比如 @KafkaListener;

在開啟了@EnableKafka注解后,spring會掃描到此配置并創(chuàng)建缺少的bean實例,比如當配置的工廠beanName不是kafkaListenerContainerFactory的時候,就會默認創(chuàng)建一個beanName為kafkaListenerContainerFactory的實例,這也是為什么在springboot中不用定義consumer的相關(guān)配置也可以通過@KafkaListener正常的處理消息

5、消息處理

單條消息處理

@Configuration
public class KafkaConsumerConfiguration {

  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);
        // poll 一次拉取的阻塞的最大時長,單位:毫秒。這里指的是阻塞拉取需要滿足至少 fetch-min-size 大小的消息
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);
        return props;
    }

}

這種方式的@KafkaLisener中的參數(shù)是單條的。

批量處理

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 增加開啟批量處理
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

// 注意:這里接受的是集合類型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

這種方式的@KafkaLisener中的參數(shù)是多條的。

6、線程池相關(guān)

如果沒有額外給Kafka指定線程池,底層默認用的是SimpleAsyncTaskExecutor類,它不使用線程池,而是為每個任務(wù)創(chuàng)建新線程。相當于一個消費者用一個獨立的線程來跑。

總結(jié)

spring為了將kafka融入其生態(tài),方便在spring大環(huán)境下使用kafka,開發(fā)了spring-kafa這一模塊,本質(zhì)上是為了幫助開發(fā)者更好的以spring的方式使用kafka

@KafkaListener就是這么一個工具,在同一個項目中既可以有單條的消息處理,也可以配置多條的消息處理,稍微改變下配置即可實現(xiàn),很是方便

當然,@KafkaListener單條或者多條消息處理仍然是spring自行封裝處理,與kafka-client客戶端的拉取機制無關(guān);比如一次性拉取50條消息,對于單條處理來說就是循環(huán)50次處理,而多條消息處理則可以一次性處理50條;本質(zhì)上來說這套邏輯都是spring處理的,并不是說單條消費就是通過kafka-client一次只拉取一條消息

在使用過程中需要注意spring自動的創(chuàng)建的一些bean實例,當然也可以覆蓋其自動創(chuàng)建的實例以滿足特定的需求場景。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • springboot寶塔簡單部署的實現(xiàn)示例

    springboot寶塔簡單部署的實現(xiàn)示例

    通過使用Spring Boot,可以快速構(gòu)建出高效、可擴展的應(yīng)用程序,而寶塔面板則提供了簡單易用的網(wǎng)站管理和維護工具,本文將詳細介紹如何將Spring Boot應(yīng)用程序與寶塔面板進行集成,實現(xiàn)自動化部署、配置管理等操作
    2023-11-11
  • java中的Arrays這個工具類你真的會用嗎(一文秒懂)

    java中的Arrays這個工具類你真的會用嗎(一文秒懂)

    這篇文章主要介紹了java中的Arrays這個工具類你真的會用嗎,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • JDBC示例代碼

    JDBC示例代碼

    本教程提供了如何創(chuàng)建一個簡單的JDBC應(yīng)用程序的示例。演示如何打開一個數(shù)據(jù)庫連接,執(zhí)行SQL查詢,并顯示結(jié)果
    2014-03-03
  • 整理java讀書筆記十五之java中的內(nèi)部類

    整理java讀書筆記十五之java中的內(nèi)部類

    內(nèi)部類是指在一個外部類的內(nèi)部再定義一個類。類名不需要和文件夾相同。本文給大家分享java讀書筆記十五之java中的內(nèi)部類,對java讀書筆記相關(guān)知識感興趣的朋友一起學習吧
    2015-12-12
  • IDEA插件EasyCode及MyBatis最優(yōu)配置步驟詳解

    IDEA插件EasyCode及MyBatis最優(yōu)配置步驟詳解

    這篇文章主要介紹了IDEA插件EasyCode MyBatis最優(yōu)配置步驟詳解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • SpringMVC 使用JSR-303進行校驗 @Valid示例

    SpringMVC 使用JSR-303進行校驗 @Valid示例

    本篇文章主要介紹了SpringMVC 使用JSR-303進行校驗 @Valid示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02
  • JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實體類

    JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實體類

    這篇文章主要介紹了JPA如何使用nativequery多表關(guān)聯(lián)查詢返回自定義實體類,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • java常見事件響應(yīng)方法實例匯總

    java常見事件響應(yīng)方法實例匯總

    這篇文章主要介紹了java常見事件響應(yīng)方法,對于初學者有很好的參考借鑒價值,分享給大家,需要的朋友可以參考下
    2014-08-08
  • Java找不到或無法加載主類及編碼錯誤問題的解決方案

    Java找不到或無法加載主類及編碼錯誤問題的解決方案

    今天小編就為大家分享一篇關(guān)于Java找不到或無法加載主類及編碼錯誤問題的解決方案,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-02-02
  • SpringCloud如何利用Feign訪問外部http請求

    SpringCloud如何利用Feign訪問外部http請求

    這篇文章主要介紹了SpringCloud如何利用Feign訪問外部http請求,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論