池化
如果一个对象需要多次使用,并且会同时存在很多个的情况下,如果每个都去重新创建会浪费非常多的性能。
这个时候就需要池化技术来帮助减小多次重新创建的负担。
池化技术的主要目的就是为了减少大量
的创建操作导致性能损耗,通过一个对象池
来提供复用对象。
为什么需要线程池
线程池是经常使用的线程池化工具。
如果不使用线程池的话,每发生一个请求都需要创建一个新的线程,但如果并发请求的量非常大,而且一般情况下每个网络请求连接持续的时间都比较短,这样就会导致线程频繁的创建和销毁。这样会严重损耗性能,所以需要通过池化技术对多个线程进行封装,缓存和复用来减少或避免这种无意义的性能损耗。
使用线程池的条件:
- 单个请求处理的时间比较短
- 并发请求数量很大
使用线程池的好处
- 降低创建/销毁线程带来的性能损耗
- 提高任务的响应速度,无需等待线程创建
- 便于管理线程,防止线程过多消耗资源
Java中的线程池
Java中的线程池是通过ThreadPoolExecutor来实现的,先来看看这个类的继承关系:ThreadPoolExecutor -> <A>AbstractExeecutorService -> <I>ExecutorService -> <I>Executor
可以看到继承链一直延伸到Executor
,下面来一层一层看看每个类都定义了一些什么东西:
Executor -> Interface
Executor是一个专门用来执行Runnable
的对象。
提供接口方法:
1 | void execute(Runnable command); |
ExecutorService -> Interface
ExecutorService是一个提供了:
- 终止task
- 通过
Future
跟踪一个或多个task
上述两个功能接口的Executor
ExecutorService可以被关闭,被关闭之后会拒绝接受新的task
AbstractExeecutorService -> Abstract
提供<I>ExecutorService
接口的默认实现。
ThreadPoolExecutor
通过池化的Thread对象来执行task的Executor。
结合父类和接口的描述,可以认为ThreadPoolExecutor是一个:
以Runnable作为task,同时支持多个池化的task的跟踪操作的工具类。
ThreadPoolExecutor
ThreadPoolExecutor是Java并发包中提供的线程池实现类,先来看一看如何使用:
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor( |
结论先行
为了便于理解构造方法中的n多参数,这里先概括地说一下ThreadPoolExecutor的大致工作流程:
- 通过调用
execute()
方法启动一个新的任务 - 线程池会先判断当前运行中的线程数是否已经达到
corePoolSize
,若核心池已满,则会将新任务抛到workQueue
中等待,否则通过threadFactory
启动新线程到核心池中 - 若
workQueue
已满,新任务继续添加会触发非核心线程的创建,当核心线程与非核心线程的总和达到maximumPoolSize
的时候会触发拒绝策略,通过handler
处理拒绝逻辑 - 当非核心线程空闲的时候计时,若空闲时间超过
keepAliveTime
设定的时间时被销毁
ThreadPoolExecutor的5个状态
为了方便管理线程,ThreadPoolExecutor
定义了5中运行状态:
- RUNNING:接受并执行新任务
- SHUTDOWN:不再接受新任务,但已经添加的任务会执行完毕
- STOP:不再接受新任务,同时立刻终止所有未完成的任务
- TIDYING:所有任务全被终止,worker数量为0,开始执行
terminated()
回调 - TERMINATED:
terminated()
执行完毕
先来看看这几个状态在源码中是如何定义的:
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
可以看到变量这些状态都是通过一个叫ctl
(control?)的成员来定义的。
ctl
是一个AtomicInteger
类型的成员,这里顺便提一下AtomicInteger
,其实就是通过volatile
关键字使其value
对所有线程可见。然后可以看到通过ctlOf()
方法对ctl
进行初始化,其实就是把两个参数做了或运算。COUNT_BITS
用来记录当前线程池中的Workers
,每一位记录一个。我们知道在Java中Integer类型是32位的,所以INTEGER.SIZE
方法的取值就是32
,可以知道COUNT_BITS
就代表ctl
成员中有29位用于记录Worker
数量,也就是最多纪录2^29
个Worker
。CAPACITY
其实就是一共可以容纳多少Worker
:0000 0000 0000 0000 0000 0000 0000 0001 -> 1
0010 0000 0000 0000 0000 0000 0000 0000 -> 1 << COUNT_BITS
0001 1111 1111 1111 1111 1111 1111 1111 -> (1 << COUNT_BITS) - 1
可以发现最终得到的结果就是29位1,也就是线程池的最大容量,用
ctl
的低29位来记录RUNNING
用来表示线程池的运行状态。我们知道负数在计算机中是通过补码的形式参与运算的所以-1
在内存中其实是如下表示:1000 0000 0000 0000 0000 0000 0000 0001 -> 原码
1111 1111 1111 1111 1111 1111 1111 1110 -> 反码
1111 1111 1111 1111 1111 1111 1111 1111 -> 补码
1110 0000 0000 0000 0000 0000 0000 0000 -> -1 << COUNT_BITS 补码
这个时候我们可以看出,其实是通过
ctl
的前三位来表示线程池的运行状态,RUNNING
的状态对应111(补码)
**<0**SHUTDOWN
用来表示线程池的正在终止状态,对应000
**>0**STOP
用来表示线程池的立刻终止状态,对应001
**>0**TIDYING
用来表示线程池的整理状态,对应002
**>0**TERMINATED用来表示线程池的已终止状态,对应
003
**>0**
添加任务 -> execute()
我们知道当需要将一个任务抛给线程池执行的时候需要调用execute(Runnable)
方法,下面来看一看这个方法里都做了些什么事情:
1 | public void execute(Runnable command) { |
我们看到这个方法里一共做了两个事情:
- 若当前运行中的线程小于核心池容量,则在核心池中启动新的线程执行任务
- 若核心池已满,则尝试将新任务添加至
workQueue
,若不成功则触发拒绝策略- 先对线程池的运行状态进行检查,若处于
RUNNING
状态则向workQueue
中尝试添加新的任务。若添加失败,则尝试向非核心池添加新的任务 - 在向
workQueue
中添加新任务的过程中,线程池的运行状态又可能会发生变化,所以要进行二次检查。若此次检查未通过,则根据不同情况选择触发拒绝策略,或者向核心池添加新任务
- 先对线程池的运行状态进行检查,若处于
Worker
Worker
对Runnable
的封装,通过workQueue
我们不难猜到Worker
其实就是用来消费任务的工具。
1 | private final class Worker |
通过源码可以看出,Worker
继承自AQS
,并且实现了Runnable
接口 。
有关AQS又是一套庞大的知识体系,这里就不展开讲了。只需要知道AQS
是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的框架。需要了解的话可以参考美团讲解AQS的文章 。
从源码可以看出,Worker的宗旨就是充当不同线程与线程池中间的桥梁,或者可以理解为以继承Runnable
的形式创建了一个线程容器,进而将任务与线程解耦,便于在不同的任务中复用线程。
添加Worker -> addWorker()
在上面的execute(Runnable)
方法中我们注意到,任务是通过addWorker(Runnable, Boolean)
方法添加的,不难猜到Worker
对象就是在这个方法中创建的。
在这个方法中主要做了一些线程池状态相关的检查工作,如果符合条件则向线程池的对应区域添加Worker
并启动。
来看看源码:
1 | // 通过一个HashSet保存所有Worker |
可以看出addWorker()
方法一共只干了三个事情:
- 检查
ctl
状态,判断是否可以添加Worker
- 加锁后再次检查
ctl
状态,若仍然符合添加Worker
的条件则尝试添加,并记录线程池最大压力的容量 - 尝试启动
Worker
的线程,通过对线程传入Worker
并在Worker
的run()
方法中调用ThreadPoolExecutor.runWorker()
来实现在runWorker()
方法中的线程切换
Worker执行任务 -> runWorker()
ThreadPoolExecutor.runWorker(Worker)
方法才是真正针对不同Worker
去执行任务的方法,负责不断从workQueue
取出任务并执行。在执行任务的过程中还需要兼顾线程池整体的运行状态。
下面来看看源码:
1 | final void runWorker(Worker w) { |
关闭线程池 -> tryTerminate()
这个方法意义明确:尝试关闭线程池。
直接来看源码:
1 | final void tryTerminate() { |
总结
上面分析了线程池的大致工作流程,其设计宗旨就是通过一层Worker Runnable
对 Runnable
的包装之后传给线程,在线程中调用类方法拿到Worker Runnable
引用,从而替换掉真正的task Runnable
实现Thread复用。进而避免频繁创建和销毁线程,节省资源开销。