三流码奴的自我救赎

0%

保姆级ThreadPoolExecutor分析

池化

如果一个对象需要多次使用,并且会同时存在很多个的情况下,如果每个都去重新创建会浪费非常多的性能。
这个时候就需要池化技术来帮助减小多次重新创建的负担。
池化技术的主要目的就是为了减少大量的创建操作导致性能损耗,通过一个对象池来提供复用对象。

为什么需要线程池

线程池是经常使用的线程池化工具。
如果不使用线程池的话,每发生一个请求都需要创建一个新的线程,但如果并发请求的量非常大,而且一般情况下每个网络请求连接持续的时间都比较短,这样就会导致线程频繁的创建和销毁。这样会严重损耗性能,所以需要通过池化技术对多个线程进行封装,缓存和复用来减少或避免这种无意义的性能损耗。
使用线程池的条件:

  • 单个请求处理的时间比较短
  • 并发请求数量很大

使用线程池的好处

  1. 降低创建/销毁线程带来的性能损耗
  2. 提高任务的响应速度,无需等待线程创建
  3. 便于管理线程,防止线程过多消耗资源

Java中的线程池

Java中的线程池是通过ThreadPoolExecutor来实现的,先来看看这个类的继承关系:
ThreadPoolExecutor -> <A>AbstractExeecutorService -> <I>ExecutorService -> <I>Executor
可以看到继承链一直延伸到Executor,下面来一层一层看看每个类都定义了一些什么东西:

Executor -> Interface

Executor是一个专门用来执行Runnable的对象。
提供接口方法:

1
void execute(Runnable command);

ExecutorService -> Interface

ExecutorService是一个提供了:

  • 终止task
  • 通过Future跟踪一个或多个task
    上述两个功能接口的Executor
    ExecutorService可以被关闭,被关闭之后会拒绝接受新的task

AbstractExeecutorService -> Abstract

提供<I>ExecutorService接口的默认实现。

ThreadPoolExecutor

通过池化的Thread对象来执行task的Executor。
结合父类和接口的描述,可以认为ThreadPoolExecutor是一个:
以Runnable作为task,同时支持多个池化的task的跟踪操作的工具类。

ThreadPoolExecutor

ThreadPoolExecutor是Java并发包中提供的线程池实现类,先来看一看如何使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心池大小
maximumPoolSize, // 线程池总容量
keepAliveTime, // 在线程池空闲的情况下,非核心线程的存活时间
unit, // 时间单位
workQueue, // 任务等待队列
threadFactory, // 线程factory,用于启动新的线程
handler // 拒绝handler
);

for (int i = 1; i <= 10; i++) {
// 启动新任务
executor.execute(new Runnable());
}

// ...

executor.shutdown()

结论先行

为了便于理解构造方法中的n多参数,这里先概括地说一下ThreadPoolExecutor的大致工作流程:

  1. 通过调用execute()方法启动一个新的任务
  2. 线程池会先判断当前运行中的线程数是否已经达到corePoolSize,若核心池已满,则会将新任务抛到workQueue中等待,否则通过threadFactory启动新线程到核心池
  3. workQueue已满,新任务继续添加会触发非核心线程的创建,当核心线程非核心线程的总和达到maximumPoolSize的时候会触发拒绝策略,通过handler处理拒绝逻辑
  4. 非核心线程空闲的时候计时,若空闲时间超过keepAliveTime设定的时间时被销毁

ThreadPoolExecutor的5个状态

为了方便管理线程,ThreadPoolExecutor定义了5中运行状态:

  • RUNNING:接受并执行新任务
  • SHUTDOWN:不再接受新任务,但已经添加的任务会执行完毕
  • STOP:不再接受新任务,同时立刻终止所有未完成的任务
  • TIDYING:所有任务全被终止,worker数量为0,开始执行terminated()回调
  • TERMINATED:terminated()执行完毕

先来看看这几个状态在源码中是如何定义的:

