来源:Java并发---- Executor并发框架--ThreadToolExecutor类详解(execute方法,关闭方法)

作者:Chang_Wen_Liu

构造方法

源码详解

创建线程池并执行任务

关闭线程池

在看这篇文章之前,建议大家先温习一下上篇:Java并发/Executor并发框架/线程池,ThreadToolExecutor初步理解

1、构造方法

请参考上篇文章:Java并发/Executor并发框架/线程池,ThreadToolExecutor初步理解

2、源码详解

线程池能够复用线程,减少线程创建,销毁,恢复等状态切换的开销,提高程序的性能。一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行的任务的队列。

ThreadPoolExecutor类中定义了一些与线程状态与活动线程数相关的一些变量,如下:

线程池内部有一些状态,先来了解下这些状态的机制。以下用代码注释的方式来解释其中的含义。

/这个是用一个int来表示workerCount和runState的,其中runState占int的高3位,其它29位为workerCount的值。workerCount:当前活动的线程数;runState:线程池的当前状态。用AtomicInteger是因为其在并发下使用compareAndSet效率非常高;当改变当前活动的线程数时只对低29位操作,如每次加一减一,workerCount的值变了,但不会影响高3位的runState的值。当改变当前状态的时候,只对高3位操作,不会改变低29位的计数值。这里有一个假设,就是当前活动的线程数不会超过29位能表示的值,即不会超过536870911,就目前以及可预见的很长一段时间来讲,这个值是足够用了/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//COUNT_BITS,就是用来表示workerCount占用一个int的位数,其值为前面说的29private static final int COUNT_BITS = Integer.SIZE - 3;/CAPACITY为29位能表示的最大容量,即workerCount实际能用的最大值。其值的二进制为:00011111111111111111111111111111(占29位,29个1)/private static final int CAPACITY = (1 << COUNT_BITS) - 1;/以下常量是线程池的状态,状态存储在int的高3位,所以要左移29位。腾出的低29位来表示workerCount注意,这5个状态是有大小关系的。RUNNING<shutdown<stop<tidying<terminated 当需要判断多个状态时,只需要用<或="">来判断就可以了//RUNNING的含义:线程池能接受新任务,并且可以运行队列中的任务-1的二进制为32个1,移位后为:11100000000000000000000000000000/private static final int RUNNING = -1 << COUNT_BITS;/SHUTDOWN的含义:不再接受新任务,但仍可以执行队列中的任务0的二进制为32个0,移位后还是全0/private static final int SHUTDOWN = 0 << COUNT_BITS;/STOP的含义:不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000/private static final int STOP = 1 << COUNT_BITS;/TIDYING的含义:所有任务均已终止,workerCount的值为0,转到TIDYING状态的线程即将要执行terminated()钩子方法.2的二进制为00000000000000000000000000000010移位后01000000000000000000000000000000/private static final int TIDYING = 2 << COUNT_BITS;/TERMINATED的含义:terminated()方法执行结束.3的二进制为00000000000000000000000000000011移位后01100000000000000000000000000000/private static final int TERMINATED = 3 << COUNT_BITS;
各状态之间可能的转变有以下几种RUNNING -> SHUTDOWN调用了shutdown方法线程池实现了finalize方法在里面调用了shutdown方法因此shutdown可能是在finalize中被隐式调用的(RUNNING or SHUTDOWN) -> STOP调用了shutdownNow方法SHUTDOWN -> TIDYING当队列和线程池均为空的时候STOP -> TIDYING当线程池为空的时候TIDYING -> TERMINATEDterminated()钩子方法调用完毕
/传入的参数为存储runState和workerCount的int值,这个方法用于取出runState的值。~为按位取反操作,~CAPACITY值为:11100000000000000000000000000000,再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值/private static int runStateOf(int c) { return c & ~CAPACITY; }/传入的参数为存储runState和workerCount的int值,这个方法用于取出workerCount的值。因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了,保留参数的低29位,也就是workerCount的值。/private static int workerCountOf(int c) { return c & CAPACITY; }/将runState和workerCount存到同一个int中,这里的rs就是runState,是已经移位过的值,填充返回值的高3位,wc填充返回值的低29位/private static int ctlOf(int rs, int wc) { return rs | wc; }

