Flink實戰(zhàn)之實現(xiàn)流式數(shù)據(jù)去重
流式數(shù)據(jù)是一種源源不斷產(chǎn)生的數(shù)據(jù),沒有預(yù)定的開始與結(jié)束,至少理論上來說,它的數(shù)據(jù)輸入永遠(yuǎn)不會結(jié)束。因此流式數(shù)據(jù)處理與傳統(tǒng)的批處理技術(shù)不同,必須具備持續(xù)不斷地對到達(dá)的數(shù)據(jù)進(jìn)行處理的能力。
因為流式數(shù)據(jù)源源不斷地產(chǎn)生,對流式數(shù)據(jù)做去重就十分困難,因為一條數(shù)據(jù)重復(fù)與否需要與之前的數(shù)據(jù)痕跡作比對,數(shù)據(jù)是無窮盡產(chǎn)生的,倘留存之前的數(shù)據(jù),勢必占據(jù)大量的存儲空間,判重的過程也會隨著數(shù)據(jù)量的增加而變得復(fù)雜耗時。
本文探索了一種流式大數(shù)據(jù)的實時去重方法,不一定適用于所有場景,不過或許可以給面對相似問題的你一點點啟發(fā)。
Bloom 過濾器
海量數(shù)據(jù)的去重,很容易聯(lián)想到 Bloom 過濾器。Bloom過濾器是由一個長度為 m 比特的數(shù)組與 k 個哈希函數(shù)組成的數(shù)據(jù)結(jié)構(gòu)。
當(dāng)要插入一個元素時,將數(shù)據(jù)分別輸入到 k 個哈希函數(shù),產(chǎn)生 k 個哈希值,以哈希值作為位數(shù)組中的索引,將相應(yīng)的比特位置為 1。
如下圖所示,是由 3 個哈希函數(shù) + 18 個比特位組成的 Bloom 過濾器:
當(dāng)元素 "hello" 插入時,3 個哈希函數(shù)分別計算得到 3 個哈希值,將哈希值對應(yīng)的比特位置為 1。
當(dāng)元素 "world" 插入時,3 個哈希函數(shù)分別計算再次得到 3 個哈希值,將哈希值對應(yīng)的比特位置為 1。
Bloom 過濾器的巧妙之處就在于用一張位圖來留存數(shù)據(jù)的痕跡,無需存儲數(shù)據(jù)本身,用有限的空間和極低的時間復(fù)雜度即可完成過濾。
當(dāng)要查詢一個元素時,同樣將其輸入 k 個哈希函數(shù),然后檢查對應(yīng)的 k 個比特,如果有任意一個比特為 0,表明該元素一定不在集合中;如果所有比特均為 1,表明該元素有(較大的)可能性在集合中。為什么無法百分之百確定元素在集合中呢?以元素 "test" 為例:
我們假設(shè) "test" 經(jīng)過哈希函數(shù)計算后得到的哈希值恰好是之前的數(shù)據(jù) "hello" + "world" 的哈希值的子集,此時 Bloom 就會產(chǎn)生誤判,誤以為 "test" 已經(jīng)在集合中。
不過這個誤判率可以通過增加哈希函數(shù)的個數(shù)和位圖的大小來控制在極低的范圍內(nèi),給定預(yù)計輸入的元素總數(shù) n 和預(yù)期的假陽性率 p,經(jīng)過嚴(yán)格的數(shù)學(xué)推導(dǎo)可以得到哈希函數(shù)的個數(shù) k 和位圖的大小 m 的理論值:
Bloom 過濾器去重流數(shù)據(jù)
使用 Bloom 對流式數(shù)據(jù)去重時,由于 Bloom 的位圖空間有限而流數(shù)據(jù)是源源不斷產(chǎn)生的,有限的位圖空間無法應(yīng)對無限的數(shù)據(jù),而如果定時重置過濾器,重置將導(dǎo)致已保存狀態(tài)位的丟失,從而引入重復(fù)記錄,無法做到 "無縫" 銜接。示意圖如下:
在 t1 時刻重置過濾器時,將導(dǎo)致 t1 時刻之前的 01,03 數(shù)據(jù)標(biāo)記丟失,重置后再次出現(xiàn)的數(shù)據(jù) 03 將穿透過濾器,同理在 t2 時刻、t3 時刻、t4 時刻重置過濾器后,數(shù)據(jù) 06、08、09 也將穿透過濾器,造成去重結(jié)果不準(zhǔn)確。
Bloom 過濾器隊列去重流數(shù)據(jù)
既然一個 Bloom 無法應(yīng)對流數(shù)據(jù)的去重,如果用多個 Bloom 過濾器能否實現(xiàn)預(yù)期效果呢?
我們采用 Bloom 過濾器隊列對數(shù)據(jù)流進(jìn)行去重,隊列中的 Bloom 過濾器是按時間依次補(bǔ)位到隊列中的,重點在 “依次”,每個過濾器的 TTL (Time To Live) 相同,但存活的起止時間不同。
如圖所示:
過濾器-1 的存活起止時間是[t0, t3];
過濾器-2 在 t1 時刻補(bǔ)充到隊列中,存活起止時間是 [t1, t4];
過濾器-3 在 t2 時刻補(bǔ)位到隊列中,存活起止時間是 [t2, t5];
過濾器-4 在 t3 時刻補(bǔ)位到隊列中,存活起止時間是 [t3, t6],t3 時刻,過濾器-1 的生命周期結(jié)束,從過濾器隊首移除,新的隊首是 過濾器-2;
過濾器-5 在 t4 時刻補(bǔ)位到隊列中,存活起止時間是 [t4, t7],t4 時刻,過濾器-2 的聲明周期結(jié)束,從過濾器隊首移除,新的隊首是 過濾器-3;
過濾器-6 在 t5 時刻補(bǔ)位到隊列中,存活起止時間是 [t5, t8],t5 時刻,過濾器-3 的聲明周期結(jié)束,從過濾器隊首移除,新的隊首是 過濾器-4;
過濾器隊列中每隔固定時間間隔從隊首移除一個舊的過濾器,同時補(bǔ)位到隊尾一個新的過濾器,隊列的規(guī)模一直保持固定的規(guī)模 (本例中為 3);
這個過濾器隊列如何判別重復(fù)呢?
當(dāng)接收到一個數(shù)據(jù)元素時,用過濾器隊列中的 每個過濾器 來判斷該數(shù)據(jù)是否出現(xiàn)過,只有當(dāng)隊列中的每個過濾器都判定為 "未出現(xiàn)過" 時,才認(rèn)為是非重復(fù)數(shù)據(jù),允許通過;只要隊列中有任何一個過濾器判斷為 "已出現(xiàn)過",則攔截該數(shù)據(jù)。
無論攔截或是放行該條數(shù)據(jù),都在在當(dāng)前隊列中的 First 2 個過濾器中留存該數(shù)據(jù)記錄的 "痕跡"(圖中用相同位置的綠色 bit 標(biāo)識數(shù)據(jù)的痕跡)。
還是以上圖為例,介紹一下過濾器隊列的工作過程:
[t0, t1] 時間段,隊列中只有 1 個過濾器:過濾器-1,數(shù)據(jù) 01,01,03 依次到達(dá)后,經(jīng) 過濾器-1去重后的結(jié)果是 01,03,在 過濾器-1 中記錄 [t0, t1] 時間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位;
[t1, t2] 時間段,隊列中有 2 個過濾器:過濾器-1、過濾器-2,當(dāng)數(shù)據(jù) 03,03,04 依次到達(dá)后,03 被 過濾器-1 攔截,04 可以通過過濾器隊列,因此去重后的結(jié)果是 04,同時在 過濾器-1 和 過濾器-2 中記錄 [t1, t2] 時間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位;
[t2, t3] 時間段,隊列中有 3 個過濾器:過濾器-1、過濾器-2、過濾器-3。當(dāng)數(shù)據(jù) 04,06,06 依次到達(dá)后,04 被 過濾器-1、過濾器-2 攔截,06 可以通過過濾器隊列,因此去重后的結(jié)果是 06,同時在 過濾器-1 和 過濾器-2 中記錄 [t2, t3] 時間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位,過濾器-2 就是過濾器-1 在 [t1, t3] 時間段的備份;因為 [t2, t3] 時刻 過濾器-1 的狀態(tài)已經(jīng)復(fù)制到了 過濾器-2 中,過濾器-3 在[t2, t3] 時間段就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);
t3 時刻,過濾器-4 補(bǔ)位到隊尾,過濾器-1從隊首移除 (t3 時刻之后,如果還有 t3 時刻之前出現(xiàn)過的數(shù)據(jù)再次出現(xiàn),將會穿透過濾器隊列,我們可以通過設(shè)置過濾器的存活時間和隊列的大小來盡量避免這一情況的發(fā)生);
[t3, t4] 時間段,隊列中有 3 個過濾器:過濾器-2、過濾器-3、過濾器-4,當(dāng)數(shù)據(jù) 06,08,07 依次到達(dá)后,06 被 過濾器-2 攔截,08 和 07 可以通過過濾器隊列,因此去重后的結(jié)果是 08,07,同時在 過濾器-2 和 過濾器-3 中記錄 [t3, t4] 時間段流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過濾器-3 作為 過濾器-2 在 [t3, t4] 時間段的備份),因為 [t3, t4] 時刻 過濾器-2 的狀態(tài)已經(jīng)復(fù)制到了 過濾器-3 中,過濾器-4 在[t3, t4] 時間段就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);
t4 時刻,過濾器-5 補(bǔ)位到隊尾,過濾器-2 從隊首移除 (t4 時刻之后,如果還有 t2 時刻之前出現(xiàn)過的數(shù)據(jù)再次出現(xiàn),將會穿透過濾器隊列,我們可以通過設(shè)置過濾器的存活時間和隊列的大小來避免這一情況的發(fā)生);
[t4, t5] 時間段,隊列中有 3 個過濾器:過濾器-3、過濾器-4、過濾器-5,當(dāng)數(shù)據(jù) 08,08,09依次到達(dá)后,08 被 過濾器-3 攔截,09 可以通過過濾器隊列,因此去重后的結(jié)果是 09,同時在 過濾器-3 和 過濾器-4 中記錄 [t3, t4] 時刻流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過濾器-4 作為 過濾器-3 在 [t4, t5] 時間段的備份),因為 [t4, t5] 時間段 過濾器-3 的狀態(tài)已經(jīng)復(fù)制到了 過濾器-4 中,過濾器-5 在 [t4, t5] 時刻就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);
t5 時刻,過濾器-6 補(bǔ)位到隊尾,過濾器-3 從隊首移除 (t5時刻之后,如果還有 t3 時刻之前出現(xiàn)過的數(shù)據(jù)再次出現(xiàn),將會穿透過濾器隊列,我們可以通過設(shè)置過濾器的存活時間和隊列的大小來避免這一情況的發(fā)生);
[t5, t6] 時間段,隊列中有 3 個過濾器:過濾器-4、過濾器-5、過濾器-6,當(dāng)數(shù)據(jù) 09,09,10 依次到達(dá)后,09 被 過濾器-4 攔截,10 可以通過過濾器隊列,因此去重后的結(jié)果是 10,同時在 過濾器-4 和 過濾器-5 中記錄 [t5, t6] 時刻流經(jīng)所有數(shù)據(jù)記錄的狀態(tài)位 (過濾器-5 作為 過濾器-4 在 [t5, t6] 時刻的備份),因為 [t5, t6] 時刻過濾器-4 的狀態(tài)已經(jīng)復(fù)制到了 過濾器-5 中,過濾器-6 在[t5, t6] 時刻就不必留存數(shù)據(jù)記錄了 (圖中用灰色表示);
實現(xiàn)
如何把上述設(shè)計在 Flink 中實現(xiàn)呢,Bloom 過濾器隊列是隨著時間動態(tài)變化的,因此需要用到 Flink 的 定時器。KeyedProcessFunction
算子的 TimerService
就提供了定時器注冊功能,可以注冊 EventTimeTimer
或 ProcessingTimeTimer
。
BloomFilterProcessFunction.java:
package org.example.flink.operator; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.example.flink.data.Trace; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; public class BloomFilterProcessFunction extends KeyedProcessFunction<String, Trace, Trace> { private static final long serialVersionUID = 1L; // bloom預(yù)計插入的數(shù)據(jù)量 private static final long EXPECTED_INSERTIONS = 5000000L; // bloom的假陽性率 private static final double FPP = 0.001; // bloom過濾器TTL private static final long TTL = 60 * 1000; // bloom過濾器隊列size private static final int FILTER_QUEUE_SIZE = 10; // bloom過濾器隊列 private List<BloomFilter<String>> bloomFilterList; // 是否已經(jīng)注冊定時器 private boolean registeredTimerTask = false; @Override public void open(Configuration parameters) throws Exception { bloomFilterList = new ArrayList<>(FILTER_QUEUE_SIZE); BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), EXPECTED_INSERTIONS, FPP); bloomFilterList.add(bloomFilter); } @Override public void processElement(Trace trace, KeyedProcessFunction<String, Trace, Trace>.Context context, Collector<Trace> out) throws Exception { BloomFilter<String> firstBloomFilter = bloomFilterList.get(0); String key = trace.getGid(); // 只要有一個bloom未hit該元素,就意味著該元素從未出現(xiàn)過,在隊列中的所有過濾器留下該元素的標(biāo)記 if (!firstBloomFilter.mightContain(key)) { for (BloomFilter<String> bloomFilter : bloomFilterList) { bloomFilter.put(key); } // 該元素從未出現(xiàn)過,為非重復(fù)數(shù)據(jù) out.collect(trace); } if (!registeredTimerTask) { long current = context.timerService().currentProcessingTime(); // 注冊處理時間定時器 context.timerService().registerProcessingTimeTimer(current + TTL); registeredTimerTask = true; } } @Override public void onTimer(long timestamp, OnTimerContext context, Collector<Trace> out) throws Exception { // append新的bloomFilter到bloom過濾器隊列 bloomFilterList .add(BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), EXPECTED_INSERTIONS, FPP)); // 清理第一個bloomFilter if (bloomFilterList.size() > FILTER_QUEUE_SIZE) { bloomFilterList.remove(0); } // 創(chuàng)建一個新的timer task context.timerService().registerProcessingTimeTimer(timestamp + TTL); } @Override public void close() throws Exception { bloomFilterList = null; } }
以下是主程序入口,實驗場景還是設(shè)定為從 Kafka 消費數(shù)據(jù),去重后寫入到 MySQL:
StreamDeduplication.java:
package org.example.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.example.flink.data.Trace; import org.example.flink.operator.BloomFilterProcessFunction; import com.google.gson.Gson; public class StreamDeduplication { public static void main(String[] args) throws Exception { // 1. prepare Configuration configuration = new Configuration(); configuration.setString("rest.port", "9091"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.enableCheckpointing(2 * 60 * 1000); env.setStateBackend(new EmbeddedRocksDBStateBackend()); // 使用rocksDB作為狀態(tài)后端 // 2. Kafka Source KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("127.0.0.1:9092") .setTopics("trace") .setGroupId("group-01") .setStartingOffsets(OffsetsInitializer.latest()) .setProperty("commit.offsets.on.checkpoint", "true") .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); sourceStream.setParallelism(1); // 設(shè)置source算子的并行度為1 // 3. 轉(zhuǎn)換為Trace對象 SingleOutputStreamOperator<Trace> mapStream = sourceStream.map(new MapFunction<String, Trace>() { private static final long serialVersionUID = 1L; @Override public Trace map(String value) throws Exception { Gson gson = new Gson(); Trace trace = gson.fromJson(value, Trace.class); return trace; } }); mapStream.name("Map to Trace"); mapStream.setParallelism(1); // 設(shè)置map算子的并行度為1 // 4. Bloom過濾器去重, 在去重之前要keyBy處理,保障同一gid的數(shù)據(jù)全都交由同一個線程處理 SingleOutputStreamOperator<Trace> deduplicatedStream = mapStream.keyBy( new KeySelector<Trace, String>() { private static final long serialVersionUID = 1L; @Override public String getKey(Trace trace) throws Exception { return trace.getGid(); } }) .process(new BloomFilterProcessFunction()); deduplicatedStream.name("Bloom filter process for distinct gid"); deduplicatedStream.setParallelism(2); // 設(shè)置去重算子的并行度為2 // 5. 將去重結(jié)果寫入DataBase DataStreamSink<Trace> sinkStream = deduplicatedStream.addSink( JdbcSink.sink("insert into flink.deduplication(gid, timestamp) values (?, ?);", (statement, trace) -> { statement.setString(1, trace.getGid()); statement.setLong(2, trace.getTimestamp()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://127.0.0.1:3306/flink") .withUsername("username") .withPassword("password") .build()) ); sinkStream.name("Sink DB"); sinkStream.setParallelism(1); // 執(zhí)行 env.execute("Stream Real-Time Deduplication"); } }
測試
以下是向 Kafka 生產(chǎn)重復(fù)數(shù)據(jù)的測試程序,程序中模擬了數(shù)據(jù)亂序到達(dá)的情況。
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); String topic = "trace"; props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); InputStream inputStream = KafkaDataProducer.class.getClassLoader().getResourceAsStream(TEST_DATA); Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name()); String content = scanner.useDelimiter("\\A").next(); scanner.close(); JSONObject jsonContent = JSONObject.parseObject(content); int nonDuplicateNum = 100000; int repeatNum = 100; Random r = new Random(); for (int i = 0; i < nonDuplicateNum; i++) { String id = jsonContent.getString(GID); String newId = increase(id, String.valueOf(i)); jsonContent.put(GID, newId); // 制造重復(fù)數(shù)據(jù) for (int j = 0; j < repeatNum; j++) { // 對時間進(jìn)行隨機(jī)擾動,模擬數(shù)據(jù)亂序到達(dá) long current = System.currentTimeMillis() - r.nextInt(60) * 1000; jsonContent.put(TIMESTAMP, current); producer.send(new ProducerRecord<String, String>(topic, jsonContent.toString())); } // wait some time Thread.sleep(5); } Thread.sleep(2000); System.out.println("\n"); System.out.println("finished"); producer.close(); }
共生產(chǎn)了 10, 000, 000 條 ID,其中非重復(fù)的 ID 共計 100, 000 個。我們看一下 Flink 是否能做到實時去重,將 100, 000 個非重復(fù) ID 的結(jié)果正確寫入到數(shù)據(jù)庫。實驗過程耗時較長,簡單看一下動態(tài)效果圖:
可以看到,F(xiàn)link 的處理速度非???,去重結(jié)果的數(shù)值和 Kafka 中實際的 distinct id 值跟的非常緊,幾乎是毫秒延遲!
以上就是Flink實戰(zhàn)之實現(xiàn)流式數(shù)據(jù)去重的詳細(xì)內(nèi)容,更多關(guān)于Flink流式數(shù)據(jù)去重的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis-Plus+達(dá)夢數(shù)據(jù)庫實現(xiàn)高效數(shù)據(jù)持久化的示例
這篇文章主要介紹了MyBatis-Plus和達(dá)夢數(shù)據(jù)庫實現(xiàn)高效數(shù)據(jù)持久化,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08SpringBoot+WebSocket實現(xiàn)即時通訊的方法詳解
這篇文章主要為大家詳細(xì)介紹了如何利用SpringBoot+WebSocket實現(xiàn)即時通訊功能,文中示例代碼講解詳細(xì),對我們學(xué)習(xí)或工作有一定參考價值,需要的可以參考一下2022-05-05Spring Boot 實現(xiàn)https ssl免密登錄(X.509 pki登錄)
這篇文章主要介紹了Spring Boot 實現(xiàn)https ssl免密登錄(X.509 pki登錄),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01java中hasNextInt判斷后無限循環(huán)輸出else項的解決方法
這篇文章主要介紹了java中hasNextInt判斷后無限循環(huán)輸出else項的解決方法的相關(guān)資料,需要的朋友可以參考下2016-10-10Mybatis?MappedStatement類核心原理詳解
這篇文章主要介紹了Mybatis?MappedStatement類,mybatis的mapper文件最終會被解析器,解析成MappedStatement,其中insert|update|delete|select每一個標(biāo)簽分別對應(yīng)一個MappedStatement2022-11-11SpringBoot集成Validation參數(shù)校驗
這篇文章主要為大家詳細(xì)介紹了SpringBoot集成Validation參數(shù)校驗,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01從零開始Java實現(xiàn)Parser?Combinator
這篇文章主要為大家介紹了從零開始Java實現(xiàn)Parser?Combinator過程及原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05