线程池
概述
池化技术在各种场景的应用下已经屡见不鲜,线程池当然也属于是池化技术的一种。只要聊到池化技术,它自然主要目的就是为了减少每次获取资源的消耗,提高资源的利用率。
线程池介绍
线程池按照名称理解就是管理一系列的线程的资源池,它负责生产和销毁线程。
使用线程池的好处:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就立即执行;
- 提高线程的看额管理型:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控;
线程池的工作机制
首先我们需要明确线程池的目的是什么,线程池的目的就是为了提供线程的利用率,由线程来负责线程的创建和销毁。所以我们需要着重关注的就是,线程池中线程的创建时机和销毁时机,了解清除这两点之后就可以对线程池的工作机制更加的清晰。
线程池的参数介绍
关于线程池的参数,可以直接查看ThreadPoolExcutor类的构造函数就可以了:
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}其中核心的参数有:
corePoolSize:核心线程maxiumPoolSize:最大线程数keepAliveTime:线程的最大空闲时间timeunit:线程最大空闲时间的单位threadFactory:线程工厂,用来创建线程,一般默认即可rejectExecutionHandler:拒绝策略wokerQueue:线程的工作队列
线程池提交任务的方式
一般而言,我们利用线程池执行任务的时候会用到submit()和execute()两个方法。
submit()方法
submit()方法一共有三种形式,它是ExecutorService接口中的方法,在ThreadPoolExecutor中并没有重写这个方法,所以它的具体实现是在AbstractExecutorService类中:
// 提交一个Callable任务,可以返回结果并可以抛出异常任务
<T> Future<T> submit(Callable<T> task);
// 提交一个Runnable任务,无返回值,使用future.get()的时候返回null
Future<?> submit(Runnable task);
// 提交一个Runable任务,并指定一个返回值
<T> Future<T> submit(Runnable task, T result);execute()方法
execute()方法就比较简单了,它是Executor类的接口,并且由ThreadPoolExecutor类实现了。它只有一种形式:
public void execute(Runnable command);submit()与execute()的区别
- execute()方法是Executor接口中的方法,而submit()方法是ExecutorService接口中的方法;
- execute()方法提交一个额没有返回的Runnable任务给线程池执行,而submit()方法支持提交没有返回值的Runnable接口和有返回的Callable接口;
- execute()方法不能获取任务的执行清洁过,而submit()方法可以通过返回的值来判断任务的执行结果;
- execute()方法不能获取任务的状态,但是submit()方法可以通过Future.isDone()或Future.isCancelled()等方法来获取任务的状态;
- execute方法不能取消任务,但是submit()方法可以通过Future.cancel()方法来取消任务执行;
- execute()方法在任务中抛出的异常会被线程池捕获,通常不会传递到调用方,只能通过线程池的异常处理器来处理;而submit()方法在任务中抛出异常,可以通过Future.get()捕获ExecutionException,从而获取任务抛出的原始异常;
关于为什么submit()方法抛出的异常可以通过Future.get()方法抛出,可以看下FutureTask中的run()方法,它在执行run()方法的时候如果出现了一场,就会通过setException()方法将异常设置到outcome属性中。最后,在我们调用get()方法的时候,会调用到report()方法来计算要返回的值:
- 如果任务是正常完成的,就返回对应的返回值
- 如果任务是被取消的,就抛出CancellationException异常
- 除此之外,抛出outcome收集到的异常信息
所以submit()方法收集异常不是这个方法原因,是因为它执行的不是FutureTask.run()方法,是在FutureTask中的特殊处理而已。
线程池中线程的创建时机
前面提到对于submit()方法而言,其最终执行任务的方法还是execute()方法,所以只需要研究execute()方法就可以了解线程池创建线程的时机。在聊这个之前,需要先了解在线程池的一个状态表示原理。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE 的值为 32
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;首先ctl是用来标识线程池状态的,它的高3位表示线程池的状态,低29位表示线程池中的线程数。
补码是计算机中普遍用来表示负数的方法,正数的补码于原码相同,负数的补码是反码加1。这里演示下RUNNING状态的值的计算过程:
首先 -1 的而进制使用补码表示:11111111 11111111 11111111 11111111
然后左移29位,低位补0:11100000 00000000 00000000 00000000
因为是负数,所以需要求反码再加1得到原码,所以取反码为:00011111 11111111 11111111 11111111
得到反码后加1得到原码的绝对值:00100000 00000000 00000000 00000000
所以 -1 左移 29 位的结果就是: -1 \times 2^{29} = 536870912 (与该数学计算方式等价)
根据上面的内容,我们来分析下每个状态下ctl的值:
|线程池状态|二进制表示|数值表示|特点| |-|-|-|-| |RUNNING|11100000 00000000 00000000 00000000|-536870912|线程池处于正常运行状态 此时可以接受新的任务 可以处理队列中的任务| |SHUTDOWN|00000000 00000000 00000000 00000000|0|调用shutdown()方法后进入该状态 此时不再接受新的任务 继续处理队列中的任务| |STOP|00100000 00000000 00000000 00000000|536870912|调用shutdownNow()方法后进入该状态 此时不再接受新的任务 不在处理队列中的任务 中断所有在进行的任务| |TIDYING|01000000 00000000 00000000 00000000|1073741824|过渡状态 工作线程数量为0 工作队列为空 调用terminateed()钩子方法终止线程池| |TERMINATED|01100000 00000000 00000000 00000000|1610612736|线程池完全终止状态 线程池生成周期结束|
因此线程池中的状态流转过程为:
flowchart LR
RUNNING -- shutdown() --> SHUTDOWN --> TIDYING --> TERMINATED
RUNNING -- shutdownNow() --> STOP --> TIDYING所以我们现在来看ctl变量的初始化:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 将RUNNING状态的值与0进行或运算,任何数字与0进行或运算都等于它本身,所以ctl的默认值为:-536870912
private static int ctlOf(int rs, int wc) { return rs | wc; }在已知条件:ctl的初始值为-536870912的时候,来分析下execute()方法的代码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl -- 线程池中的线程计数器,它的类型是AtomicInteger
int c = ctl.get();
// 判断当前线程池中工作线程(计算ctl的低29位的值进行计算)是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 小于核心线程,直接创建一个线程(当wokrQueue为空的时候,addWorker()返回fasle,返回false不代表线程没有创建, 仍然是创建了的)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前线程池中工作线程数是大于corePoolSize的,那么就会进入这个分支,首先会判断线程池是否还处于RUNNING状态,如果还处于RUNNING状态,就将任务保存在工作队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程不再RUNNING状态了,就删除这个任务,并且直接执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 此时线程池处于RUNNING状态,并且workQueue还没有满载,如果此时线程池中没有存活的线程,就创建一个(兜底策略,防止异常导线程池内的线程都销毁掉了,保证核心线程数)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当workQueue中已经满了,workQueue.offer()方法就会返回false,就进入了这个分支
else if (!addWorker(command, false))
// 触发它的拒绝策略
reject(command);
}根据不同的情况下来分下execute()方法:
假如是第一个任务提交到线程池,ctl的值为-536870912,此时workerCountOf()方法的返回值为0,它是小于corePoolSize的值。所以就会走到addWorke()方法(暂且不去谈addWorker()方法的具体作用,先粗略理解为新增一个线程并更新ctl的值);
假如是第二个任务提交到线程池,ctl的值为-536870911,此时wokerCountOf()方法的返回值为1,它是小于corePoolSize的值。
所以就会走到addWorker()方法;
假如是第十个任务提交到线程池,ctl的值为-536870902,此时wokerCountOf()方法的返回值10 ,它是等于corePoolSiz的值。
所以此时会根据线程池的来判断(暂且都认为线程池一直处于RUNNING状态),然后就会将任务保存在workQueue中;
假如workQueue的容量为10,那么在第21个任务提交到线程池(不一定是20个,因为有可能前面的线程已经执行完了,暂且理解为任务耗时无限长,投递21个任务的时候,前面的10个任务都没有被处理完),ctl的值为-536870902,此时wokerCountOf()方法的返回值10,它是等于corePoolSize的值,所以仍然尝试向workQueue中投递任务,但是因为workQueue已经满了,它就会走到addWorker()方法。
假如workQueue的容量为10 ,那么在第X(X远大于21)个任务的时候,addWoker()方法就会返回false,此时就会进入reject()方法,执行拒绝策略;
在execute()方法中有一个非常核心的方法,也是线程池中非常核心的方法,它就是addWoker()方法,所以还需要彻底理解addWorker()方法逻辑才可以。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 定义了一个死循环,每次都获取当前线程池的最新状态
for (int c = ctl.get();;) {
// 判断线程池的状态,判断逻辑就是 c>= SHUTDOWN 并且 c >= STOP,其实这里是要区别当前线城市SHUTDOWN还是STOP状态
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
// 如果是SHUTDOWN、STOP、workQueue为空,task为null的四种状态时直接返回false
return false;
// 当线程池处于RUNNING状态的时候,会进入下面的死循环
for (;;) {
// 首先判断当前线程的数量,如果是核心线程的话,当前工作线程与核心线程数的关系,如果不是,就判断与最大线程数的关系
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 当core为true,表示核心线程创建,如果当前工作线程数大于核心线程数,就返回false,就会将任务存放到workQueue
// 当core为false,表示非核心线程创建,如果当前工作线程数大于最大线程数,就返回false,就会执行拒绝策略
return false;
// 当满足创建线程的条件后,就会通过CAS的方式来更新ctl的值,然后跳出这个循环,如果CAS失败的话,重新获取ctl的值
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 判断线程池的状态,CAS失败的话 由下一次循环来再次创建线程
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// 直到能跳出死循环,否则会一直在循环中进行判断,跳出循环后就证明有创建线程的能力
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池的状态
int c = ctl.get();
// 检查下线程池的状态是不是正常的,期间外部有没有调用过shutdown()方法
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程添加到线程集合中
workers.add(w);
// 标记当前线程为添加状态为true
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// 如果线程成功添加到线程组中的时候,workAdded的值为true
if (workerAdded) {
// 启动这个线程,并标记当前线程的启动状态为true
t.start();
workerStarted = true;
}
}
} finally {
// 如果当前线程启动失败,则执行失败的任务
if (! workerStarted)
addWorkerFailed(w);
}
// 返回当前线程的启动状态
return workerStarted;
}addWoker()方法的具体作用,就是创建一个新的工作线程。所以总结下来,线程池中线程创建的正常流程是:

