重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

如何进行ThreadPoolExecutor源码解析

如何进行ThreadPoolExecutor 源码解析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

创新互联主要从事成都做网站、成都网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务万载,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108

一、线程

  1. 线程是CPU 调度的最小操作单位,线程模型分为KLT 模型和ULT 模型,JVM 使用的是KLT 模型。

  2. 线程的状态 :NEW,RUNNABLE,BLOCKED,TERMINATED

二、线程池

1. 线程池解决的两大核心问题:
  • 在执行大量异步运算的时候,线程池用优化系统性能,减少线程的反复创建所带来的的系统开销

  • 提供了一种限制和管理资源的方法

2. 7 大核心参数:
  1. corePoolSize :线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。

  2. maximumPoolSize :线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

  3. keepAliveTime :线程池维护线程所允许的空闲时间。当线程池中的线程数量大corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime;

  4. unit: keepAliveTime的单位;

  5. workQueue: 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

  • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene; -

  • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;

  • priorityBlockingQuene:具有优先级的无界阻塞队列;

  1. threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。

  2. handler: 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略:

  • AbortPolicy:直接抛出异常,默认策略;

  • CallerRunsPolicy:用调用者所在的线程来执行任务;

  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

  • DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。

3. 线程池的生命周期状态 :
  • NEW

  • RUNNABLE

  • WATING

  • BLOCKED

  • TIMED_WATING

  • TERMINATED

4. 线程池的重要属性 :ctl
  1. ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两 部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这 里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常 量表示workerCount的上限值,大约是5亿。

  2. runState 主要提供线程池生命周期的控制,主要值包括:

  • RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行 处理。