1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

可以看到变量这些状态都是通过一个叫ctl(control?)的成员来定义的。

  • ctl是一个AtomicInteger类型的成员,这里顺便提一下AtomicInteger,其实就是通过 volatile关键字使其value对所有线程可见。然后可以看到通过ctlOf()方法对ctl进行初始化,其实就是把两个参数做了或运算

  • COUNT_BITS用来记录当前线程池中的Workers,每一位记录一个。我们知道在Java中Integer类型是32位的,所以INTEGER.SIZE方法的取值就是32,可以知道COUNT_BITS就代表ctl成员中有29位用于记录Worker数量,也就是最多纪录2^29Worker

  • CAPACITY其实就是一共可以容纳多少Worker

    0000 0000 0000 0000 0000 0000 0000 0001 -> 1

    0010 0000 0000 0000 0000 0000 0000 0000 -> 1 << COUNT_BITS

    0001 1111 1111 1111 1111 1111 1111 1111 -> (1 << COUNT_BITS) - 1

    可以发现最终得到的结果就是29位1,也就是线程池的最大容量,用ctl的低29位来记录

  • RUNNING用来表示线程池的运行状态。我们知道负数在计算机中是通过补码的形式参与运算的所以-1在内存中其实是如下表示:

    1000 0000 0000 0000 0000 0000 0000 0001 -> 原码

    1111 1111 1111 1111 1111 1111 1111 1110 -> 反码

    1111 1111 1111 1111 1111 1111 1111 1111 -> 补码

    1110 0000 0000 0000 0000 0000 0000 0000 -> -1 << COUNT_BITS 补码

    这个时候我们可以看出,其实是通过ctl前三位来表示线程池的运行状态,RUNNING的状态对应111(补码)**<0**

  • SHUTDOWN用来表示线程池的正在终止状态,对应000**>0**

  • STOP用来表示线程池的立刻终止状态,对应001**>0**

  • TIDYING用来表示线程池的整理状态,对应002**>0**

  • TERMINATED用来表示线程池的已终止状态,对应003**>0**

添加任务 -> execute()

我们知道当需要将一个任务抛给线程池执行的时候需要调用execute(Runnable)方法,下面来看一看这个方法里都做了些什么事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void execute(Runnable command) {
// 对Runnable进行空检查
if (command == null)
throw new NullPointerException();

// 从ctl中取得value
int c = ctl.get();
// 检查当前运行中的任务数是否小于核心池容量
if (workerCountOf(c) < corePoolSize) {
// 若核心池有余量则尝试将新任务添加至核心池
if (addWorker(command, true))
// 若向核心池添加任务成功则直接返回
return;
// 向核心池添加任务失败,获取新的ctl状态
c = ctl.get();
}
// 检查线程池是否处于RUNNING状态,若满足条件则尝试向workQueue中添加新任务
if (isRunning(c) && workQueue.offer(command)) {
// 向workQueue中添加任务完毕,ctl状态可能发生改变,重新获取新的ctl状态
int recheck = ctl.get();
// case1,若线程池不处于RUNNING状态,则移除当前任务
if (!isRunning(recheck) && remove(command))
// 二次检测未通过,触发拒绝策略
reject(command);
// case2,若workerCount为0,则添加一个空worker
// 目前能想到的有两个情况会进这个case
// 1.构造时corePoolSize可以设置为0,这个情况下第一个task被提交的时候,上面的核心池检测不会通过,所以
// 会直接进入想workQueue中添加task的流程,到这个case的时候task已经被添加到workQueue里面了,如果
// 要直接通过Worker构造执行task的话需要把已经添加进workQueue的task移除掉,为了避免这种不必要的操作,
// 直接添加一个空Worker,他会自己从workQueue中取出任务执行
// 2.若核心池中有且仅有一个线程,并且线程出错导致退出,这个时候会变为上面的第1种情况,解决方法是一样的
else if (workerCountOf(recheck) == 0)
// 添加空Worker,从workQueue中取出任务执行
addWorker(null, false);
}
// 线程池不处于RUNNING状态,或workQueue已满,尝试启动非核心线程执行任务
else if (!addWorker(command, false))
// 启动非核心线程失败,出发拒绝策略
reject(command);
}

