Flink支持哪些數(shù)據(jù)類型?
一、支持的數(shù)據(jù)類型
Flink 對可以在 DataSet 或 DataStream 中的元素類型進行了一些限制。這樣做的原因是系統(tǒng)會分析類型以確定有效的執(zhí)行策略。
1.Java Tuple 和 Scala Case類;
2.Java POJO;
3.基本類型;
4.通用類;
5.值;
6.Hadoop Writables;
7.特殊類型
二、Flink之Tuple類型
Tuple類型 Tuple 是flink 一個很特殊的類型 (元組類型),是一個抽象類,共26個Tuple子類繼承Tuple 他們是 Tuple0一直到Tuple25
package org.apache.flink.api.java.tuple;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
@Public
public abstract class Tuple implements Serializable {
private static final long serialVersionUID = 1L;
public static final int MAX_ARITY = 25;
private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};
public Tuple() {
}
public abstract <T> T getField(int var1);
public <T> T getFieldNotNull(int pos) {
T field = this.getField(pos);
if (field != null) {
return field;
} else {
throw new NullFieldException(pos);
}
}
public abstract <T> void setField(T var1, int var2);
public abstract int getArity();
public abstract <T extends Tuple> T copy();
public static Class<? extends Tuple> getTupleClass(int arity) {
if (arity >= 0 && arity <= 25) {
return CLASSES[arity];
} else {
throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
}
}
public static Tuple newInstance(int arity) {
switch(arity) {
case 0:
return Tuple0.INSTANCE;
case 1:
return new Tuple1();
case 2:
return new Tuple2();
case 3:
return new Tuple3();
case 4:
return new Tuple4();
case 5:
return new Tuple5();
case 6:
return new Tuple6();
case 7:
return new Tuple7();
case 8:
return new Tuple8();
case 9:
return new Tuple9();
case 10:
return new Tuple10();
case 11:
return new Tuple11();
case 12:
return new Tuple12();
case 13:
return new Tuple13();
case 14:
return new Tuple14();
case 15:
return new Tuple15();
case 16:
return new Tuple16();
case 17:
return new Tuple17();
case 18:
return new Tuple18();
case 19:
return new Tuple19();
case 20:
return new Tuple20();
case 21:
return new Tuple21();
case 22:
return new Tuple22();
case 23:
return new Tuple23();
case 24:
return new Tuple24();
case 25:
return new Tuple25();
default:
throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
}
}
}
查看源碼我們看到Tuple0一直到Tuple25
我們看flink為我們?yōu)槲覀儤?gòu)造好了0-25個字段的模板類
ackage org.apache.flink.api.java.tuple;
import java.io.ObjectStreamException;
import org.apache.flink.annotation.Public;
@Public
public class Tuple0 extends Tuple {
private static final long serialVersionUID = 1L;
public static final Tuple0 INSTANCE = new Tuple0();
public Tuple0() {
}
public int getArity() {
return 0;
}
public <T> T getField(int pos) {
throw new IndexOutOfBoundsException(String.valueOf(pos));
}
public <T> void setField(T value, int pos) {
throw new IndexOutOfBoundsException(String.valueOf(pos));
}
public Tuple0 copy() {
return new Tuple0();
}
public String toString() {
return "()";
}
public boolean equals(Object o) {
return this == o || o instanceof Tuple0;
}
public int hashCode() {
return 0;
}
private Object readResolve() throws ObjectStreamException {
return INSTANCE;
}
}
三、Tuple的使用
方式一:初始化元組
可使用靜態(tài)方法 newInstance進行元組構(gòu)造 指定元組空間大小;
ex: 1 則元組只有一個空間,則實際使用的Tuple1 字段只有f0
ex: 12 則元組只有兩個空間,則實際使用的Tuple2 字段只有f0,f1
指定 Tuple元組空間大小 (可理解為字段個數(shù))
Tuple tuple = Tuple.newInstance(1);
方式一:構(gòu)造元組
使用Tuple.newInstance(xx),指定元組空間大小的話,這樣存取雖然能夠?qū)崿F(xiàn),但會存在存儲索引位置使用不正確的情況,可能由于失誤操作編寫出索引越界異常,而且使用不太方便,使用Tuplex.of(數(shù)據(jù))方法構(gòu)造Tuple元組
Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2");
System.out.println(tuple3.f0); // test0
System.out.println(tuple3.f1); // test1
System.out.println(tuple3.f2); // test2
四、Flink之POJO類型
Java和Scala的類在滿足下列條件時,將會被Flink視作特殊的POJO數(shù)據(jù)類型專門進行處理:
1.是公共類;
2.無參構(gòu)造是公共的;
3.所有的屬性都是可獲得的(聲明為公共的,或提供get,set方法);
4.字段的類型必須是Flink支持的。Flink會用Avro來序列化任意的對象。
Flink會分析POJO類型的結(jié)構(gòu)獲知POJO的字段。POJO類型要比一般類型好用。此外,F(xiàn)link訪問POJO要比一般類型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");
五、Flink之基本類型
Flink支持Java和Scala所有的基本數(shù)據(jù)類型,比如 Integer,String,和Double。
六、Flink之通用類型
Flink支持大多數(shù)的Java,Scala類(API和自定義)。包含不能序列化字段的類在增加一些限制后也可支持。遵循Java Bean規(guī)范的類一般都可以使用。
所有不能視為POJO的類Flink都會當做一般類處理。這些數(shù)據(jù)類型被視作黑箱,其內(nèi)容是不可見的。通用類使用Kryo進行序列/反序列化。
七、Flink之值類型Values
通過實現(xiàn)org.apache.flinktypes.Value接口的read和write方法提供自定義代碼來進行序列化/反序列化,而不是使用通用的序列化框架。
Flink預(yù)定義的值類型與原生數(shù)據(jù)類型是一一對應(yīng)的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。這些值類型作為原生數(shù)據(jù)類型的可變變體,他們的值是可以改變的,允許程序重用對象從而緩解GC的壓力。
八、Flink之Hadoop的Writable類
它實現(xiàn)org.apache.hadoop.Writable接口的類型,該類型的序列化邏輯在write()和readFields()方法中實現(xiàn)。
九、Flink之特殊類型
Flink比較特殊的類型有以下兩種:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either實現(xiàn)。
Java Api 與 Scala 的 類似Either,它表示兩種可能類型的值,Left或Right。Either對于錯誤處理或需要輸出兩種不同類型的記錄的運算符很有用。
類型擦除和類型推理
Java編譯器在編譯之后會丟棄很多泛型類型信息。這在Java中稱為類型擦除。這意味著在運行時,對象的實例不再知道其泛型類型。
例如,在JVM中,DataStream<String>和DataStream<Long>的實例看起來是相同的。
List<String> l1 = new ArrayList<String>(); List<Integer> l2 = new ArrayList<Integer>(); System.out.println(l1.getClass() == l2.getClass());
泛型:一種較為準確的說法就是為了參數(shù)化類型,或者說可以將類型當作參數(shù)傳遞給一個類或者是方法。Flink 的Java API會試圖去重建(可以做類型推理)這些被丟棄的類型信息,并將它們明確地存儲在數(shù)據(jù)集以及操作中。你可以通過DataStream.getType()方法來獲取類型,這個方法將返回一個TypeInformation的實例,這個實例是Flink內(nèi)部表示類型的方式。
到此這篇關(guān)于Flink支持哪些數(shù)據(jù)類型?的文章就介紹到這了,更多相關(guān)Flink的數(shù)據(jù)類型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA版使用Java操作Redis數(shù)據(jù)庫的方法
這篇文章主要介紹了IDEA版使用Java操作Redis數(shù)據(jù)庫的方法,首先需要下載jedis.jar包,然后再工程中設(shè)置具體操作步驟跟隨小編一起學習下吧2021-08-08
SpringBoot+Mybatis plus+React實現(xiàn)條件選擇切換搜索實踐
本文主要介紹了SpringBoot+Mybatis plus+React實現(xiàn)條件選擇切換搜索實踐,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09
java后端如何調(diào)用第三方接口(往header和body中的參數(shù)傳參)
這篇文章主要介紹了java后端如何調(diào)用第三方接口(往header和body中的參數(shù)傳參),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
Java數(shù)據(jù)結(jié)構(gòu)之鏈表詳解
本篇文章我們將講解一種新型的數(shù)據(jù)結(jié)構(gòu)—鏈表,鏈表是一種使用廣泛的通用數(shù)據(jù)結(jié)構(gòu),它可以用來作為實現(xiàn)棧,隊列等數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ).文中有非常詳細的介紹,需要的朋友可以參考下2021-05-05
SpringBoot接收接口入?yún)⒌姆绞叫〗Y(jié)
這篇文章主要給大家介紹了SpringBoot接收接口入?yún)⒌膸追N方式,我們從調(diào)用方的視角去看待這個問題,對調(diào)用方來說,它在調(diào)用接口時有好幾種傳參方式,下面,將會依次對這幾種參數(shù)方式進行講解和代碼示例,需要的朋友可以參考下2024-01-01
流讀取導致StringBuilder.toString()亂碼的問題及解決
這篇文章主要介紹了流讀取導致StringBuilder.toString()亂碼的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11
springboot3.0整合mybatis-flex實現(xiàn)逆向工程的示例代碼
逆向工程先創(chuàng)建數(shù)據(jù)庫表,由框架負責根據(jù)數(shù)據(jù)庫表,自動生成mybatis所要執(zhí)行的代碼,本文就來介紹一下springboot mybatis-flex逆向工程,感興趣的可以了解一下2024-06-06
SpringBoot實現(xiàn)elasticsearch索引操作的代碼示例
這篇文章主要給大家介紹了SpringBoot如何實現(xiàn)elasticsearch 索引操作,文中有詳細的代碼示例,感興趣的同學可以參考閱讀下2023-07-07

