欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink實(shí)戰(zhàn)之實(shí)現(xiàn)流式數(shù)據(jù)去重

 更新時(shí)間:2025年03月21日 08:59:17   作者:watermark's  
流式數(shù)據(jù)是一種源源不斷產(chǎn)生的數(shù)據(jù),本文探索了一種流式大數(shù)據(jù)的實(shí)時(shí)去重方法,不一定適用于所有場(chǎng)景,不過(guò)或許可以給面對(duì)相似問(wèn)題的你一點(diǎn)點(diǎn)啟發(fā),

流式數(shù)據(jù)是一種源源不斷產(chǎn)生的數(shù)據(jù),沒(méi)有預(yù)定的開(kāi)始與結(jié)束,至少理論上來(lái)說(shuō),它的數(shù)據(jù)輸入永遠(yuǎn)不會(huì)結(jié)束。因此流式數(shù)據(jù)處理與傳統(tǒng)的批處理技術(shù)不同,必須具備持續(xù)不斷地對(duì)到達(dá)的數(shù)據(jù)進(jìn)行處理的能力。

因?yàn)榱魇綌?shù)據(jù)源源不斷地產(chǎn)生,對(duì)流式數(shù)據(jù)做去重就十分困難,因?yàn)橐粭l數(shù)據(jù)重復(fù)與否需要與之前的數(shù)據(jù)痕跡作比對(duì),數(shù)據(jù)是無(wú)窮盡產(chǎn)生的,倘留存之前的數(shù)據(jù),勢(shì)必占據(jù)大量的存儲(chǔ)空間,判重的過(guò)程也會(huì)隨著數(shù)據(jù)量的增加而變得復(fù)雜耗時(shí)。

本文探索了一種流式大數(shù)據(jù)的實(shí)時(shí)去重方法,不一定適用于所有場(chǎng)景,不過(guò)或許可以給面對(duì)相似問(wèn)題的你一點(diǎn)點(diǎn)啟發(fā)。

Bloom 過(guò)濾器

海量數(shù)據(jù)的去重,很容易聯(lián)想到 Bloom 過(guò)濾器。Bloom過(guò)濾器是由一個(gè)長(zhǎng)度為 m 比特的數(shù)組與 k 個(gè)哈希函數(shù)組成的數(shù)據(jù)結(jié)構(gòu)。

當(dāng)要插入一個(gè)元素時(shí),將數(shù)據(jù)分別輸入到 k 個(gè)哈希函數(shù),產(chǎn)生 k 個(gè)哈希值,以哈希值作為位數(shù)組中的索引,將相應(yīng)的比特位置為 1。

如下圖所示,是由 3 個(gè)哈希函數(shù) + 18 個(gè)比特位組成的 Bloom 過(guò)濾器:

當(dāng)元素 "hello" 插入時(shí),3 個(gè)哈希函數(shù)分別計(jì)算得到 3 個(gè)哈希值,將哈希值對(duì)應(yīng)的比特位置為 1。

當(dāng)元素 "world" 插入時(shí),3 個(gè)哈希函數(shù)分別計(jì)算再次得到 3 個(gè)哈希值,將哈希值對(duì)應(yīng)的比特位置為 1。

Bloom 過(guò)濾器的巧妙之處就在于用一張位圖來(lái)留存數(shù)據(jù)的痕跡,無(wú)需存儲(chǔ)數(shù)據(jù)本身,用有限的空間和極低的時(shí)間復(fù)雜度即可完成過(guò)濾。

當(dāng)要查詢一個(gè)元素時(shí),同樣將其輸入 k 個(gè)哈希函數(shù),然后檢查對(duì)應(yīng)的 k 個(gè)比特,如果有任意一個(gè)比特為 0,表明該元素一定不在集合中;如果所有比特均為 1,表明該元素有(較大的)可能性在集合中。為什么無(wú)法百分之百確定元素在集合中呢?以元素 "test" 為例:

我們假設(shè) "test" 經(jīng)過(guò)哈希函數(shù)計(jì)算后得到的哈希值恰好是之前的數(shù)據(jù) "hello" + "world" 的哈希值的子集,此時(shí) Bloom 就會(huì)產(chǎn)生誤判,誤以為 "test" 已經(jīng)在集合中。

