欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程編程實(shí)戰(zhàn)之模擬大量數(shù)據(jù)同步

 更新時(shí)間:2019年02月14日 10:20:01   作者:沉靜  
這篇文章主要介紹了Java多線程編程實(shí)戰(zhàn)之模擬大量數(shù)據(jù)同步,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

背景

最近對(duì)于 Java 多線程做了一段時(shí)間的學(xué)習(xí),筆者一直認(rèn)為,學(xué)習(xí)東西就是要應(yīng)用到實(shí)際的業(yè)務(wù)需求中的。否則要么無法深入理解,要么硬生生地套用技術(shù)只是達(dá)到炫技的效果。

不過筆者仍舊認(rèn)為自己對(duì)于多線程掌握不夠熟練,不敢輕易應(yīng)用到生產(chǎn)代碼中。這就按照平時(shí)工作中遇到的實(shí)際問題,腦補(bǔ)了一個(gè)很可能存在的業(yè)務(wù)場(chǎng)景:

已知某公司管理著 1000 個(gè)微信服務(wù)號(hào),每個(gè)服務(wù)號(hào)有 1w ~ 50w 粉絲不等。假設(shè)該公司每天都需要將所有微信服務(wù)號(hào)的粉絲數(shù)據(jù)通過調(diào)用微信 API 的方式更新到本地?cái)?shù)據(jù)庫。

需求分析

對(duì)此需求進(jìn)行分析,主要存在以下問題:

  • 單個(gè)服務(wù)號(hào)獲取粉絲 id,只能每次 1w 按順序拉取
  • 微信的 API 對(duì)于服務(wù)商的并發(fā)請(qǐng)求數(shù)量有限制

單個(gè)服務(wù)號(hào)獲取粉絲 id,只能每次 1w 按順序拉取。這個(gè)問題決定了單個(gè)公眾號(hào)在拉取粉絲 id 上,無法分配給多個(gè)線程執(zhí)行。

微信的 API 對(duì)于服務(wù)商的并發(fā)請(qǐng)求數(shù)量有限制。這點(diǎn)最容易被忽略,如果我們同時(shí)有過多的請(qǐng)求,則會(huì)導(dǎo)致接口被封禁。這里可以通過信號(hào)量來控制同時(shí)執(zhí)行的線程數(shù)量。

為了盡快完成數(shù)據(jù)同步,根據(jù)實(shí)際情況:整個(gè)數(shù)據(jù)同步可分為讀數(shù)據(jù)和寫數(shù)據(jù)兩個(gè)部分。讀數(shù)據(jù)是通過 API 獲取,走網(wǎng)絡(luò) IO,速度較慢;寫數(shù)據(jù)是寫到數(shù)據(jù)庫,速度較快。所以得出結(jié)論:需要分配較多的線程進(jìn)行讀數(shù)據(jù),較少的線程進(jìn)行寫數(shù)據(jù)。

設(shè)計(jì)要點(diǎn)

首先,我們需要確定開啟多少個(gè)線程(在生產(chǎn)中往往是使用線程池),線程數(shù)量需要根據(jù)服務(wù)器性能來決定,這里我們定為 40 個(gè)讀取數(shù)據(jù)線程(將 1000 個(gè)公眾號(hào)分為 40 份,分別在 40 個(gè)線程中執(zhí)行),1個(gè)寫入數(shù)據(jù)線程。(具體開多少個(gè)線程,取決于線程池的容量,以及可以分配給此業(yè)務(wù)的數(shù)量。具體的數(shù)字需要根據(jù)實(shí)際情況測(cè)試得出,比服務(wù)器閾值低一些較好。當(dāng)然,配置允許范圍內(nèi)越大越好)

其次,考慮到微信對(duì)于 API 并發(fā)請(qǐng)求的限制,需要限制同時(shí)執(zhí)行的線程數(shù),使用java.util.concurrent.Semaphore進(jìn)行控制,這里我們限制為 20 個(gè)(具體的信號(hào)量憑證數(shù),取決于同一時(shí)間能夠執(zhí)行的線程,跟 API 限制,服務(wù)器性能有關(guān))。

