线程池
直接 new Thread().start() 在生产环境是个反模式——线程创建/销毁开销大、无法控制并发数、无任务队列、无异常处理、无法复用。线程池 解决了这些问题:复用线程、控制并发、缓冲任务、统一管理。
如果上一章的并发集合是”数据结构层的并发工具”,那线程池就是”执行层的并发工具”——它把”开线程”变成”提交任务”,让你从”线程调度”中解放出来,专注业务逻辑。线程池是 Java 并发工程化的核心,也是面试必考。
一、Executor 框架
JUC 的 Executor 框架是一套分层 API:
Executor (顶层接口,execute(Runnable))
└── ExecutorService (扩展,submit/shutdown/awaitTermination)
└── ThreadPoolExecutor (核心实现)
└── ScheduledThreadPoolExecutor (定时任务)
└── ForkJoinPool (分治)
最常用的接口是 ExecutorService:
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<Integer> f = pool.submit(() -> 1 + 1); // 提交 Callable
pool.execute(() -> System.out.println("hi")); // 提交 Runnable
pool.shutdown(); // 不接受新任务,等已提交的完成
pool.shutdownNow(); // 尝试中断所有任务
但生产环境不要用 Executors 工厂方法——后面会讲为什么。生产代码应该直接 new ThreadPoolExecutor(...)。
二、ThreadPoolExecutor 七大参数
ThreadPoolExecutor 的完整构造函数:
public ThreadPoolExecutor(
int corePoolSize, // 1. 核心线程数
int maximumPoolSize, // 2. 最大线程数
long keepAliveTime, // 3. 空闲存活时间
TimeUnit unit, // 4. 时间单位
BlockingQueue<Runnable> workQueue, // 5. 任务队列
ThreadFactory threadFactory, // 6. 线程工厂
RejectedExecutionHandler handler // 7. 拒绝策略
) { ... }
2.1 七参数详解
1. corePoolSize(核心线程数):
- 即使空闲也不会被回收的线程数(除非设了
allowCoreThreadTimeOut(true))。 - 是线程池的”基本盘”。
2. maximumPoolSize(最大线程数):
- 线程池能创建的最大线程数。
- 当队列满了,才会创建超过 corePoolSize 的线程(直到 max)。
3. keepAliveTime + 4. unit(空闲存活时间):
- 超过 core 的线程空闲多久后回收。
- 默认只对非核心线程生效;
allowCoreThreadTimeOut(true)后核心线程也回收。
5. workQueue(任务队列):
- 存放等待执行的任务。
- 常用:
ArrayBlockingQueue(有界)、LinkedBlockingQueue(默认无界,慎用)、SynchronousQueue(直接交付)、PriorityBlockingQueue(优先级)。
6. threadFactory(线程工厂):
- 创建新线程的工厂,可自定义线程名、是否守护、优先级等。
- 强烈建议自定义——默认的
pool-1-thread-1命名在生产排查时是噩梦。
7. handler(拒绝策略):
- 队列满了且达到最大线程数时,新任务怎么办。
2.2 线程池工作流程(关键!)
提交任务的执行顺序:
1. 当前线程数 < corePoolSize?→ 创建核心线程执行任务
2. 否则,队列没满?→ 入队等待
3. 否则,线程数 < maximumPoolSize?→ 创建非核心线程执行
4. 否则,执行拒绝策略
注意顺序:先核心线程,再队列,最后非核心线程——很多人误以为”先填满队列再扩容”,错了,是先填核心,再填队列,队列满才扩容到 max。
举例:core=2,max=4,queue=10。提交 13 个任务:
- 任务 1-2:创建核心线程,立即执行。
- 任务 3-12:入队等待(10 个)。
- 任务 13:队列满,扩容到非核心线程(线程 3)执行。
- 任务 14:扩容到线程 4(达 max)。
- 任务 15+:拒绝。
三、四种拒绝策略
当队列满且达到 max 线程数时,触发拒绝策略:
| 策略 | 行为 | 适用 |
|---|---|---|
AbortPolicy(默认) | 抛 RejectedExecutionException | 默认,发现问题立即暴露 |
CallerRunsPolicy | 由提交任务的线程执行该任务 | 不丢任务,反压生产者 |
DiscardPolicy | 静默丢弃新任务 | 容忍丢任务(如日志) |
DiscardOldestPolicy | 丢弃队列最老的任务,重试提交 | 只关心最新任务 |
推荐:
AbortPolicy——默认,发现问题早,配合监控告警。CallerRunsPolicy——常见于”不能丢任务”场景,让调用者自己跑——天然反压(生产者被阻塞,不再提交)。DiscardPolicy/DiscardOldestPolicy要慎用——任务被悄悄丢,问题难发现。
也可以实现 RejectedExecutionHandler 自定义策略——如持久化到磁盘后续处理。
四、Executors 工厂方法的陷阱
Executors 提供几个工厂方法,但生产环境不推荐:
| 方法 | 问题 |
|---|---|
newFixedThreadPool(n) | 用 LinkedBlockingQueue(默认无界),队列堆积 OOM |
newSingleThreadExecutor() | 同上,无界队列 OOM |
newCachedThreadPool() | maxPoolSize = Integer.MAX_VALUE,无限创建线程 OOM |
newScheduledThreadPool(n) | 用 DelayedWorkQueue(无界),任务堆积 OOM |
阿里 Java 开发手册明令禁止用 Executors 创建线程池,必须用 new ThreadPoolExecutor——原因就是上面这些”无界”风险。
正确做法:
ThreadPoolExecutor pool = new ThreadPoolExecutor(
4, // core
8, // max
60L, TimeUnit.SECONDS, // keepAlive
new ArrayBlockingQueue<>(100), // 有界队列!
new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(), // 命名
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
ThreadFactoryBuilder来自 Guava。没有 Guava 时可手写ThreadFactory:
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
public Thread newThread(Runnable r) {
return new Thread(r, "biz-pool-" + counter.getAndIncrement());
}
};
五、合理配置线程池大小
线程池大小不是拍脑袋——要根据任务类型算。
5.1 CPU 密集型
任务主要是计算——CPU 一直在跑。
- 线程数 = N + 1(N = CPU 核数)
- 多一个线程是为了在偶尔的页面错误/系统调用时不浪费 CPU。
- 太多线程反而上下文切换开销大。
5.2 IO 密集型
任务主要是等 IO(网络、磁盘、数据库)——CPU 大量空闲。
- 线程数 = 2N 或 N × (1 + 等待时间/计算时间)
- 等待时间越长,可设越多线程——CPU 在等待时切换到别的线程。
5.3 混合型
既有计算又有等待——拆成两个线程池分别配置。
int cpuN = Runtime.getRuntime().availableProcessors();
// CPU 密集型
ThreadPoolExecutor cpuPool = new ThreadPoolExecutor(
cpuN + 1, cpuN + 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000));
// IO 密集型
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
2 * cpuN, 2 * cpuN, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000));
5.4 实践建议
- 核心线程数 = 平时负载——避免频繁创建销毁。
- 最大线程数 = 峰值负载——应对突发。
- 队列大小 = 能容忍的延迟——队列越大延迟越高。
- 监控——
getActiveCount/getQueue().size()/getCompletedTaskCount()。
六、ForkJoinPool 与分治
ForkJoinPool 是分治任务专用线程池——核心是工作窃取(Work-Stealing)。
6.1 工作窃取
每个线程有自己的双端队列(Deque)存放任务。线程从自己队列的头部取任务(LIFO);当自己队列空了,从别的线程队列的尾部偷任务(FIFO)。
线程 A 的队列: [T1, T2, T3] → 从头取 T1
线程 B 的队列: [] → 偷 A 的 T3(从尾偷)
好处:减少线程间争用(每个线程主要操作自己的队列),提高 CPU 利用率。
6.2 ForkJoinTask
ForkJoinTask 是分治任务的抽象——RecursiveTask(有返回值)/RecursiveAction(无返回值)。
经典例子——并行求和:
class SumTask extends RecursiveTask<Long> {
private final long[] arr;
private final int start, end;
private static final int THRESHOLD = 10000;
SumTask(long[] arr, int start, int end) {
this.arr = arr; this.start = start; this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += arr[i];
return sum;
}
int mid = (start + end) >>> 1;
SumTask left = new SumTask(arr, start, mid);
SumTask right = new SumTask(arr, mid, end);
left.fork(); // 异步执行左半
long rightResult = right.compute(); // 同步执行右半
long leftResult = left.join(); // 等左半结果
return leftResult + rightResult;
}
}
ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new SumTask(arr, 0, arr.length));
6.3 parallelStream
parallelStream 底层就是 ForkJoinPool.commonPool()——默认并行度 = CPU 核数 - 1。
long sum = Arrays.stream(arr).parallel().sum();
注意:
parallelStream适合”数据量大 + 计算重”——数据少时拆分开销超过收益。- 不要在 parallelStream 里做阻塞 IO——会饿死公共池。
- 共享池是 JVM 全局的——一个任务慢拖累所有 parallelStream 用户。
七、实战:自定义线程池 + 监控
下面这个例子展示完整的线程池定义、工作流程验证、四种拒绝策略对比、ForkJoinPool 分治求和。
观察重点:
- 工作流程:core=2 时,任务 1、2 创建核心线程;任务 3、4 入队;任务 5、6 触发扩容到 max=4;任务 7 触发 AbortPolicy。
- CallerRunsPolicy:让提交线程(main)自己执行任务——反压生产者,无任务丢失。
- DiscardPolicy:任务被悄悄丢弃。
- DiscardOldestPolicy:丢队列最老的任务,新任务入队。
- ForkJoinPool:分治求和可能比串行快(核数多时加速明显),但小数据量反而慢——拆分开销。
- parallelStream:底层用
ForkJoinPool.commonPool(),并行度默认 CPU 核数 - 1。
八、线程池的优雅关闭
shutdown() 和 shutdownNow() 的区别:
| 方法 | 行为 |
|---|---|
shutdown() | 不接受新任务,已提交的(队列里的+在跑的)继续完成 |
shutdownNow() | 尝试中断正在跑的任务,返回未执行的队列任务 |
优雅关闭模式:
pool.shutdown(); // 不接新任务
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
List<Runnable> dropped = pool.shutdownNow(); // 超时强制关
log.warn("线程池强制关闭,丢弃 {} 个任务", dropped.size());
}
九、本章小结
| 概念 | 核心要点 |
|---|---|
| 七参数 | core/max/keepAlive/unit/queue/factory/handler |
| 工作流程 | core → queue → max → 拒绝 |
| AbortPolicy | 默认,抛异常 |
| CallerRunsPolicy | 调用者执行,反压 |
| DiscardPolicy | 丢新任务 |
| DiscardOldestPolicy | 丢最老任务 |
| Executors 陷阱 | 无界队列/无限线程,OOM 风险,禁止使用 |
| CPU 密集型 | N+1 |
| IO 密集型 | 2N 或 N×(1+等待/计算) |
| ForkJoinPool | 工作窃取,分治专用 |
| parallelStream | 底层用 ForkJoinPool.commonPool |
记忆口诀:
- 七参数别忘:core、max、keepAlive、unit、queue、factory、handler。
- 流程顺序:核心→队列→最大→拒绝(不是”队列先满才扩容”)。
- Executors 别用——无界队列 OOM、无限线程 OOM,用
new ThreadPoolExecutor。 - CPU 密集 N+1,IO 密集 2N——根据任务类型算线程数。
- CallerRunsPolicy 反压——不丢任务,让生产者慢下来。
- ForkJoinPool 适合分治——递归拆分 + 工作窃取。
- parallelStream 别阻塞 IO——会饿死公共池。
结语:线程池是工程化的开始
线程池是把”开线程”变成”提交任务”的关键——它让你专注业务,把线程生命周期交给框架。掌握 ThreadPoolExecutor 七参数、工作流程、拒绝策略、合理配置大小,是写出生产级并发代码的入门门槛。
但 ExecutorService.submit() 返回的 Future.get() 是阻塞的——你必须等结果出来才能继续,无法”完成后回调”。这在异步编程场景下很笨重。下一章讲 CompletableFuture——Java 的”Promise”,让异步任务可以链式编排、组合、异常处理——这是现代 Java 异步编程的核心。