欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

為什么程序中突然多了 200 個 Dubbo-thread 線程的說明

 更新時間:2020年09月25日 10:47:05   作者:回歸心靈  
這篇文章主要介紹了為什么程序中突然多了 200 個 Dubbo-thread 線程的說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

背景

在某次查看程序線程堆棧信息時,偶然發(fā)現(xiàn)有 200 個 Dubbo-thread 線程,而且大部分都處于 WAITING 狀態(tài),如下所示:

"Dubbo-thread-200" #160932 daemon prio=5 os_prio=0 tid=0x00007f5af9b54800 nid=0x79a6 waiting on condition [0x00007f5a9acd5000]
  java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x00000000c78f1240> (a java.util.concurrent.SynchronousQueue$TransferStack)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
 at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
 at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:924)
 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

  Locked ownable synchronizers:
 - None

為什么會有這么多 Dubbo-thread 線程呢?這些線程有什么作用呢?帶著疑問就去研究了下源碼。

源碼分析

Dubbo (2.7.5 版本)的線程池 ThreadPool 有四種具體的實(shí)現(xiàn)類型:

fixed=org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool

程序通過調(diào)用具體實(shí)現(xiàn)類的 getExecutor(URL url) 方法來創(chuàng)建線程池。而調(diào)用該方法的只有 DefaultExecutorRepository 類的 createExecutor 方法,該方法會根據(jù) url 上的參數(shù) threadpool=cached 來決定創(chuàng)建那種類型的線程池。createExecutor 是一個私有方法,調(diào)用它的有下面兩個方法:

 /**
   * Get called when the server or client instance initiating.
   *
   * @param url
   * @return
   */
  public synchronized ExecutorService createExecutorIfAbsent(URL url) {
    String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
      componentKey = CONSUMER_SIDE;
    }
    Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
    Integer portKey = url.getPort();
    ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
    // If executor has been shut down, create a new one
    if (executor.isShutdown() || executor.isTerminated()) {
      executors.remove(portKey);
      executor = createExecutor(url);
      executors.put(portKey, executor);
    }
    return executor;
  }

  public ExecutorService getExecutor(URL url) {
    String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
      componentKey = CONSUMER_SIDE;
    }
    Map<Integer, ExecutorService> executors = data.get(componentKey);

    /**
     * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
     * have Executor instances generated and stored.
     */
    if (executors == null) {
      logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
          "before coming to here.");
      return null;
    }

    Integer portKey = url.getPort();
    ExecutorService executor = executors.get(portKey);
    if (executor != null) {
      if (executor.isShutdown() || executor.isTerminated()) {
        executors.remove(portKey);
        executor = createExecutor(url);
        executors.put(portKey, executor);
      }
    }
    return executor;
  }

對于上面第一個方法,備注已經(jīng)說明在服務(wù)提供者或者服務(wù)消費(fèi)者初始化的時候會調(diào)用,通過debug 可以得出:服務(wù)提供者初始化會創(chuàng)建線程名為 DubboServerHandler-10.12.16.67:20880-thread 的線程池,服務(wù)消費(fèi)者會創(chuàng)建線程名為 DubboClientHandler-10.12.16.67:20880-thread 的線程池。

這里需要說明下,Dubbo 創(chuàng)建的線程池會存儲在 Map 中共享使用:

private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();

外面的 key 表示服務(wù)提供方還是消費(fèi)方,里面的 key 表示服務(wù)暴露的端口號,也就是說消費(fèi)方對于相同端口號的服務(wù)只會創(chuàng)建一個線程池,共享同一個線程池進(jìn)行服務(wù)請求和消息接收后一系列處理。

顯然和 Dubbo-thread 名不一樣,那就很有可能是通過調(diào)用第二個方法創(chuàng)建的線程池。第二個方法的調(diào)用往上追溯就比較分散了,找不到什么有用的信息。

再看方法具體內(nèi)容,當(dāng)已經(jīng)創(chuàng)建的線程池關(guān)閉或終止時會重新創(chuàng)建新的線程池。然后就推測什么情況下線程池會被關(guān)閉或終止,在服務(wù)重啟后輸出堆棧信息并沒有 Dubbo-thread 線程,然后就猜測消費(fèi)方和提供方連接斷開會不會觸發(fā)線程池關(guān)閉,于是重啟了服務(wù)提供方,果然重現(xiàn)了Dubbo-thread 線程。

然后在 Dubbo 的具體線程池創(chuàng)建方法中添加日志,輸出調(diào)用棧信息(通過產(chǎn)生一個異常輸出調(diào)用信息)。

如下圖:

在這里插入圖片描述可以看到當(dāng) channel 失效時會調(diào)用 disconnected 方法,最終會調(diào)用 DefaultExecutorRepository 類的 getExecutor 創(chuàng)建線程池,當(dāng)服務(wù)提供者重啟時,消費(fèi)方相應(yīng)的線程池會被shutdown。

重現(xiàn)創(chuàng)建線程池所用的 URL 是 WrappedChannelHandler 類的 URL,該值是在服務(wù)啟動初始化時設(shè)置的,該值的設(shè)置要早于 AbstractClient 客戶端 Executor 初始化。

因此由于 channel 斷開而重新創(chuàng)建的線程池所用的 URL 和客戶端初始創(chuàng)建線程池用的 URL 可能是不同的,特別是在沒有配置 consumer 的線程池類型時,初始創(chuàng)建的 Cached 類型線程池,線程名稱是 DubboClientHandler…。

而重新創(chuàng)建所用 URL 是沒有經(jīng)過下面方法設(shè)置的,因此就會創(chuàng)建默認(rèn)類型為 fixed 的線程池,線程數(shù)為默認(rèn) 200,線程名為 Dubbo…。

private void initExecutor(URL url) {
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
    executor = executorRepository.createExecutorIfAbsent(url);
  }

總結(jié)

那么,就可以知道 Dubbo-thread 線程池的創(chuàng)建是由于服務(wù)消費(fèi)方和提供方之間連接斷開而創(chuàng)建的線程池,代替程序啟動初始化時創(chuàng)建的 DubboClientHandler 線程池。主要做一些 channel 斷開后續(xù)一些處理,還有接收服務(wù)端消息后的反序列化等操作,具體的可以看類 ThreadlessExecutor(同步調(diào)用處理類) 、ChannelEventRunnable(channel 不同狀態(tài)處理,包括:連接、接收到消息、斷開鏈接等)。

還有一個要注意到點(diǎn)是,如果沒有配置consumer.threadpool 類型、therads 等信息,那么斷開連接后再創(chuàng)建的線程池將會是 fixed 類型的線程池,線程數(shù)為默認(rèn) 200。

以上這篇為什么程序中突然多了 200 個 Dubbo-thread 線程的說明就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論