然后,我們需要知道數(shù)據(jù)何時(shí)讀取、寫入完畢,以控制程序邏輯以及終止程序,這里我們使用java.util.concurrent.CountDownLatch進(jìn)行控制。

最后,我們需要一個(gè)數(shù)據(jù)結(jié)構(gòu),用來在多個(gè)線程共享處理的數(shù)據(jù),此處同步數(shù)據(jù)的場(chǎng)景非常適合使用隊(duì)列,這里我們使用線程安全的java.util.concurrent.ConcurrentLinkedQueue來進(jìn)行處理。(需要注意的是,在實(shí)際開發(fā)中,隊(duì)列不能夠無限制地增長,這將會(huì)很快消耗掉內(nèi)存,我們需要根據(jù)實(shí)際情況對(duì)隊(duì)列長度做控制。例如,可以通過控制讀取線程數(shù)和寫入線程數(shù)的比例來控制隊(duì)列的長度)

模擬代碼

由于本文重點(diǎn)關(guān)注多線程的使用,模擬代碼只體現(xiàn)多線程操作的方法。代碼里添加了大量的注釋,方便各位讀者閱讀理解。

JDK:1.8

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * N個(gè)線程向隊(duì)列添加數(shù)據(jù)
 * 一個(gè)線程消費(fèi)隊(duì)列數(shù)據(jù)
 */
public class QueueTest {
  private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");

  private static final int OFFER_COUNT = 40; // 開啟的線程數(shù)量

  private static Semaphore semaphore = new Semaphore(20); // 同一時(shí)間執(zhí)行的線程數(shù)量(大多用于控制API調(diào)用次數(shù)或數(shù)據(jù)庫查詢連接數(shù))

