Java實(shí)現(xiàn)多任務(wù)執(zhí)行助手
本文實(shí)例為大家分享了Java實(shí)現(xiàn)多任務(wù)執(zhí)行助手的具體代碼,供大家參考,具體內(nèi)容如下
1.多線程執(zhí)行任務(wù)類
package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig { ? ? private TheadPoolProperties theadPoolProperties; ? ? private ThreadPoolExecutor executor; ? ? private ThreadPoolExecutor executorChild; ? ? public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) { ? ? ? ? this.theadPoolProperties = theadPoolProperties; ? ? ? ? this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); ? ? ? ? this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); ? ? } ? ? public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) { ? ? ? ? if (taskList != null && !taskList.isEmpty()) { ? ? ? ? ? ? List<V> resultList = new ArrayList(); ? ? ? ? ? ? List futureList = null; ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) { ? ? ? ? ? ? ? ? ? ? throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size()); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) { ? ? ? ? ? ? ? ? ? ? futureList = executorChilds[0].invokeAll(taskList); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } catch (InterruptedException var6) { ? ? ? ? ? ? ? ? var6.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? ? ? this.doFutureList(resultList, futureList); ? ? ? ? ? ? return resultList; ? ? ? ? } else { ? ? ? ? ? ? return null; ? ? ? ? } ? ? } ? ? <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) { ? ? ? ? if (futureList != null) { ? ? ? ? ? ? Iterator var3 = futureList.iterator(); ? ? ? ? ? ? while(var3.hasNext()) { ? ? ? ? ? ? ? ? Future future = (Future)var3.next(); ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? resultList.add(future.get()); ? ? ? ? ? ? ? ? } catch (ExecutionException | InterruptedException var6) { ? ? ? ? ? ? ? ? ? ? var6.printStackTrace(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) { ? ? ? ? if (taskList != null && !taskList.isEmpty()) { ? ? ? ? ? ? Iterator var2 = taskList.iterator(); ? ? ? ? ? ? while(var2.hasNext()) { ? ? ? ? ? ? ? ? Callable<V> call = (Callable)var2.next(); ? ? ? ? ? ? ? ? this.executor.submit(call); ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? public TheadPoolProperties getTheadPoolProperties() { ? ? ? ? return this.theadPoolProperties; ? ? } ? ? public ThreadPoolExecutor getExecutor() { ? ? ? ? return this.executor; ? ? } ? ? public ThreadPoolExecutor getExecutorChild() { ? ? ? ? return this.executorChild; ? ? } ? ? public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) { ? ? ? ? this.theadPoolProperties = theadPoolProperties; ? ? } ? ? public void setExecutor(ThreadPoolExecutor executor) { ? ? ? ? this.executor = executor; ? ? } ? ? public void setExecutorChild(ThreadPoolExecutor executorChild) { ? ? ? ? this.executorChild = executorChild; ? ? } ? ? public boolean equals(Object o) { ? ? ? ? if (o == this) { ? ? ? ? ? ? return true; ? ? ? ? } else if (!(o instanceof ThreadPoolConfig)) { ? ? ? ? ? ? return false; ? ? ? ? } else { ? ? ? ? ? ? ThreadPoolConfig other = (ThreadPoolConfig)o; ? ? ? ? ? ? if (!other.canEqual(this)) { ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? label47: { ? ? ? ? ? ? ? ? ? ? Object this$theadPoolProperties = this.getTheadPoolProperties(); ? ? ? ? ? ? ? ? ? ? Object other$theadPoolProperties = other.getTheadPoolProperties(); ? ? ? ? ? ? ? ? ? ? if (this$theadPoolProperties == null) { ? ? ? ? ? ? ? ? ? ? ? ? if (other$theadPoolProperties == null) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? break label47; ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) { ? ? ? ? ? ? ? ? ? ? ? ? break label47; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? Object this$executor = this.getExecutor(); ? ? ? ? ? ? ? ? Object other$executor = other.getExecutor(); ? ? ? ? ? ? ? ? if (this$executor == null) { ? ? ? ? ? ? ? ? ? ? if (other$executor != null) { ? ? ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } else if (!this$executor.equals(other$executor)) { ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? Object this$executorChild = this.getExecutorChild(); ? ? ? ? ? ? ? ? Object other$executorChild = other.getExecutorChild(); ? ? ? ? ? ? ? ? if (this$executorChild == null) { ? ? ? ? ? ? ? ? ? ? if (other$executorChild != null) { ? ? ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } else if (!this$executorChild.equals(other$executorChild)) { ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? protected boolean canEqual(Object other) { ? ? ? ? return other instanceof ThreadPoolConfig; ? ? } ? ? public int hashCode() { ? ? ? ? int PRIME = true; ? ? ? ? int result = 1; ? ? ? ? Object $theadPoolProperties = this.getTheadPoolProperties(); ? ? ? ? int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode()); ? ? ? ? Object $executor = this.getExecutor(); ? ? ? ? result = result * 59 + ($executor == null ? 43 : $executor.hashCode()); ? ? ? ? Object $executorChild = this.getExecutorChild(); ? ? ? ? result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode()); ? ? ? ? return result; ? ? } ? ? public String toString() { ? ? ? ? return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")"; ? ? } }
2.執(zhí)行器工廠類
package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory { ? ? private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class); ? ? private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap(); ? ? private static final int DEFAULT_QUEUE_SIZE = 1000; ? ? private static final String DEFAULT_EXECUTOR_NAME = "default-executor"; ? ? private static final int MAX_THREAD_NUM = 100; ? ? private static final int CORE_THREAD_NUM = 1; ? ? private static volatile ExecutorFactory instance; ? ? private ExecutorFactory() { ? ? } ? ? public static ExecutorFactory getInstance() { ? ? ? ? if (instance == null) { ? ? ? ? ? ? Class var0 = ExecutorFactory.class; ? ? ? ? ? ? synchronized(ExecutorFactory.class) { ? ? ? ? ? ? ? ? if (instance == null) { ? ? ? ? ? ? ? ? ? ? instance = new ExecutorFactory(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? return instance; ? ? } ? ? public ThreadPoolExecutor getThreadPoolExecutorByName(String name) { ? ? ? ? return (ThreadPoolExecutor)threadPoolExecutorMap.get(name); ? ? } ? ? public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() { ? ? ? ? return threadPoolExecutorMap; ? ? } ? ? public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) { ? ? ? ? if (StringUtils.isBlank(threadPoolExecutorName)) { ? ? ? ? ? ? throw new IllegalArgumentException("thread name empty"); ? ? ? ? } else { ? ? ? ? ? ? if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ? ? ? ? ? ? ? ? Class var5 = ExecutorFactory.class; ? ? ? ? ? ? ? ? synchronized(ExecutorFactory.class) { ? ? ? ? ? ? ? ? ? ? if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ? ? ? ? ? ? ? ? ? ? ? ? ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor(); ? ? ? ? ? ? ? ? ? ? ? ? threadPoolExecutorMap.put(threadPoolExecutorName, executor); ? ? ? ? ? ? ? ? ? ? ? ? logger.info("thread name: {} executor created", threadPoolExecutorName); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName); ? ? ? ? } ? ? } ? ? public <T extends Runnable> void submit(T t) { ? ? ? ? ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); ? ? ? ? defaultExecutor.submit(t); ? ? } ? ? public <T extends Runnable> void submit(String poolName, T t) { ? ? ? ? ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); ? ? ? ? if (executor == null) { ? ? ? ? ? ? logger.error("thread name: {} executor not exist.", poolName); ? ? ? ? ? ? throw new IllegalArgumentException("thread name:" + poolName + " executor not exist."); ? ? ? ? } else { ? ? ? ? ? ? executor.submit(t); ? ? ? ? } ? ? } ? ? public <T extends Callable<Object>> Future<Object> submit(T t) { ? ? ? ? ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); ? ? ? ? return defaultExecutor.submit(t); ? ? } ? ? public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) { ? ? ? ? ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); ? ? ? ? if (executor == null) { ? ? ? ? ? ? logger.error("thread poolName: {} executor not exist.", poolName); ? ? ? ? ? ? throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist."); ? ? ? ? } else { ? ? ? ? ? ? return executor.submit(t); ? ? ? ? } ? ? } ? ? public ThreadPoolExecutor getThreadPoolExecutor() { ? ? ? ? return this.getThreadPoolExecutor("default-executor", 1000, 1, 100); ? ? } }
3.多線程配置類
package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties { ? ? // 執(zhí)行并行任務(wù)時(shí),等待多久時(shí)間超時(shí)(單位:秒) ? ? @NotNull ? ? private Integer timeOut; ? ? // 隊(duì)列大小 ? ? @NotNull ? ? private Integer queueSize;? ? ? // 核心線程數(shù)量 ? ? @NotNull ? ? private Integer coreThreadNum; ? ? // 線程池最大線程數(shù)量 ? ? @NotNull ? ? private Integer maxPoolSize; ? ? // 并行執(zhí)行每組大小 ? ? private Integer groupSize = 20; ? ? public TheadPoolProperties() { ? ? } ? ? public Integer getTimeOut() { ? ? ? ? return this.timeOut; ? ? } ? ? public Integer getQueueSize() { ? ? ? ? return this.queueSize; ? ? } ? ? public Integer getCoreThreadNum() { ? ? ? ? return this.coreThreadNum; ? ? } ? ? public Integer getMaxPoolSize() { ? ? ? ? return this.maxPoolSize; ? ? } ? ? public Integer getGroupSize() { ? ? ? ? return this.groupSize; ? ? } ? ? public void setTimeOut(Integer timeOut) { ? ? ? ? this.timeOut = timeOut; ? ? } ? ? public void setQueueSize(Integer queueSize) { ? ? ? ? this.queueSize = queueSize; ? ? } ? ? public void setCoreThreadNum(Integer coreThreadNum) { ? ? ? ? this.coreThreadNum = coreThreadNum; ? ? } ? ? public void setMaxPoolSize(Integer maxPoolSize) { ? ? ? ? this.maxPoolSize = maxPoolSize; ? ? } ? ? public void setGroupSize(Integer groupSize) { ? ? ? ? this.groupSize = groupSize; ? ? } ? ? public boolean equals(Object o) { ? ? ? ? if (o == this) { ? ? ? ? ? ? return true; ? ? ? ? } else if (!(o instanceof TheadPoolProperties)) { ? ? ? ? ? ? return false; ? ? ? ? } else { ? ? ? ? ? ? TheadPoolProperties other = (TheadPoolProperties)o; ? ? ? ? ? ? if (!other.canEqual(this)) { ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? label71: { ? ? ? ? ? ? ? ? ? ? Object this$timeOut = this.getTimeOut(); ? ? ? ? ? ? ? ? ? ? Object other$timeOut = other.getTimeOut(); ? ? ? ? ? ? ? ? ? ? if (this$timeOut == null) { ? ? ? ? ? ? ? ? ? ? ? ? if (other$timeOut == null) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? break label71; ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } else if (this$timeOut.equals(other$timeOut)) { ? ? ? ? ? ? ? ? ? ? ? ? break label71; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? Object this$queueSize = this.getQueueSize(); ? ? ? ? ? ? ? ? Object other$queueSize = other.getQueueSize(); ? ? ? ? ? ? ? ? if (this$queueSize == null) { ? ? ? ? ? ? ? ? ? ? if (other$queueSize != null) { ? ? ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } else if (!this$queueSize.equals(other$queueSize)) { ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? label57: { ? ? ? ? ? ? ? ? ? ? Object this$coreThreadNum = this.getCoreThreadNum(); ? ? ? ? ? ? ? ? ? ? Object other$coreThreadNum = other.getCoreThreadNum(); ? ? ? ? ? ? ? ? ? ? if (this$coreThreadNum == null) { ? ? ? ? ? ? ? ? ? ? ? ? if (other$coreThreadNum == null) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? break label57; ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } else if (this$coreThreadNum.equals(other$coreThreadNum)) { ? ? ? ? ? ? ? ? ? ? ? ? break label57; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? Object this$maxPoolSize = this.getMaxPoolSize(); ? ? ? ? ? ? ? ? Object other$maxPoolSize = other.getMaxPoolSize(); ? ? ? ? ? ? ? ? if (this$maxPoolSize == null) { ? ? ? ? ? ? ? ? ? ? if (other$maxPoolSize != null) { ? ? ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } else if (!this$maxPoolSize.equals(other$maxPoolSize)) { ? ? ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? Object this$groupSize = this.getGroupSize(); ? ? ? ? ? ? ? ? Object other$groupSize = other.getGroupSize(); ? ? ? ? ? ? ? ? if (this$groupSize == null) { ? ? ? ? ? ? ? ? ? ? if (other$groupSize == null) { ? ? ? ? ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } else if (this$groupSize.equals(other$groupSize)) { ? ? ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? return false; ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? protected boolean canEqual(Object other) { ? ? ? ? return other instanceof TheadPoolProperties; ? ? } ? ? public int hashCode() { ? ? ? ? int PRIME = true; ? ? ? ? int result = 1; ? ? ? ? Object $timeOut = this.getTimeOut(); ? ? ? ? int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode()); ? ? ? ? Object $queueSize = this.getQueueSize(); ? ? ? ? result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode()); ? ? ? ? Object $coreThreadNum = this.getCoreThreadNum(); ? ? ? ? result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode()); ? ? ? ? Object $maxPoolSize = this.getMaxPoolSize(); ? ? ? ? result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode()); ? ? ? ? Object $groupSize = this.getGroupSize(); ? ? ? ? result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode()); ? ? ? ? return result; ? ? } ? ? public String toString() { ? ? ? ? return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")"; ? ? } }
4.列表拆分工具類
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** ?* 列表或數(shù)組按指定大小分組,用于批量取一部分?jǐn)?shù)據(jù)循環(huán)處理 ?* ?*/ public class ArraySplitUtil<T> { ? ? /** ? ? ?* 按指定大小對(duì)列表分組 ? ? ?* @param list ? ? ?* @param splitSize ? ? ?* @return ? ? ?*/ ? ? public List<List<T>> splistList(List<T> list, int splitSize) { ? ? ? ? if (null == list || list.size() == 0) { ? ? ? ? ? ? return null; ? ? ? ? } ? ? ? ? int listSize = list.size(); ? ? ? ? List<List<T>> newList = new ArrayList<>(); ? ? ? ? if (listSize < splitSize) { ? ? ? ? ? ? newList.add(list); ? ? ? ? ? ? return newList; ? ? ? ? } ? ? ? ? int addLength = splitSize; ? ? ? ? int times = listSize / splitSize; ? ? ? ? if (listSize % splitSize != 0) { ? ? ? ? ? ? times += 1; ? ? ? ? } ? ? ? ? int start = 0; ? ? ? ? int end = 0; ? ? ? ? int last = times - 1; ? ? ? ? for (int i = 0; i < times; i++) { ? ? ? ? ? ? start = i * splitSize; ? ? ? ? ? ? if (i < last) { ? ? ? ? ? ? ? ? end = start + addLength; ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? end = listSize; ? ? ? ? ? ? } ? ? ? ? ? ? newList.add(list.subList(start, end)); ? ? ? ? } ? ? ? ? return newList; ? ? } ? ? /** ? ? ?* 按指定大小對(duì)數(shù)組分組 ? ? ?* @param array ? ? ?* @param splitSize ? ? ?* @return ? ? ?*/ ? ? public List<T[]> splistArray(T[] array, int splitSize) { ? ? ? ? if (null == array) { ? ? ? ? ? ? return null; ? ? ? ? } ? ? ? ? int listSize = array.length; ? ? ? ? List<T[]> newList = new ArrayList<>(); ? ? ? ? if (listSize < splitSize) { ? ? ? ? ? ? newList.add(array); ? ? ? ? ? ? return newList; ? ? ? ? } ? ? ? ? int addLength = splitSize; ? ? ? ? int times = listSize / splitSize; ? ? ? ? if (listSize % splitSize != 0) { ? ? ? ? ? ? times += 1; ? ? ? ? } ? ? ? ? int start = 0; ? ? ? ? int end = 0; ? ? ? ? int last = times - 1; ? ? ? ? for (int i = 0; i < times; i++) { ? ? ? ? ? ? start = i * splitSize; ? ? ? ? ? ? if (i < last) { ? ? ? ? ? ? ? ? end = start + addLength; ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? end = listSize; ? ? ? ? ? ? } ? ? ? ? ? ? newList.add(Arrays.copyOfRange(array, start, end)); ? ? ? ? } ? ? ? ? return newList; ? ? } ? ? public static <E> ArraySplitUtil<E> build(){ ? ? ? ? return new ArraySplitUtil<>(); ? ? } }
5.多任務(wù)執(zhí)行助手類
package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** ?* 多任務(wù)助手 ?* @author visy.wang ?* @date 2022/5/9 14:38 ?*/ @Service public class MultiTaskHelper { ? ? @Autowired ? ? private ThreadPoolConfig threadPoolConfig; ? ? private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>(); ? ? public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){ ? ? ? ? return createAndRunListTask(list, null, handler); ? ? } ? ? public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){ ? ? ? ? return createAndRunListTask(list, handler, null); ? ? } ? ? public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){ ? ? ? ? createAndRunListTaskWithoutReturn(list, null, handler); ? ? } ? ? public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){ ? ? ? ? createAndRunListTaskWithoutReturn(list, handler, null); ? ? } ? ? /** ? ? ?* 把列表按線程數(shù)分組 ? ? ?* @param list 列表 ? ? ?* @return 分組后的列表 ? ? ?*/ ? ? @SuppressWarnings("unchecked") ? ? private <T> List<List<T>> listSplit(List<T> list){ ? ? ? ? String key = list.get(0).getClass().getName(); ? ? ? ? int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize(); ? ? ? ? ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key); ? ? ? ? if(Objects.isNull(arraySplitUtil)){ ? ? ? ? ? ? arraySplitUtil = ArraySplitUtil.build(); ? ? ? ? ? ? ArraySplitUtilCache.put(key, arraySplitUtil); ? ? ? ? } ? ? ? ? return arraySplitUtil.splistList(list, groupSize); ? ? } ? ? /** ? ? ?* 創(chuàng)建并運(yùn)行多任務(wù) ? ? ?* @param list 輸入數(shù)據(jù)列表 ? ? ?* @param handler1 處理器1 (優(yōu)先級(jí)使用) ? ? ?* @param handler2 處理器2 ? ? ?* @param <I> 輸入數(shù)據(jù)類型 ? ? ?* @param <O> 輸出數(shù)據(jù)類型 ? ? ?* @return 執(zhí)行結(jié)果分組列表 ? ? ?*/ ? ? private <I,O> List<List<O>> createAndRunListTask(List<I> list, ?Function<List<I>, List<O>> handler1, Function<I,O> handler2){ ? ? ? ? List<List<I>> listGroup = listSplit(list); ? ? ? ? //設(shè)定每個(gè)組的任務(wù) ? ? ? ? List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); ? ? ? ? listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { ? ? ? ? ? ? taskList.add(() -> { ? ? ? ? ? ? ? ? if(Objects.nonNull(handler1)){ ? ? ? ? ? ? ? ? ? ? return handler1.apply(subList); ? ? ? ? ? ? ? ? }else if(Objects.nonNull(handler2)){ ? ? ? ? ? ? ? ? ? ? return subList.stream().map(handler2).collect(Collectors.toList()); ? ? ? ? ? ? ? ? }else{ ? ? ? ? ? ? ? ? ? ? return null; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? }); ? ? ? ? }); ? ? ? ? return threadPoolConfig.doConcurrentTask(taskList); ? ? } ? ? /** ? ? ?* 創(chuàng)建并運(yùn)行多任務(wù)(無返回結(jié)果) ? ? ?* @param list 輸入數(shù)據(jù)列表 ? ? ?* @param handler1 處理器1 (優(yōu)先級(jí)更高) ? ? ?* @param handler2 處理器2 ? ? ?* @param <I> 輸入數(shù)據(jù)類型 ? ? ?*/ ? ? private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){ ? ? ? ? List<List<I>> listGroup = listSplit(list); ? ? ? ? //設(shè)定每個(gè)組的任務(wù) ? ? ? ? List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); ? ? ? ? listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { ? ? ? ? ? ? taskList.add(() -> { ? ? ? ? ? ? ? ? if(Objects.nonNull(handler1)){ ? ? ? ? ? ? ? ? ? ? handler1.accept(subList); ? ? ? ? ? ? ? ? }else if(Objects.nonNull(handler2)){ ? ? ? ? ? ? ? ? ? ? subList.forEach(handler2); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? return null; ? ? ? ? ? ? }); ? ? ? ? }); ? ? ? ? threadPoolConfig.doConcurrentTask(taskList); ? ? } }
6.多任務(wù)助手使用:
@Autowired package com.zoom.fleet.schedule.service; import com.visy.helper.MultiTaskHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** ?* 多任務(wù)助手使用示例 ?* @author visy.wang ?* @date 2022/5/13 14:11 ?*/ @Service public class MultiTaskTest { ? ? @Autowired ? ? private MultiTaskHelper multiTaskHelper; ? ? private void test(){ ? ? ? ? //待多任務(wù)執(zhí)行的數(shù)據(jù)列表 ? ? ? ? List<String> idList = new ArrayList<>(); ? ? ? ? //1.有返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯,返回多任務(wù)執(zhí)行結(jié)果和合集 ? ? ? ? List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{ ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼 ? ? ? ? ? ? return Long.valueOf(id); ? ? ? ? }); ? ? ? ? //2.有返回結(jié)果的執(zhí)行方式二, 定義單個(gè)數(shù)線程的處理邏輯,返回多任務(wù)執(zhí)行結(jié)果和合集 ? ? ? ? resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{ ? ? ? ? ? ? //每一個(gè)線程下列表操作的業(yè)務(wù)代碼 ? ? ? ? ? ? return subIdList.stream().map(id->{ ? ? ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼 ? ? ? ? ? ? ? ? return Long.valueOf(id); ? ? ? ? ? ? }).collect(Collectors.toList()); ? ? ? ? }); ? ? ? ? //3.無返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯 ? ? ? ? multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{ ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼... ? ? ? ? }); ? ? ? ? //3.無返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯 ? ? ? ? multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{ ? ? ? ? ? ? subIdList.forEach(id->{ ? ? ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼... ? ? ? ? ? ? }); ? ? ? ? ? ? //繼續(xù)操作subIdList... ? ? ? ? }); ? ? } }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java編程之單元測(cè)試(Junit)實(shí)例分析(附實(shí)例源碼)
這篇文章主要介紹了java編程之單元測(cè)試(Junit),結(jié)合實(shí)例形式較為詳細(xì)的分析總結(jié)了Java單元測(cè)試的原理、步驟及相關(guān)注意事項(xiàng),并附帶了完整代碼供讀者下載參考,需要的朋友可以參考下2015-11-11Java集合基礎(chǔ)知識(shí) List/Set/Map詳解
這篇文章主要介紹了Java集合基礎(chǔ)知識(shí) List/Set/Map,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03spring中@autowired、@Qualifier、@Primary注解的使用說明
這篇文章主要介紹了spring中@autowired、@Qualifier、@Primary注解的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11SpringBoot實(shí)現(xiàn)自定義指標(biāo)監(jiān)控功能
本文主要介紹了SpringBoot實(shí)現(xiàn)自定義指標(biāo)監(jiān)控功能的實(shí)現(xiàn),,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,感興趣的小伙伴跟著著小編來一起來學(xué)習(xí)吧2024-01-01mybatis攔截器實(shí)現(xiàn)數(shù)據(jù)權(quán)限項(xiàng)目實(shí)踐
本文主要介紹了mybatis攔截器實(shí)現(xiàn)數(shù)據(jù)權(quán)限項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06Java8進(jìn)行多個(gè)字段分組統(tǒng)計(jì)的實(shí)例代碼
在本篇文章里小編給大家分享的是關(guān)于Java8進(jìn)行多個(gè)字段分組統(tǒng)計(jì)的實(shí)例代碼,需要的朋友們可以學(xué)習(xí)下。2020-05-05