SpringCloudStream中的消息分區(qū)數(shù)詳解
一、前言
本文僅針對(duì) Kafka 來(lái)聊消息分區(qū)數(shù)相關(guān)的話題。
SpringCloudStream 中的消息分區(qū)數(shù)如何配置?
或者說(shuō)消息分區(qū)數(shù)會(huì)受到哪些配置的影響。
- SpringCloudStream:Greenwich.SR2
- Kafka:kafka_2.12-2.3.0
二、影響因素
2.1 Kafka服務(wù)端
首先應(yīng)該想到的,Kafka 配置文件 server.properties 中默認(rèn)每一個(gè) topic 的分區(qū)數(shù) num.partitions=1
# The default number of log partitions per topic. More partitions allow greater num.partitions=1
2.2 生產(chǎn)者端
從SpringCloudStream的配置中可以看到,生產(chǎn)者可以指定分區(qū)數(shù),默認(rèn)1:
spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n
【說(shuō)明】:當(dāng)分區(qū)功能開啟時(shí),使用該參數(shù)來(lái)配置消息數(shù)據(jù)的分區(qū)數(shù)。
如果消息生產(chǎn)者已經(jīng)配置了分區(qū)鍵的生成策略,那么它的值必須大于1。
2.3 消費(fèi)者端
SpringCloudStream 允許通過(guò)配置,使得消費(fèi)者能夠自動(dòng)創(chuàng)建分區(qū)。
#輸入通道消費(fèi)者的并發(fā)數(shù),默認(rèn)1 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
若想以上配置生效,還需添加如下通用配置:
#Kafka綁定器允許在需要的時(shí)候自動(dòng)創(chuàng)建分區(qū)。默認(rèn)false spring.cloud.stream.kafka.binder.autoAddPartitions=true
消費(fèi)者端如此配置以后,將表現(xiàn)為一個(gè)消費(fèi)者服務(wù)或進(jìn)程中,會(huì)有2個(gè)線程各自消費(fèi)1個(gè)分區(qū),即2個(gè)消費(fèi)者線程同時(shí)消費(fèi)。
以下是該配置的效果驗(yàn)證步驟:
消費(fèi)者代碼:
1個(gè) @StreamListener 消費(fèi)自己的 topic 或自己的輸出channel:
@EnableBinding(SpiderSink.class) @Slf4j public class SpiderSinkReceiver { @Autowired private SpiderMessageService spiderMessageService; @StreamListener(SpiderSink.INPUT) public void receive(Object payload) { log.info("SPIDER-SINK received: {}", payload); } }
方式一:通過(guò)日志驗(yàn)證:
通過(guò)在 log4j 日志中,打印線程名稱的方式,驗(yàn)證 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置確確實(shí)實(shí)會(huì)新增1個(gè)消費(fèi)者線程。
[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50) [INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
方式二:直接查看分區(qū)數(shù)來(lái)驗(yàn)證:
另外,也可在啟動(dòng)一個(gè)生產(chǎn)者服務(wù)時(shí),等待自動(dòng)創(chuàng)建一個(gè)新 topic 后(此時(shí)默認(rèn)分區(qū)數(shù)為1),比如我們創(chuàng)建的 topic 為“topic-spider-dev”,此時(shí)通過(guò)kafka命令查看分區(qū)數(shù),此時(shí)分區(qū)數(shù)為1:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:1 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1
然后,配置消費(fèi)者服務(wù)的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,啟動(dòng)一個(gè)消費(fèi)者服務(wù),再次查看分區(qū)數(shù),已經(jīng)變?yōu)?了:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:2 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1 ? ? ? ? Topic: topic-spider-dev Partition: 1 ? ?Leader: 2 ? ? ? Replicas: 2 ? ? Isr: 2
同時(shí)查看消費(fèi)者端的應(yīng)用日志,看到2個(gè)消費(fèi)者線程各自分配了一個(gè)分區(qū):
[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-0] [INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-1]
最終,確確實(shí)實(shí)地驗(yàn)證了 concurrency 配置對(duì)消費(fèi)者線程數(shù)和分區(qū)數(shù)的影響。
2.4 其他因素
比如,SpringCloudStream 中 Kafka 綁定器的配置中,也有一個(gè)相關(guān)的影響因素:
#最小分區(qū)數(shù),默認(rèn)1 spring.cloud.stream.kafka.binder.minPartitionCount=n
【說(shuō)明】:該參數(shù)僅在設(shè)置了 autoCreateTopics 和 autoAddPartitions 時(shí)生效,用來(lái)設(shè)置該綁定器所使用主題的全局分區(qū)最小數(shù)量。
如果當(dāng)生產(chǎn)者的 partitionCount 參數(shù)或 instanceCount * concurrency 設(shè)置大于該參數(shù)配置時(shí),該參數(shù)值將被覆蓋。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringCloudStream原理和深入使用小結(jié)
- SpringCloud中的Stream服務(wù)間消息傳遞詳解
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動(dòng)消息確認(rèn)問(wèn)題
- 關(guān)于SpringCloudStream配置問(wèn)題
- Spring Cloud Stream 高級(jí)特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實(shí)現(xiàn)分布式事務(wù)的實(shí)踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
Java NIO原理圖文分析及代碼實(shí)現(xiàn)
本文主要介紹Java NIO原理的知識(shí),這里整理了詳細(xì)資料及簡(jiǎn)單示例代碼和原理圖,有需要的小伙伴可以參考下2016-09-09Java時(shí)區(qū)轉(zhuǎn)換實(shí)例代碼解析
這篇文章主要介紹了Java時(shí)區(qū)轉(zhuǎn)換實(shí)例代碼解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03java?常規(guī)輪詢長(zhǎng)輪詢Long?polling實(shí)現(xiàn)示例詳解
這篇文章主要為大家介紹了java?常規(guī)輪詢長(zhǎng)輪詢Long?polling實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12java實(shí)現(xiàn)解析二進(jìn)制文件的方法(字符串、圖片)
本篇文章主要介紹了java實(shí)現(xiàn)解析二進(jìn)制文件的方法(字符串、圖片),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-02-02Matlab及Java實(shí)現(xiàn)小時(shí)鐘效果
這篇文章主要為大家詳細(xì)介紹了Matlab及Java實(shí)現(xiàn)小時(shí)鐘效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-05-05