不過(guò)這個(gè)誤判率可以通過(guò)增加哈希函數(shù)的個(gè)數(shù)和位圖的大小來(lái)控制在極低的范圍內(nèi),給定預(yù)計(jì)輸入的元素總數(shù) n 和預(yù)期的假陽(yáng)性率 p,經(jīng)過(guò)嚴(yán)格的數(shù)學(xué)推導(dǎo)可以得到哈希函數(shù)的個(gè)數(shù) k 和位圖的大小 m 的理論值:

Bloom 過(guò)濾器去重流數(shù)據(jù)

使用 Bloom 對(duì)流式數(shù)據(jù)去重時(shí),由于 Bloom 的位圖空間有限而流數(shù)據(jù)是源源不斷產(chǎn)生的,有限的位圖空間無(wú)法應(yīng)對(duì)無(wú)限的數(shù)據(jù),而如果定時(shí)重置過(guò)濾器,重置將導(dǎo)致已保存狀態(tài)位的丟失,從而引入重復(fù)記錄,無(wú)法做到 "無(wú)縫" 銜接。示意圖如下:

在 t1 時(shí)刻重置過(guò)濾器時(shí),將導(dǎo)致 t1 時(shí)刻之前的 01,03 數(shù)據(jù)標(biāo)記丟失,重置后再次出現(xiàn)的數(shù)據(jù) 03 將穿透過(guò)濾器,同理在 t2 時(shí)刻、t3 時(shí)刻、t4 時(shí)刻重置過(guò)濾器后,數(shù)據(jù) 06、08、09 也將穿透過(guò)濾器,造成去重結(jié)果不準(zhǔn)確。

Bloom 過(guò)濾器隊(duì)列去重流數(shù)據(jù)

既然一個(gè) Bloom 無(wú)法應(yīng)對(duì)流數(shù)據(jù)的去重,如果用多個(gè) Bloom 過(guò)濾器能否實(shí)現(xiàn)預(yù)期效果呢?

我們采用 Bloom 過(guò)濾器隊(duì)列對(duì)數(shù)據(jù)流進(jìn)行去重,隊(duì)列中的 Bloom 過(guò)濾器是按時(shí)間依次補(bǔ)位到隊(duì)列中的,重點(diǎn)在 “依次”,每個(gè)過(guò)濾器的 TTL (Time To Live) 相同,但存活的起止時(shí)間不同。

如圖所示:

過(guò)濾器-1 的存活起止時(shí)間是[t0, t3];

過(guò)濾器-2 在 t1 時(shí)刻補(bǔ)充到隊(duì)列中,存活起止時(shí)間是 [t1, t4];

過(guò)濾器-3 在 t2 時(shí)刻補(bǔ)位到隊(duì)列中,存活起止時(shí)間是 [t2, t5];

過(guò)濾器-4 在 t3 時(shí)刻補(bǔ)位到隊(duì)列中,存活起止時(shí)間是 [t3, t6],t3 時(shí)刻,過(guò)濾器-1 的生命周期結(jié)束,從過(guò)濾器隊(duì)首移除,新的隊(duì)首是 過(guò)濾器-2;

過(guò)濾器-5 在 t4 時(shí)刻補(bǔ)位到隊(duì)列中,存活起止時(shí)間是 [t4, t7],t4 時(shí)刻,過(guò)濾器-2 的聲明周期結(jié)束,從過(guò)濾器隊(duì)首移除,新的隊(duì)首是 過(guò)濾器-3;

過(guò)濾器-6 在 t5 時(shí)刻補(bǔ)位到隊(duì)列中,存活起止時(shí)間是 [t5, t8],t5 時(shí)刻,過(guò)濾器-3 的聲明周期結(jié)束,從過(guò)濾器隊(duì)首移除,新的隊(duì)首是 過(guò)濾器-4;

過(guò)濾器隊(duì)列中每隔固定時(shí)間間隔從隊(duì)首移除一個(gè)舊的過(guò)濾器,同時(shí)補(bǔ)位到隊(duì)尾一個(gè)新的過(guò)濾器,隊(duì)列的規(guī)模一直保持固定的規(guī)模 (本例中為 3);

