欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

kafka生產(chǎn)者發(fā)送消息流程深入分析講解

 更新時(shí)間:2023年03月30日 09:17:06   作者:william_cr7  
本文將介紹kafka的一條消息的發(fā)送流程,從消息的發(fā)送到服務(wù)端的存儲(chǔ)。上文說到kafak分為客戶端與服務(wù)端,要發(fā)送消息就涉及到了網(wǎng)絡(luò)通訊,kafka采用TCP協(xié)議進(jìn)行客戶端與服務(wù)端的通訊協(xié)議

消息發(fā)送過程

消息的發(fā)送可能會(huì)經(jīng)過攔截器、序列化、分區(qū)器等過程。消息發(fā)送的主要涉及兩個(gè)線程,分別為main線程和sender線程。

如圖所示,主線程由 afkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器RecordAccumulator (也稱為消息收集器)中。 Sender 線程負(fù)責(zé)從RecordAccumulator 獲取消息并將其發(fā)送到 Kafka中。

攔截器

在消息序列化之前會(huì)經(jīng)過消息攔截器,自定義攔截器需要實(shí)現(xiàn)ProducerInterceptor接口,接口主要有兩個(gè)方案#onSend和#onAcknowledgement,在消息發(fā)送之前會(huì)調(diào)用前者方法,可以在發(fā)送之前假如處理邏輯,比如計(jì)費(fèi)。在收到服務(wù)端ack響應(yīng)后會(huì)觸發(fā)后者方法。需要注意的是攔截器中不要加入過多的復(fù)雜業(yè)務(wù)邏輯,以免影響發(fā)送效率。

消息分區(qū)

消息ProducerRecord會(huì)將消息路由到那個(gè)分區(qū)中,分兩種情況:

1.指定了partition字段

如果消息ProducerRecord中指定了 partition字段,那么就不需要走分區(qū)器,直接發(fā)往指定得partition分區(qū)中。

2.沒有指定partition,但自定義了分區(qū)器

3.沒指定parittion,也沒有自定義分區(qū)器,但key不為空

4.沒指定parittion,也沒有自定義分區(qū)器,key也為空

看源碼

// KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//指定分區(qū)partition則直接返回,否則走分區(qū)器
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(),                 serializedValue, cluster);
}
//DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

partition 方法中定義了分區(qū)分配邏輯 如果 ke 不為 null , 那 么默認(rèn)的分區(qū)器會(huì)對(duì) key 進(jìn)行哈 希(采 MurmurHash2 算法 ,具備高運(yùn)算性能及 低碰 撞率),最終根據(jù)得到 哈希值來 算分區(qū)號(hào), 有相同 key 的消息會(huì)被寫入同一個(gè)分區(qū) 如果 key null ,那么消息將會(huì)以輪詢的方式發(fā)往主題內(nèi)的各個(gè)可用分區(qū)。

消息累加器

分區(qū)確定好了之后,消息并不是直接發(fā)送給broker,因?yàn)橐粋€(gè)個(gè)發(fā)送網(wǎng)絡(luò)消耗太大,而是先緩存到消息累加器RecordAccumulator,RecordAccumulator主要用來緩存消息 Sender 線程可以批量發(fā)送,進(jìn) 減少網(wǎng)絡(luò)傳輸 的資源消耗以提升性能 RecordAccumulator 緩存的大 小可以通過生產(chǎn)者客戶端參數(shù) buffer memory 配置,默認(rèn)值為 33554432B ,即 32MB如果生產(chǎn)者發(fā)送消息的速度超過發(fā) 送到服務(wù)器的速度 ,則會(huì)導(dǎo)致生產(chǎn)者空間不足,這個(gè)時(shí)候 KafkaProducer的send()方法調(diào)用要么 被阻塞,要么拋出異常,這個(gè)取決于參數(shù) max block ms 的配置,此參數(shù)的默認(rèn)值為 60秒。

消息累加器本質(zhì)上是個(gè)ConcurrentMap,

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

發(fā)送流程源碼分析

//KafkaProducer
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
	// intercept the record, which can be potentially modified; this method does not throw exceptions
    //首先執(zhí)行攔截器鏈
	ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
	return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
	try {
		throwIfProducerClosed();
		// first make sure the metadata for the topic is available
		long nowMs = time.milliseconds();
		ClusterAndWaitTime clusterAndWaitTime;
		try {
			clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
		} catch (KafkaException e) {
			if (metadata.isClosed())
				throw new KafkaException("Producer closed while send in progress", e);
			throw e;
		}
		nowMs += clusterAndWaitTime.waitedOnMetadataMs;
		long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
		Cluster cluster = clusterAndWaitTime.cluster;
		byte[] serializedKey;
		try {
			//key序列化
			serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in key.serializer", cce);
		}
		byte[] serializedValue;
		try {
			//value序列化
			serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in value.serializer", cce);
		}
		//獲取分區(qū)partition
		int partition = partition(record, serializedKey, serializedValue, cluster);
		tp = new TopicPartition(record.topic(), partition);
		setReadOnly(record.headers());
		Header[] headers = record.headers().toArray();
		//消息壓縮
		int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
				compressionType, serializedKey, serializedValue, headers);
		//判斷消息是否超過最大允許大小,消息緩存空間是否已滿
		ensureValidRecordSize(serializedSize);
		long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
		if (log.isTraceEnabled()) {
			log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
		}
		// producer callback will make sure to call both 'callback' and interceptor callback
		Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
		if (transactionManager != null && transactionManager.isTransactional()) {
			transactionManager.failIfNotReadyForSend();
		}
		//將消息緩存在消息累加器RecordAccumulator中
		RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
        //開辟新的ProducerBatch
		if (result.abortForNewBatch) {
			int prevPartition = partition;
			partitioner.onNewBatch(record.topic(), cluster, prevPartition);
			partition = partition(record, serializedKey, serializedValue, cluster);
			tp = new TopicPartition(record.topic(), partition);
			if (log.isTraceEnabled()) {
				log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
			}
			// producer callback will make sure to call both 'callback' and interceptor callback
			interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
			result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
		}
		if (transactionManager != null && transactionManager.isTransactional())
			transactionManager.maybeAddPartitionToTransaction(tp);
		//判斷消息是否已滿,喚醒sender線程進(jìn)行發(fā)送消息
		if (result.batchIsFull || result.newBatchCreated) {
			log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
			this.sender.wakeup();
		}
		return result.future;
		// handling exceptions and record the errors;
		// for API exceptions return them in the future,
		// for other exceptions throw directly
	} catch (Exception e) {
		// we notify interceptor about all exceptions, since onSend is called before anything else in this method
		this.interceptors.onSendError(record, tp, e);
		throw e;
	}
}

