欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Flink自定義Sink端實(shí)現(xiàn)過程講解

 更新時(shí)間:2023年01月20日 09:13:57   作者:Bonyin  
這篇文章主要介紹了Flink自定義Sink端實(shí)現(xiàn)過程,在Fink官網(wǎng)中sink端只是給出了常規(guī)的write api.在我們實(shí)際開發(fā)場景中需要將flink處理的數(shù)據(jù)寫入kafka,hbase kudu等外部系統(tǒng)

Sink介紹

在Fink官網(wǎng)中sink端只是給出了常規(guī)的write api.在我們實(shí)際開發(fā)場景中需要將flink處理的數(shù)據(jù)寫入kafka,hbase kudu等外部系統(tǒng)。

UML關(guān)系

自定義Sink需要實(shí)現(xiàn)父類的接口和繼承抽象類。

上面是Sink的繼承關(guān)系

Flink addSink

// 方法需要SinkFunction的對(duì)象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}
		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}

SinkFunction

// SinkFunction是一個(gè)接口
public interface SinkFunction<IN> extends Function, Serializable {
   //公共方法
	default void invoke(IN value, Context context) throws Exception {
		invoke(value);
	}
}

RichSinkFunction

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
	private static final long serialVersionUID = 1L;
}

其他繼承接口SinkFunction的類:

案例

自定義HbaseSink

public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
    Logger logger = LoggerFactory.getLogger(HbaseSink.class);
    org.apache.hadoop.conf.Configuration configuration;
    Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //獲取hbase 的鏈接信息
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
        //創(chuàng)建conn
        connection = ConnectionFactory.createConnection(configuration);
        logger.info("創(chuàng)建鏈接成功");
    }
    @Override
    public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
        //往habse 里面插入數(shù)據(jù)
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Table table = connection.getTable(TableName.valueOf("torder_count"));
        Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
        put.addColumn("info".getBytes(), // 列族
                "order_total".getBytes(StandardCharsets.UTF_8), //特征字段
                value.f0.toString().getBytes()); //屬性值
        put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
        table.put(put);
        table.close();
        logger.info("=====一條數(shù)據(jù)寫入成功======,時(shí)間:"+value.f1+", 值:"+value.f0);
    }
    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }

通過以上案例我們熟悉了addSink函數(shù)的操作。

到此這篇關(guān)于Flink自定義Sink端實(shí)現(xiàn)過程講解的文章就介紹到這了,更多相關(guān)Flink自定義Sink內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論