我们看到这个方法里一共做了两个事情:

  1. 若当前运行中的线程小于核心池容量,则在核心池中启动新的线程执行任务
  2. 核心池已满,则尝试将新任务添加至workQueue,若不成功则触发拒绝策略
    • 先对线程池的运行状态进行检查,若处于RUNNING状态则向workQueue中尝试添加新的任务。若添加失败,则尝试向非核心池添加新的任务
    • 在向workQueue中添加新任务的过程中,线程池的运行状态又可能会发生变化,所以要进行二次检查。若此次检查未通过,则根据不同情况选择触发拒绝策略,或者向核心池添加新任务

Worker

WorkerRunnable的封装,通过workQueue我们不难猜到Worker其实就是用来消费任务的工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 没什么用,为了消除警告
private static final long serialVersionUID = 6138294804551838833L;

// 持有一个Thread,这个就是Worker工作做的线程
final Thread thread;

// 持有一个Runnable,这个就是Worker在自己的Thread需要完成的任务
// 当任务被完成时如果有新的任务进来,会再次给这个成员赋值,从而复用Thread,减小资源开销
Runnable firstTask;

// 记录已经完成了的任务总数
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // AQS的方法,初始化同步状态,不多做解释
// 将第一个任务赋值给firstTask
this.firstTask = firstTask;
/** Worker会绑定一个自己需要运行在的线程,在Worker创建的时候通过ThreadFactory创建
* 一个新的线程,Thread类需要一个Runnable类型的参数,同时因为Worker也实现了Runnable接口,
* 所以这一步其实是完成了Worker和Thread的·双向绑定·
* 注:这里ThreadFactory是一个接口,默认实现类是Excutors.DefaultThreadFactory:
* private static class DefaultThreadFactory implements ThreadFactory {
* ...
* public Thread newThread(Runnable r) {
* Thread t = new Thread(group, r,
* namePrefix + threadNumber.getAndIncrement(),
* 0);
* ...
* return t;
* }
* }
* 可以看到在这里直接将Worker传给了Thread,至此Thread和Worker建立了一对一的关系
*/

this.thread = getThreadFactory().newThread(this);
}

// 实现Runnable的方法
public void run() {
// 在这里就可以看到,在Worker的run()方法中其实执行了ThreadPoolExecutor.runWorker(Worker)方法
// 而ThreadPoolExecutor就可以通过Worker拿到其中的Runnable,从而将task管理转移到了线程池中
runWorker(this);
}

// AQS相关方法,不多做解释
...
}

通过源码可以看出,Worker继承自AQS,并且实现了Runnable接口 。

有关AQS又是一套庞大的知识体系,这里就不展开讲了。只需要知道AQS是一种提供了原子式管理同步状态阻塞唤醒线程功能以及队列模型的框架。需要了解的话可以参考美团讲解AQS的文章

从源码可以看出,Worker的宗旨就是充当不同线程线程池中间的桥梁,或者可以理解为以继承Runnable的形式创建了一个线程容器,进而将任务线程解耦,便于在不同的任务中复用线程。

添加Worker -> addWorker()

在上面的execute(Runnable)方法中我们注意到,任务是通过addWorker(Runnable, Boolean)方法添加的,不难猜到Worker对象就是在这个方法中创建的。

在这个方法中主要做了一些线程池状态相关的检查工作,如果符合条件则向线程池的对应区域添加Worker并启动。

来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// 通过一个HashSet保存所有Worker
private final HashSet<Worker> workers = new HashSet<>();