這個(gè)過(guò)濾器隊(duì)列如何判別重復(fù)呢?

當(dāng)接收到一個(gè)數(shù)據(jù)元素時(shí),用過(guò)濾器隊(duì)列中的 每個(gè)過(guò)濾器 來(lái)判斷該數(shù)據(jù)是否出現(xiàn)過(guò),只有當(dāng)隊(duì)列中的每個(gè)過(guò)濾器都判定為 "未出現(xiàn)過(guò)" 時(shí),才認(rèn)為是非重復(fù)數(shù)據(jù),允許通過(guò);只要隊(duì)列中有任何一個(gè)過(guò)濾器判斷為 "已出現(xiàn)過(guò)",則攔截該數(shù)據(jù)。

無(wú)論攔截或是放行該條數(shù)據(jù),都在在當(dāng)前隊(duì)列中的 First 2 個(gè)過(guò)濾器中留存該數(shù)據(jù)記錄的 "痕跡"(圖中用相同位置的綠色 bit 標(biāo)識(shí)數(shù)據(jù)的痕跡)。

還是以上圖為例,介紹一下過(guò)濾器隊(duì)列的工作過(guò)程:

[t0, t1] 時(shí)間段,隊(duì)列中只有 1 個(gè)過(guò)濾器:過(guò)濾器-1,數(shù)據(jù) 01,01,03 依次到達(dá)后,經(jīng) 過(guò)濾器-1去重后的結(jié)果是 01,03,在 過(guò)濾器-1 中記錄 [t0, t1] 時(shí)間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位;

[t1, t2] 時(shí)間段,隊(duì)列中有 2 個(gè)過(guò)濾器:過(guò)濾器-1、過(guò)濾器-2,當(dāng)數(shù)據(jù) 03,03,04 依次到達(dá)后,03 被 過(guò)濾器-1 攔截,04 可以通過(guò)過(guò)濾器隊(duì)列,因此去重后的結(jié)果是 04,同時(shí)在 過(guò)濾器-1 和 過(guò)濾器-2 中記錄 [t1, t2] 時(shí)間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位;

[t2, t3] 時(shí)間段,隊(duì)列中有 3 個(gè)過(guò)濾器:過(guò)濾器-1、過(guò)濾器-2、過(guò)濾器-3。當(dāng)數(shù)據(jù) 04,06,06 依次到達(dá)后,04 被 過(guò)濾器-1、過(guò)濾器-2 攔截,06 可以通過(guò)過(guò)濾器隊(duì)列,因此去重后的結(jié)果是 06,同時(shí)在 過(guò)濾器-1 和 過(guò)濾器-2 中記錄 [t2, t3] 時(shí)間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位,過(guò)濾器-2 就是過(guò)濾器-1 在 [t1, t3] 時(shí)間段的備份;因?yàn)?[t2, t3] 時(shí)刻 過(guò)濾器-1 的狀態(tài)已經(jīng)復(fù)制到了 過(guò)濾器-2 中,過(guò)濾器-3 在[t2, t3] 時(shí)間段就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);

t3 時(shí)刻,過(guò)濾器-4 補(bǔ)位到隊(duì)尾,過(guò)濾器-1從隊(duì)首移除 (t3 時(shí)刻之后,如果還有 t3 時(shí)刻之前出現(xiàn)過(guò)的數(shù)據(jù)再次出現(xiàn),將會(huì)穿透過(guò)濾器隊(duì)列,我們可以通過(guò)設(shè)置過(guò)濾器的存活時(shí)間和隊(duì)列的大小來(lái)盡量避免這一情況的發(fā)生);

[t3, t4] 時(shí)間段,隊(duì)列中有 3 個(gè)過(guò)濾器:過(guò)濾器-2、過(guò)濾器-3、過(guò)濾器-4,當(dāng)數(shù)據(jù) 06,08,07 依次到達(dá)后,06 被 過(guò)濾器-2 攔截,08 和 07 可以通過(guò)過(guò)濾器隊(duì)列,因此去重后的結(jié)果是 08,07,同時(shí)在 過(guò)濾器-2 和 過(guò)濾器-3 中記錄 [t3, t4] 時(shí)間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過(guò)濾器-3 作為 過(guò)濾器-2 在 [t3, t4] 時(shí)間段的備份),因?yàn)?[t3, t4] 時(shí)刻 過(guò)濾器-2 的狀態(tài)已經(jīng)復(fù)制到了 過(guò)濾器-3 中,過(guò)濾器-4 在[t3, t4] 時(shí)間段就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);