(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处 于RUNNING状态,并且线程池中的任务数为0!

  • SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> -SHUTDOWN。

  • STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中 断正在处理的任务。

(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

  • TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。

(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的 任务为空时,就会由STOP -> TIDYING。

  • TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。

(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING - > TERMINATED。 进入TERMINATED的条件如下: 线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 如果线程池状态是SHUTDOWN并且workerQueue为空; workerCount为0,设置TIDYING状态成功。

如何进行ThreadPoolExecutor 源码解析

  1. ctl相关 API

  • runStateOf():获取运行状态;

  • workerCountOf():获取活动线程数;

  • ctlOf():获取运行状态和活动线程数的值。

5. 线程池的行为
  • execute(Runnable command):执行Ruannable类型的任务

  • submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象

  • shutdown():在完成已提交的任务后封闭办事,不再接管新任务,

  • shutdownNow():停止所有正在履行的任务并封闭办事。

  • isTerminated():测试是否所有任务都履行完毕了。

  • isShutdown():测试是否该ExecutorService已被关闭。

6. 常用线程池的具体实现
ThreadPoolExecutor 默认线程池 
ScheduledThreadPoolExecutor 定时线程池
7. 线程池监控API
  • public long getTaskCount() //线程池已执行与未执行的任务总数

  • public long getCompletedTaskCount() //已完成的任务数

  • public int getPoolSize() //线程池当前的线程数

  • public int getActiveCount() //线程池中正在执行任务的线程数量

三、源码解析

execute() 方法
//在将来的某个时间执行给定的任务。任务可以是新起一个新线程或者复用现有池线程中的线程去执行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 执行过程还是分为 3 步:
         *
         * 1.执行任务: 
         * 如果小于核心线程数,尝试创建一个新线程来执行给定的任务。
         * 方法 addWorker() 就是真正的创建一个新线程来执行任务的方法。
         * addWorker()方法会对 runState 和 workerCount进行原子检查。
         * addWorker()方法会返回一个 boolean 值,通过返回 false 值来防止在不应该添加线程的情况下发出错误警报
         * 
         *
         * 2.添加到阻塞队列:
         * 未能满足条件执行完步骤 1 则添加到阻塞队列。
         * 如果任务可以成功排队,会再次进行检查,检查是否应该添加线程(因为现有线程自上次检查后就死了),
         * 或者自进入此方法以来该池已关闭。因此,需要重新检查状态,并在停止的情况下在必要时回滚队列,如果没有,则启动一个新线程。 
         *
         * 3.拒绝任务: 
         * 如果无法将任务添加至阻塞队列,最大线程数也未达到最大会尝试添加一个新的线程。如果失败,说明线程池已关闭或处于饱和状态,因此拒绝该任务。
         */
         
         //clt记录着runState和workerCount
        int c = ctl.get();
        /*
         * workerCountOf方法取出低29位的值,表示当前活动的线程数;
         * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
             * 如果为true,根据corePoolSize来判断;
             * 如果为false,则根据maximumPoolSize来判断
             */
            if (addWorker(command, true))
                return;
            //如果添加失败,则重新获取ctl值    
            c = ctl.get();
        }
        //执行到此处说明从核心线程里给当前任务分配线程失败
        //如果当前线程池是运行状态并且任务添加到队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //重新获取ctl值。即使添加队列成功也要再次检查,如果不是运行状态,由于之前已经把任务添加到workerQueue 中了,所以要移除该任务,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
                /*
                 * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法 这里传入的参数表示:
                 * 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
                 * 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPolSize,添加线程时根据maximumPoolSize来判断;
                 * 如果判断workerCount大于0,则直接返回,在workQueue中新增的comman 会在将来的某个时刻被执行。
                 */
            //因为 任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中 获取任务。     
            else if (workerCountOf(recheck) == 0)
                //执行到这里说明任务已经添加到阻塞队列里了,最大线程数也未饱和,则创建一个新的线程去阻塞队列里拿任务
                //这步操作也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中 获取任务。
                //为什么要这样做呢?是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
                addWorker(null, false);
        }
        /*
         * 如果执行到这里,有两种情况:
         * 1. 线程池已经不是RUNNING状态;
         * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
         * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的 有限线程数量的上限设置为maximumPoolSize;
         * 如果失败则拒绝该任务
         */
        else if (!addWorker(command, false))
            reject(command);
    }
addWorker() 方法
/**
 * 检查是否可以根据当前线程池的状态添加一个新的工作线程去执行任务。
 * addWorker(runnable,true)表示从核心工作线程数中分配线程执行传进来的任务;
 * addWorker(null,false)表示从最大线程数中分配线程执行阻塞队列中的任务。
 * 线程池如果停止或者关闭则直接返回 false,如果线程池创建新线程失败同样也会返回 false。
 * 如果创建线程失败,或者线程工厂返回 null,或者执行当前 addWorker()的线程抛出异常,(注意是当前线程抛出异常,当前线程抛出异常只与当前任务有关,并不影响其他任务的执行),线程池的相关属性会立即回滚
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层 for 循环就是为了能给任务分配线程做准备,判断状态--> 原子递增 workerCount
    // 直到线程池状态不符合条件返回 false ,或者自增成功跳出 for 循环
    // 同样的,getTask()从阻塞队列中获取任务的时候也是这么个逻辑,先对 workerCount 原子递减,再去执行任务
    for (;;) {
        //可以看到,每一步操作都会对线程池的状态参数做判断
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //也是对线程池状态,队列状态做检查
        /**
         * 这里的状态判断也很好理解:
         * 线程池状态为SHUTDOWN,不会再接受新的任务了,返回 false
         * 想城池状态不为SHUTDOWN,传进来的任务为空,并且阻塞队列里也没任务,那还执行个锤子任务,同样返回 false 
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //前面已经判断过满足为任务分配一个线程去执行任务
        //这个 for 循环就是为了创建任务做准备,先去原子性的递增 workerCount,workerCount 递增成功了才会去真正的为任务分配线程去执行
        for (;;) {
            //当前工作线程数
            int wc = workerCountOf(c);
            //当前工作线程数大于corePoolSize 或者 maximumPoolSize (跟谁比较就是根据传进来的参数 core 判断),
            //说明也没有分配的线程可以执行任务了,返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /** 
             * 执行到这里说明满足条件了,可以分配出来线程去执行任务了
             * 尝试增加workerCount,如果成功,则跳出第一个for循环
             * 这里是进行 CAS 自增 ctl 的 workerCount(先把数量自增,再跳出 for 循环创建新的线程去执行任务)
             * 该方法内部也是调用了原子类 AtomicInteger.compareAndSet()方法,保证原子递增
             */    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //如果尝试添加新的工作线程失败则会继续判断当前线程池的状态,状态满足继续尝试为当前线程分配工作线程    
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //跳出 for 循环之后说明线程池的工作线程数 workerCount 已经调节过了,接下来要做到就是真正的分配线程,执行任务
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Worker 对象是个内部类,其实就是用threatFactory 生成一个新的线程
        //继承 AQS 类,实现Runable 接口,重写 run()方法,重写的 run()方法也很重要,后面会讲
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //加锁保证同步
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //还是先进行一通的线程池状态检查
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //这个 workers 是个 HashSet,线程池也是通过维护这个 workers 控制任务的执行
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        //largestPoolSize记录着线程池中出现过的最大线程数量
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //终于,调用线程的 start() 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果创建线程失败,就要回滚线程池的状态了
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker 类

Worker 类是用来干嘛的,存在的意义

回头再看 Worker 类,线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker对象(HashSet)。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属 性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创 建的线程,是用来处理任务的线程。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来 实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;

  2. 如果正在执行任务,则不应该中断线程;

  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;

  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是否是空闲状态;

  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的 线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。 此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

Worker 类中以及涉及到的重要的方法

  • tryAcquire(int unused) 方法

  /**
    * 用于判断线程是否空闲以及是否可以被中断
    */
   protected boolean tryAcquire(int unused) {
           //cas 修改状态,不可重入
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

tryAcquire方法是根据state是否是0来判断的,所以,将state设置为-1是 为了禁止在执行任务前对线程进行中断。

正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。

  • runWorker(Worker w)方法

/**
 * Worker 类实现 Runnable 接口,重写的 run()方法
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允许中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //while 循环就是不断的去执行任务,当自己的任务(firstTask)执行完之后依然会从阻塞队列里拿任务去执行,就这样的操作保证了线程的重用
        //task 为空则从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 如果线程池正在停止,那么要保证当前线程是中断状态,如果不是的话,则要保证当前线程不是中断状态
             * 这里为什么要这么做呢?考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会 把状态设置为STOP,
             * 回顾一下STOP状态:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,
             * 调用 shutdownNow() 方法会使线程池进入到STOP状态。
             * STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在 RUNNING或者SHUTDOWN状态时线程是非中断状态的,
             * 因为Thread.interrupted()方法会重置中断的状态。
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

总结一下runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;

  2. getTask()方法从阻塞队列中取任务;

  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是 中断状态;

  4. 调用task.run()执行任务;

  5. 如果task为null则跳出循环,执行processWorkerExit()方法;

  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

  • getTask()方法

/**
 * 从阻塞队列中获取任务,返回值是 Runnable
 * 线程池状态不满足执行条件时直接返回 null
 */
private Runnable getTask() {
    //timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?
    
    //这里两个 for 循环操作和 addWorker() 方法里的两个 for 循环操作思想一样
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //仍然检查线程池状态
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //timed变量用于判断是否需要进行超时控制
        //allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时
        //wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
        //对于超过核心线程数量的这些线程,需要进行超时控制
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //和 addWorker() 里流程一样,也是先对线程池中 workerCount 进行控制,再进行后面的执行任务操作
        //满足条件则 workerCount 数量减一
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null
            //否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回
            timedOut = false;
        }
    }
}
processWorkerExit() 
/**
 * getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行 processWorkerExit方法。
 * 做线程池的善后工作
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1
    //如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount减1了,这里就不需要再减了
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工作线程
        // workers 是前面提到的 HashSet,线程池就是通过维护这个 worker()来保证线程池运作的 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();


    /**
     * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生 命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程, runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入 processWorkerExit方法,整个线程结束

四、思考

  1. 线程池如何实现线程重用的?

就是在重写的 run()方法里,通过 while 循环,执行完 firstTask 之后依然从阻塞队列里获取任务去执行。

  1. 线程超时怎么处理?

当前面任务抛出异常,后面的线程还会执行吗? 答案是会。也是 while 循环里找答案,当前线程抛出异常只会对当前线程产生影响,对线程池里其他任务不会有影响。

  1. 什么时候会销毁?

是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

  1. 阻塞队列选取?在JDK中提供了如下阻塞队列:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

  • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

  • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;

  • PriorityBlockingQuene:具有优先级的无界阻塞队列;

  1. 丢弃策略选取?线程池提供了4种策略:

  • AbortPolicy:直接抛出异常,默认策略;

  • CallerRunsPolicy:用调用者所在的线程来执行任务;

  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

  • DiscardPolicy:直接丢弃任务;

  1. 线程数如何设置?

  • 一般设法是会根据我们任务的类型去设置,简单分为: CPU 密集型 :CPU 核数 + 1 IO 密集型:2*CPU 核数 + 1

《Java并发编程实战》中最原始的公式是这样的: Nthreads=Ncpu∗Ucpu∗(1+WC)Nthreads=Ncpu∗Ucpu∗(1+CW);

  • Ncpu代表CPU的个数,

  • Ucpu代表CPU利用率的期望值(0

  • WCCW仍然是等待时间与计算时间的比例。

上面提供的公式相当于目标CPU利用率为100%。 通常系统中不止一个线程池,所以实际配置线程数应该将目标CPU利用率计算进去。

关于如何进行ThreadPoolExecutor 源码解析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。


分享名称:如何进行ThreadPoolExecutor源码解析
网站地址:http://cqcxhl.cn/article/gjsiep.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP