JUC - 线程池
# JUC - 线程池
# 1 什么是线程池
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的 ThreadPoolExecutor 类。
当然,使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
# 2 线程池解决了什么问题
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 复用已创建的线程:频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 控制并发的数量:对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 对线程统一管理:系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia
“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
- 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
- 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
- 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。
在了解完“是什么”和“为什么”之后,下面我们来一起深入一下线程池的内部实现原理。
# 3 线程池总体设计和实现
# 3.1 总体设计
# 3.1.1 相关类
- Executor:提供了一种思想,将任务提交和任务执行解耦。用户只要提供 Runnable 任务对象,即将任务运行逻辑提交给 Executor 执行器,由 Executor 执行器完成线程的调度和任务的执行
- ExecutorService:扩展了接口 Executor 的功能,(1)提供管理线程池的方法,如提交、关闭等;(2)可以为一个或多个异步任务生成 Feature 方法
- AbstractExecutorService:上层接口的抽象类,实现任务执行过程中通用逻辑,使下层只关注与任务的具体执行
- ThreadPoolExecutor:线程池核心实现类,维护线程的生命状态以及执行并行任务i
- ScheduledExecutorService:扩展了接口 ExecutorService 功能,提供定时执行任务接口
- ScheduledThreadPoolExecutor:继承了 ThreadPoolExecutor,并实现定时执行任务
- Executors: 线程池的构造工厂类
runState 状态,保存在前3位;(2)workerCount 当前有效线程数量,保存在后29位;在实际中通常需要同时获取状态和线程数量进行决策,将它们保存在一个变量中可以无需担心一致性问题。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
类中提供了快速获取 runState 和 workerCount 值的方法,使用位运算符提高效率:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
ThreadPoolExecutor 的几种状态:
- RUNNING:可以接收新的任务,也可以接收已经在阻塞队列中的任务(存在空闲核心线程)
- SHUTDOWN:不接收新的任务,但可以接收已经在阻塞队列中的任务(不存在空闲核心线程)
- STOP:不接收新的任务,也不接收已经在阻塞队列中的任务,可以中断正在执行的任务
- TIDYING:所有任务都已被终止,workerCount 为0,即将执行 terminated() 方法
- TERMINATED:已经执行完成 terminated() 方法
ThreadPoolExecutor 生命周期中 runState 几种状态的流转过程:
# 3.3 任务执行机制
# 3.3.1 任务调度
ThreadPoolExecutor 类中初始化线程池的几个配置参数:
corePoolSize:核心线程数,保持活跃状态的线程数(未设置allowCoreThreadTimeOut),最小值为0
maximumPoolSize:最大线程数,包括核心线程数和非核心线程数
keepAliveTime:允许线程空闲的时间,若超过该时间线程将被销毁
allowCoreThreadTimeOut:允许核心线程在超过空闲时间后销毁
ThreadPoolExecutor 核心逻辑 execute() 方法,该方法完成的工作是:检查当前线程池的运行状态、运行线程数、运行策略,以决定是直接申请线程执行,还是添加到阻塞队列中,还是直接拒绝任务。其执行过程如下:
- 检查线程池运行状态,仅当 RUNNING 状态时才执行逻辑,否则直接拒绝
- 当 workCount 小于 corePoolSize,则创建并启动新线程来执行第一个任务
- 当 workCount 大于等于 corePoolSize,且阻塞队列未满,则将任务添加到阻塞队列中
- 当 workCount 大于等于 corePoolSize,且阻塞队列已满,且 workCount 小于 maximumPoolSize,则创建并启动新线程并执行该任务
- workCount 大于等于 maximumPoolSize,则采用拒绝策略执行该任务,默认的拒绝策略是抛出异常
- 当 workCount 大于 corePoolSize,在线程等待 keepAliveTime 时间后依然没有可执行的任务,则线程数量将收缩到核心线程数量
根据实际业务场景,在初始化线程池是可以通过配置改变线程池默认的执行过程,如:
- 声明线程池后,立即调用 preStartAllCoreThreads() 方法,直接创建所有核心线程
- allowCoreThreadTimeOut 设置为 true,使核心线程在经过 keepAliveTime 时间后也能被销毁
# 3.3.1 任务缓冲
线程池的本质是对线程和任务的管理,通过阻塞队列缓存任务实现二者解耦。一个线程往阻塞队列中添加任务,一个线程从阻塞队列中取出任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
不同的队列可以实现不同的任务存储策略:
# 3.3.2 任务申请
任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
帮助线程从阻塞队列中获取任务,这部分逻辑由 getTask() 方法实现,执行流程如下:
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。
# 3.3.3 任务拒绝
任务拒绝是线程池的保护机制,当阻塞队列已满,且当前线程数大于等于最大线程数时,表示线程池已处于满负荷状态。此时就会触发任务拒绝策略,保护线程池。可以通过实现拒绝策略接口自定义拒绝策略,也可以使用自带的几种拒绝策略:
# 3.4 工作线程 Worker(复用)
# 3.4.1 Worker 线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看一下它的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
2
3
4
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
# 3.4.2 Worker 线程增加
增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:
# 3.4.3 Workder 线程回收
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
2
3
4
5
6
7
线程回收的工作是在processWorkerExit方法完成的。
事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
# 3.4.4 Workder 线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
1.while循环不断地通过getTask()方法获取任务。 2.getTask()方法从阻塞队列中取任务。 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。 4.执行任务。 5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
执行流程如下图所示:
# 4 如何创建线程池
# 4.1 通过 Executors 创建
Executors 类中提供的几个静态方法来创建线程池。
# newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
1
2
3
4
5CacheThreadPool
的运行流程如下:- 提交任务进线程池。
- 因为 corePoolSize 为 0 的关系,不创建核心线程,线程池最大为 Integer.MAX_VALUE。
- 尝试将任务添加到 SynchronousQueue 队列。
- 如果 SynchronousQueue 入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从 SynchronousQueue 拉取任务并在当前线程执行。
- 如果 SynchronousQueue 已有任务在等待,入列操作将会阻塞。
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。
# newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
1
2
3
4
5核心线程数量和总线程数量相等,都是传入的参数 nThreads,所以只能创建核心线程,不能创建非核心线程。因为 LinkedBlockingQueue 的默认大小是 Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
与CachedThreadPool的区别:
- 因为 corePoolSize == maximumPoolSize ,所以 FixedThreadPool 只会创建核心线程。 而CachedThreadPool 因为 corePoolSize=0,所以只会创建非核心线程。
- 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool 会在 60s 后收回。
- 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool 占用资源更多。
- 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool 是因为阻塞队列可以很大(最大为 Integer 最大值),故几乎不会触发拒绝策略;CachedThreadPool 是因为线程池很大(最大为 Integer 最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
1
2
3
4
5
6有且仅有一个核心线程(corePoolSize == maximumPoolSize=1),使用了 LinkedBlockingQueue(容量很大),所以不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
# newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
1
2
3
4
5
6
7
8
9
10四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用Executors类中的线程池,而是通过
ThreadPoolExecutor
的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到 Integer.MAX_VALUE)时,其实是可以使用 JDK 提供的这几个接口的,它能让我们的代码具有更强的可读性。
# 4.2 通过 new ThreadPoolExecutor 创建
示例:
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
2
3
4
5
6
7
8
自定义线程设置 corePoolSize=2,maximumPoolSize=5,workQueue 大小为3,最多线程池有8个线程,再来线程便会启动拒绝策略,拒绝策略 DiscardOldestPolicy,所以会抛弃队列中等待最久的任务
# 5 线程池可以应用于哪些业务场景
为了最大程度利用CPU的多核性能,并行运算的能力是不可或缺的。
场景1:快速响应用户请求
描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。
分析:从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
场景2:快速处理批量任务
描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。
分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
# 6 常见的业务问题
线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考
场景1:使用 Exectors 中 newFixedThreadPool方法 或 newCachedThreadPool 方法 创建线程池
其中 newFixedThreadPool方法源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2
3
4
5
可见其使用了 LinkedBlockingQueue 阻塞队列,该队列的最大长度为 Integer.MAX_VALUE,可以认为是无界的。虽然最大线程数是固定的,但当某个时间段需执行大量任务,且任务的执行时间较长,那么阻塞队列就会出现大量堆积任务最后可能撑爆内存造成 OOM;
而 newCachedThreadPool 方法源码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2
3
4
5
可见使用了 SynchronousQueue 阻塞队列,该阻塞队列不存储元素,先选择空闲线程处理任务,如果没有则创建新的线程处理任务,且最大线程数量为 Integer.MAX_VALUE,当某段时间需要执行大量任务是,就会创建大量的线程,而每个线程都需要暂用内存,最后可能造成 OOM;
合理建议:
不建议使用 Executors 类来创建线程池,应该使用 new ThreadPoolExecutor() 的方式,根据实际业务需求自定义核心线程数、最大线程数、阻塞队列类型、拒绝策略来创建线程池。通常定义有限的线程数以及有界阻塞队列。
定义合适的线程池名称,在出现线程数量暴增、线程死锁、线程占用大量 CPU 资源等情况下,可以通过线程名快速抓取线程栈来分析问题。
使用适当的监控手段观察线程池状态,以便快速发现问题,如线程数量激增或大量任务阻塞等情况,如打印线程池当前状态日志或实现UI界面管理并提供报警机制。例如:
private void printStats(ThreadPoolExecutor threadPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("========================="); log.info("Pool Size: {}", threadPool.getPoolSize()); log.info("Active Threads: {}", threadPool.getActiveCount()); log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount()); log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size()); log.info("========================="); }, 0, 1, TimeUnit.SECONDS); }
1
2
3
4
5
6
7
8
9
10
11
12确保线程池是复用的,每次都 new 一个线程池可能比不使用线程池还糟糕。在使用他人创建线程池的方法时要先阅读源码后再根据情况创建线程池。
根据业务情况复用线程池,对于执行比较慢、数量不大的IO任务考虑更多的是线程数量;对于吞吐量比较多的计算型任务,线程处理不宜过多(一般考虑CPU核数或两倍),通过大的阻塞队列来缓冲。
场景2:XX页面展示接口产生大量调用降级,数量级在几十到上百
该服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件,示意图如下:
场景3:XX业务提供的服务执行时间过长,作为上游服务整体超时,大量下游服务调用失败
该服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。示意图如下:
业务中要使用线程池,而使用不当又会导致故障,那么我们怎样才能更好地使用线程池呢?
# 6 动态化线程池
美团开源:基于配置中心的轻量级动态线程池,内置监控告警功能,可通过SPI自定义扩展实现 (opens new window)