" />

欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink入門級應(yīng)用域名處理示例

 更新時間:2022年03月21日 17:54:48   作者:andandan  
這篇文章主要介紹了一個比較簡單的入門級Flink應(yīng)用,代碼很容易寫,主要用到的算子有FlatMap、KeyBy、Reduce,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

概述

最近做了一個小任務(wù),要使用Flink處理域名數(shù)據(jù),在4GB的域名文檔中求出每個域名的頂級域名,最后輸出每個頂級域名下的前10個子級域名。一個比較簡單的入門級Flink應(yīng)用,代碼很容易寫,主要用到的算子有FlatMap、KeyBy、Reduce。但是由于Maven打包問題,總是提示找不到入口類,卡了好久,最后也是成功解決了。

主體代碼如下:

public class FlinkStreamingTopDomain {
    public static void main(String[] args) throws Exception{
        // 獲取流處理運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取kafkaConsumer
        FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
        // 從當(dāng)前消費(fèi)組下標(biāo)開始讀取
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource text = env.addSource(kafkaConsumer);

        // 算子
        DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
                .keyBy(0).reduce(new Reduce());
        //把數(shù)據(jù)打印到控制臺
        windowCount.print()
                .setParallelism(16);//使用16個并行度
        //注意:因?yàn)閒link是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會執(zhí)行
        env.execute("streaming topDomain calculate");
    }
}

算子

FlatMap

Flatmap是對一行字符進(jìn)行處理的,官網(wǎng)上的解釋如下

FlatMap
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

其實(shí)和Hadoop的Map差不多,都是把一行字符串進(jìn)行處理,得到我們想要的<key,value>,不同之處在于Map處理后得到的是<key,values[]>。即Hadoop的Map操作會按key自動的將value處理成數(shù)組的形式,而Flink的FlatMap算子只會把每行數(shù)據(jù)處理成key、value。

下面是我處理業(yè)務(wù)的FlatMap代碼

    // FlatMap分割域名,并輸出二元組<頂級域名,域名>
    public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
            String[] values = s.split("\\^");   // 按字符^分割
            if(values.length - 1 < 2) {
                return;
            }
            String domain = values[2];
            out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
        }
    }

我這里把數(shù)據(jù)處理成了二元組形式,之后reduce也是對這個二元組進(jìn)行處理。

KeyBy

先來看看官網(wǎng)的解釋

KeyBy
DataStream → KeyedStream
    
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -&gt; value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -&gt; value.f0) // Key by the first element of a Tuple

Attention:A type cannot be a key if:
    1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    2.it is an array of any type.   

keyBy會按照一個keySelector定義的方式進(jìn)行哈希分區(qū),會將一個流分成多個Partition,相同key的會被分在同一個分區(qū),經(jīng)過keyBy的流變成KeyedStream。

需要注意的有兩點(diǎn):

1.pojo類型作為key,必須重寫hashcode()方法

2.數(shù)組類型不能作為key

Reduce

官網(wǎng)的解釋如下

Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

reduce是進(jìn)行”滾動“處理的,即reduce方法的第一個參數(shù)是當(dāng)前已經(jīng)得到的結(jié)果記為currentResult,第二個參數(shù)是當(dāng)前要處理的<key,value>。流式計(jì)算會一條一條的處理數(shù)據(jù),每處理完一條數(shù)據(jù)就得到新的currentResult。

業(yè)務(wù)處理代碼如下

    // 拼接同一分區(qū)下的ip
    public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
        @Override
        public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
            String[] domains = t1.f1.toString().split("\\^");
            if(domains.length == 10){
                return t1;
            }
            t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
            System.out.println(t1.f1 );
            return t1;
        }
   }

連接socket測試

1.將主體代碼里的kafka獲取數(shù)據(jù),改成socket獲取數(shù)據(jù)

//        int port;
//        try {
//            ParameterTool parameterTool = ParameterTool.fromArgs(args);
//            port = parameterTool.getInt("port");
//        } catch (Exception e){
//            System.out.println("沒有指定port參數(shù),使用默認(rèn)值1112");
//            port = 1112;
//        }

        // 連接socket獲取輸入數(shù)據(jù)
