欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Java回顧之多線程同步的使用詳解

 更新時間:2013年05月08日 10:17:31   作者:  
在這篇文章里,我們關(guān)注線程同步的話題。這是比多線程更復雜,稍不留意,我們就會“掉到坑里”,而且和單線程程序不同,多線程的錯誤是否每次都出現(xiàn),也是不固定的,這給調(diào)試也帶來了很大的挑戰(zhàn)

首先闡述什么是同步,不同步有什么問題,然后討論可以采取哪些措施控制同步,接下來我們會仿照回顧網(wǎng)絡通信時那樣,構(gòu)建一個服務器端的“線程池”,JDK為我們提供了一個很大的concurrent工具包,最后我們會對里面的內(nèi)容進行探索。

為什么要線程同步?

說到線程同步,大部分情況下, 我們是在針對“單對象多線程”的情況進行討論,一般會將其分成兩部分,一部分是關(guān)于“共享變量”,一部分關(guān)于“執(zhí)行步驟”。

共享變量

當我們在線程對象(Runnable)中定義了全局變量,run方法會修改該變量時,如果有多個線程同時使用該線程對象,那么就會造成全局變量的值被同時修改,造成錯誤。我們來看下面的代碼:

復制代碼 代碼如下:

共享變量造成同步問題
 class MyRunner implements Runnable
 {
     public int sum = 0;

     public void run()
     {
         System.out.println(Thread.currentThread().getName() + " Start.");
         for (int i = 1; i <= 100; i++)
         {
             sum += i;
         }
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
         System.out.println(Thread.currentThread().getName() + " End.");
     }
 }

 
 private static void sharedVaribleTest() throws InterruptedException
 {
     MyRunner runner = new MyRunner();
     Thread thread1 = new Thread(runner);
     Thread thread2 = new Thread(runner);
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

這個示例中,線程用來計算1到100的和是多少,我們知道正確結(jié)果是5050(好像是高斯小時候玩過這個?),但是上述程序返回的結(jié)果是10100,原因是兩個線程同時對sum進行操作。

執(zhí)行步驟

我們在多個線程運行時,可能需要某些操作合在一起作為“原子操作”,即在這些操作可以看做是“單線程”的,例如我們可能希望輸出結(jié)果的樣子是這樣的:

復制代碼 代碼如下:

線程1:步驟1
 線程1:步驟2
 線程1:步驟3
 線程2:步驟1
 線程2:步驟2
 線程2:步驟3

如果同步控制不好,出來的樣子可能是這樣的:
復制代碼 代碼如下:

線程1:步驟1
線程2:步驟1
線程1:步驟2
線程2:步驟2
線程1:步驟3
線程2:步驟3

這里我們也給出一個示例代碼:
復制代碼 代碼如下:

執(zhí)行步驟混亂帶來的同步問題
 class MyNonSyncRunner implements Runnable
 {
     public void run() {
         System.out.println(Thread.currentThread().getName() + " Start.");
         for(int i = 1; i <= 5; i++)
         {
             System.out.println(Thread.currentThread().getName() + " Running step " + i);
             try
             {
                 Thread.sleep(50);
             }
             catch(InterruptedException ex)
             {
                 ex.printStackTrace();
             }
         }
         System.out.println(Thread.currentThread().getName() + " End.");
     }
 }

 
 private static void syncTest() throws InterruptedException
 {
     MyNonSyncRunner runner = new MyNonSyncRunner();
     Thread thread1 = new Thread(runner);
     Thread thread2 = new Thread(runner);
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

如何控制線程同步

既然線程同步有上述問題,那么我們應該如何去解決呢?針對不同原因造成的同步問題,我們可以采取不同的策略。

控制共享變量

我們可以采取3種方式來控制共享變量。

將“單對象多線程”修改成“多對象多線程”

上文提及,同步問題一般發(fā)生在“單對象多線程”的場景中,那么最簡單的處理方式就是將運行模型修改成“多對象多線程”的樣子,針對上面示例中的同步問題,修改后的代碼如下:

復制代碼 代碼如下:

解決共享變量問題方案一
 private static void sharedVaribleTest2() throws InterruptedException
 {
     Thread thread1 = new Thread(new MyRunner());
     Thread thread2 = new Thread(new MyRunner());
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我們可以看到,上述代碼中兩個線程使用了兩個不同的Runnable實例,它們在運行過程中,就不會去訪問同一個全局變量。
將“全局變量”降級為“局部變量”

既然是共享變量造成的問題,那么我們可以將共享變量改為“不共享”,即將其修改為局部變量。這樣也可以解決問題,同樣針對上面的示例,這種解決方式的代碼如下:

復制代碼 代碼如下:

解決共享變量問題方案二
 class MyRunner2 implements Runnable
 {
     public void run()
     {
         System.out.println(Thread.currentThread().getName() + " Start.");
         int sum = 0;
         for (int i = 1; i <= 100; i++)
         {
             sum += i;
         }
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
         System.out.println(Thread.currentThread().getName() + " End.");
     }
 }

 
 private static void sharedVaribleTest3() throws InterruptedException
 {
     MyRunner2 runner = new MyRunner2();
     Thread thread1 = new Thread(runner);
     Thread thread2 = new Thread(runner);
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我們可以看出,sum變量已經(jīng)由全局變量變?yōu)閞un方法內(nèi)部的局部變量了。
使用ThreadLocal機制

ThreadLocal是JDK引入的一種機制,它用于解決線程間共享變量,使用ThreadLocal聲明的變量,即使在線程中屬于全局變量,針對每個線程來講,這個變量也是獨立的。

我們可以用這種方式來改造上面的代碼,如下所示:

復制代碼 代碼如下:

解決共享變量問題方案三
 class MyRunner3 implements Runnable
 {
     public ThreadLocal<Integer> tl = new ThreadLocal<Integer>();

     public void run()
     {
         System.out.println(Thread.currentThread().getName() + " Start.");
         for (int i = 0; i <= 100; i++)
         {
             if (tl.get() == null)
             {
                 tl.set(new Integer(0));
             }
             int sum = ((Integer)tl.get()).intValue();
             sum+= i;
             tl.set(new Integer(sum));
             try {
                 Thread.sleep(10);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }

         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
         System.out.println(Thread.currentThread().getName() + " End.");
     }
 }

 
 private static void sharedVaribleTest4() throws InterruptedException
 {
     MyRunner3 runner = new MyRunner3();
     Thread thread1 = new Thread(runner);
     Thread thread2 = new Thread(runner);
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

綜上三種方案,第一種方案會降低多線程執(zhí)行的效率,因此,我們推薦使用第二種或者第三種方案。

控制執(zhí)行步驟

說到執(zhí)行步驟,我們可以使用synchronized關(guān)鍵字來解決它。

復制代碼 代碼如下:

執(zhí)行步驟問題解決方案
 class MySyncRunner implements Runnable
 {
     public void run() {
         synchronized(this)
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             for(int i = 1; i <= 5; i++)
             {
                 System.out.println(Thread.currentThread().getName() + " Running step " + i);
                 try
                 {
                     Thread.sleep(50);
                 }
                 catch(InterruptedException ex)
                 {
                     ex.printStackTrace();
                 }
             }
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     }
 }

 
 private static void syncTest2() throws InterruptedException
 {
     MySyncRunner runner = new MySyncRunner();
     Thread thread1 = new Thread(runner);
     Thread thread2 = new Thread(runner);
     thread1.setDaemon(true);
     thread2.setDaemon(true);
     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

在線程同步的話題上,synchronized是一個非常重要的關(guān)鍵字。它的原理和數(shù)據(jù)庫中事務鎖的原理類似。我們在使用過程中,應該盡量縮減synchronized覆蓋的范圍,原因有二:1)被它覆蓋的范圍是串行的,效率低;2)容易產(chǎn)生死鎖。我們來看下面的示例:
復制代碼 代碼如下:

synchronized示例
 private static void syncTest3() throws InterruptedException
 {
     final List<Integer> list = new ArrayList<Integer>();

     Thread thread1 = new Thread()
     {
         public void run()
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             Random r = new Random(100);
             synchronized(list)
             {
                 for (int i = 0; i < 5; i++)
                 {
                     list.add(new Integer(r.nextInt()));
                 }
                 System.out.println("The size of list is " + list.size());
             }
             try
             {
                 Thread.sleep(500);
             }
             catch(InterruptedException ex)
             {
                 ex.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     };

     Thread thread2 = new Thread()
     {
         public void run()
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             Random r = new Random(100);
             synchronized(list)
             {
                 for (int i = 0; i < 5; i++)
                 {
                     list.add(new Integer(r.nextInt()));
                 }
                 System.out.println("The size of list is " + list.size());
             }
             try
             {
                 Thread.sleep(500);
             }
             catch(InterruptedException ex)
             {
                 ex.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     };

     thread1.start();
     thread2.start();
     thread1.join();
     thread2.join();
 }

我們應該把需要同步的內(nèi)容集中在一起,盡量不包含其他不相關(guān)的、消耗大量資源的操作,示例中線程休眠的操作顯然不應該包括在里面。

構(gòu)造線程池

我們在<基于Java回顧之網(wǎng)絡通信的應用分析>中,已經(jīng)構(gòu)建了一個Socket連接池,這里我們在此基礎(chǔ)上,構(gòu)建一個線程池,完成基本的啟動、休眠、喚醒、停止操作。

基本思路還是以數(shù)組的形式保持一系列線程,通過Socket通信,客戶端向服務器端發(fā)送命令,當服務器端接收到命令后,根據(jù)收到的命令對線程數(shù)組中的線程進行操作。

Socket客戶端的代碼保持不變,依然采用構(gòu)建Socket連接池時的代碼,我們主要針對服務器端進行改造。

首先,我們需要定義一個線程對象,它用來執(zhí)行我們的業(yè)務操作,這里簡化起見,只讓線程進行休眠。

復制代碼 代碼如下:

定義線程對象
 enum ThreadStatus
 {
     Initial,
     Running,
     Sleeping,
     Stopped
 }

 enum ThreadTask
 {
     Start,
     Stop,
     Sleep,
     Wakeup
 }

 
 class MyThread extends Thread
 {
     public ThreadStatus status = ThreadStatus.Initial;
     public ThreadTask task;
     public void run()
     {
         status = ThreadStatus.Running;
         while(true)
         {
             try {
                 Thread.sleep(3000);
                 if (status == ThreadStatus.Sleeping)
                 {
                     System.out.println(Thread.currentThread().getName() + " 進入休眠狀態(tài)。");
                     this.wait();
                 }
             } catch (InterruptedException e) {
                 System.out.println(Thread.currentThread().getName() + " 運行過程中出現(xiàn)錯誤。");
                 status = ThreadStatus.Stopped;
             }
         }
     }
 }

然后,我們需要定義一個線程管理器,它用來對線程池中的線程進行管理,代碼如下:
復制代碼 代碼如下:

定義線程池管理對象
 class MyThreadManager
 {
     public static void manageThread(MyThread[] threads, ThreadTask task)
     {
         for (int i = 0; i < threads.length; i++)
         {
             synchronized(threads[i])
             {
                 manageThread(threads[i], task);
             }
         }
         System.out.println(getThreadStatus(threads));
     }

     public static void manageThread(MyThread thread, ThreadTask task)
     {
         if (task == ThreadTask.Start)
         {
             if (thread.status == ThreadStatus.Running)
             {
                 return;
             }
             if (thread.status == ThreadStatus.Stopped)
             {
                 thread = new MyThread();
             }
             thread.status = ThreadStatus.Running;
             thread.start();

         }
         else if (task == ThreadTask.Stop)
         {
             if (thread.status != ThreadStatus.Stopped)
             {
                 thread.interrupt();
                 thread.status = ThreadStatus.Stopped;
             }
         }
         else if (task == ThreadTask.Sleep)
         {
             thread.status = ThreadStatus.Sleeping;
         }
         else if (task == ThreadTask.Wakeup)
         {
             thread.notify();
             thread.status = ThreadStatus.Running;
         }
     }

     public static String getThreadStatus(MyThread[] threads)
     {
         StringBuffer sb = new StringBuffer();
         for (int i = 0; i < threads.length; i++)
         {
             sb.append(threads[i].getName() + "的狀態(tài):" + threads[i].status).append("\r\n");
         }
         return sb.toString();
     }
 }

最后,是我們的服務器端,它不斷接受客戶端的請求,每收到一個連接請求,服務器端會新開一個線程,來處理后續(xù)客戶端發(fā)來的各種操作指令。
復制代碼 代碼如下:

定義服務器端線程池對象
 public class MyThreadPool {

     public static void main(String[] args) throws IOException
     {
         MyThreadPool pool = new MyThreadPool(5);
     }

     private int threadCount;
     private MyThread[] threads = null;

    
     public MyThreadPool(int count) throws IOException
     {
         this.threadCount = count;
         threads = new MyThread[count];
         for (int i = 0; i < threads.length; i++)
         {
             threads[i] = new MyThread();
             threads[i].start();
         }
         Init();
     }

     private void Init() throws IOException
     {
         ServerSocket serverSocket = new ServerSocket(5678);
         while(true)
         {
             final Socket socket = serverSocket.accept();
             Thread thread = new Thread()
             {
                 public void run()
                 {
                     try
                     {
                         System.out.println("檢測到一個新的Socket連接。");
                         BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                         PrintStream ps = new PrintStream(socket.getOutputStream());
                         String line = null;
                         while((line = br.readLine()) != null)
                         {
                             System.out.println(line);
                             if (line.equals("Count"))
                             {
                                 System.out.println("線程池中有5個線程");
                             }
                             else if (line.equals("Status"))
                             {
                                 String status = MyThreadManager.getThreadStatus(threads);
                                 System.out.println(status);
                             }
                             else if (line.equals("StartAll"))
                             {
                                 MyThreadManager.manageThread(threads, ThreadTask.Start);
                             }
                             else if (line.equals("StopAll"))
                             {
                                 MyThreadManager.manageThread(threads, ThreadTask.Stop);
                             }
                             else if (line.equals("SleepAll"))
                             {
                                 MyThreadManager.manageThread(threads, ThreadTask.Sleep);
                             }
                             else if (line.equals("WakeupAll"))
                             {
                                 MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
                             }
                             else if (line.equals("End"))
                             {
                                 break;
                             }
                             else
                             {
                                 System.out.println("Command:" + line);
                             }
                             ps.println("OK");
                             ps.flush();
                         }
                     }
                     catch(Exception ex)
                     {
                         ex.printStackTrace();
                     }
                 }
             };
             thread.start();
         }
     }
 }

探索JDK中的concurrent工具包

為了簡化開發(fā)人員在進行多線程開發(fā)時的工作量,并減少程序中的bug,JDK提供了一套concurrent工具包,我們可以用它來方便的開發(fā)多線程程序。
線程池

我們在上面實現(xiàn)了一個非?!昂喡钡木€程池,concurrent工具包中也提供了線程池,而且使用非常方便。

concurrent工具包中的線程池分為3類:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。

首先我們來定義一個Runnable的對象

復制代碼 代碼如下:

定義Runnable對象
 class MyRunner implements Runnable
 {
     public void run() {
         System.out.println(Thread.currentThread().getName() + "運行開始");
         for(int i = 0; i < 1; i++)
         {
             try
             {
                 System.out.println(Thread.currentThread().getName() + "正在運行");
                 Thread.sleep(200);
             }
             catch(Exception ex)
             {
                 ex.printStackTrace();
             }
         }
         System.out.println(Thread.currentThread().getName() + "運行結(jié)束");
     }
 }

可以看出,它的功能非常簡單,只是輸出了線程的執(zhí)行過程。

ScheduledThreadPool

這和我們平時使用的ScheduledTask比較類似,或者說很像Timer,它可以使得一個線程在指定的一段時間內(nèi)開始運行,并且在間隔另外一段時間后再次運行,直到線程池關(guān)閉。

示例代碼如下:

復制代碼 代碼如下:

ScheduledThreadPool示例
 private static void scheduledThreadPoolTest()
 {
     final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

     MyRunner runner = new MyRunner();

     final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
     final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);

     scheduler.schedule(new Runnable()
     {
         public void run()
         {
             handler1.cancel(true);
             handler2.cancel(true);
             scheduler.shutdown();
         }
     }, 30, TimeUnit.SECONDS
     );
 }

FixedThreadPool

這是一個指定容量的線程池,即我們可以指定在同一時間,線程池中最多有多個線程在運行,超出的線程,需要等線程池中有空閑線程時,才能有機會運行。

來看下面的代碼:

復制代碼 代碼如下:

FixedThreadPool示例
 private static void fixedThreadPoolTest()
 {
     ExecutorService exec = Executors.newFixedThreadPool(3);
     for(int i = 0; i < 5; i++)
     {
         MyRunner runner = new MyRunner();
         exec.execute(runner);
     }
     exec.shutdown();
 }

注意它的輸出結(jié)果:
復制代碼 代碼如下:

pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行開始
pool-1-thread-3正在運行
pool-1-thread-1運行結(jié)束
pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行結(jié)束
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行結(jié)束
pool-1-thread-1運行結(jié)束
pool-1-thread-2運行結(jié)束

可以看到從始至終,最多有3個線程在同時運行。
CachedThreadPool

這是另外一種線程池,它不需要指定容量,只要有需要,它就會創(chuàng)建新的線程。

它的使用方式和FixedThreadPool非常像,來看下面的代碼:

復制代碼 代碼如下:

CachedThreadPool示例
 private static void cachedThreadPoolTest()
 {
     ExecutorService exec = Executors.newCachedThreadPool();
     for(int i = 0; i < 5; i++)
     {
         MyRunner runner = new MyRunner();
         exec.execute(runner);
     }
     exec.shutdown();
 }

它的執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

pool-1-thread-1運行開始
pool-1-thread-1正在運行
pool-1-thread-2運行開始
pool-1-thread-2正在運行
pool-1-thread-3運行開始
pool-1-thread-3正在運行
pool-1-thread-4運行開始
pool-1-thread-4正在運行
pool-1-thread-5運行開始
pool-1-thread-5正在運行
pool-1-thread-1運行結(jié)束
pool-1-thread-2運行結(jié)束
pool-1-thread-3運行結(jié)束
pool-1-thread-4運行結(jié)束
pool-1-thread-5運行結(jié)束

可以看到,它創(chuàng)建了5個線程。
處理線程返回值

在有些情況下,我們需要使用線程的返回值,在上述的所有代碼中,線程這是執(zhí)行了某些操作,沒有任何返回值。

如何做到這一點呢?我們可以使用JDK中的Callable<T>和CompletionService<T>,前者返回單個線程的結(jié)果,后者返回一組線程的結(jié)果。
返回單個線程的結(jié)果

還是直接看代碼吧:

復制代碼 代碼如下:

Callable示例
 private static void callableTest() throws InterruptedException, ExecutionException
 {
     ExecutorService exec = Executors.newFixedThreadPool(1);
     Callable<String> call = new Callable<String>()
     {
         public String call()
         {
             return "Hello World.";
         }
     };
     Future<String> result = exec.submit(call);
     System.out.println("線程的返回值是" + result.get());
     exec.shutdown();
 }

執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

線程的返回值是Hello World.

返回線程池中每個線程的結(jié)果

這里需要使用CompletionService<T>,代碼如下:

復制代碼 代碼如下:

CompletionService示例
 private static void completionServiceTest() throws InterruptedException, ExecutionException
 {
     ExecutorService exec = Executors.newFixedThreadPool(10);
     CompletionService<String> service = new ExecutorCompletionService<String>(exec);
     for (int i = 0; i < 10; i++)
     {
         Callable<String> call = new Callable<String>()
         {
             public String call() throws InterruptedException
             {
                 return Thread.currentThread().getName();
             }
         };
         service.submit(call);
     }

     Thread.sleep(1000);
     for(int i = 0; i < 10; i++)
     {
         Future<String> result = service.take();
         System.out.println("線程的返回值是" + result.get());
     }
     exec.shutdown();
 }

執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

線程的返回值是pool-2-thread-1
線程的返回值是pool-2-thread-2
線程的返回值是pool-2-thread-3
線程的返回值是pool-2-thread-5
線程的返回值是pool-2-thread-4
線程的返回值是pool-2-thread-6
線程的返回值是pool-2-thread-8
線程的返回值是pool-2-thread-7
線程的返回值是pool-2-thread-9
線程的返回值是pool-2-thread-10

實現(xiàn)生產(chǎn)者-消費者模型

對于生產(chǎn)者-消費者模型來說,我們應該都不會陌生,通常我們都會使用某種數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)它。在concurrent工具包中,我們可以使用BlockingQueue來實現(xiàn)生產(chǎn)者-消費者模型,如下:

復制代碼 代碼如下:

BlockingQueue示例
 public class BlockingQueueSample {

     public static void main(String[] args)
     {
         blockingQueueTest();
     }

     private static void blockingQueueTest()
     {
         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
         final int maxSleepTimeForSetter = 10;
         final int maxSleepTimerForGetter = 10;

         Runnable setter = new Runnable()
         {
             public void run()
             {
                 Random r = new Random();
                 while(true)
                 {
                     int value = r.nextInt(100);
                     try
                     {
                         queue.put(new Integer(value));
                         System.out.println(Thread.currentThread().getName() + "---向隊列中插入值" + value);
                         Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
                     }
                     catch(Exception ex)
                     {
                         ex.printStackTrace();
                     }
                 }
             }
         };

         Runnable getter = new Runnable()
         {
             public void run()
             {
                 Random r = new Random();
                 while(true)
                 {
                     try
                     {
                         if (queue.size() == 0)
                         {
                             System.out.println(Thread.currentThread().getName() + "---隊列為空");
                         }
                         else
                         {
                             int value = queue.take().intValue();
                             System.out.println(Thread.currentThread().getName() + "---從隊列中獲取值" + value);
                         }
                         Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
                     }
                     catch(Exception ex)
                     {
                         ex.printStackTrace();
                     }
                 }
             }
         };

         ExecutorService exec = Executors.newFixedThreadPool(2);
         exec.execute(setter);
         exec.execute(getter);
     }
 }

我們定義了兩個線程,一個線程向Queue中添加數(shù)據(jù),一個線程從Queue中取數(shù)據(jù)。我們可以通過控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,來使得程序得出不同的結(jié)果。

可能的執(zhí)行結(jié)果如下:

復制代碼 代碼如下:

pool-1-thread-1---向隊列中插入值88
pool-1-thread-2---從隊列中獲取值88
pool-1-thread-1---向隊列中插入值75
pool-1-thread-2---從隊列中獲取值75
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-1---向隊列中插入值50
pool-1-thread-2---從隊列中獲取值50
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-2---隊列為空
pool-1-thread-1---向隊列中插入值51
pool-1-thread-1---向隊列中插入值92
pool-1-thread-2---從隊列中獲取值51
pool-1-thread-2---從隊列中獲取值92

因為Queue中的值和Thread的休眠時間都是隨機的,所以執(zhí)行結(jié)果也不是固定的。

使用信號量來控制線程

JDK提供了Semaphore來實現(xiàn)“信號量”的功能,它提供了兩個方法分別用于獲取和釋放信號量:acquire和release,示例代碼如下:

復制代碼 代碼如下:

SemaPhore示例
 private static void semaphoreTest()
 {
     ExecutorService exec = Executors.newFixedThreadPool(10);
     final Semaphore semp = new Semaphore(2);

     for (int i = 0; i < 10; i++)
     {
         Runnable runner = new Runnable()
         {
             public void run()
             {
                 try
                 {
                     semp.acquire();
                     System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在執(zhí)行。");
                     Thread.sleep(5000);
                     semp.release();
                 }
                 catch(Exception ex)
                 {
                     ex.printStackTrace();
                 }
             }
         };
         exec.execute(runner);
     }

     exec.shutdown();
 }

執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在執(zhí)行。
Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在執(zhí)行。
Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在執(zhí)行。
Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在執(zhí)行。
Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在執(zhí)行。
Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在執(zhí)行。
Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在執(zhí)行。
Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在執(zhí)行。
Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在執(zhí)行。
Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在執(zhí)行。

可以看出,盡管線程池中創(chuàng)建了10個線程,但是同時運行的,只有2個線程。
控制線程池中所有線程的執(zhí)行步驟

在前面,我們已經(jīng)提到,可以用synchronized關(guān)鍵字來控制單個線程中的執(zhí)行步驟,那么如果我們想要對線程池中的所有線程的執(zhí)行步驟進行控制的話,應該如何實現(xiàn)呢?

我們有兩種方式,一種是使用CyclicBarrier,一種是使用CountDownLatch。

CyclicBarrier使用了類似于Object.wait的機制,它的構(gòu)造函數(shù)中需要接收一個整型數(shù)字,用來說明它需要控制的線程數(shù)目,當在線程的run方法中調(diào)用它的await方法時,它會保證所有的線程都執(zhí)行到這一步,才會繼續(xù)執(zhí)行后面的步驟。

示例代碼如下:

復制代碼 代碼如下:

CyclicBarrier示例
 class MyRunner2 implements Runnable
 {
     private CyclicBarrier barrier = null;
     public MyRunner2(CyclicBarrier barrier)
     {
         this.barrier = barrier;
     }

     public void run() {
         Random r = new Random();
         try
         {
             for (int i = 0; i < 3; i++)
             {
                 Thread.sleep(r.nextInt(10) * 1000);
                 System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");
                 barrier.await();
             }
         }
         catch(Exception ex)
         {
             ex.printStackTrace();
         }
     }

 }

 private static void cyclicBarrierTest()
 {
     CyclicBarrier barrier = new CyclicBarrier(3);

     ExecutorService exec = Executors.newFixedThreadPool(3);
     for (int i = 0; i < 3; i++)
     {
         exec.execute(new MyRunner2(barrier));
     }
     exec.shutdown();
 }

執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。
Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。
Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。
Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。
Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。
Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。
Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。
Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。
Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。

可以看出,thread-2到第1次等待點時,一直等到thread-1到達后才繼續(xù)執(zhí)行。

CountDownLatch則是采取類似”倒計時計數(shù)器”的機制來控制線程池中的線程,它有CountDown和Await兩個方法。示例代碼如下:

復制代碼 代碼如下:

CountDownLatch示例
 private static void countdownLatchTest() throws InterruptedException
 {
     final CountDownLatch begin = new CountDownLatch(1);
     final CountDownLatch end = new CountDownLatch(5);
     ExecutorService exec = Executors.newFixedThreadPool(5);
     for (int i = 0; i < 5; i++)
     {
         Runnable runner = new Runnable()
         {
             public void run()
             {
                 Random r = new Random();
                 try
                 {
                     begin.await();
                     System.out.println(Thread.currentThread().getName() + "運行開始");
                     Thread.sleep(r.nextInt(10)*1000);
                     System.out.println(Thread.currentThread().getName() + "運行結(jié)束");
                 }
                 catch(Exception ex)
                 {
                     ex.printStackTrace();
                 }
                 finally
                 {
                     end.countDown();
                 }
             }
         };
         exec.execute(runner);
     }
     begin.countDown();
     end.await();
     System.out.println(Thread.currentThread().getName() + "運行結(jié)束");
     exec.shutdown();
 }

執(zhí)行結(jié)果如下:
復制代碼 代碼如下:

pool-1-thread-1運行開始
pool-1-thread-5運行開始
pool-1-thread-2運行開始
pool-1-thread-3運行開始
pool-1-thread-4運行開始
pool-1-thread-2運行結(jié)束
pool-1-thread-1運行結(jié)束
pool-1-thread-3運行結(jié)束
pool-1-thread-5運行結(jié)束
pool-1-thread-4運行結(jié)束
main運行結(jié)束

相關(guān)文章

  • MyBatis中$和#的深入講解

    MyBatis中$和#的深入講解

    這篇文章主要給大家介紹了關(guān)于MyBatis中$和#的相關(guān)資料,文中通過圖文介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-10-10
  • 基于JAVA每月運勢api調(diào)用代碼實例

    基于JAVA每月運勢api調(diào)用代碼實例

    這篇文章主要為大家詳細介紹了JAVA每月運勢api調(diào)用代碼實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • java 對象輸入輸出流讀寫文件的操作實例

    java 對象輸入輸出流讀寫文件的操作實例

    這篇文章主要介紹了java 對象輸入輸出流讀寫文件的操作實例的相關(guān)資料,這里使用實現(xiàn)Serializable接口,需要的朋友可以參考下
    2017-07-07
  • SpringBoot中的@EnableAutoConfiguration注解解析

    SpringBoot中的@EnableAutoConfiguration注解解析

    這篇文章主要介紹了SpringBoot中的@EnableAutoConfiguration注解解析,@EnableAutoConfiguration也是借助@Import的幫助,將所有符合自動配置條件的bean定義注冊到IoC容器,需要的朋友可以參考下
    2023-09-09
  • Java中channel用法總結(jié)

    Java中channel用法總結(jié)

    這篇文章主要介紹了Java中channel用法,較為詳細的總結(jié)了channel的定義、類型及使用技巧,需要的朋友可以參考下
    2015-06-06
  • struts2框架入門

    struts2框架入門

    本文主要介紹了struts2框架的基礎(chǔ)入門知識。具有很好的參考價值。下面跟著小編一起來看下吧
    2017-03-03
  • Spring中11個最常用的擴展點總結(jié),你知道幾個

    Spring中11個最常用的擴展點總結(jié),你知道幾個

    我們知道IOC(控制反轉(zhuǎn))和AOP(面向切面編程)是spring的基石,除此之外spring的擴展能力非常強,下面這篇文章主要給大家介紹了關(guān)于Spring中11個最常用的擴展點的相關(guān)資料,需要的朋友可以參考下
    2022-12-12
  • Java高效實現(xiàn)excel轉(zhuǎn)pdf(支持帶圖片的轉(zhuǎn)換)

    Java高效實現(xiàn)excel轉(zhuǎn)pdf(支持帶圖片的轉(zhuǎn)換)

    這篇文章主要為大家詳細介紹了如何用java實現(xiàn)excel轉(zhuǎn)pdf文件,并且支持excel單元格中帶有圖片的轉(zhuǎn)換,文中的示例代碼講解詳細,需要的可以參考下
    2024-01-01
  • 完美解決Eclipse 項目有紅感嘆號的問題

    完美解決Eclipse 項目有紅感嘆號的問題

    下面小編就為大家?guī)硪黄昝澜鉀QEclipse 項目有紅感嘆號的問題。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-01-01
  • 淺析java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別

    淺析java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別

    以下是對java中ArrayList與Vector的區(qū)別以及HashMap與Hashtable的區(qū)別進行了詳細的解析。需要的朋友可以過來參考下
    2013-08-08

最新評論