Flink自定義Sink端實(shí)現(xiàn)過程講解
Sink介紹
在Fink官網(wǎng)中sink端只是給出了常規(guī)的write api.在我們實(shí)際開發(fā)場景中需要將flink處理的數(shù)據(jù)寫入kafka,hbase kudu等外部系統(tǒng)。
UML關(guān)系
自定義Sink需要實(shí)現(xiàn)父類的接口和繼承抽象類。
上面是Sink的繼承關(guān)系
Flink addSink
// 方法需要SinkFunction的對(duì)象 public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
SinkFunction
// SinkFunction是一個(gè)接口 public interface SinkFunction<IN> extends Function, Serializable { //公共方法 default void invoke(IN value, Context context) throws Exception { invoke(value); } }
RichSinkFunction
@Public public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { private static final long serialVersionUID = 1L; }
其他繼承接口SinkFunction的類:
案例
自定義HbaseSink
public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> { Logger logger = LoggerFactory.getLogger(HbaseSink.class); org.apache.hadoop.conf.Configuration configuration; Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //獲取hbase 的鏈接信息 configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103"); //創(chuàng)建conn connection = ConnectionFactory.createConnection(configuration); logger.info("創(chuàng)建鏈接成功"); } @Override public void invoke(Tuple2<Integer, String> value, Context context) throws Exception { //往habse 里面插入數(shù)據(jù) SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Table table = connection.getTable(TableName.valueOf("torder_count")); Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8)); put.addColumn("info".getBytes(), // 列族 "order_total".getBytes(StandardCharsets.UTF_8), //特征字段 value.f0.toString().getBytes()); //屬性值 put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes()); table.put(put); table.close(); logger.info("=====一條數(shù)據(jù)寫入成功======,時(shí)間:"+value.f1+", 值:"+value.f0); } @Override public void close() throws Exception { super.close(); connection.close(); }
通過以上案例我們熟悉了addSink函數(shù)的操作。
到此這篇關(guān)于Flink自定義Sink端實(shí)現(xiàn)過程講解的文章就介紹到這了,更多相關(guān)Flink自定義Sink內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot啟動(dòng)并初始化執(zhí)行sql腳本問題
這篇文章主要介紹了SpringBoot啟動(dòng)并初始化執(zhí)行sql腳本問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01springmvc開啟異步請(qǐng)求報(bào)錯(cuò)Java code using the Ser
這篇文章主要為大家介紹了springmvc開啟異步請(qǐng)求報(bào)錯(cuò)Java code using the Servlet API or解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-02-02分布式醫(yī)療掛號(hào)系統(tǒng)EasyExcel導(dǎo)入導(dǎo)出數(shù)據(jù)字典的使用
這篇文章主要為大家介紹了分布式醫(yī)療掛號(hào)系統(tǒng)EasyExcel導(dǎo)入導(dǎo)出數(shù)據(jù)字典的使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04SpringBoot、Java 使用 Jsoup 解析 HTML 頁面
這篇文章主要介紹了SpringBoot、Java 使用 Jsoup 解析 HTML 頁面的詳細(xì)步驟,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08Spring IOC相關(guān)注解運(yùn)用(上篇)
這篇文章主要介紹了Spring?IOC相關(guān)注解的運(yùn)用,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05spring4.3 實(shí)現(xiàn)跨域CORS的方法
下面小編就為大家分享一篇spring4.3 實(shí)現(xiàn)跨域CORS的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01springboot項(xiàng)目中controller層與前端的參數(shù)傳遞方式
這篇文章主要介紹了springboot項(xiàng)目中controller層與前端的參數(shù)傳遞方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10