t4 時(shí)刻,過(guò)濾器-5 補(bǔ)位到隊(duì)尾,過(guò)濾器-2 從隊(duì)首移除 (t4 時(shí)刻之后,如果還有 t2 時(shí)刻之前出現(xiàn)過(guò)的數(shù)據(jù)再次出現(xiàn),將會(huì)穿透過(guò)濾器隊(duì)列,我們可以通過(guò)設(shè)置過(guò)濾器的存活時(shí)間和隊(duì)列的大小來(lái)避免這一情況的發(fā)生);

[t4, t5] 時(shí)間段,隊(duì)列中有 3 個(gè)過(guò)濾器:過(guò)濾器-3、過(guò)濾器-4、過(guò)濾器-5,當(dāng)數(shù)據(jù) 08,08,09依次到達(dá)后,08 被 過(guò)濾器-3 攔截,09 可以通過(guò)過(guò)濾器隊(duì)列,因此去重后的結(jié)果是 09,同時(shí)在 過(guò)濾器-3 和 過(guò)濾器-4 中記錄 [t3, t4] 時(shí)刻流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過(guò)濾器-4 作為 過(guò)濾器-3 在 [t4, t5] 時(shí)間段的備份),因?yàn)?[t4, t5] 時(shí)間段 過(guò)濾器-3 的狀態(tài)已經(jīng)復(fù)制到了 過(guò)濾器-4 中,過(guò)濾器-5 在 [t4, t5] 時(shí)刻就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);

t5 時(shí)刻,過(guò)濾器-6 補(bǔ)位到隊(duì)尾,過(guò)濾器-3 從隊(duì)首移除 (t5時(shí)刻之后,如果還有 t3 時(shí)刻之前出現(xiàn)過(guò)的數(shù)據(jù)再次出現(xiàn),將會(huì)穿透過(guò)濾器隊(duì)列,我們可以通過(guò)設(shè)置過(guò)濾器的存活時(shí)間和隊(duì)列的大小來(lái)避免這一情況的發(fā)生);

[t5, t6] 時(shí)間段,隊(duì)列中有 3 個(gè)過(guò)濾器:過(guò)濾器-4、過(guò)濾器-5、過(guò)濾器-6,當(dāng)數(shù)據(jù) 09,09,10 依次到達(dá)后,09 被 過(guò)濾器-4 攔截,10 可以通過(guò)過(guò)濾器隊(duì)列,因此去重后的結(jié)果是 10,同時(shí)在 過(guò)濾器-4 和 過(guò)濾器-5 中記錄 [t5, t6] 時(shí)刻流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過(guò)濾器-5 作為 過(guò)濾器-4 在 [t5, t6] 時(shí)刻的備份),因?yàn)?[t5, t6] 時(shí)刻過(guò)濾器-4 的狀態(tài)已經(jīng)復(fù)制到了 過(guò)濾器-5 中,過(guò)濾器-6 在[t5, t6] 時(shí)刻就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);

實(shí)現(xiàn)

如何把上述設(shè)計(jì)在 Flink 中實(shí)現(xiàn)呢,Bloom 過(guò)濾器隊(duì)列是隨著時(shí)間動(dòng)態(tài)變化的,因此需要用到 Flink 的 定時(shí)器。KeyedProcessFunction 算子的 TimerService 就提供了定時(shí)器注冊(cè)功能,可以注冊(cè) EventTimeTimer 或 ProcessingTimeTimer。

BloomFilterProcessFunction.java:

package org.example.flink.operator;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.example.flink.data.Trace;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;


public class BloomFilterProcessFunction extends KeyedProcessFunction<String, Trace, Trace> {

	private static final long serialVersionUID = 1L;
	
	// bloom預(yù)計(jì)插入的數(shù)據(jù)量
	private static final long EXPECTED_INSERTIONS = 5000000L;
	// bloom的假陽(yáng)性率
	private static final double FPP = 0.001;
	// bloom過(guò)濾器TTL
	private static final long TTL = 60 * 1000;
	// bloom過(guò)濾器隊(duì)列size
	private static final int FILTER_QUEUE_SIZE = 10;
	// bloom過(guò)濾器隊(duì)列
	private List<BloomFilter<String>> bloomFilterList;
	// 是否已經(jīng)注冊(cè)定時(shí)器
	private boolean registeredTimerTask = false;
	
	@Override
	public void open(Configuration parameters) throws Exception {
		bloomFilterList = new ArrayList<>(FILTER_QUEUE_SIZE);
		BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),
				EXPECTED_INSERTIONS, FPP);
		bloomFilterList.add(bloomFilter);
	}
	
	@Override
	public void processElement(Trace trace, KeyedProcessFunction<String, Trace, Trace>.Context context,
			Collector<Trace> out) throws Exception {
		BloomFilter<String> firstBloomFilter = bloomFilterList.get(0);
		String key = trace.getGid();
		// 只要有一個(gè)bloom未hit該元素,就意味著該元素從未出現(xiàn)過(guò),在隊(duì)列中的所有過(guò)濾器留下該元素的標(biāo)記
		if (!firstBloomFilter.mightContain(key)) {
			for (BloomFilter<String> bloomFilter : bloomFilterList) {
				bloomFilter.put(key);
			}
			// 該元素從未出現(xiàn)過(guò),為非重復(fù)數(shù)據(jù)
			out.collect(trace);
		}
		if (!registeredTimerTask) {
			long current = context.timerService().currentProcessingTime();
			// 注冊(cè)處理時(shí)間定時(shí)器
			context.timerService().registerProcessingTimeTimer(current + TTL);
			registeredTimerTask = true;
		}
	}
	
	@Override
	public void onTimer(long timestamp, OnTimerContext context, Collector<Trace> out) throws Exception {
		// append新的bloomFilter到bloom過(guò)濾器隊(duì)列
		bloomFilterList
				.add(BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), EXPECTED_INSERTIONS, FPP));
		// 清理第一個(gè)bloomFilter
		if (bloomFilterList.size() > FILTER_QUEUE_SIZE) {
			bloomFilterList.remove(0);
		}
		// 創(chuàng)建一個(gè)新的timer task
		context.timerService().registerProcessingTimeTimer(timestamp + TTL);
	}

	@Override
	public void close() throws Exception {
		bloomFilterList = null;
	}
}

以下是主程序入口,實(shí)驗(yàn)場(chǎng)景還是設(shè)定為從 Kafka 消費(fèi)數(shù)據(jù),去重后寫入到 MySQL:

StreamDeduplication.java:

package org.example.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.flink.data.Trace;
import org.example.flink.operator.BloomFilterProcessFunction;

import com.google.gson.Gson;


public class StreamDeduplication {

	public static void main(String[] args) throws Exception {
		// 1. prepare
		Configuration configuration = new Configuration();
		configuration.setString("rest.port", "9091");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
		env.enableCheckpointing(2 * 60 * 1000);
		env.setStateBackend(new EmbeddedRocksDBStateBackend());  // 使用rocksDB作為狀態(tài)后端
		
		// 2. Kafka Source
		KafkaSource<String> source = KafkaSource.<String>builder()
			.setBootstrapServers("127.0.0.1:9092")
			.setTopics("trace")
			.setGroupId("group-01")
		    .setStartingOffsets(OffsetsInitializer.latest())
		    .setProperty("commit.offsets.on.checkpoint", "true")
		    .setValueOnlyDeserializer(new SimpleStringSchema())
		    .build();

		DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
				"Kafka Source");
		sourceStream.setParallelism(1);	// 設(shè)置source算子的并行度為1
		