1、创建线程池并执行任务

接下来通过状态来看线程池的运行:

完成线程的创建后,

首先通过 submit 或者 execute 方法提交,实际submit 方法也是内部调用 execute方法;

submit方法的实现是在AbstractExecutorService类里,三种重载方法类似,

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask); //<span style="font-size:12px;">内部调用 execute方法</span> return ftask;}

execute方法是在ThreadPoolExecutor类里实现

<pre name="code" class="java"> public void execute(Runnable command) {if (command == null) // 任务为null,则抛出异常 throw new NullPointerException();// 然后通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数。 int c = ctl.get();/如果当前活动的线程数小于corePoolSize,则增加一个线程来执行新传入的任务。 什么概念?也就是说当池中的线程数小于corePoolSize的时候,不管池中的线程是否有空闲的, 每次调用该方法都去增加一个线程,直到池中的数目达到corePoolSize为止。 /if (workerCountOf(c) < corePoolSize) {/ addWorker()返回值表示: 1、true 表示需要检测当前运行的线程是否小于corePoolSize 2、false 表示需要检测当前运行的线程数量是否小于maxPoolSize /if (addWorker(command, true))return; // 新线程创建成功,终止该方法的执行c = ctl.get(); // 任务添加到线程失败,取出记录着runState和workerCount 的 ctl的当前值 }/ 方法解释: isRunning(c) 当前线程池是否处于运行状态。源代码是通过判断c < SHUTDOWN 来确定返回值。 由于RUNNING才会接收新任务,且只有这个值-1才小于SHUTDOWN workQueue.offer(command) 任务添加到缓冲队列 /if (isRunning(c) && workQueue.offer(command)) { // 当前线程处于运行状态且成功添加到缓冲队列 int recheck = ctl.get();/ 如果 线程池已经处于非运行状态,则从缓冲队列中移除任务然后采用线程池指定的策略拒绝任务 如果 线程池中任务数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null /if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0) // 得到活动线程数为0 addWorker(null, false);}/ 当不满足以下两个条件时执行如下代码: 1. 当前线程池并不处于Running状态 2. 当前线程池处于Running状态,但是缓冲队列已经满了 /else if (!addWorker(command, false))reject(command); // 采用线程池指定的策略拒绝任务 }private static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean isRunning(int c) {return c < SHUTDOWN;}

当前活动的线程小于corePoolSize了,那么等于和大于corePoolSize怎么处理呢? 1> 当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程,在这里第20行的if语句已经体现出来了。这里利用了&&的特性,只有当第一个条件会真时才会去判断第二个条件,第一个条件是isRunning(),判断线程池是否处于RUNNING状态,因为只有在这个状态下才会接受新任务,否则就拒绝,如果正处于RUNNING状态,那么就加入队列,如果加入失败可能就是队列已经满了,这时候直接执行第29行。  2> 在execute()方法中,当 当前活动的线程数量 < corePoolSize 时,会执行addWorker()方法,关于addWorker(),它是用来直接新建线程用的,之所以叫addWorker而不是addThread是因为在线程池中,所有的线程都用一个Worker对象包装着,来看一下这个方法:

接下来介绍上面提到的addWorker方法,方法实现如下:

<pre name="code" class="java"> private static int runStateOf(int c) { return c & ~CAPACITY; }//使用原子的compareAndSet来替换旧值。但并不保证成功,若成功,该方法返回true;若失败,则返回false private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}private final ReentrantLock mainLock = new ReentrantLock();/ 创建并执行新线程 @param firstTask 用于指定新增的线程执行的第一个任务 @param core true表示在新增线程时会判断当前活动线程数是否少于corePoolSize, false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。 @return 是否成功新增一个线程 /private boolean addWorker(Runnable firstTask, boolean core) {retry: // 标签,写代码时避免使用标签。 for (;;) {int c = ctl.get(); // 获取记录着runState和workCount的int变量的当前值 int rs = runStateOf(c); // 取出当前线程池运行的状态值/ 这个条件代表着以下几个情景,就直接返回false说明线程创建失败: 1.rs > SHUTDOWN; 此时不再接收新任务,且所有的任务已经执行完毕2.rs = SHUTDOWN; 此时不再接收新任务,但是会执行队列中的任务,在后面的或语句中,第一个不成立,firstTask != null成立 3.rs = SHUTDOWN;此时不再接收新任务,fistTask == null,任务队列workQueue已经空了 转换成一个等价实现:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()), rs != SHUTDOWN结合前面的rs >= SHUTDOWN,表示线程池的状态已经由SHUTDOWN转为剩余的三个状态之一了,此时就要拒绝这个传入的任务;括号里的第二个条件表示状态已经为非运行状态了,却传入了一个任务,这个任务也要拒绝; 括号里的第三个条件表示线程池的状态不为RUNNING,但队列中没有任务了,就不需要新增线程了。 /if (rs >= SHUTDOWN && //如果已经调用了SHUTDOWN,池的状态改变后,这里就为true ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c); // 取出当前活动的线程数。 / 若当前活动线程数超过低29位能表示的最大值(也就是容量)时就不能再加线程了,因为再加就会影响状态的值了!若传入的参数core参数为true,则当前活动的线程数要小于corePoolSize才能创建新线程,大于或等于corePoolSize就不能再创建了; 若core参数为false,则当前活动的线程数要小于maximumPoolSize才能创建新线程,大于或等于maximumPoolSize就不能再创建了。 /if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建 if (compareAndIncrementWorkerCount(c)) // 使用CAS操作将当前活动线程数加一 break retry; // 当加一成功,则跳出大循环,进入循环体后面的真正新增线程的地方; c = ctl.get(); // Re-read ctl//若加一不成功,判断下当前状态改变没有,若改变了则重新开始外层循环的下一次迭代, // 若状态没有改变,只是加一失败,那么就继续内层循环,直到加一成功。 / 往当前活动线程数加一成功后,就会来真的新增线程了 (先加一后新增线程可以避免锁的使用,使用CAS原子操作加一后,其它线程看到的就是加一后的值, 若达到上限,其它线程就不会去创建新线程了。若先创建线程,再去加一,若不加锁, 假如一个使用无界队列的线程池,当前活动线程数为corePoolSize少一,外部线程在执行execute的时候都发现线程数不足corePoolSize,都去创建线程, 而最终只能有一个线程进入线程池,其它的都得作废,而加锁可以解决这个问题,但是降低了线程池的可伸缩性)。 /if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop }}//下面这里就是开始创建新的线程了!!! //Worker的也是Runnable的实现类 boolean workerStarted = false;boolean workerAdded = false;ThreadPoolExecutor.Worker w = null;try {final ReentrantLock mainLock = this.mainLock;/ Worker为内部类,封装了线程和任务,通过ThreadFactory创建线程,可能失败抛异常或者返回null在Worker的构造方法中,创建了一个线程对象,但这个线程是没有启动的。 在构造方法中启动线程,会导致this对象泄露,让线程看到未完整构建的对象,这个要避免。 /w = new ThreadPoolExecutor.Worker(firstTask);//因为不可以直接在Worker的构造方法中进行线程创建,所以要把它的引用赋给t方便后面进行线程创建 final Thread t = w.thread;//因为上面的两个整个过程并不是互斥的,所以创建完线程对象后再来判断下当前池的状态 / 为啥会出现t==null? Worker的构造方法是通过调用getThreadFactory().newThread(this)方法来创建线程的,而newThread方法可能会返回null(threadFactory可以通过ThreadPoolExecutor的构造方法传入,如没有传入,有个默认实现) /if (t != null) {mainLock.lock();try {//再次取出ctl的当前值,用于进行状态的检查,防止线程池的已经状态改变了 int c = ctl.get();int rs = runStateOf(c);/ 当创建线程失败要减少当前活动线程数;当池的状态非RUNNING和SHUTDOWN的时候,也需要减少当前活动线程数,并要尝试终止线程池; 当线程池的状态为非RUNNING,且有初始任务要执行的时候,因为这个状态要拒绝新进来的任务,所以这个新增的线程也没有用处了。/if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { // 当状态判断没有问题,往下执行 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException();workers.add(w); //将创建的线程添加到workers容器中(线程终止时会从里面移除) int s = workers.size(); //获取当前线程活动的数量 if (s > largestPoolSize) //判断当前线程活动的数量是否超过线程池最大的线程数量 largestPoolSize = s; //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了 workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); //开启线程 workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w); // 创建线程失败 }return workerStarted;}// 创建线程失败 private void addWorkerFailed(ThreadPoolExecutor.Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w); // 从workers容器中移除 decrementWorkerCount(); //减少一个活动的当前线程数 tryTerminate(); //尝试终止线程池 } finally {mainLock.unlock();}}

那么在创建线程的时候,线程执行的是什么的呢?

我们前面提到Worker继承的其实也是Runnable,它在创建线程的时候是以自身作为任务传进先创建的线程中的,这段比较简单,我就不一一注释了,只是给出源代码给大家看吧。

在继续其他方法之前,先说下Worker这个内部类。我们看一下每次新增一个线程后这个线程都做了些什么,显然需要看看Worker的run方法:

<pre name="code" class="java"> / Worker实现了Runnable接口,可以在后续作为Thread的构造方法参数用以创建线程。 同时,Worker还继承了AbstractQueuedSynchronizer类,只是简化每个Worker对象相关的锁的获取,在每次执行一个任务的时候,都需要持有这个锁。 在以前的ThreadPoolExecutor实现中,并没有继承AbstractQueuedSynchronizer, 而是在Worker内部声明了一个对象字段private final ReentrantLock runLock = new ReentrantLock(), 每次执行一个任务的时候,需要对runLock加锁。 /private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {......Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;//this指的是worker对象本身 this.thread = getThreadFactory().newThread(this);}//它以自身的对象作为线程任务传进去,那么它的run方法又是怎样的呢? public void run() {runWorker(this);}......}

只是简单的调用了runWorker方法,继续看runWorker

这个方法逻辑很简单。还记得前面提到的新增线程时指定第一个任务吗?若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行,直到getTask返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。getTask方法后面会详细讲解。当有任务执行时(之前通过参数传入的第一个任务或从队列中获取的任务),需要做一个状态判断。也就是clearInterruptsForTaskRun方法,来看看这个方法干了什么:(这个原来的方法就是下面if里的条件)

<pre name="code" class="java"> / 执行Worker中的任务,它的执行流程是这样的: 若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行, 直到getTask()返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。/final void runWorker(ThreadPoolExecutor.Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask; //将当前Worker中的任务取出来交给task,并释放掉w.firstTask占用的内存w.firstTask = null;w.unlock(); // allow interrupts/用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false该变量的值在processWorkerExit()会使用来判断线程是否由于异常终止/boolean completedAbruptly = true;try {//执行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程执行了多个任务while (task != null || (task = getTask()) != null) {w.lock();/这里利用了&&的短路特性,当前一个条件为true的时候才去执行后面一个条件。当当前状态小于STOP时,也就是当前状态为RUNNING时,需要清除线程的中断状态(线程池为RUNNING状态,线程却的中断状态却为true,可能在上次执行的任务里调用了类似Thread.currentThread().interrupt()的方法,因此当然不能让接下来执行的任务受之前任务的影响),如果Thread.interrupted()返回false,表示以前没有设置过中断,整个if的结果就是false;如果Thread.interrupted()返回true,那就要考虑为什么会是true了。是RUNNING状态就已经被中断了还是判断第一个条件后另外一个非池中的线程调用了shutdownNow中断了该线程?如果是后者,表示正在执行的任务需要中断,所以第三个条件判断当前池的状态是否不为RUNNING,如果不为RUNNING,那么就要重新中断该线程以维护shutdownNow方法的语义。/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted()) //这个if条件就是原来的clearInterruptsForTaskRunwt.interrupt();try {//在真正执行任务前,调用该方法。这是一个钩子方法,// 用户可以继承ThreadPoolExecutor重写beforeExecute方法来做一些事情。beforeExecute(wt, task);Throwable thrown = null;try {task.run(); //开始执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); //也是个钩子方法,同beforeExecute方法。}} finally {/随后将task变量置为null,让其再从队列里接收任务,若不置为null,就满足while的第一个条件了,结果就是这个任务被死循环执行/task = null;w.completedTasks++; // 然后将该线程完成的任务数自增w.unlock();/只有当线程终止的时候,才会将该线程执行的任务总数加到线程池的completedTaskCount中,所以completedTaskCount这个值并不是一个准确值。/}}/在最后有一个将completedAbruptly置为false的操作,如果线程能走到这里来,说明该线程在执行任务过程中没有抛出异常,也就是说该线程并不是异常结束的,而是正常结束的;如果走不到这一步,completedAbruptly的值还是初始值true,表示线程是异常结束的。/completedAbruptly = false;} finally {// 线程结束时,会调用processWorkerExit方法做一些清理和数据同步的工作:processWorkerExit(w, completedAbruptly);}}private static boolean runStateAtLeast(int c, int s) {return c >= s;}protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }

线程结束时,会调用processWorkerExit

方法做一些清理和数据同步的工作:

<pre name="code" class="java"> / 对线程的结束做一些清理和数据同步 @param w 封装线程的Worker @param completedAbruptly 表示该线程是否结束于异常/private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasnt adjusteddecrementWorkerCount(); //此时将线程数量减一final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; //统计总共完成的任务数workers.remove(w); //将该线程数从workers容器中移除} finally {mainLock.unlock();}tryTerminate(); //尝试终止线程池int c = ctl.get();/接下来的这个if块要做的事儿了。当池的状态还是RUNNING,又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了,也就是最后的addWorker操作 /if (runStateLessThan(c, STOP)) { //如果当前运行状态为RUNNING,SHUTDOWNif (!completedAbruptly) { //如果线程不是结束于异常int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否允许线程超时结束if (min == 0 && ! workQueue.isEmpty()) //如果允许把那个且队列不为空min = 1; //至少要保留一个线程来完成任务//如果当前活动的线程数大于等于最小的值// 1.不允许核心线程超时结束,则必须要使得活动线程数超过corePoolSize数才可以// 2. 允许核心线程超时结束,但是队列中有任务,必须留至少一个线程if (workerCountOf(c) >= min)return; // replacement not needed}//直接加个线程addWorker(null, false);}}private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}private static boolean runStateLessThan(int c, int s) {return c < s;}

如果线程是异常结束(被中断、任务执行本身异常等),当前活动的线程数减少一个。如果是正常结束的呢?不应该将其也减一吗?不用担心,在runWorker的while最后一次循环中的getTask方法里做掉了。 接下来将该线程执行过的任务数加到completedTaskCount中,这个在前面也提到了。然后从workers中去除该工作线程。如果该线程的中断是因为调用了shutdown、shutdownNow接口而中断的该如何处理?就是这个tryTerminate了,来看下tryTerminate

干了什么:

<pre name="code" class="java"> / 执行该方法,根据线程池状态进行 判断是否结束线程池 /final void tryTerminate() {for (;;) {int c = ctl.get();/当池的状态为SHUTDOWN且任务队列为空,需要将池的状态转变为TERMINATED; 当池的状态为STOP且池中的当前活动线程数为0,要将池的状态转换成TERMINATED。这个方法就是用来做这种状态转变的。 如果状态是RUNNING,表示线程池还正在提供服务,不需要状态变换;如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了; 若状态为SHUTDWON,但队列中还有任务,此时这些任务还需要做掉,因此池中的线程不能终止,因此,这种情况下也不需要做什么。 /if (isRunning(c) || //线程池正在运行中,自然不能结束线程池啦 runStateAtLeast(c, TIDYING) || //如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池出于SHUTDOWN状态,但是任务队列不为空,自然不能结束线程池啦 return;//如果状态为SHUTDWON但队列中已经没有任务了,这里调用了一个interruptIdleWorkers(ONLY_ONE)操作去中断一个空闲线程。 if (workerCountOf(c) != 0) { // Eligible to terminate / 调用这个方法的目的是将shutdown信号传播给其它线程。调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务, 那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN, 在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件),然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务, 所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候,这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出, 然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。 /interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {/ 当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了, 进入TIDYING状态就可以执行钩子方法terminated(), 该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变) /if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); //执行该方法,结束线程池 } finally {ctl.set(ctlOf(TERMINATED, 0));/ 当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束,即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的, 所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。 /termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS }}private static boolean runStateAtLeast(int c, int s) {return c >= s;}

继续说processWorkerExit方法中调用tryTerminate之后的代码。如果池的状态仍为RUNNING,而线程是因为执行的任务本身抛出了异常而结束或正常结束时该如何处理?这时候池的状态还是RUNNING呢!那就是接下来的这个if块要做的事儿了。当池的状态还是RUNNING,又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了,也就是最后的addWorker操作。而正常结束又有几种情况了,如果允许core线程超时,也就是allowCoreThreadTimeOut为true,那么在池中没有任务的时候,调用带有时限参数的poll方法时就可能返回null,致使线程正常退出,如果允许core线程超时,池中最小的线程数可为0,如果此时队列又有任务了,那么池中必须要有一个线程,若池中活动的线程数不为0,就不需要新增线程来替换死掉的线程,否则就要新增一个;如果不允许core线程超时,池中的线程必须达到corePoolSize个才能让多的线程退出,而不需要用新的线程替换,否则也需要新增一个线程替换这个死掉的线程。

在runWorker执行任务之前调用了w.lock操作,为什么要在执行任务的时候锁定这个每个线程都有一份的锁呢?原因在于调用了线程池shutdown后(前面说过,SHUTDOWN的含义:不再接受新任务,但仍可以执行队列中的任务),会调用interruptIdleWorkers方法去终止空闲线程,该方法会持有mainLock锁,但此时队列中可能还有很多任务,线程也可能还正在执行任务,就可能有一些线程终止不掉。此时,有些线程可能刚执行任务结束,正准备再去队列中拿任务,有些可能还正在执行任务,有些可能刚拿到一个新的任务,对于仍进入队列中拿任务的线程,最终队列中任务会被拿完,而此时拿任务的线程会发现线程池的状态为SHUTDOWN,就会立马返回一个null,返回null意味着ThreadPoolExecutor.runWorker中的循环退出了,这个线程也就自动终止了;此外拿任务并没有持有mainLock锁,所以在终止空闲线程与线程非执行任务期间(如从队列获取任务)存在竞态条件。有可能已经判断了线程池的状态仍未RUNNING,准备从queue里take任务,而在执行take之前,另一个非池中的线程可能调用了shutdown,并且执行完了interruptIdleWorkers方法(马上就会介绍这个方法),若此时队列中恰好没有任务了,若这个正要调用take的线程阻塞,就不会醒过来了,不用担心,interruptIdleWorkers已经中断了该线程,而take是可以响应中断的,再调用take后会立马抛出异常。 对于正在执行中的任务,其它线程不能直接将这个正在线程中断掉,因此除了mainLock锁,interruptIdleWorkers还需要持有线程执行任务时获取的那把锁(这也是为什么执行任务的时候需要获取那个每个线程都有的锁的原因),如果获取不成功表示线程正在执行任务。

-----------------------------------------------------------------------------------------

2、关闭线程池

看下终止空闲线程的方法实现:

private void interruptIdleWorkers() {interruptIdleWorkers(false);}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断且能立即(tryLock)获取到前面提到的那把线程任务锁时,就中断该线程。为什么需要持有mainLock?mainLock是用来保护workers变量的。

shutdown是持有mainLock的,但是runWorker的时候并没有,那么,会不会出现碰巧出现同一时刻池中所有线程都刚好执行完任务,去取任务的时候发现池的状态为SHUTDOWN,就立即返回null并终止线程,而导致队列中的剩下的任务得不到执行?这是不会出现的,来看下getTask的实现:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop }try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

若进入这个方法的工作线程是即将要终止的线程,该方法就必须返回null,有以下几种情形需要返回null:

1、当前活动线程数超过maximumPoolSize个(调用了setMaximumPoolSize的缘故);

2、线程池已经停止(STOP);

3、线程池已经关闭(SHUTDOWN)且任务队列为空;

4、工作线程获取任务超时,且满足(allowCoreThreadTimeOut || workerCount > corePoolSize)条件

先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一,这时候不管队列中有没有任务,都不用去执行了;如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了;所以这两种场景中获取任务的线程没必要存在了,这里调用了decrementWorkerCount减少活动线程数。前面在processWorkerExit中也提到,如果任务是非正常终止,processWorkerExit里要将活动线程数减一,正常的线程退出,减一是在这里做的。返回null之后,runWorker的while循环就退出了。接下来是个嵌套循环,它的目的就是上述的1和4.后面是从队列中取任务,比较简单,不多说。

// 确保允许调用发interrupt每个Worker线程的private void checkShutdownAccess() {SecurityManager security = System.getSecurityManager();if (security != null) {// 检查权限(以抛出异常的形式) security.checkPermission(shutdownPerm);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers)security.checkAccess(w.thread);} finally {mainLock.unlock();}}}/ 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。 如果线程池已经关闭,则调用没有其他作用。 该方法不会等待之前已经提交的任务执行完毕,awaitTermination方法才有这个效果。 抛出:SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")), 或者安全管理器的 checkAccess 方法拒绝访问。 /public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查终止线程池的线程是否有权限。 advanceRunState(SHUTDOWN);// 设置线程池的状态为关闭状态。 interruptIdleWorkers(); // 中断线程池中空闲的线程 onShutdown(); // 钩子函数,在ThreadPoolExecutor中没有任何动作 } finally {mainLock.unlock();}tryTerminate(); // 尝试终止线程池}/ 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。 在从此方法返回的任务队列中排空(移除)这些任务。 并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。 返回:从未开始执行的任务的列表。 抛出:SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")), 或者安全管理器的 checkAccess 方法拒绝访问。 /public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}/ 中断所有线程,即使线程还是active的。忽略所有SecurityExceptions异常。 /private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}/ 中断那些可能在等待执行任务的线程,让他们能检查是否可以terminate。 这里直接吞了SecurityException异常,防止某些线程在interrupt之后仍然处于uninterrupted状态。 @param onlyOne 如果是true,最多只中断一个Worker。 这种情况只有在tryTerminate调用的时候才会出现,表示可以termination,但是还有其他的Worker存在。 在这种情况下,最多只有一个处于等待的Worker被中断,来保证shutdown信号的繁衍传递(propagate语义), 以便能处理所有信号都处于等待状态的情况, 中断任意一个随机的线程都能保证从shutdown操作开始之后新添加的Worker最终都能退出。 为了保证最终的termination,永远只interrupt一个线程就足够了(为什么足够), 但是shutdown操作总是所有idle的workers,这样冗余的workers可以立即退出, 而不是等待一个straggler任务来完成操作。 /private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}/ Common form of interruptIdleWorkers, to avoid having to remember what the boolean argument means. */private void interruptIdleWorkers() {interruptIdleWorkers(false);}

以上,核心方法分析结束。欢迎来原文章中指出理解错漏的地方。

Java并发---- Executor并发框架--ThreadToolExecutor类详解(execute方法,关闭方法)

—————————————————————————————————————————

在学习过程如果有任何疑问,请来极乐网(http://www.dreawer.com

)提问,或者扫描下方二维码,关注极乐官方微信,在平台下方留言~

分类: 教程分享 标签: 暂无标签

评论

暂无评论数据

暂无评论数据

目录