Java多線程并發(fā)執(zhí)行demo代碼實(shí)例
主類:MultiThread,執(zhí)行并發(fā)類
package java8test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @param <H> 為被處理的數(shù)據(jù)類型
* @param <T>返回?cái)?shù)據(jù)類型
* 知識(shí)點(diǎn)1:X,T為泛型,為什么要用泛型,泛型和Object的區(qū)別請(qǐng)看:https://www.cnblogs.com/xiaoxiong2015/p/12705815.html
*/
public abstract class MultiThread<X, T> {
public static int i = 0;
// 知識(shí)點(diǎn)2:線程池:https://www.cnblogs.com/xiaoxiong2015/p/12706153.html
private final ExecutorService exec; // 線程池
// 知識(shí)點(diǎn)3:@author Doung Lea 隊(duì)列:https://www.cnblogs.com/xiaoxiong2015/p/12825636.html
private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>();
// 知識(shí)點(diǎn)4:計(jì)數(shù)器,還是并發(fā)包大神 @author Doug Lea 編寫(xiě)。是一個(gè)原子安全的計(jì)數(shù)器,可以利用它實(shí)現(xiàn)發(fā)令槍
private final CountDownLatch startLock = new CountDownLatch(1); // 啟動(dòng)門(mén),當(dāng)所有線程就緒時(shí)調(diào)用countDown
private final CountDownLatch endLock; // 結(jié)束門(mén)
private final List<X> listData;// 被處理的數(shù)據(jù)
/**
* @param list list.size()為多少個(gè)線程處理,list里面的H為被處理的數(shù)據(jù)
*/
public MultiThread(List<X> list) {
if (list != null && list.size() > 0) {
this.listData = list;
exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 創(chuàng)建線程池,線程池共有nThread個(gè)線程
endLock = new CountDownLatch(list.size()); // 設(shè)置結(jié)束門(mén)計(jì)數(shù)器,當(dāng)一個(gè)線程結(jié)束時(shí)調(diào)用countDown
} else {
listData = null;
exec = null;
endLock = null;
}
}
/**
*
* @return 獲取每個(gè)線程處理結(jié)速的數(shù)組
* @throws InterruptedException
* @throws ExecutionException
*/
public List<T> getResult() throws InterruptedException, ExecutionException {
List<T> resultList = new ArrayList<>();
if (listData != null && listData.size() > 0) {
int nThread = listData.size(); // 線程數(shù)量
for (int i = 0; i < nThread; i++) {
X data = listData.get(i);
Future<T> future = exec.submit(new Task(i, data) {
@Override
public T execute(int currentThread, X data) {
return outExecute(currentThread, data);
}
}); // 將任務(wù)提交到線程池
queue.add(future); // 將Future實(shí)例添加至隊(duì)列
}
startLock.countDown(); // 所有任務(wù)添加完畢,啟動(dòng)門(mén)計(jì)數(shù)器減1,這時(shí)計(jì)數(shù)器為0,所有添加的任務(wù)開(kāi)始執(zhí)行
endLock.await(); // 主線程阻塞,直到所有線程執(zhí)行完成
for (Future<T> future : queue) {
resultList.add(future.get());
}
exec.shutdown(); // 關(guān)閉線程池
}
return resultList;
}
/**
* 每一個(gè)線程執(zhí)行的功能,需要調(diào)用者來(lái)實(shí)現(xiàn)
* @param currentThread 線程號(hào)
* @param data 每個(gè)線程被處理的數(shù)據(jù)
* @return T返回對(duì)象
*/
public abstract T outExecute(int currentThread, X data);
/**
* 線程類
*/
private abstract class Task implements Callable<T> {
private int currentThread;// 當(dāng)前線程號(hào)
private X data;
public Task(int currentThread, X data) {
this.currentThread = currentThread;
this.data = data;
}
@Override
public T call() throws Exception {
// startLock.await(); // 線程啟動(dòng)后調(diào)用await,當(dāng)前線程阻塞,只有啟動(dòng)門(mén)計(jì)數(shù)器為0時(shí)當(dāng)前線程才會(huì)往下執(zhí)行
T t = null;
try {
t = execute(currentThread, data);
} finally {
endLock.countDown(); // 線程執(zhí)行完畢,結(jié)束門(mén)計(jì)數(shù)器減1
}
return t;
}
/**
* 每一個(gè)線程執(zhí)行的功能
* @param currentThread 線程號(hào)
* @param data 每個(gè)線程被處理的數(shù)據(jù)
* @return T返回對(duì)象
*/
public abstract T execute(int currentThread, X data);
}
}
結(jié)果類:ResultVO,保存返回結(jié)果,根據(jù)實(shí)際情況替換成自己的
package java8test;
public class ResultVo {
int i;
public ResultVo(int i) {
this.i = i;
}
public ResultVo() {
// TODO Auto-generated constructor stub
}
}
參數(shù)類:ParamVO,傳入?yún)?shù)類,根據(jù)實(shí)際情況替換成自己的
package java8test;
public class ParamVo {
private int i;
ParamVo(int i) {
this.i = i;
}
public int getI() {
return i;
}
@Override
public String toString() {
return String.valueOf(i) + " " + hashCode();
}
}
測(cè)試類:new兩個(gè)MultiThread,可以看到MultiThread這個(gè)類不存在線程安全問(wèn)題。
package java8test;
import java.util.ArrayList;
import java.util.List;
public class Test {
public static void main(String[] args) {
try {
List<ParamVo> splitList = new ArrayList<ParamVo>();
for (int i = 0; i < 100; i++) {
splitList.add(new ParamVo(i));
}
List<ParamVo> splitList1 = new ArrayList<ParamVo>();
for (int i = 200; i < 300; i++) {
splitList1.add(new ParamVo(i));
}
MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) {
@Override
public ResultVo outExecute(int currentThread, ParamVo data) {
System.out.println("當(dāng)前線程名稱:" + Thread.currentThread().getName() + "當(dāng)前線程號(hào)=" + currentThread
+ " data=" + data);
i--;
return new ResultVo(data.getI());
}
};
MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) {
@Override
public ResultVo outExecute(int currentThread, ParamVo data) {
System.out.println("當(dāng)前線程名稱:" + Thread.currentThread().getName() + "當(dāng)前線程號(hào)=" + currentThread
+ " data=" + data);
i--;
return new ResultVo(data.getI());
}
};
List<ResultVo> list = multiThread.getResult();
List<ResultVo> list1 = multiThread1.getResult();
// 獲取每一批次處理結(jié)果
System.out.println("獲取處理結(jié)果........................");
for (ResultVo vo : list) {
System.out.println(vo.i);
}
System.out.println("獲取1處理結(jié)果........................");
for (ResultVo vo : list1) {
System.out.println(vo.i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
這個(gè)類也用在了生產(chǎn)當(dāng)中,用來(lái)并發(fā)插入數(shù)據(jù)。但是事務(wù)不能被管控,需要自己保證最終事務(wù)一致。需要注意。


以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 簡(jiǎn)述Java中進(jìn)程與線程的關(guān)系_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
- java中進(jìn)程與線程_三種實(shí)現(xiàn)方式總結(jié)(必看篇)
- java多線程之線程,進(jìn)程和Synchronized概念初解
- java高并發(fā)之理解進(jìn)程和線程
- Java多線程高并發(fā)中解決ArrayList與HashSet和HashMap不安全的方案
- java多線程之并發(fā)工具類CountDownLatch,CyclicBarrier和Semaphore
- Java的線程與進(jìn)程以及線程的四種創(chuàng)建方式
- Java并發(fā)編程之volatile與JMM多線程內(nèi)存模型
- Java多線程并發(fā)與并行和線程與進(jìn)程案例
相關(guān)文章
Lucene實(shí)現(xiàn)索引和查詢的實(shí)例講解
下面小編就為大家分享一篇Lucene實(shí)現(xiàn)索引和查詢的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-12-12
Spring中的FactoryBean與BeanFactory詳細(xì)解析
這篇文章主要介紹了Spring中的FactoryBean與BeanFactory詳細(xì)解析,在Spring框架中,FactoryBean和BeanFactory是兩個(gè)關(guān)鍵的接口,用于創(chuàng)建和管理對(duì)象實(shí)例,它們?cè)赟pring的IoC(Inversion of Control,控制反轉(zhuǎn))容器中發(fā)揮著重要的作用,需要的朋友可以參考下2023-11-11
Hutool開(kāi)發(fā)MapUtil工具類使用示例
這篇文章主要為大家介紹了Hutool開(kāi)發(fā)MapUtil工具類使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10
老生常談spring boot 1.5.4 日志管理(必看篇)
下面小編就為大家?guī)?lái)一篇老生常談spring boot 1.5.4 日志管理(必看篇)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06
Springboot項(xiàng)目因?yàn)閗ackson版本問(wèn)題啟動(dòng)報(bào)錯(cuò)解決方案
這篇文章主要介紹了Springboot項(xiàng)目因?yàn)閗ackson版本問(wèn)題啟動(dòng)報(bào)錯(cuò)解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
SpringBoot Shiro 權(quán)限注解不起作用的解決方法
本文主要介紹了SpringBoot Shiro 權(quán)限注解不起作用的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
MyBatis入門(mén)學(xué)習(xí)教程-MyBatis快速入門(mén)
MyBatis是一個(gè)支持普通SQL查詢,存儲(chǔ)過(guò)程和高級(jí)映射的優(yōu)秀持久層框架,這篇文章主要給大家分享MyBatis的一篇快速入門(mén)教程2021-06-06

