Flume如何自定義Sink數(shù)據(jù)至MySQL
Flume自定義Sink數(shù)據(jù)至MySQL
一、介紹
Sink不斷地輪詢Channel中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統(tǒng)、或者被發(fā)送到另一個Flume Agent。
Sink是完全事務(wù)性的。在從Channel批量刪除數(shù)據(jù)之前,每個Sink用Channel啟動一個事務(wù)。批量事件一旦成功寫出到存儲系統(tǒng)或下一個Flume Agent,Sink就利用Channel提交事務(wù)。事務(wù)一旦被提交,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件。
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類型已經(jīng)很多,但是有時候并不能滿足實(shí)際開發(fā)當(dāng)中的需求,此時我們就需要根據(jù)實(shí)際需求自定義某些Sink。
官方也提供了自定義sink的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink根據(jù)官方說明自定義MySink需要繼承AbstractSink類并實(shí)現(xiàn)Configurable接口。
實(shí)現(xiàn)相應(yīng)方法:
- configure(Context context)//初始化context(讀取配置文件內(nèi)容)
- process()//從Channel讀取獲取數(shù)據(jù)(event),這個方法將被循環(huán)調(diào)用。
使用場景:
讀取Channel數(shù)據(jù)寫入MySQL或者其他文件系統(tǒng)。
二、需求
使用flume接收(id,name,string)數(shù)據(jù),并在Sink端給每條數(shù)據(jù)進(jìn)行切分,編寫JDBC驅(qū)動將數(shù)據(jù)保存到MySQL數(shù)據(jù)庫。
三、編寫MySink
package com.flume.flume; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySink extends AbstractSink implements Configurable { private String msgPrefix; /** * 用來保存數(shù)據(jù),不斷調(diào)用次方法 * @return * @throws EventDeliveryException */ @Override public Status process() throws EventDeliveryException { //獲取sink對應(yīng)的channnel Channel channel = getChannel(); Connection connection = null; PreparedStatement statement = null; //獲取事務(wù)對象 Transaction transaction = channel.getTransaction(); try{ //開啟事務(wù) transaction.begin(); //從channel中獲取數(shù)據(jù) Event event = channel.take(); //切割數(shù)據(jù) String data = new String(event.getBody()); String[] arr = data.split(","); String id = arr[0]; String name = arr[1]; int age = Integer.parseInt(arr[2]); //保存到mysql //1、獲取connect connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false","root","123321"); statement = connection.prepareStatement("insert into test values(?,?,?)"); saveToMysql(id,name,age,connection,statement); //模擬數(shù)據(jù)保存 //System.out.println(msgPrefix+":"+new String(take.getBody())); //提交事務(wù) transaction.commit(); return Status.READY; }catch (Exception e){ transaction.rollback(); }finally { //關(guān)閉事務(wù) transaction.close(); if(statement!=null) //5、關(guān)閉 { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return Status.BACKOFF; } public void saveToMysql(String id,String name,int age,Connection connection,PreparedStatement statement) throws SQLException { //2、獲取statement對象 //sql注入 【 select * from table where name='zhangsan' or 1=1】 //connection.createStatement(); //3、賦值 statement.setString(1,id); statement.setString(2,name); statement.setInt(3,age); System.out.println(id+","+name+","+age); //4、保存 statement.executeUpdate(); } /** * 獲取sink的配置屬性 * @param context */ @Override public void configure(Context context) { msgPrefix = context.getString("msg.prefix"); } }
四、編寫Flume腳本
#定義agent a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定義source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 9999 #定義channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 #定義sink a1.sinks.k1.type = com.atguigu.flume.MySink a1.sinks.k1.msg.prefix = message #定義source、channel、sink之間的綁定關(guān)系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
五、測試
1.啟動flume
[hadoop@hadoop102 ~]$ cd /opt/module/flume/ [hadoop@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
2.啟動nc端口
[hadoop@hadoop102 ~]$ nc hadoop102 9999 1,ttt,8 OK
3.客戶端輸出
4.查看MySQL數(shù)據(jù)庫
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini)
這篇文章主要介紹了MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini),需要的朋友可以參考下2016-07-07mysql和oracle默認(rèn)排序的方法 - 不指定order by
這篇文章主要介紹了mysql和oracle默認(rèn)排序的方法 - 不指定order by。具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07Prometheus 監(jiān)控MySQL使用grafana展示
這篇文章主要介紹prometheus通過mysql exporter+node exporter監(jiān)控mysql,并使用grafana進(jìn)行圖表展示的相關(guān)內(nèi)容,感興趣的效果版可以參考下文2021-08-08