  public static void main(String[] args) throws InterruptedException {
    Queue<String> queue = new ConcurrentLinkedQueue<>(); // 處理隊(duì)列,需要處理的數(shù)據(jù),放置到此隊(duì)列中

    CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer線程latch,每完成一個(gè),latch減一,lacth的count為0時(shí)表示offer處理完畢
    CountDownLatch pollLatch = new CountDownLatch(1); // poll線程latch,latch的count為0時(shí),表示poll處理完畢

    Runnable offerRunnable = () -> {
      try {
        semaphore.acquire(); // 信號(hào)量控制
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      try {
        for (String datum : data) {
          queue.offer(datum);
          TimeUnit.SECONDS.sleep(2); // 模擬取數(shù)據(jù)很慢的情況
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中執(zhí)行l(wèi)atch.countDown()以及信號(hào)量釋放,避免因異常導(dǎo)致沒有正常釋放
        offerLatch.countDown();
        semaphore.release();
      }
    };

    Runnable pollRunnable = () -> {
      int count = 0;
      try {
        while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未執(zhí)行完,或queue仍舊有數(shù)據(jù),則繼續(xù)循環(huán)
          String poll = queue.poll();
          if (poll != null) {
            System.out.println(poll);
            count++;
          }
          // 無論是否poll到數(shù)據(jù),均暫停一小段時(shí)間,可降低CPU消耗
          TimeUnit.MILLISECONDS.sleep(100);
        }
        System.out.println("total count:" + count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中執(zhí)行l(wèi)atch.countDown(),避免因異常導(dǎo)致沒有正常釋放
        pollLatch.countDown();
      }
    };

    // 啟動(dòng)線程(生產(chǎn)環(huán)境中建議使用線程池)
    new Thread(pollRunnable).start(); // 啟動(dòng)一個(gè)poll線程
    for (int i = 0; i < OFFER_COUNT; i++) {
      new Thread(offerRunnable).start();
    } // 模擬取數(shù)據(jù)很慢,需要開啟40個(gè)線程處理

    // latch等待,會(huì)block主線程直到latch的count為0
    offerLatch.await();
    pollLatch.await();

    System.out.println("===the end===");
  }
}

到這里,本文結(jié)束。以上是筆者腦補(bǔ)的一個(gè)常見需求的解決方案。

注意:多線程編程對(duì)實(shí)際環(huán)境和需求有很大的依賴,需要根據(jù)實(shí)際的需求情況對(duì)各個(gè)參數(shù)做調(diào)整。實(shí)際在使用中,需要盡量模擬生產(chǎn)環(huán)境的數(shù)據(jù)情況來進(jìn)行測(cè)試,對(duì)服務(wù)器執(zhí)行期間的并發(fā)數(shù),CPU、內(nèi)存、網(wǎng)絡(luò) IO、磁盤 IO 做好觀察。并適當(dāng)?shù)卣{(diào)低并發(fā)數(shù),以給服務(wù)器留有處理其他請(qǐng)求的余量。

以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Spring?Boot?整合?Fisco?Bcos的案例分析(區(qū)塊鏈)

    Spring?Boot?整合?Fisco?Bcos的案例分析(區(qū)塊鏈)

    本篇文章介紹的?Spring?Boot?整合?Fisco?Bcos的案例,是在阿里云服務(wù)器上部署驗(yàn)證的。大家可根據(jù)自己的電腦環(huán)境,對(duì)比該案例進(jìn)行開發(fā)即可,具體案例代碼跟隨小編一起看看吧
    2022-01-01
  • Spring Boot的FailureAnalyzer機(jī)制及如何解救應(yīng)用啟動(dòng)危機(jī)

    Spring Boot的FailureAnalyzer機(jī)制及如何解救應(yīng)用啟動(dòng)危機(jī)

    本文探討了FailureAnalyzer工具,它不僅能幫助我們快速識(shí)別和處理代碼中的錯(cuò)誤,還能極大地提升我們的開發(fā)效率,通過詳細(xì)的實(shí)例分析,我們了解了FailureAnalyzer如何通過自定義邏輯應(yīng)對(duì)不同類型的異常,讓程序員能夠更好地定位問題并迅速找到解決方案,感興趣的朋友一起看看吧
    2025-01-01
  • 關(guān)于BindingResult的使用總結(jié)及注意事項(xiàng)

    關(guān)于BindingResult的使用總結(jié)及注意事項(xiàng)

    這篇文章主要介紹了關(guān)于BindingResult的使用總結(jié)及注意事項(xiàng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot 整合 Avro 與 Kafka的詳細(xì)過程

    SpringBoot 整合 Avro 與 Kafka的詳細(xì)過程

    本文介紹了如何在Spring Boot中使用Avro和Kafka進(jìn)行數(shù)據(jù)的序列化和反序列化,并通過MyBatisPlus將數(shù)據(jù)存入數(shù)據(jù)庫,感興趣的朋友跟隨小編一起看看吧
    2024-12-12
  • Spring?Security內(nèi)置過濾器的維護(hù)方法

    Spring?Security內(nèi)置過濾器的維護(hù)方法

    這篇文章主要介紹了Spring?Security的內(nèi)置過濾器是如何維護(hù)的,本文給我們分析一下HttpSecurity維護(hù)過濾器的幾個(gè)方法,需要的朋友可以參考下
    2022-02-02
  • Java生成隨機(jī)時(shí)間的簡單隨機(jī)算法

    Java生成隨機(jī)時(shí)間的簡單隨機(jī)算法

    今天小編就為大家分享一篇關(guān)于Java生成隨機(jī)時(shí)間的簡單隨機(jī)算法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • nacos客戶端如何獲取配置

    nacos客戶端如何獲取配置

    這篇文章主要介紹了nacos客戶端如何獲取配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Maven中optional和scope元素的使用弄明白了嗎

    Maven中optional和scope元素的使用弄明白了嗎

    這篇文章主要介紹了Maven中optional和scope元素的使用弄明白了嗎,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • Java SPI 機(jī)制知識(shí)點(diǎn)總結(jié)

    Java SPI 機(jī)制知識(shí)點(diǎn)總結(jié)

    在本篇文章里小編給大家整理的是一篇關(guān)于Java SPI 機(jī)制知識(shí)點(diǎn)總結(jié)內(nèi)容,需要的朋友們可以參考下。
    2020-02-02
  • 淺談Spring裝配Bean之組件掃描和自動(dòng)裝配

    淺談Spring裝配Bean之組件掃描和自動(dòng)裝配

    本篇文章主要介紹了淺談Spring裝配Bean之組件掃描和自動(dòng)裝配,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-10-10

最新評(píng)論