欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA多線程之實現(xiàn)用戶任務排隊并預估排隊時長

 更新時間:2021年12月09日 16:24:12   作者:洛陽泰山  
本文主要介紹了Java多線程之實現(xiàn)用戶任務排隊并預估排隊時長的問題,文中的代碼具有一定的學習和工作價值,感興趣的小伙伴快跟隨小編一起學習一下吧

實現(xiàn)流程

初始化一定數(shù)量的任務處理線程和緩存線程池,用戶每次調用接口,開啟一個線程處理。

假設初始化5個處理器,代碼執(zhí)行 BlockingQueue.take 時候,每次take都會處理器隊列就會減少一個,當處理器隊列為空時,take就是阻塞線程,當用戶處理某某任務完成時候,調用資源釋放接口,在處理器隊列put 一個處理器對象,原來阻塞的take ,就繼續(xù)執(zhí)行。

排隊論簡介

排隊論是研究系統(tǒng)隨機聚散現(xiàn)象和隨機系統(tǒng)工作工程的數(shù)學理論和方法,又稱隨機服務系統(tǒng)理論,為運籌學的一個分支。我們下面對排隊論做下簡化處理,先看下圖:

代碼具體實現(xiàn)

任務隊列初始化 TaskQueue

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * 初始化隊列及線程池
 * @author tarzan
 *
 */
@Component
public class TaskQueue {
    //處理器隊列
    public static BlockingQueue<TaskProcessor> taskProcessors;
    //等待任務隊列
    public static BlockingQueue<CompileTask> waitTasks;
    //處理任務隊列
    public static BlockingQueue<CompileTask> executeTasks;
    //線程池
    public static ExecutorService exec;
    //初始處理器數(shù)(計算機cpu可用線程數(shù))
    public static Integer processorNum=Runtime.getRuntime().availableProcessors();
 
    /**
     * 初始化處理器、等待任務、處理任務隊列及線程池
     */
    @PostConstruct
    public static void initEquipmentAndUsersQueue(){
        exec = Executors.newCachedThreadPool();
        taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
        //將空閑的設備放入設備隊列中
        setFreeDevices(processorNum);
        waitTasks =new LinkedBlockingQueue<CompileTask>();
        executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
    }
 
 
    /**
     * 將空閑的處理器放入處理器隊列中
     */
    private static void setFreeDevices(int num) {
        //獲取可用的設備
        for (int i = 0; i < num; i++) {
            TaskProcessor dc=new TaskProcessor();
            try {
                taskProcessors.put(dc);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
 
 
    public static CompileTask getWaitTask(Long clazzId) {
        return get(TaskQueue.waitTasks,clazzId);
    }
 
    public static CompileTask getExecuteTask(Long clazzId) {
        return get(TaskQueue.executeTasks,clazzId);
    }
 
 
    private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
        CompileTask compileTask =null;
        if (CollectionUtils.isNotEmpty(users)){
            Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
            if(optional.isPresent()){
                compileTask =  optional.get();
            }
        }
        return compileTask;
    }
 
    public static Integer getSort(Long clazzId) {
        AtomicInteger index = new AtomicInteger(-1);
        BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
        if (CollectionUtils.isNotEmpty(compileTasks)){
            compileTasks.stream()
                    .filter(e -> {
                        index.getAndIncrement();
                        return e.getClazzId().longValue() == clazzId.longValue();
                    })
                    .findFirst();
        }
        return index.get();
    }
 
    //單位秒
    public static int estimatedTime(Long clazzId){
        return  estimatedTime(60,getSort(clazzId)+1);
    }
 
    //單位秒
    public static int estimatedTime(int cellMs,int num){
         int a= (num-1)/processorNum;
         int b= cellMs*(a+1);
        return  b;
    }
 
 
 
 
 

編譯任務類 CompileTask

import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;
 
import java.util.Date;
 
 
@Data
public class CompileTask implements Runnable {
    //當前請求的線程對象
    private Long clazzId;
    //用戶id
    private Long userId;
    //當前請求的線程對象
    private Thread thread;
    //綁定處理器
    private TaskProcessor taskProcessor;
    //任務狀態(tài)
    private Integer status;
    //開始時間
    private Date startTime;
    //結束時間
    private Date endTime;
 
    private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);
 
    private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);
 
    @Override
    public void run() {
        compile();
    }
 
    /**
     * 編譯
     */
    public void compile() {
        try {
            //取出一個設備
            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
            //取出一個任務
            CompileTask compileTask = TaskQueue.waitTasks.take();
            //任務和設備綁定
            compileTask.setTaskProcessor(taskProcessor);
            //放入
            TaskQueue.executeTasks.put(compileTask);
            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
            //切換用戶數(shù)據(jù)源
            dataSourceService.switchDataSource(userId);
            //添加進度
            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
        } catch (InterruptedException e) {
            System.err.println( e.getMessage());
        }
    }
 
}

任務處理器 TaskProcessor?

import lombok.Data;
 
import java.util.Date;
 
@Data
public class TaskProcessor {
 
    /**
     * 釋放
     */
    public  static Boolean release(CompileTask task)  {
        Boolean flag=false;
        Thread thread=task.getThread();
        synchronized (thread) {
            try {
                if(null!=task.getTaskProcessor()){
                    TaskQueue.taskProcessors.put(task.getTaskProcessor());
                    TaskQueue.executeTasks.remove(task);
                    task.setEndTime(new Date());
                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
                    flag=true;
                    System.out.println("用戶"+task.getClazzId()+"耗時"+intervalMilli+"ms");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return flag;
        }
    }
 
}

Controller控制器接口實現(xiàn)

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;
 
import java.util.Date;
 
 
@RestController
@RequestMapping("task")
@Api(value = "數(shù)據(jù)編譯任務", tags = "數(shù)據(jù)編譯任務")
public class CompileTaskController {
 
    @ApiOperation(value = "添加等待請求 @author Tarzan Liu")
    @PostMapping("compile/{clazzId}")
    public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
        if(checkUser!=null){
            return  R.fail("已經(jīng)正在排隊!");
        }
        checkUser=TaskQueue.getExecuteTask(clazzId);
        if(checkUser!=null){
            return  R.fail("正在執(zhí)行編譯!");
        }
        //獲取當前的線程
        Thread thread=Thread.currentThread();
        //創(chuàng)建當前的用戶請求對象
        CompileTask compileTask =new CompileTask();
        compileTask.setThread(thread);
        compileTask.setClazzId(clazzId);
        compileTask.setStartTime(new Date());
        //將當前用戶請求對象放入隊列中
        try {
            TaskQueue.waitTasks.put(compileTask);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TaskQueue.exec.execute(compileTask);
        return R.data(TaskQueue.waitTasks.size()-1);
    }
 
    @ApiOperation(value = "查詢當前任務前還有多少任務等待 @author Tarzan Liu")
    @PostMapping("sort/{clazzId}")
    public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.getSort(clazzId));
    }
 
    @ApiOperation(value = "查詢當前任務預估時長 @author Tarzan Liu")
    @PostMapping("estimate/time/{clazzId}")
    public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.estimatedTime(clazzId));
    }
 
    @ApiOperation(value = "任務釋放 @author Tarzan Liu")
    @PostMapping("release/{clazzId}")
    public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
        CompileTask task=TaskQueue.getExecuteTask(clazzId);
        if(task==null){
            return  R.fail("資源釋放異常");
        }
        return R.status(TaskProcessor.release(task));
    }
 
    @ApiOperation(value = "執(zhí)行 @author Tarzan Liu")
    @PostMapping("exec")
    public R exec() {
        Long start=System.currentTimeMillis();
        for (Long i = 1L; i < 100; i++) {
            compile(i);
        }
        System.out.println("消耗時間:"+(System.currentTimeMillis()-start)+"ms");
        return R.status(true);
    }
}