		// 3. 轉(zhuǎn)換為Trace對(duì)象
		SingleOutputStreamOperator<Trace> mapStream = sourceStream.map(new MapFunction<String, Trace>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Trace map(String value) throws Exception {
				Gson gson = new Gson();
				Trace trace = gson.fromJson(value, Trace.class);
				return trace;
			}
		});
		mapStream.name("Map to Trace");
		mapStream.setParallelism(1);	// 設(shè)置map算子的并行度為1
		
		// 4. Bloom過(guò)濾器去重, 在去重之前要keyBy處理,保障同一gid的數(shù)據(jù)全都交由同一個(gè)線程處理
		SingleOutputStreamOperator<Trace> deduplicatedStream = mapStream.keyBy(
				new KeySelector<Trace, String>() {
					
					private static final long serialVersionUID = 1L;

					@Override
					public String getKey(Trace trace) throws Exception {
						return trace.getGid();
					}
				})
			.process(new BloomFilterProcessFunction());
		deduplicatedStream.name("Bloom filter process for distinct gid");
		deduplicatedStream.setParallelism(2);	// 設(shè)置去重算子的并行度為2
		
		// 5. 將去重結(jié)果寫入DataBase
		DataStreamSink<Trace> sinkStream = deduplicatedStream.addSink(
				JdbcSink.sink("insert into flink.deduplication(gid, timestamp) values (?, ?);",
						(statement, trace) -> {
							statement.setString(1, trace.getGid());
                            statement.setLong(2, trace.getTimestamp());
                        },
						JdbcExecutionOptions.builder()
                        		.withBatchSize(1000)
                        		.withBatchIntervalMs(200)
                        		.withMaxRetries(5)
                        		.build(), 
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            	.withUrl("jdbc:mysql://127.0.0.1:3306/flink")
                            	.withUsername("username")
                            	.withPassword("password")
                            	.build())
				);
		sinkStream.name("Sink DB");
		sinkStream.setParallelism(1);
		
		// 執(zhí)行
		env.execute("Stream Real-Time Deduplication");
	}
}

測(cè)試

以下是向 Kafka 生產(chǎn)重復(fù)數(shù)據(jù)的測(cè)試程序,程序中模擬了數(shù)據(jù)亂序到達(dá)的情況。

public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    String topic = "trace";
    props.put("bootstrap.servers", "127.0.0.1:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<String, String>(props);

    InputStream inputStream = KafkaDataProducer.class.getClassLoader().getResourceAsStream(TEST_DATA);
    Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name());
    String content = scanner.useDelimiter("\\A").next();
    scanner.close();
    JSONObject jsonContent = JSONObject.parseObject(content);

    int nonDuplicateNum = 100000;
    int repeatNum = 100;
    Random r = new Random();
    for (int i = 0; i < nonDuplicateNum; i++) {
        String id = jsonContent.getString(GID);
        String newId = increase(id, String.valueOf(i));
        jsonContent.put(GID, newId);
        // 制造重復(fù)數(shù)據(jù)
        for (int j = 0; j < repeatNum; j++) {
            // 對(duì)時(shí)間進(jìn)行隨機(jī)擾動(dòng),模擬數(shù)據(jù)亂序到達(dá)
            long current = System.currentTimeMillis() - r.nextInt(60) * 1000;
            jsonContent.put(TIMESTAMP, current);
            producer.send(new ProducerRecord<String, String>(topic, jsonContent.toString()));
        }
        // wait some time
        Thread.sleep(5);
    }
    Thread.sleep(2000);
    System.out.println("\n");
    System.out.println("finished");
    producer.close();
}

共生產(chǎn)了 10, 000, 000 條 ID,其中非重復(fù)的 ID 共計(jì) 100, 000 個(gè)。我們看一下 Flink 是否能做到實(shí)時(shí)去重,將 100, 000 個(gè)非重復(fù) ID 的結(jié)果正確寫入到數(shù)據(jù)庫(kù)。實(shí)驗(yàn)過(guò)程耗時(shí)較長(zhǎng),簡(jiǎn)單看一下動(dòng)態(tài)效果圖:

可以看到,F(xiàn)link 的處理速度非???,去重結(jié)果的數(shù)值和 Kafka 中實(shí)際的 distinct id 值跟的非常緊,幾乎是毫秒延遲!