当线程中的线程创建成功后,在addWorker()方法中会启动这个线程,实际就上去执行Woker.run()方法,就是线程中线程具体要干的事情。
线程池中线程的销毁时机
在线程池中的线程都会被封装成一个Woker类型的对象,Woker类是ThreadPoolExecutor私有内部类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// 执行任务的具体线程
final Thread thread;
// 用户提交的任务信息
Runnable firstTask;
// 记录完成人数的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// ...省略后续的方法
}从Worker的方法中可以看到,创建Worker对象的时候它会从ThreadFactory中创建一个新的对象。因为Worker继承了Runnable接口,所以中调用Woker#thread.start()方法的时候就会调用Worker#run()方法,最终执行具体的逻辑的是ThreadPoolExecutor#runWorker()方法。
final void runWorker(Worker w) {
// 过去到当前执行这个方法的当前线程
Thread wt = Thread.currentThread();
// 获取到用户提交的任务
Runnable task = w.firstTask;
// 将Worker内的任务清空
w.firstTask = null;
// 解锁(思考:这里为什么需要解锁呢?)
w.unlock();
// 标识用户提交任务的执行状态,true表示执行异常或被中断;false表示执行正常
boolean completedAbruptly = true;
try {
// task == null
// 说明第一个任务线程已经执行完了,这是第二次来执行这个方法了
// 所以当他第二次来的时候就会执行getTask()方法
while (task != null || (task = getTask()) != null) {
// 加锁(每个Worker自己就是一把锁)
w.lock();
// runStateAtLeast(ctl.get(), STOP):线程池当前不为STOP状态
// Thread.interrupted():如果当前被标记为中断,返回true,并清除中断标志位
// wt.isInterrupted():当前工作线程是否被标记为中断,如果被标记为中断了返回true
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 这里主要是在非STOP状态下,可以响应中断
wt.interrupt();
try {
// 钩子函数,支持用户继承ThreadPoolExecutor,然后在用户任务执行前执行某些动作
beforeExecute(wt, task);
try {
// 实际来执行用户的任务,如果任务没有抛出异常就会在while中进行下一次循环
task.run();
// 钩子函数,支持用户支撑ThreadPoolExecutor,然后在用户任务执行后执行某些动作
afterExecute(task, null);
} catch (Throwable ex) {
// 钩子函数,支持用户支撑ThreadPoolExecutor,然后在用户任务执行后执行某些动作
afterExecute(task, ex);
// 这里需要注意下,如果用户提交的任务在执行的期间出现了异常,这里会将异常抛出到上一层方法
throw ex;
}
} finally {
// 如果用户提交的任务出现了一场,就会执行到这里
task = null;
// 记录用户完成任务的数的计数器还是自增加1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 如果用户提交的任务没有出现异常,就是将任务的标志位设置为false
completedAbruptly = false;
} finally {
// 无论是否出现了异常,都会执行这个方法,区别是completedAbruptly值不同
processWorkerExit(w, completedAbruptly);
}
}对于runWorker()中的方法逻辑还是比较好理解的,主要任务还是执行用户提交的任务,然后围绕着用户的任务执行的成功还是失败进行不同的处理:
- 如果用户任务执行成功,就会进入下一次循环。再次进入循环的时候
task!=null的条件就会是false,所以会进入getTask()方法来获取的一个任务,这个getTask()方法主要就是从workQueue中获取一个任务。 - 如果用户任务执行失败了,就不会进入下一次循环,而是执行完两个finally代码块,最终会去执行 processWorkerExit()方法;
getTask()方法
按照上面的分析,当用户的任务正常执行后就会进入getTask()方法来从workQueue中获取一个新的任务。
private Runnable getTask() {
// 标记最后一次拉取任务是否超时了,默认值为false
boolean timedOut = false;
// 通过一个死循环来不断从队列中拉取任务(防止多个线程来同时来拉取的时候有的线程获取不到)
for (;;) {
// 获取到当前线程池的状态
int c = ctl.get();
// runStateAtLeast(c, SHUTDOWN):线程池的状态不为SHUTDOWN的时候为true
// (runStateAtLeast(c, STOP):线程池的状态不为STOP的时候为true
// workQueue.isEmpty():工作队列为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// 工作线程的计数器减1
decrementWorkerCount();
// 返回null值的时候就会导致,runWorker()方法跳出循环,跳出循环后就会执行processWorkerExit()方法
return null;
}
// 计算现在的工作线程的数量
int wc = workerCountOf(c);
// allowCoreThreadTimeOut这个是线程池的一个配置
// true - 表示超过空闲时间后,核心线程也会被销毁
// false - 表示超过空闲时间后,核心线程不会被销毁,默认值,通常也是推荐这个值
// wc > corePoolSize:判断当前线程数是否大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize:判断当前线程数是否大于最大线程数
// timed:allowCoreThreadTimeOut为false的时候,当前线程大于核心线程数就是true,反之就是false
// timedOut:第一次进入的时候它的值为false,所以第一次进入的(timed && timedOut)的值为false
// wc > 1 || workQueue.isEmpty():当前工作线程大于1或者工作队列为空
// 进入该判断后就会将当前工作线程数减一,然后返回null,所以runWorker()方法跳出循环,进入processWorkerExit()方法
// 进入该判断的情况就会有:
// 1.当前线程数大于最大线程数(在创建线程的时候一般不会超过最大线程,但是线程池支持动态调整参数)
// 2.在allowCoreThreadTimeOut为false的时候,只要workQueue为空就会进入
// 3.timeOut=false:表示获取任务没有超时,反之表示获取任务超时
// 3.1 此时如果当前线程数小于核心线程数:timed && timedOut 值为false
// 3.2 此时如果当前线程数大于核心线程数:timed && timedOut 值才会跟timedOut有关系
// 3.2.1 如果此时线程从workQueue中获取到的任务不为null,timedOut的值仍然为false,则不会进入分支
// 3.2.2 如果此时线程从workQueue中获取到的任务为null,timedOut的值就会为true,就会进入该分支
// 这里的其实还有个精妙之处,当从workQueue中获取任务null(超时了),但是不会直接去销毁掉这个线程,而是再次去
// 判断这个workQueue是否为空,确认为空,才会返回null
// 体现了线程资源的宝贵,在销毁的时候要确认的确是必须要销毁的才会去销毁,否则就会让再次去workQueue中获取任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 来到这里,说明有以下几种情况:
// 1. 当前线程数 < 最大线程数
// 2. workQueue不为空
try {
// 这里就是从阻塞队列中获取用户提交的任务,这里获取是比较有意思的
// 当timed=true的时候表示:当前线程 > 核心线程数,所以采取的是带有时间的阻塞获取
// 当timed=false的时候表示:当前线程 <= 核心线程数,所以采取的不带时间的无限阻塞获取
Runnable r = timed ?
// 在当前线程数 > 核心线程数的条件下,如果超过了空闲时间仍然没有获取到任务,就会返回null
// 当poll()方法返回null的时候,就会进入下一次循环
// 这里返回null,说明workQueue是空的,就会在上面的判断中进入 return null的分支
// 当进入return null的分支后,就会在runWorker()方法中跳出循环,进入processWorkerExit()方法
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 在当前线程数 <= 核心线程数的条件下,它会无限期的阻塞式的获取这个线程
workQueue.take();
// 当获取到任务后,就跳出死循环,返回到runWorker()方法中
// 它就会再次执行用户提交的任务,执行完成就会进入下一次循环,它会再次进入getTask()方法
if (r != null)
return r;
// 如果从workQueue中获取到的任务是null,才会执行。所以只有是执行有等待时间的poll()方法的时候才会到这里
// 说明当前线程数 > 核心线程数,并且阻塞空闲时间后,也没有获取到任务,设置timedOut为true
// 当将timedOut设置为true之后,就会在下一次循环的时候进入判断
// 进入判断后,就会return null,然后runWorker()方法就会跳出循环,进入processWorkerExit()方法
timedOut = true;
} catch (InterruptedException retry) {
// 当线程都在阻塞的获取阻塞队列中的任务时,如果此时线程被中断,就会抛出InterruptedException
// 进入这里之后timedOut的值就会被设置为false,主要影响的就是上面的判断
timedOut = false;
}
}
}在getTask()方法中,主要是有留意返回值,因为它的主体逻辑是一个死循环,在没有返回值的时候当前线程会一直在循环里面。对于返回值,它有两种情况:
返回了一个workQueue中一个task
这种情况很好理解了,就是正常的获取到了任务,然后返回到runWorker()方法中执行用户提交的任务;
返回了一个null
这种情况大体上来讲,就是需要去销毁线程了,它通过返回一个null,然后在runWoker()方法中就会跳出循环,进入processWorkerExit()方法。按照上面的分析,其实就是根据
(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())判断来决定的。看这个判断条件,它主要分为两个部分:
(wc > maximumPoolSize || (timed && timedOut))对于wc > maximumPoolSize,对于一般而言,线程池中的线程数是不会大于最大线程数的。只有在动态调整线程池参数的时候,才会出现maximumPoolSize从20变成10,此时工作线程大于maximumPoolSize的情况;所以对于这个判断条件而言,最主要还是看,timed 和 timedOut之间的关系。
timed是受到了 allowCoreThreadTimeOut 和 当前工作线程数的影响。allowCoreThreadTimeOut这个参数可以直接就当成false,我的理解是:线程是很宝贵的,销毁和创建的成本就很大,线程的核心思想就是为了提高资源的利用率,设置ture会导致核心线程也会被销毁,违背了线程池的核心思想。所以timed参数最大的影响就是当前线程数和核心线程数的关系。
如果当前线程数大于核心线程数,timed的值就是true,反之就是false
对于timedOut而言,它的前提就是当前线程数已经大于核心线程数,所以才会进入这个判断,此时如果当前线程从workQueue中获取任务超时了,说明队列中的任务已经处理完了,然后就会timedOut的值就是false,反之就是true;
(wc > 1 || workQueue.isEmpty())这个一部分的判断是建立在前面为true的时候才会进入,也就是当前线程从workQueue中获取任务超时的时候,才会再次判断队列是否为空。这里再次判断的是为了防止,在获取超时后到判断这之间队列中又被投递了任务,所以再次确认下。符合线程池的核心思想吧,就是尽所能减少线程的创建和消耗,提高线程的利用率。
其实对于getTask()方法而言,它其实最常见的返回null的情况就是,当工作线程大于核心线程数的时候,有的线程从workQueue中获取任务超时了,就会返回null,这样它就会从runWorker()方法的循环中跳出来,执行processWorkerExit()方法。
processWorkerExit()方法
进入processWorkerExite()方法一共有两种可能:
- 用户的任务执行的时候出现了异常,此时completedAbruptly的值为true
- getTask()方法返回为null值,此时completedAbruptly的值为false;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果用户任务执行出现了异常,completedAbruptly为true
if (completedAbruptly)
// 当前线程计数器减一
decrementWorkerCount();
// 获取锁,加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程的已完成任务数进行汇总
completedTaskCount += w.completedTasks;
// 销毁当前线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试去关闭线程池(因为线程池关闭时也会调用这个方法)
tryTerminate();
// 获取当前的工作线程数
int c = ctl.get();
// runStateLessThan(c, STOP):当前线程池不是STOP状态,也就是RUNNNING或者SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果是从workQueue中获取任务超时了,completedAbruptly的值就是false,就会进入分支
if (!completedAbruptly) {
// allowCoreThreadTimeOut通常都是false,所以允许核心线程销毁就会0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// min == 0 表示允许核心线程也被销毁
// !workQueue.isEmpty() 表示工作队列不为空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前线程数比最小线程数还大,直接return
// 直接return的结果,就是当前工作线程减一了,线程被销毁了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 有两种可能:
// 1.用户任务执行出现了异常,这个时候因为当前线程已经被销毁了,所以需要重新创建一个线程来补充
// 2.当前线程获取任务超时了,但是当前线程池的工作线程数量是小于最小线程数的,所以继续创建一个线程
addWorker(null, false);
}
}processprocessWorkerExite()方法的核心作用就是销毁线程,销毁线程后是否还需要补充线程则有两种情况:
- 如果当前线程池是RUNNING或者SHUTDOWN状态
- 当前线程执行用户任务出现了异常,就创建一个新的线程来进行补充;
- 当前线程数小于最小线程数,就创建一个新的线程来进行补充;
最小线程数:如果allowCoreThreadTimeOut的值为true,workQueue中还有任务,最小线程数为1;workQueue中没有任务,最小线程数为0;如果allowCoreThreadTimeOut的值为false,无论workQueue中是否还有任务,最小线程数都等于corePoolSize
总结
以corePoolSize=10,maximumPoolSize=20,workQueue=10为例子,则线程池的工作流程:
- 创建线程池:此时线程池的状态为RUNNING,此时线程数为0,workQueue的大小为0;
- 投递第1个任务:此时线程池的状态为RUNNING,此时线程数为1,workQueue的大小为0;
- 投递第11个任务:此时线程池的状态为RUNNING,此时线程数为10,workQueue的大小为1;
- 投递第21个任务(假设投递第21个任务的时候,最先投递的10个任务还没有执行完):此时线程池的状态为RUNNING,此时线程数为11,workQueue的大小为10;
- 投递第31个任务(假设投递第31个任务的时候,最先投递30个任务还没有执行完):此时线程池的状态为RUNNING,此时线程数为20,workQueue的大小为10,此时会执行拒绝策略;
线程池的终止
线程池中的终止有两个方法,一个是shutdown(),一个是shutdownNow(),他们之间的区别是:
- shutdown():关闭线程池,不再接受新任务,但会继续执行已提交的任务(包含执行中的和workQueue中的任务);
- shutdownNow():立即尝试停止所有任务,包括正在执行中和队列中等待的;
shutdown()是一种更温和的关闭线程池的方式,也是最推荐使用的关闭线程池的方式。使用shutdownNow()会导致执行中的任务停止,但是执行中的任务执行到哪一步是不确定的,会给系统带来更多的不稳定性,例如导致数据不一致了。所以带实际开发的工作中,真需要关闭线程池的时候,也是推荐使用shutdown()方法来关闭线程池的。
但是秉持着学习的态度,我们仍然需要学习shutdownNow()中的一些内容,就分别来讲下shutdown()和shutdownNow()方法中的一些源码分析吧!
shutdown()方法
首先来分析下更为常用的shutdown()方法,它的源码如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进行安全性检查,确保当前调用线程有权限关闭线程池和中断工作线程
checkShutdownAccess();
// 设置线程池的状态的为SHUTDOWN
advanceRunState(SHUTDOWN);
// 遍历所有的工作线程,然后将他们的中断标志位设置为true
interruptIdleWorkers();
// 钩子函数,支持自定义实现ThreadPoolExecutor的时候在线程池关闭的时候执行相关操作
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试去关闭线程池
tryTerminate();
}在上面的shutdown()方法中,主要执行了如下几个操作:
- 检查当前线程是否有操作所有线程的权限, 如果没有,就直接抛出SecurityException;
- 修改当前线程池的状态为SHUTDOWN,就是修改标志位ctl的值为0,其中自旋的CAS操作来进行修改;
- 遍历所有的工作线程,将所有工作线程的中断标志位设置为true;
- 如果线程的onShutdown()方法被实现了,就执行这个钩子函数;
- 尝试去关闭线程池;
其实上面最核心的三步就是:修改线程池的状态、设置工作线程的中断标志位以及尝试去关闭线程池。
修改线程池的状态
// 这里传进来的是SHUTDOWN,也就是0,这个方法的作用就是不断的通过CAS的方式来讲ctl的值设置为0
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}设置工作线程的中断标志位
interruptIdleWorkers()方法最后会去调用interruptIdleWorkers()方法,且onlyOne的值为false。
// 这里 onlyOne的值为false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 循环遍历当前所有的工作线程
for (Worker w : workers) {
Thread t = w.thread;
// t.isInterrupted():判断当前线程是否被设置了中断
// w.tryLock():尝试去或者这个目标线程的操作权利,来中断这个线程
if (!t.isInterrupted() && w.tryLock()) {
try {
// 如果这个线程的中断标志位为false,就尝试去获取目标线程的操作权
// ,如果获取到了就将目标线程的中断标志位设置为true
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// onlyOne的值为false,所以它会遍历完所有的线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}尝试去关闭线程池
final void tryTerminate() {
// 死循环
for (;;) {
// 获取当前线程池的状态
int c = ctl.get();
// isRunning(c):判断当前线程池的是否是RUNNING状态,这里显然不是,因为前面将ctl的值修改为了SHUTDOWN
// runStateAtLeast(c, TIDYING):判断当前线程是否是TIDYING状态,这里是true,因为SHUTDOWN < TIDYING
// 因为上面是true了,所以后面的判断不走的,后面的判断在shutdownNow()方法中会进行判断的
// 当所有的工作线程被终止的时候,runStateAtLeast(c, TIDYING)的值就是false,接着会继续判断
// 此时线程池的状态为TERMINATED,runStateLessThan(c, STOP)的值就是false
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 如果当前工作线程的数量不为0,就会尝试去将一个工作线程的中断标志位设置为true
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 如果当前工作线程的数量为0的时候
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池的状态设置为TIDYING,cltOf是或,任务数字与0或运算都是它本身
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行中的方法(在ThreadPoolExecutor中是空实现,支持子类来实现)
terminated();
} finally {
// 最后将线程池中的状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有调用了awaitTermination()方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}根据上面的分析,shutDown()方法其实就是将线程池的状态修改为SHUTDOWN状态,然后将所有工作线程的中断标志位都设置为true。
当我们调用了shutDown()方法的时候,如果此时还有任务投递到线程池中,我们以线程池为SHUTDOWN的状态下来分析下execute()方法和addWoker()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数量小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
// 这里addWorker()返回的结果是false
if (addWorker(command, true))
return;
c = ctl.get();
}
// 这里c的值为0,isRunning()的值已经是false了,所以后续的判断也不会走了
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// addWokrer()方法返回的就是false,所以就会走到拒绝策略上
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 此时线程池的状态为SHUTDOWN,因此
// runStateAtLeast(c, SHUTDOWN)的值为true
// (runStateAtLeast(c, STOP)的值为false
// firstTask != null的值为true
// 所以此时无论工作队列的是否有任务,这个判断的结果都是true,所以addWorker()方法就结束了,直接返回false
if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
return false;
// ...省略后续的代码了
}
}可以看到:当调用线程池shutdown()方法的时候,再向线程池中投递任务时候就会执行拒绝策略。
所以现在的线程池不会再接受新的任务了,那么我们只需要看当前工作线程的执行,其实就是runWoker()方法。其实观察下,上面的runWorker()方法的代码就会看到里面并没有对SHUTDOWN状态下的线程池做特殊的处理,因此SHUTDOWN状态下的线程池,它仍然会正常的执行runWorker()方法。
shutdownNow()
shutdownNow()就没有那么温和了,其体现主要就是在runWorker()方法中。当然,仍然是从头开始看:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查当前线程是否有关闭线程池或者中断线程池的权限
checkShutdownAccess();
// CAS自循环,将线程池的状态设置为STOP状态
advanceRunState(STOP);
// 循环将所有的线程的中断标志位都设置为true
interruptWorkers();
// 删除掉工作队列中还没有执行的任务信息
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}然后就是依然需要查看execute()方法和addWorker()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// addWorker()的值返回为false
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning()的值为false,此时也不会像队列中放入任务了
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// addWorkker()的值为false,所以会走向拒绝策略
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 现在线程池的状态为STOP,runStateAtLeast(c, SHUTDOWN)的值为true
// runStateAtLeast(c, STOP)的值为true
// firstTask != null的值为true
// workQueue.isEmpty()因为前面清除掉了,所以这里的值也是true
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 。。。省略无用的代码
}
}在这里shutdown()方法和shutdownNow()方法的区别在于:
- shutddown()方法是将线程池设置为 SHUTDOWN 状态;
- shutdownNow()方法是将线程池设置为 STOP 状态,然后清空工作队列中的任务;
其实shutdown()和shutdownNow()方法的根本区别在于runWorker()方法中:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 线程池的状态已经被设置成了STOP
// runStateAtLeast(ctl.get(), STOP)的值就是true
// 如果当前线程的中断标志位已经设置成了true,Thread.interrupted()会返回true,并将中断标志位设置为false
// !wt.isInterrupted()此时中断标志位false,所以最终的作用就是在这里再次给线程的中断标志位设置为true
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 假如调用shutdownNow()方法的时候,当前线程正在执行到了这里,那么它还是会完整的执行完用户的任务的
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}在调用shutdownNow()方法的时候有两种可能:
- 当前工作线程正常执行,处于Running状态;该状态下的线程会执行完用户提交的任务,然后再次调用getTask()方法的时候返回null,最终会跳出循环,然后执行processWorkerExit()方法销毁当前线程。
- 当前工作线程正在执行
workQueue.take()或者workQueue.poll()方法,处于阻塞状态;对于处于阻塞状态的线程,当执行wt.interrupt()方法的时候会抛出InterruptException异常,此时就会跳出循环,然后执行processWorkerExit()方法销毁当前线程。
BlockingQueue的poll()方法和take()方法都是响应中断的,所以在调用interrupt()方法的时候就会抛出InterruptException异常;
总结
在调用shutdownNow()方法后会拒绝用户新提交的任务,并且清空工作队列中的任务,然后设置将所有的线程中断标志位设置为true。此时,正在执行的线程仍然会执行完用户提交的任务后被销毁;被阻塞的线程会抛出InterruptException,然后被销毁。所以在调用shutdownNow()方法的时候,我们需要额外的处理当前还没有执行完的任务,还是更加推荐采用shutdown()方法。
线程池中的锁机制
在ThreadPoolExecutor中为了保证线程安全、控制任务提交与执行的正确性,使用了多种同步机制,其中最核心的就是锁(Lock)和同步器(AQS),尤其是ReentrantLock。
线程中主要用到的锁
mainLock
在ThreadPoolExecutor中,最核心的锁就是mainLock,它是一个ReentrantLock类型的对象,用于保护线程池状态变更以及工作线程集合workers的并发访问。此外
使用场景举例:
- 添加Woker:当有新的任务提交且需要创建工作线程时,会调用addWorker()方法,该方法负责创建并启动一个新的Worker(工作线程)。在方法的开始和关键部分,都使用了mainLock来保证线程,这里主要保证的是对共享资源workers(工作线程集合)的访问,以确保线程池状态变更的可见性和原子性;
- shutdown()和shutdownNow()方法:在关闭线程池的方法中,都会使用mainLock来保证线程池安全地修改状态和遍历workers;
Woker内部锁
每个Worker都是一个内部类,它本身继承了AQS并实现了简单的不可重入锁,同时也作为Runnable被线程执行。
Worker内部通过继承AQS来实现简单的互斥锁,目的是保证一个任务在执行期间不会被中断(除非线程池要关闭)。Worker本身也是一个Runnable,它持有一个Thread,并在执行任务的时候加锁,防止任务执行过程中被其它的逻辑干扰;
使用场景举例:
在runWorker()方法中,也就是真正执行任务的循环中,会先调用w.lock(),目的是:
- 防止在任务执行过程中线程被意外地中断,比如在shutdownNow时,只中断空闲线程;
- 只有在任务执行时才允许中断当前线程(通过
w.unlock()后变得可中断)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断(初始状态是锁住的,unlock 后可以被中断)
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁,主要是为了防止在任务执行期间被 shutdownNow 中断
// 如果线程池正在停止,确保线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (...) { ... }
finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 释放锁
}
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}理解Worker为什么要继承AQS的原因主要是为了控制响应中断。关键点在于:
- Worker 初始化时,调用
setState(-1),表示锁是 “已锁定” 状态(不可获取),并且此时不允许中断该线程。 - 在
runWorker()最开始,会调用w.unlock(),也就是release(1),表示 Worker 现在可以被打断(即允许中断),但前提是线程池要进入 STOP 或更高状态。 - 但在执行每个具体任务之前,
runWorker()会再次调用w.lock(),也就是 加锁,表示“我正在执行任务,此时不允许中断我”。
ctl变量(AtomicInteger)
线程中非常核心的原子变量,用于同时保存线程池状态和有效线程数。通过CAS操作来保证对线程池状态和数量的原子更新,避免使用锁带来的性能开销。