private boolean addWorker(Runnable firstTask, boolean core) {
// 添加标记
retry:
for (; ; ) {
int c = ctl.get(); // 获ctl
int rs = runStateOf(c); // 通过ctl获取线程池运行状态
/**
* 这个case条件比较多可以分两部分来看:
* 1. 因为RUNNING状态是负数表示,所以当runningStatus>=SHUTDOWN的时候,都不能再添加新的Worker了
* 2. 排除特殊情况,根据线程池运行状态的定义,SHUTDOWN的时候会继续将已经添加到workQueue的任务执行完,
* 所以在workQueue不为空的情况下,需要一个firstTask为null的Worker去取出任务执行完毕。所以在这个
* case下不能直接return false
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
// 获取workerCount
int wc = workerCountOf(c);
// 如果worker总数大于线程池容量则直接return false
if (wc >= CAPACITY ||
// 这里分两个case
// 1.如果添加的是核心线程并且核心池已满则添加失败
// 2.如果添加的是非核心线程并且线程池总容量已满则添加失败
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// ctl尝试+1
if (compareAndIncrementWorkerCount(c))
// 成功则跳出标签循环
break retry;
// ctl+1失败,重新读取ctl值
c = ctl.get();
// 状态发生更新,回到标签处重新开始循环
if (runStateOf(c) != rs)
continue retry;
}
}

// 通过上面的分析可以看出,addWorker()方法到目前为止仅仅完成了对workerCount的+1操作,下面才真正进入到任务启动的流程

// 两个标志位
// 标志worker是否已经准备就绪
boolean workerStarted = false;
// 标志worker是否已经被添加
boolean workerAdded = false;

Worker w = null;
try {
// 这里真正创建了新的Worker
w = new Worker(firstTask);
// 上面Worker类的解释中已经提到,Worker实例化的时候会和一个新线程进行双向绑定,
// 所以这个时候w.thread拿到的就是Worker实际依赖的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 锁住!因为涉及到Worker的添加,也涉及到poolSize的校验
mainLock.lock();
try {
// 这个时候重新获取ctl的状态
int rs = runStateOf(ctl.get());
// 根据上面的分析,在所有状态中只有RUNNING是负的,也就是说这里的判断就是在校验线程池的状态是否为RUNNING
if (rs < SHUTDOWN ||
// 因为SHUTDOWN状态的情况下还需要吧workQueue中未完成的任务完成
// 所以当添加的是空task的Worker时时符合条件的
(rs == SHUTDOWN && firstTask == null)) {
// 若该Worrker对应的线程已经存在则是预料之外的情况,因为一个Worker是和一个线程双向绑定的
if (t.isAlive())
// 直接抛出异常
throw new IllegalThreadStateException();
// 向HashSet中添加当前Worker
workers.add(w);
int s = workers.size();
// 记录线程池压力最大时候的容量
if (s > largestPoolSize)
largestPoolSize = s;
// 更新flag
workerAdded = true;
}
} finally {
// 释放!
mainLock.unlock();
}
// 如果Worker已经被成功添加到HashSet中,则直接启动Worker对应的线程
if (workerAdded) {
/**
* 这里去要特别注意,可以看到添加Worker的结果是直接启动了Worker绑定的线程,
* 而通过前面对于Worker类的分析可以知道Worker类本身实现了Runnable接口,
* 在Worker中创建Thread时注入的Runnable其实是Worker自己,所以在这里start()线程
* 的结果是调用了Worker的run()方法。
*
* 前面分析过Worker的run()方法内部其实是调用了ThreadPoolExecutor.runWorker(),
* 所以在这一步同时完成了线程切换,因为runWorker()方法是由Worker.thread调用的
* 下面研究runWorker()方法的时候就可以看到,虽然方法属于ThreadPoolExecutor,但其实
* Thread.currentThread()取得的是Worker绑定的Thread
*/
t.start();
// 更新flag
workerStarted = true;
}
}
} finally {
// 排除异常情况,若没有添加成功则去执行addWorkerFailed的回调
if (!workerStarted)
addWorkerFailed(w);
}
// 将Worker最终是否已经启动的flag返回
return workerStarted;
}