以上就是Flink實(shí)戰(zhàn)之實(shí)現(xiàn)流式數(shù)據(jù)去重的詳細(xì)內(nèi)容,更多關(guān)于Flink流式數(shù)據(jù)去重的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • MyBatis-Plus+達(dá)夢(mèng)數(shù)據(jù)庫(kù)實(shí)現(xiàn)高效數(shù)據(jù)持久化的示例

    MyBatis-Plus+達(dá)夢(mèng)數(shù)據(jù)庫(kù)實(shí)現(xiàn)高效數(shù)據(jù)持久化的示例

    這篇文章主要介紹了MyBatis-Plus和達(dá)夢(mèng)數(shù)據(jù)庫(kù)實(shí)現(xiàn)高效數(shù)據(jù)持久化,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-08-08
  • SpringBoot+WebSocket實(shí)現(xiàn)即時(shí)通訊的方法詳解

    SpringBoot+WebSocket實(shí)現(xiàn)即時(shí)通訊的方法詳解

    這篇文章主要為大家詳細(xì)介紹了如何利用SpringBoot+WebSocket實(shí)現(xiàn)即時(shí)通訊功能,文中示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)或工作有一定參考價(jià)值,需要的可以參考一下
    2022-05-05
  • Spring Boot 實(shí)現(xiàn)https ssl免密登錄(X.509 pki登錄)

    Spring Boot 實(shí)現(xiàn)https ssl免密登錄(X.509 pki登錄)

    這篇文章主要介紹了Spring Boot 實(shí)現(xiàn)https ssl免密登錄(X.509 pki登錄),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • java中hasNextInt判斷后無(wú)限循環(huán)輸出else項(xiàng)的解決方法

    java中hasNextInt判斷后無(wú)限循環(huán)輸出else項(xiàng)的解決方法

    這篇文章主要介紹了java中hasNextInt判斷后無(wú)限循環(huán)輸出else項(xiàng)的解決方法的相關(guān)資料,需要的朋友可以參考下
    2016-10-10
  • Mybatis之#{}與${}的區(qū)別使用詳解

    Mybatis之#{}與${}的區(qū)別使用詳解

    這篇文章主要介紹了Mybatis之#{}與${}的區(qū)別詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Mybatis優(yōu)化檢索的方法詳解

    Mybatis優(yōu)化檢索的方法詳解

    MyBatis是一款優(yōu)秀的基于Java的持久層框架,它可以將 SQL 語(yǔ)句和數(shù)據(jù)庫(kù)中的記錄映射成為 Java 對(duì)象,并且支持靈活的 SQL 查詢語(yǔ)句,在Mybatis中,可以使用動(dòng)態(tài)SQL來(lái)靈活構(gòu)造SQL語(yǔ)句,從而滿足各種不同的檢索需求,本文介紹Mybatis如何優(yōu)化檢索,需要的朋友可以參考下
    2024-05-05
  • 一文搞懂Java創(chuàng)建線程的五種方法

    一文搞懂Java創(chuàng)建線程的五種方法

    本文主要為大家詳細(xì)介紹一下Java實(shí)現(xiàn)線程創(chuàng)建的五種常見(jiàn)方式,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)有一定的幫助,感興趣的可以跟隨小編學(xué)習(xí)一下
    2022-06-06
  • Mybatis?MappedStatement類核心原理詳解

    Mybatis?MappedStatement類核心原理詳解

    這篇文章主要介紹了Mybatis?MappedStatement類,mybatis的mapper文件最終會(huì)被解析器,解析成MappedStatement,其中insert|update|delete|select每一個(gè)標(biāo)簽分別對(duì)應(yīng)一個(gè)MappedStatement
    2022-11-11
  • SpringBoot集成Validation參數(shù)校驗(yàn)

    SpringBoot集成Validation參數(shù)校驗(yàn)

    這篇文章主要為大家詳細(xì)介紹了SpringBoot集成Validation參數(shù)校驗(yàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • 從零開(kāi)始Java實(shí)現(xiàn)Parser?Combinator

    從零開(kāi)始Java實(shí)現(xiàn)Parser?Combinator

    這篇文章主要為大家介紹了從零開(kāi)始Java實(shí)現(xiàn)Parser?Combinator過(guò)程及原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05

最新評(píng)論