利用Java多線程技術(shù)導(dǎo)入數(shù)據(jù)到Elasticsearch的方法步驟
前言

近期接到一個(gè)任務(wù),需要改造現(xiàn)有從mysql往Elasticsearch導(dǎo)入數(shù)據(jù)MTE(mysqlToEs)小工具,由于之前采用單線程導(dǎo)入,千億數(shù)據(jù)需要兩周左右的時(shí)間才能導(dǎo)入完成,導(dǎo)入效率非常低。所以樓主花了3天的時(shí)間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導(dǎo)入工具,單臺(tái)服務(wù)器導(dǎo)入效率提高十幾倍(合理調(diào)整線程數(shù)據(jù),效率更高)。
關(guān)鍵技術(shù)棧
- Elasticsearch
- jdbc
- ExecutorService\Thread
- sql
工具說明
maven依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
java線程池設(shè)置
默認(rèn)線程池大小為21個(gè),可調(diào)整。其中POR為處理流程已辦數(shù)據(jù)線程池,ROR為處理流程已閱數(shù)據(jù)線程池。
private static int THREADS = 21; public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
定義已辦生產(chǎn)者線程/已閱生產(chǎn)者線程:ZlPendProducer/ZlReadProducer
public class ZlPendProducer implements Runnable {
...
@Override
public void run() {
System.out.println(threadName + "::啟動(dòng)...");
for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
try {
....
int size = 1000;
for (int i = 0; i < count; i += size) {
if (i + size > count) {
//作用為size最后沒有100條數(shù)據(jù)則剩余幾條newList中就裝幾條
size = count - i;
}
String sql = "select * from " + tableName + " limit " + i + ", " + size;
System.out.println(tableName + "::sql::" + sql);
rs = statement.executeQuery(sql);
List<HistPendingEntity> lst = new ArrayList<>();
while (rs.next()) {
HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
lst.add(p);
}
MteExecutor.POR.submit(new ZlPendConsumer(lst));
Thread.sleep(2000);
}
....
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZlReadProducer implements Runnable {
...已閱生產(chǎn)者處理邏輯同已辦生產(chǎn)者
}
定義已辦消費(fèi)者線程/已閱生產(chǎn)者線程:ZlPendConsumer/ZlReadConsumer
public class ZlPendConsumer implements Runnable {
private String threadName;
private List<HistPendingEntity> lst;
public ZlPendConsumer(List<HistPendingEntity> lst) {
this.lst = lst;
}
@Override
public void run() {
...
lst.forEach(v -> {
try {
String json = new Gson().toJson(v);
EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
Const.COUNTER.LD_P.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
System.out.println("err::PendingId::" + v.getPendingId());
}
});
...
}
}
public class ZlReadConsumer implements Runnable {
//已閱消費(fèi)者處理邏輯同已辦消費(fèi)者
}
定義導(dǎo)入Elasticsearch數(shù)據(jù)監(jiān)控線程:Monitor
監(jiān)控線程-Monitor為了計(jì)算每分鐘導(dǎo)入Elasticsearch的數(shù)據(jù)總條數(shù),利用監(jiān)控線程,可以調(diào)整線程池的線程數(shù)的大小,以便利用多線程更快速的導(dǎo)入數(shù)據(jù)。
public void monitorToES() {
new Thread(() -> {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append("已辦表數(shù)::").append(Const.TBL.TBL_PEND_COUNT)
.append("::已辦總數(shù)::").append(Const.COUNTER.LD_P_TOTAL)
.append("::已辦入庫(kù)總數(shù)::").append(Const.COUNTER.LD_P);
sb.append("~~~~已閱表數(shù)::").append(Const.TBL.TBL_READ_COUNT);
sb.append("::已閱總數(shù)::").append(Const.COUNTER.LD_R_TOTAL)
.append("::已閱入庫(kù)總數(shù)::").append(Const.COUNTER.LD_R);
if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
if ((end - start) / 1000 >= 60) {
start = end;
sb.append("\n#########################################\n");
sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條");
sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條");
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
}
}
System.out.println(sb.toString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
初始化Elasticsearch:EsClient
String cName = meta.get("cName");//es集群名字
String esNodes = meta.get("esNodes");//es集群ip節(jié)點(diǎn)
Settings esSetting = Settings.builder()
.put("cluster.name", cName)
.put("client.transport.sniff", true)//增加嗅探機(jī)制,找到ES集群
.put("thread_pool.search.size", 5)//增加線程池個(gè)數(shù),暫時(shí)設(shè)為5
.build();
String[] nodes = esNodes.split(",");
client = new PreBuiltTransportClient(esSetting);
for (String node : nodes) {
if (node.length() > 0) {
String[] hostPort = node.split(":");
client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
}
}
初始化數(shù)據(jù)庫(kù)連接
conn = DriverManager.getConnection(url, user, password);
啟動(dòng)參數(shù)
nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 &
參數(shù)說明
ES-Cluster2019 為Elasticsearch集群名字
node1:9300,node2:9300,node3:9300為es的節(jié)點(diǎn)IP
130 130為已辦已閱分表的數(shù)據(jù)
程序入口:MteMain

// 監(jiān)控線程 Monitor monitorService = new Monitor(); monitorService.monitorToES(); // 已辦生產(chǎn)者線程 Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); pendProducerThread.start(); // 已閱生產(chǎn)者線程 Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); readProducerThread.start();
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- JAVA使用ElasticSearch查詢in和not in的實(shí)現(xiàn)方式
- java 使用ElasticSearch完成百萬級(jí)數(shù)據(jù)查詢附近的人功能
- java使用elasticsearch分組進(jìn)行聚合查詢過程解析
- Java ES(Elasticsearch) 中的and 和 or 查詢
- Java如何使用elasticsearch進(jìn)行模糊查詢
- Java中Elasticsearch 實(shí)現(xiàn)分頁(yè)方式(三種方式)
- 使用java操作elasticsearch的具體方法
- 基于Lucene的Java搜索服務(wù)器Elasticsearch安裝使用教程
- elasticsearch+logstash并使用java代碼實(shí)現(xiàn)日志檢索
- 一步步教你JAVA如何優(yōu)化Elastic?Search
相關(guān)文章
IDEA Maven Mybatis generator 自動(dòng)生成代碼(實(shí)例講解)
下面小編就為大家分享一篇IDEA Maven Mybatis generator 自動(dòng)生成代碼的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2017-12-12
解決IDEA修改 .vmoptions 文件后導(dǎo)致無法啟動(dòng)的問題
這篇文章主要介紹了解決IDEA修改 .vmoptions 文件后導(dǎo)致無法啟動(dòng)的問題,需要的朋友可以參考下2020-12-12
SpringBoot中GlobalExceptionHandler異常處理機(jī)制詳細(xì)說明
Spring Boot的GlobalExceptionHandler是一個(gè)全局異常處理器,用于捕獲和處理應(yīng)用程序中發(fā)生的所有異常,這篇文章主要給大家介紹了關(guān)于Java中GlobalExceptionHandler異常處理機(jī)制的相關(guān)資料,需要的朋友可以參考下2024-03-03
Java并發(fā)應(yīng)用之任務(wù)執(zhí)行分析
這篇文章主要為大家詳細(xì)介紹了JavaJava并發(fā)應(yīng)用編程中任務(wù)執(zhí)行分析的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-07-07
Java基于FFmpeg實(shí)現(xiàn)Mp4視頻轉(zhuǎn)GIF
FFmpeg是一套可以用來記錄、轉(zhuǎn)換數(shù)字音頻、視頻,并能將其轉(zhuǎn)化為流的開源計(jì)算機(jī)程序。本文主要介紹了在Java中如何基于FFmpeg進(jìn)行Mp4視頻到Gif動(dòng)圖的轉(zhuǎn)換,感興趣的小伙伴可以了解一下2022-11-11