接口測試

根據(jù)任務id查詢該任務前還有多少個任務待執(zhí)行

根據(jù)任務id查詢該任務預估執(zhí)行完成的剩余時間,單位秒

補充知識

BlockingQueue

BlockingQueue即阻塞隊列,它是基于ReentrantLock,依據(jù)它的基本原理,我們可以實現(xiàn)Web中的長連接聊天功能,當然其最常用的還是用于實現(xiàn)生產(chǎn)者與消費者模式,大致如下圖所示:

在Java中,BlockingQueue是一個接口,它的實現(xiàn)類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區(qū)別主要體現(xiàn)在存儲結構上或對元素操作上的不同,但是對于take與put操作的原理,卻是類似的。

阻塞與非阻塞

入隊

offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞

put(E e):如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞

offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現(xiàn)以下三種情況:-->阻塞

被喚醒

等待時間超時

當前線程被中斷

出隊

poll():如果沒有元素,直接返回null;如果有元素,出隊

take():如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷-->阻塞

poll(long timeout, TimeUnit unit):如果隊列不空,出隊;如果隊列已空且已經(jīng)超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現(xiàn)以下三種情況:

被喚醒

等待時間超時

當前線程被中斷?

到此這篇關于JAVA多線程之實現(xiàn)用戶任務排隊并預估排隊時長的文章就介紹到這了,更多相關JAVA 多線程 用戶任務排隊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java8 LocalDate 使用詳解

    java8 LocalDate 使用詳解

    這篇文章主要介紹了java8 LocalDate 使用詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • Java基礎之匿名內(nèi)部類、包裝類

    Java基礎之匿名內(nèi)部類、包裝類

    這篇文章主要給大家介紹了關于Java中方法使用的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-08-08
  • Java實現(xiàn)按中文首字母排序的具體實例

    Java實現(xiàn)按中文首字母排序的具體實例

    這篇文章主要介紹了Java實現(xiàn)按中文首字母排序的具體實例,有需要的朋友可以參考一下
    2013-12-12
  • spring cloud consul使用ip注冊服務的方法示例

    spring cloud consul使用ip注冊服務的方法示例

    這篇文章主要介紹了spring cloud consul使用ip注冊服務的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03
  • Java實現(xiàn)添加文字水印和圖片水印功能

    Java實現(xiàn)添加文字水印和圖片水印功能

    為圖片添加水印是一種常用的圖片處理技術,本文主要介紹了Java實現(xiàn)添加文字水印和圖片水印功能,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-05-05
  • 詳解Spring Boot下Druid連接池的使用配置分析

    詳解Spring Boot下Druid連接池的使用配置分析

    本篇文章主要介紹了詳解Spring Boot下Druid連接池的使用配置分析,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • Java Swing null絕對布局的實現(xiàn)示例

    Java Swing null絕對布局的實現(xiàn)示例

    這篇文章主要介紹了Java Swing null絕對布局的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-12-12
  • java發(fā)送heartbeat心跳包(byte轉16進制)

    java發(fā)送heartbeat心跳包(byte轉16進制)

    這篇文章主要介紹了java發(fā)送heartbeat心跳包(byte轉16進制),需要的朋友可以參考下
    2014-05-05
  • 在springboot中使用AOP進行全局日志記錄

    在springboot中使用AOP進行全局日志記錄

    這篇文章主要介紹就在springboot中使用AOP進行全局日志記錄,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java更新調度器(update scheduler)的使用詳解

    Java更新調度器(update scheduler)的使用詳解

    Java更新調度器是Java中的一個特性,可以自動化Java應用程序的更新過程,它提供了一種方便的方式來安排Java應用程序的更新,確保其與最新的功能、錯誤修復和安全補丁保持同步,本文將深入介紹如何使用Java更新調度器,并解釋它對Java開發(fā)人員和用戶的好處
    2023-11-11

最新評論