欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

利用Java多線程技術(shù)導入數(shù)據(jù)到Elasticsearch的方法步驟

 更新時間:2019年07月16日 09:47:06   作者:Wooola  
這篇文章主要介紹了利用Java多線程技術(shù)導入數(shù)據(jù)到Elasticsearch的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

前言


近期接到一個任務(wù),需要改造現(xiàn)有從mysql往Elasticsearch導入數(shù)據(jù)MTE(mysqlToEs)小工具,由于之前采用單線程導入,千億數(shù)據(jù)需要兩周左右的時間才能導入完成,導入效率非常低。所以樓主花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務(wù)器導入效率提高十幾倍(合理調(diào)整線程數(shù)據(jù),效率更高)。

關(guān)鍵技術(shù)棧

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具說明

maven依賴

<dependency> 
 <groupId>mysql</groupId> 
 <artifactId>mysql-connector-java</artifactId> 
 <version>${mysql.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.elasticsearch</groupId> 
 <artifactId>elasticsearch</artifactId> 
 <version>${elasticsearch.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.elasticsearch.client</groupId> 
 <artifactId>transport</artifactId> 
 <version>${elasticsearch.version}</version> 
</dependency> 
<dependency> 
 <groupId>org.projectlombok</groupId> 
 <artifactId>lombok</artifactId> 
 <version>${lombok.version}</version> 
</dependency> 
<dependency> 
 <groupId>com.alibaba</groupId> 
 <artifactId>fastjson</artifactId> 
 <version>${fastjson.version}</version> 
</dependency> 

java線程池設(shè)置

默認線程池大小為21個,可調(diào)整。其中POR為處理流程已辦數(shù)據(jù)線程池,ROR為處理流程已閱數(shù)據(jù)線程池。

private static int THREADS = 21; 
public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 

定義已辦生產(chǎn)者線程/已閱生產(chǎn)者線程:ZlPendProducer/ZlReadProducer

public class ZlPendProducer implements Runnable { 
 ... 
 @Override 
 public void run() { 
 System.out.println(threadName + "::啟動..."); 
 for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
 try { 
 .... 
 int size = 1000; 
 for (int i = 0; i < count; i += size) { 
 if (i + size > count) { 
 //作用為size最后沒有100條數(shù)據(jù)則剩余幾條newList中就裝幾條 
 size = count - i; 
 } 
 String sql = "select * from " + tableName + " limit " + i + ", " + size; 
 System.out.println(tableName + "::sql::" + sql); 
 rs = statement.executeQuery(sql); 
 List<HistPendingEntity> lst = new ArrayList<>(); 
 while (rs.next()) { 
 HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
 lst.add(p); 
 } 
 MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
 Thread.sleep(2000); 
 } 
 .... 
 } catch (Exception e) { 
 e.printStackTrace(); 
 } 
 } 
} 
public class ZlReadProducer implements Runnable { 
 ...已閱生產(chǎn)者處理邏輯同已辦生產(chǎn)者 
} 

定義已辦消費者線程/已閱生產(chǎn)者線程:ZlPendConsumer/ZlReadConsumer

public class ZlPendConsumer implements Runnable { 
 private String threadName; 
 private List<HistPendingEntity> lst; 
 public ZlPendConsumer(List<HistPendingEntity> lst) { 
 this.lst = lst; 
 } 
 @Override 
 public void run() { 
 ... 
 lst.forEach(v -> { 
 try { 
 String json = new Gson().toJson(v); 
 EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
 Const.COUNTER.LD_P.incrementAndGet(); 
 } catch (Exception e) { 
 e.printStackTrace(); 
 System.out.println("err::PendingId::" + v.getPendingId()); 
 } 
 }); 
 ... 
 } 
} 
public class ZlReadConsumer implements Runnable { 
 //已閱消費者處理邏輯同已辦消費者 
} 

定義導入Elasticsearch數(shù)據(jù)監(jiān)控線程:Monitor

監(jiān)控線程-Monitor為了計算每分鐘導入Elasticsearch的數(shù)據(jù)總條數(shù),利用監(jiān)控線程,可以調(diào)整線程池的線程數(shù)的大小,以便利用多線程更快速的導入數(shù)據(jù)。

public void monitorToES() { 
 new Thread(() -> { 
 while (true) { 
 StringBuilder sb = new StringBuilder(); 
 sb.append("已辦表數(shù)::").append(Const.TBL.TBL_PEND_COUNT) 
 .append("::已辦總數(shù)::").append(Const.COUNTER.LD_P_TOTAL) 
 .append("::已辦入庫總數(shù)::").append(Const.COUNTER.LD_P); 
 sb.append("~~~~已閱表數(shù)::").append(Const.TBL.TBL_READ_COUNT); 
 sb.append("::已閱總數(shù)::").append(Const.COUNTER.LD_R_TOTAL) 
 .append("::已閱入庫總數(shù)::").append(Const.COUNTER.LD_R); 
 if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { 
 ldPrevPendCount = Const.COUNTER.LD_P.get(); 
 ldPrevReadCount = Const.COUNTER.LD_R.get(); 
 start = System.currentTimeMillis(); 
 } else { 
 long end = System.currentTimeMillis(); 
 if ((end - start) / 1000 >= 60) { 
 start = end; 
 sb.append("\n#########################################\n"); 
 sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條"); 
 sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條"); 
 ldPrevPendCount = Const.COUNTER.LD_P.get(); 
 ldPrevReadCount = Const.COUNTER.LD_R.get(); 
 } 
 } 
 System.out.println(sb.toString()); 
 try { 
 Thread.sleep(3000); 
 } catch (InterruptedException e) { 
 e.printStackTrace(); 
 } 
 } 
 }).start(); 
} 

初始化Elasticsearch:EsClient

String cName = meta.get("cName");//es集群名字 
String esNodes = meta.get("esNodes");//es集群ip節(jié)點 
Settings esSetting = Settings.builder() 
 .put("cluster.name", cName) 
 .put("client.transport.sniff", true)//增加嗅探機制,找到ES集群 
 .put("thread_pool.search.size", 5)//增加線程池個數(shù),暫時設(shè)為5 
 .build(); 
String[] nodes = esNodes.split(","); 
client = new PreBuiltTransportClient(esSetting); 
for (String node : nodes) { 
 if (node.length() > 0) { 
 String[] hostPort = node.split(":"); 
 client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 
 } 
} 

初始化數(shù)據(jù)庫連接

conn = DriverManager.getConnection(url, user, password); 

啟動參數(shù)

nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

參數(shù)說明

ES-Cluster2019 為Elasticsearch集群名字

node1:9300,node2:9300,node3:9300為es的節(jié)點IP

130 130為已辦已閱分表的數(shù)據(jù)

程序入口:MteMain

// 監(jiān)控線程 
Monitor monitorService = new Monitor(); 
monitorService.monitorToES(); 
// 已辦生產(chǎn)者線程 
Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); 
pendProducerThread.start(); 
// 已閱生產(chǎn)者線程 
Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); 
readProducerThread.start(); 

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • IDEA Maven Mybatis generator 自動生成代碼(實例講解)

    IDEA Maven Mybatis generator 自動生成代碼(實例講解)

    下面小編就為大家分享一篇IDEA Maven Mybatis generator 自動生成代碼的實例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2017-12-12
  • Java多線程——基礎(chǔ)概念

    Java多線程——基礎(chǔ)概念

    這篇文章主要介紹了java多線程編程實例,分享了幾則多線程的實例代碼,具有一定參考價值,加深多線程編程的理解還是很有幫助的,需要的朋友可以參考下,希望可以幫到你
    2021-07-07
  • resty upload無需依賴的文件上傳與下載

    resty upload無需依賴的文件上傳與下載

    這篇文章主要為大家介紹了resty upload中無需依賴的文件上傳與下載過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助祝大家多多進步,早日升職加薪
    2022-03-03
  • 解決IDEA修改 .vmoptions 文件后導致無法啟動的問題

    解決IDEA修改 .vmoptions 文件后導致無法啟動的問題

    這篇文章主要介紹了解決IDEA修改 .vmoptions 文件后導致無法啟動的問題,需要的朋友可以參考下
    2020-12-12
  • Java設(shè)計模式之外觀模式示例詳解

    Java設(shè)計模式之外觀模式示例詳解

    外觀模式為多個復雜的子系統(tǒng),提供了一個一致的界面,使得調(diào)用端只和這個接口發(fā)生調(diào)用,而無須關(guān)系這個子系統(tǒng)內(nèi)部的細節(jié)。本文將通過示例詳細為大家講解一下外觀模式,需要的可以參考一下
    2022-03-03
  • SpringBoot中GlobalExceptionHandler異常處理機制詳細說明

    SpringBoot中GlobalExceptionHandler異常處理機制詳細說明

    Spring Boot的GlobalExceptionHandler是一個全局異常處理器,用于捕獲和處理應(yīng)用程序中發(fā)生的所有異常,這篇文章主要給大家介紹了關(guān)于Java中GlobalExceptionHandler異常處理機制的相關(guān)資料,需要的朋友可以參考下
    2024-03-03
  • Java并發(fā)應(yīng)用之任務(wù)執(zhí)行分析

    Java并發(fā)應(yīng)用之任務(wù)執(zhí)行分析

    這篇文章主要為大家詳細介紹了JavaJava并發(fā)應(yīng)用編程中任務(wù)執(zhí)行分析的相關(guān)知識,文中的示例代碼講解詳細,感興趣的小伙伴可以了解一下
    2023-07-07
  • 關(guān)于springboot2.4跨域配置問題

    關(guān)于springboot2.4跨域配置問題

    這篇文章主要介紹了springboot2.4跨域配置的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-07-07
  • Java基于FFmpeg實現(xiàn)Mp4視頻轉(zhuǎn)GIF

    Java基于FFmpeg實現(xiàn)Mp4視頻轉(zhuǎn)GIF

    FFmpeg是一套可以用來記錄、轉(zhuǎn)換數(shù)字音頻、視頻,并能將其轉(zhuǎn)化為流的開源計算機程序。本文主要介紹了在Java中如何基于FFmpeg進行Mp4視頻到Gif動圖的轉(zhuǎn)換,感興趣的小伙伴可以了解一下
    2022-11-11
  • idea 默認路徑修改從C盤更改到D盤

    idea 默認路徑修改從C盤更改到D盤

    本文主要介紹了idea 默認路徑修改從C盤更改到D盤,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-07-07

最新評論