Flume如何自定義Sink數(shù)據(jù)至MySQL
Flume自定義Sink數(shù)據(jù)至MySQL
一、介紹
Sink不斷地輪詢(xún)Channel中的事件且批量地移除它們,并將這些事件批量寫(xiě)入到存儲(chǔ)或索引系統(tǒng)、或者被發(fā)送到另一個(gè)Flume Agent。
Sink是完全事務(wù)性的。在從Channel批量刪除數(shù)據(jù)之前,每個(gè)Sink用Channel啟動(dòng)一個(gè)事務(wù)。批量事件一旦成功寫(xiě)出到存儲(chǔ)系統(tǒng)或下一個(gè)Flume Agent,Sink就利用Channel提交事務(wù)。事務(wù)一旦被提交,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件。
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類(lèi)型已經(jīng)很多,但是有時(shí)候并不能滿(mǎn)足實(shí)際開(kāi)發(fā)當(dāng)中的需求,此時(shí)我們就需要根據(jù)實(shí)際需求自定義某些Sink。
官方也提供了自定義sink的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink根據(jù)官方說(shuō)明自定義MySink需要繼承AbstractSink類(lèi)并實(shí)現(xiàn)Configurable接口。
實(shí)現(xiàn)相應(yīng)方法:
- configure(Context context)//初始化context(讀取配置文件內(nèi)容)
- process()//從Channel讀取獲取數(shù)據(jù)(event),這個(gè)方法將被循環(huán)調(diào)用。
使用場(chǎng)景:
讀取Channel數(shù)據(jù)寫(xiě)入MySQL或者其他文件系統(tǒng)。
二、需求
使用flume接收(id,name,string)數(shù)據(jù),并在Sink端給每條數(shù)據(jù)進(jìn)行切分,編寫(xiě)JDBC驅(qū)動(dòng)將數(shù)據(jù)保存到MySQL數(shù)據(jù)庫(kù)。
三、編寫(xiě)MySink
package com.flume.flume; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySink extends AbstractSink implements Configurable { private String msgPrefix; /** * 用來(lái)保存數(shù)據(jù),不斷調(diào)用次方法 * @return * @throws EventDeliveryException */ @Override public Status process() throws EventDeliveryException { //獲取sink對(duì)應(yīng)的channnel Channel channel = getChannel(); Connection connection = null; PreparedStatement statement = null; //獲取事務(wù)對(duì)象 Transaction transaction = channel.getTransaction(); try{ //開(kāi)啟事務(wù) transaction.begin(); //從channel中獲取數(shù)據(jù) Event event = channel.take(); //切割數(shù)據(jù) String data = new String(event.getBody()); String[] arr = data.split(","); String id = arr[0]; String name = arr[1]; int age = Integer.parseInt(arr[2]); //保存到mysql //1、獲取connect connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false","root","123321"); statement = connection.prepareStatement("insert into test values(?,?,?)"); saveToMysql(id,name,age,connection,statement); //模擬數(shù)據(jù)保存 //System.out.println(msgPrefix+":"+new String(take.getBody())); //提交事務(wù) transaction.commit(); return Status.READY; }catch (Exception e){ transaction.rollback(); }finally { //關(guān)閉事務(wù) transaction.close(); if(statement!=null) //5、關(guān)閉 { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return Status.BACKOFF; } public void saveToMysql(String id,String name,int age,Connection connection,PreparedStatement statement) throws SQLException { //2、獲取statement對(duì)象 //sql注入 【 select * from table where name='zhangsan' or 1=1】 //connection.createStatement(); //3、賦值 statement.setString(1,id); statement.setString(2,name); statement.setInt(3,age); System.out.println(id+","+name+","+age); //4、保存 statement.executeUpdate(); } /** * 獲取sink的配置屬性 * @param context */ @Override public void configure(Context context) { msgPrefix = context.getString("msg.prefix"); } }
四、編寫(xiě)Flume腳本
#定義agent a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定義source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 9999 #定義channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 #定義sink a1.sinks.k1.type = com.atguigu.flume.MySink a1.sinks.k1.msg.prefix = message #定義source、channel、sink之間的綁定關(guān)系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
五、測(cè)試
1.啟動(dòng)flume
[hadoop@hadoop102 ~]$ cd /opt/module/flume/ [hadoop@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
2.啟動(dòng)nc端口
[hadoop@hadoop102 ~]$ nc hadoop102 9999 1,ttt,8 OK
3.客戶(hù)端輸出
4.查看MySQL數(shù)據(jù)庫(kù)
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini)
這篇文章主要介紹了MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini),需要的朋友可以參考下2016-07-07MySQL執(zhí)行事務(wù)的語(yǔ)法與流程詳解
這篇文章主要介紹了MySQL執(zhí)行事務(wù)的語(yǔ)法與流程的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01mysql和oracle默認(rèn)排序的方法 - 不指定order by
這篇文章主要介紹了mysql和oracle默認(rèn)排序的方法 - 不指定order by。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07Prometheus 監(jiān)控MySQL使用grafana展示
這篇文章主要介紹prometheus通過(guò)mysql exporter+node exporter監(jiān)控mysql,并使用grafana進(jìn)行圖表展示的相關(guān)內(nèi)容,感興趣的效果版可以參考下文2021-08-08