//        DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);

2.在服務(wù)器開啟一個端口號:nc -l -p 1112

3.運(yùn)行代碼

4.服務(wù)器輸入測試數(shù)據(jù)就可以實(shí)時的獲取處理結(jié)果

連接kafka

正式

使用kafka命令創(chuàng)建主題

kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test

kafka建立topic需要先開啟zookeeper

運(yùn)行生產(chǎn)者jar包,用生產(chǎn)者讀取數(shù)據(jù)

java -jar $jar包路徑  $topic $path

測試

另外,還可以使用測試生產(chǎn)者實(shí)現(xiàn)和socket測試相同的效果

/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1

打包上傳服務(wù)器

打包上傳服務(wù)器注意不要使用idea提供的build方式,反正我使用build會一直報錯找不到主類,即便我反編譯jar包發(fā)現(xiàn)主類在里面,并且MF文件也有配置主類信息。這個問題卡了我很久,最后我使用mvn pakage的方式打包并運(yùn)行成功,把我的打包插件貼出來幫助遇到和我相同問題的人

<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!--							<createDependencyReducedPom>false</createDependencyReducedPom>-->
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

Flink運(yùn)行指令為:

/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar

或者可以訪問Flink集群的8081端口,在提供的UI頁面上傳運(yùn)行

以上就是Flink入門級應(yīng)用域名處理示例的詳細(xì)內(nèi)容,更多關(guān)于Flink域名處理的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java的枚舉,注解和反射(一)

    Java的枚舉,注解和反射(一)

    今天小編就為大家分享一篇關(guān)于Java枚舉,注解與反射原理說明,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2021-07-07
  • 淺談@RequestBody和@RequestParam可以同時使用

    淺談@RequestBody和@RequestParam可以同時使用

    這篇文章主要介紹了@RequestBody和@RequestParam可以同時使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java實(shí)現(xiàn)調(diào)用MySQL存儲過程詳解

    Java實(shí)現(xiàn)調(diào)用MySQL存儲過程詳解

    相信大家都知道存儲過程是在大型數(shù)據(jù)庫系統(tǒng)中,一組為了完成特定功能的SQL語句集。存儲過程是數(shù)據(jù)庫中的一個重要對象,任何一個設(shè)計(jì)良好的數(shù)據(jù)庫應(yīng)用程序都應(yīng)該用到存儲過程。Java調(diào)用mysql存儲過程,實(shí)現(xiàn)如下,有需要的朋友們可以參考借鑒,下面來一起看看吧。
    2016-11-11
  • 淺析java異常棧

    淺析java異常棧

    給大家通過一個簡單的代碼實(shí)例給大家分型了java異常棧問題,需要的朋友參考一下吧。
    2017-12-12
  • java開放地址法和鏈地址法解決hash沖突的方法示例

    java開放地址法和鏈地址法解決hash沖突的方法示例

    這篇文章主要介紹了java開放地址法和鏈地址法解決hash沖突的方法示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • Java中判斷字符串是否相等的實(shí)現(xiàn)

    Java中判斷字符串是否相等的實(shí)現(xiàn)

    這篇文章主要介紹了Java中判斷字符串是否相等的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • MyBatis?Plus如何實(shí)現(xiàn)獲取自動生成主鍵值

    MyBatis?Plus如何實(shí)現(xiàn)獲取自動生成主鍵值

    這篇文章主要介紹了MyBatis?Plus如何實(shí)現(xiàn)獲取自動生成主鍵值問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Java實(shí)現(xiàn)簡單聊天機(jī)器人

    Java實(shí)現(xiàn)簡單聊天機(jī)器人

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡單聊天機(jī)器人,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Mybatis中updateBatch實(shí)現(xiàn)批量更新

    Mybatis中updateBatch實(shí)現(xiàn)批量更新

    本文主要介紹了Mybatis中updateBatch實(shí)現(xiàn)批量更新,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • MyBatisPlus中@TableField注解的基本使用

    MyBatisPlus中@TableField注解的基本使用

    這篇文章主要介紹了MyBatisPlus中@TableField注解的基本使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07

最新評論