可以看出addWorker()方法一共只干了三个事情:

  1. 检查ctl状态,判断是否可以添加Worker
  2. 加锁后再次检查ctl状态,若仍然符合添加Worker的条件则尝试添加,并记录线程池最大压力的容量
  3. 尝试启动Worker的线程,通过对线程传入Worker并在Workerrun()方法中调用ThreadPoolExecutor.runWorker()来实现在runWorker()方法中的线程切换

Worker执行任务 -> runWorker()

ThreadPoolExecutor.runWorker(Worker)方法才是真正针对不同Worker去执行任务的方法,负责不断从workQueue取出任务并执行。在执行任务的过程中还需要兼顾线程池整体的运行状态。

下面来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
final void runWorker(Worker w) {
// 根据前面的解释可以知道,runWorker方法是在不同的线程中通过Worker调用的,
// 所以在这里通过Thread.currentThread()拿到的线程wt就是Worker实际运行的线程
Thread wt = Thread.currentThread();
// 从Worker中取出task
Runnable task = w.firstTask;
// 在这里清空Worker的task
// 结合上一步不难发现,这个过程其实就是线程的复用
// Worker和一个线程双向绑定,Worker其实就是真正要执行的task的容器,需要复用Worker和Thread
// 替换Worker中的firstTask就行了
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;

// 上面完成了Worker的重置,并从Worker取出了task,下面开始真正去执行任务
try {
/**
* 这里分为两个case:
* case 1: Worker的task不为空,则可以直接尝试执行task
* case 2: Worker的Task为空,这个时候就需要尝试从workQueue中取出task
*
* 这里有了第二个case就可以从workQueue循环取出task执行
*/
while (task != null || (task = getTask()) != null) {
// 锁住!
w.lock();
// 在这个case中判断是不是需要允许线程中断
// case 1: 前面已经分析过,当线程池的状态是STOP的时候,并不会将所有未完成的任务执行完毕,所以
// 只要是状态>=STOP的情况就允许线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
/**
* case 2: 这里需要特别注意interrupted()这个方法,这个方法在判断是否标记中断的时候会同时
* 清除掉中断flag,所以这个case存在的意义其实就是防止其他流程中设置的中断对当前流程
* 产生干扰,可以看到后面的两个条件分别重新校验了线程池状态,并且校验了Worker的线程
* 中断状态是否已被清除
*/
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 允许线程中断
wt.interrupt();
try {
// 默认是空方法,子类可重写
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 在这里才真正执行了execute(Runnable)方法设置进来的task
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 收尾处理
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 当Worker从循环中退出的时候执行,负责清理工作
// 因为有可能当前任务完成之后就没有后续任务了,所以在这里会通过tryTerminate()去尝试关闭线程池
processWorkerExit(w, completedAbruptly);
}
}

关闭线程池 -> tryTerminate()

这个方法意义明确:尝试关闭线程池。

直接来看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// RUNNING状态直接return
if (isRunning(c) ||
// TIDYING之后的状态只有TERMINATED,这个时候也不需要再终止了,直接return
runStateAtLeast(c, TIDYING) ||
// SHUTDOWN状态但是还有未完成的任务,需要把任务完成之后再终止,直接return
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 经过上一个if的过滤,到这里只会是SHUTDOWN且workQueue已空,或者STOP状态,所以直接中断掉所有Worker
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态设置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 真正执行terminate
terminated();
} finally {
// 最终将线程池状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}

总结

上面分析了线程池的大致工作流程,其设计宗旨就是通过一层Worker Runnable Runnable的包装之后传给线程,在线程中调用类方法拿到Worker Runnable引用,从而替换掉真正的task Runnable实现Thread复用。进而避免频繁创建和销毁线程,节省资源开销。