Flink自定義Sink端實現(xiàn)過程講解
Sink介紹
在Fink官網(wǎng)中sink端只是給出了常規(guī)的write api.在我們實際開發(fā)場景中需要將flink處理的數(shù)據(jù)寫入kafka,hbase kudu等外部系統(tǒng)。
UML關系
自定義Sink需要實現(xiàn)父類的接口和繼承抽象類。
上面是Sink的繼承關系
Flink addSink
// 方法需要SinkFunction的對象 public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
SinkFunction
// SinkFunction是一個接口 public interface SinkFunction<IN> extends Function, Serializable { //公共方法 default void invoke(IN value, Context context) throws Exception { invoke(value); } }
RichSinkFunction
@Public public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { private static final long serialVersionUID = 1L; }
其他繼承接口SinkFunction的類:
案例
自定義HbaseSink
public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> { Logger logger = LoggerFactory.getLogger(HbaseSink.class); org.apache.hadoop.conf.Configuration configuration; Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //獲取hbase 的鏈接信息 configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103"); //創(chuàng)建conn connection = ConnectionFactory.createConnection(configuration); logger.info("創(chuàng)建鏈接成功"); } @Override public void invoke(Tuple2<Integer, String> value, Context context) throws Exception { //往habse 里面插入數(shù)據(jù) SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Table table = connection.getTable(TableName.valueOf("torder_count")); Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8)); put.addColumn("info".getBytes(), // 列族 "order_total".getBytes(StandardCharsets.UTF_8), //特征字段 value.f0.toString().getBytes()); //屬性值 put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes()); table.put(put); table.close(); logger.info("=====一條數(shù)據(jù)寫入成功======,時間:"+value.f1+", 值:"+value.f0); } @Override public void close() throws Exception { super.close(); connection.close(); }
通過以上案例我們熟悉了addSink函數(shù)的操作。
到此這篇關于Flink自定義Sink端實現(xiàn)過程講解的文章就介紹到這了,更多相關Flink自定義Sink內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot啟動并初始化執(zhí)行sql腳本問題
這篇文章主要介紹了SpringBoot啟動并初始化執(zhí)行sql腳本問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01springmvc開啟異步請求報錯Java code using the Ser
這篇文章主要為大家介紹了springmvc開啟異步請求報錯Java code using the Servlet API or解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-02-02分布式醫(yī)療掛號系統(tǒng)EasyExcel導入導出數(shù)據(jù)字典的使用
這篇文章主要為大家介紹了分布式醫(yī)療掛號系統(tǒng)EasyExcel導入導出數(shù)據(jù)字典的使用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04SpringBoot、Java 使用 Jsoup 解析 HTML 頁面
這篇文章主要介紹了SpringBoot、Java 使用 Jsoup 解析 HTML 頁面的詳細步驟,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08springboot項目中controller層與前端的參數(shù)傳遞方式
這篇文章主要介紹了springboot項目中controller層與前端的參數(shù)傳遞方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-10-10