Flume如何自定義Sink數(shù)據(jù)至MySQL
Flume自定義Sink數(shù)據(jù)至MySQL
一、介紹
Sink不斷地輪詢Channel中的事件且批量地移除它們,并將這些事件批量寫入到存儲或索引系統(tǒng)、或者被發(fā)送到另一個Flume Agent。
Sink是完全事務(wù)性的。在從Channel批量刪除數(shù)據(jù)之前,每個Sink用Channel啟動一個事務(wù)。批量事件一旦成功寫出到存儲系統(tǒng)或下一個Flume Agent,Sink就利用Channel提交事務(wù)。事務(wù)一旦被提交,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件。
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類型已經(jīng)很多,但是有時候并不能滿足實際開發(fā)當(dāng)中的需求,此時我們就需要根據(jù)實際需求自定義某些Sink。
官方也提供了自定義sink的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink根據(jù)官方說明自定義MySink需要繼承AbstractSink類并實現(xiàn)Configurable接口。
實現(xiàn)相應(yīng)方法:
- configure(Context context)//初始化context(讀取配置文件內(nèi)容)
- process()//從Channel讀取獲取數(shù)據(jù)(event),這個方法將被循環(huán)調(diào)用。
使用場景:
讀取Channel數(shù)據(jù)寫入MySQL或者其他文件系統(tǒng)。
二、需求
使用flume接收(id,name,string)數(shù)據(jù),并在Sink端給每條數(shù)據(jù)進行切分,編寫JDBC驅(qū)動將數(shù)據(jù)保存到MySQL數(shù)據(jù)庫。
三、編寫MySink
package com.flume.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySink extends AbstractSink implements Configurable {
private String msgPrefix;
/**
* 用來保存數(shù)據(jù),不斷調(diào)用次方法
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
//獲取sink對應(yīng)的channnel
Channel channel = getChannel();
Connection connection = null;
PreparedStatement statement = null;
//獲取事務(wù)對象
Transaction transaction = channel.getTransaction();
try{
//開啟事務(wù)
transaction.begin();
//從channel中獲取數(shù)據(jù)
Event event = channel.take();
//切割數(shù)據(jù)
String data = new String(event.getBody());
String[] arr = data.split(",");
String id = arr[0];
String name = arr[1];
int age = Integer.parseInt(arr[2]);
//保存到mysql
//1、獲取connect
connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false","root","123321");
statement = connection.prepareStatement("insert into test values(?,?,?)");
saveToMysql(id,name,age,connection,statement);
//模擬數(shù)據(jù)保存
//System.out.println(msgPrefix+":"+new String(take.getBody()));
//提交事務(wù)
transaction.commit();
return Status.READY;
}catch (Exception e){
transaction.rollback();
}finally {
//關(guān)閉事務(wù)
transaction.close();
if(statement!=null)
//5、關(guān)閉
{
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(connection!=null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return Status.BACKOFF;
}
public void saveToMysql(String id,String name,int age,Connection connection,PreparedStatement statement) throws SQLException {
//2、獲取statement對象
//sql注入 【 select * from table where name='zhangsan' or 1=1】
//connection.createStatement();
//3、賦值
statement.setString(1,id);
statement.setString(2,name);
statement.setInt(3,age);
System.out.println(id+","+name+","+age);
//4、保存
statement.executeUpdate();
}
/**
* 獲取sink的配置屬性
* @param context
*/
@Override
public void configure(Context context) {
msgPrefix = context.getString("msg.prefix");
}
}
四、編寫Flume腳本
#定義agent a1.sources = r1 a1.channels = c1 a1.sinks = k1 #定義source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 9999 #定義channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 #定義sink a1.sinks.k1.type = com.atguigu.flume.MySink a1.sinks.k1.msg.prefix = message #定義source、channel、sink之間的綁定關(guān)系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
五、測試
1.啟動flume
[hadoop@hadoop102 ~]$ cd /opt/module/flume/ [hadoop@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysik.config -Dflume.root.logger=INFO,console
2.啟動nc端口
[hadoop@hadoop102 ~]$ nc hadoop102 9999 1,ttt,8 OK
3.客戶端輸出

4.查看MySQL數(shù)據(jù)庫

總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini)
這篇文章主要介紹了MySQL 5.6 & 5.7最優(yōu)配置文件模板(my.ini),需要的朋友可以參考下2016-07-07
mysql和oracle默認(rèn)排序的方法 - 不指定order by
這篇文章主要介紹了mysql和oracle默認(rèn)排序的方法 - 不指定order by。具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07
Prometheus 監(jiān)控MySQL使用grafana展示
這篇文章主要介紹prometheus通過mysql exporter+node exporter監(jiān)控mysql,并使用grafana進行圖表展示的相關(guān)內(nèi)容,感興趣的效果版可以參考下文2021-08-08