生產(chǎn)消息的可靠性

消息發(fā)送到broker,什么情況下生產(chǎn)者才確定消息寫入成功了呢?ack是生產(chǎn)者一個(gè)重要的參數(shù),它有三個(gè)值,ack=1表示leader副本寫入成功服務(wù)端即可返回給生產(chǎn)者,是吞吐量和消息可靠性的平衡方案;ack=0表示生產(chǎn)者發(fā)送消息之后不需要等服務(wù)端響應(yīng),這種消息丟失風(fēng)險(xiǎn)最大;ack=-1表示生產(chǎn)者需要等等ISR中所有副本寫入成功后才能收到響應(yīng),這種消息可靠性最高但吞吐量也是最小的。

到此這篇關(guān)于kafka生產(chǎn)者發(fā)送消息流程深入分析講解的文章就介紹到這了,更多相關(guān)kafka發(fā)送消息流程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 基于雪花算法實(shí)現(xiàn)增強(qiáng)版ID生成器詳解

    基于雪花算法實(shí)現(xiàn)增強(qiáng)版ID生成器詳解

    這篇文章主要為大家詳細(xì)介紹了如何基于雪花算法實(shí)現(xiàn)增強(qiáng)版ID生成器,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)具有一定的借鑒價(jià)值,需要的可以了解一下
    2022-10-10
  • Java?IO篇之Reactor?網(wǎng)絡(luò)模型的概念

    Java?IO篇之Reactor?網(wǎng)絡(luò)模型的概念

    Reactor?模式也叫做反應(yīng)器設(shè)計(jì)模式,是一種為處理服務(wù)請(qǐng)求并發(fā)提交到一個(gè)或者多個(gè)服務(wù)處理器的事件設(shè)計(jì)模式,Reactor?模式主要由?Reactor?和處理器?Handler?這兩個(gè)核心部分組成,本文給大家介紹Java?IO篇之Reactor?網(wǎng)絡(luò)模型的概念,感興趣的朋友一起看看吧
    2022-01-01
  • Java如何實(shí)現(xiàn)自定義異常類

    Java如何實(shí)現(xiàn)自定義異常類

    這篇文章主要介紹了Java如何實(shí)現(xiàn)自定義異常類,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • 詳解java調(diào)用存儲(chǔ)過程并封裝成map

    詳解java調(diào)用存儲(chǔ)過程并封裝成map

    這篇文章主要介紹了詳解java調(diào)用存儲(chǔ)過程并封裝成map的相關(guān)資料,希望通過本文能幫助到大家實(shí)現(xiàn)這樣的功能,需要的朋友可以參考下
    2017-09-09
  • 帶你了解Java Maven的打包操作

    帶你了解Java Maven的打包操作

    這篇文章主要介紹了Maven打包的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • Java中split根據(jù)"."分割字符串問題舉例

    Java中split根據(jù)"."分割字符串問題舉例

    split表達(dá)式其實(shí)就是一個(gè)正則表達(dá)式,* | . ^ 等符號(hào)在正則表達(dá)式中屬于一種有特殊含義的字符,下面這篇文章主要給大家介紹了關(guān)于Java中split根據(jù)“.“分割字符串問題的相關(guān)資料,需要的朋友可以參考下
    2022-10-10
  • Java利用廣度優(yōu)先搜索實(shí)現(xiàn)抓牛問題

    Java利用廣度優(yōu)先搜索實(shí)現(xiàn)抓牛問題

    廣度優(yōu)先搜索是最簡(jiǎn)便的圖的搜索算法之一,這一算法也是很多重要的圖的算法的原型。本文將利用廣度優(yōu)先搜索實(shí)現(xiàn)抓牛問題,感興趣的可以了解下
    2022-06-06
  • Java基于接口實(shí)現(xiàn)模擬動(dòng)物聲音代碼實(shí)例

    Java基于接口實(shí)現(xiàn)模擬動(dòng)物聲音代碼實(shí)例

    這篇文章主要介紹了Java基于接口實(shí)現(xiàn)模擬動(dòng)物聲音代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Java中泛型的示例詳解

    Java中泛型的示例詳解

    泛型機(jī)制在項(xiàng)目中一直都在使用,不僅如此,很多源碼中都用到了泛型機(jī)制。本文將通過一些示例帶大家深入了解一下Java的泛型機(jī)制,需要的可以了解一下
    2022-10-10
  • MyBatis-Plus中使用EntityWrappe進(jìn)行列表數(shù)據(jù)倒序設(shè)置方式

    MyBatis-Plus中使用EntityWrappe進(jìn)行列表數(shù)據(jù)倒序設(shè)置方式

    這篇文章主要介紹了MyBatis-Plus中使用EntityWrappe進(jìn)行列表數(shù)據(jù)倒序設(shè)置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評(píng)論