线程池

直接 new Thread().start() 在生产环境是个反模式——线程创建/销毁开销大、无法控制并发数、无任务队列、无异常处理、无法复用。线程池 解决了这些问题:复用线程、控制并发、缓冲任务、统一管理。

如果上一章的并发集合是”数据结构层的并发工具”,那线程池就是”执行层的并发工具”——它把”开线程”变成”提交任务”,让你从”线程调度”中解放出来,专注业务逻辑。线程池是 Java 并发工程化的核心,也是面试必考。

一、Executor 框架

JUC 的 Executor 框架是一套分层 API:

Executor (顶层接口,execute(Runnable))
  └── ExecutorService (扩展,submit/shutdown/awaitTermination)
        └── ThreadPoolExecutor (核心实现)
        └── ScheduledThreadPoolExecutor (定时任务)
        └── ForkJoinPool (分治)

最常用的接口是 ExecutorService

ExecutorService pool = Executors.newFixedThreadPool(4);
Future<Integer> f = pool.submit(() -> 1 + 1);   // 提交 Callable
pool.execute(() -> System.out.println("hi"));   // 提交 Runnable
pool.shutdown();   // 不接受新任务,等已提交的完成
pool.shutdownNow(); // 尝试中断所有任务

生产环境不要用 Executors 工厂方法——后面会讲为什么。生产代码应该直接 new ThreadPoolExecutor(...)

二、ThreadPoolExecutor 七大参数

ThreadPoolExecutor 的完整构造函数:

public ThreadPoolExecutor(
    int corePoolSize,                      // 1. 核心线程数
    int maximumPoolSize,                   // 2. 最大线程数
    long keepAliveTime,                    // 3. 空闲存活时间
    TimeUnit unit,                         // 4. 时间单位
    BlockingQueue<Runnable> workQueue,     // 5. 任务队列
    ThreadFactory threadFactory,           // 6. 线程工厂
    RejectedExecutionHandler handler       // 7. 拒绝策略
) { ... }

2.1 七参数详解

1. corePoolSize(核心线程数)

  • 即使空闲也不会被回收的线程数(除非设了 allowCoreThreadTimeOut(true))。
  • 是线程池的”基本盘”。

2. maximumPoolSize(最大线程数)

  • 线程池能创建的最大线程数。
  • 当队列满了,才会创建超过 corePoolSize 的线程(直到 max)。

3. keepAliveTime + 4. unit(空闲存活时间)

  • 超过 core 的线程空闲多久后回收。
  • 默认只对非核心线程生效;allowCoreThreadTimeOut(true) 后核心线程也回收。

5. workQueue(任务队列)

  • 存放等待执行的任务。
  • 常用:ArrayBlockingQueue(有界)、LinkedBlockingQueue(默认无界,慎用)、SynchronousQueue(直接交付)、PriorityBlockingQueue(优先级)。

6. threadFactory(线程工厂)

  • 创建新线程的工厂,可自定义线程名、是否守护、优先级等。
  • 强烈建议自定义——默认的 pool-1-thread-1 命名在生产排查时是噩梦。

7. handler(拒绝策略)

  • 队列满了且达到最大线程数时,新任务怎么办。

2.2 线程池工作流程(关键!)

提交任务的执行顺序:

1. 当前线程数 < corePoolSize?→ 创建核心线程执行任务
2. 否则,队列没满?→ 入队等待
3. 否则,线程数 < maximumPoolSize?→ 创建非核心线程执行
4. 否则,执行拒绝策略

注意顺序:先核心线程,再队列,最后非核心线程——很多人误以为”先填满队列再扩容”,错了,是先填核心,再填队列,队列满才扩容到 max。

举例:core=2,max=4,queue=10。提交 13 个任务:

  • 任务 1-2:创建核心线程,立即执行。
  • 任务 3-12:入队等待(10 个)。
  • 任务 13:队列满,扩容到非核心线程(线程 3)执行。
  • 任务 14:扩容到线程 4(达 max)。
  • 任务 15+:拒绝。

三、四种拒绝策略

当队列满且达到 max 线程数时,触发拒绝策略:

策略行为适用
AbortPolicy(默认)RejectedExecutionException默认,发现问题立即暴露
CallerRunsPolicy由提交任务的线程执行该任务不丢任务,反压生产者
DiscardPolicy静默丢弃新任务容忍丢任务(如日志)
DiscardOldestPolicy丢弃队列最老的任务,重试提交只关心最新任务

推荐

  • AbortPolicy——默认,发现问题早,配合监控告警。
  • CallerRunsPolicy——常见于”不能丢任务”场景,让调用者自己跑——天然反压(生产者被阻塞,不再提交)。
  • DiscardPolicy/DiscardOldestPolicy 要慎用——任务被悄悄丢,问题难发现。

也可以实现 RejectedExecutionHandler 自定义策略——如持久化到磁盘后续处理。

四、Executors 工厂方法的陷阱

Executors 提供几个工厂方法,但生产环境不推荐

方法问题
newFixedThreadPool(n)LinkedBlockingQueue(默认无界),队列堆积 OOM
newSingleThreadExecutor()同上,无界队列 OOM
newCachedThreadPool()maxPoolSize = Integer.MAX_VALUE,无限创建线程 OOM
newScheduledThreadPool(n)DelayedWorkQueue(无界),任务堆积 OOM

阿里 Java 开发手册明令禁止用 Executors 创建线程池,必须用 new ThreadPoolExecutor——原因就是上面这些”无界”风险。

正确做法

ThreadPoolExecutor pool = new ThreadPoolExecutor(
    4,                                    // core
    8,                                    // max
    60L, TimeUnit.SECONDS,                // keepAlive
    new ArrayBlockingQueue<>(100),        // 有界队列!
    new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(),  // 命名
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

ThreadFactoryBuilder 来自 Guava。没有 Guava 时可手写 ThreadFactory

ThreadFactory factory = new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger();
    public Thread newThread(Runnable r) {
        return new Thread(r, "biz-pool-" + counter.getAndIncrement());
    }
};

五、合理配置线程池大小

线程池大小不是拍脑袋——要根据任务类型算。

5.1 CPU 密集型

任务主要是计算——CPU 一直在跑。

  • 线程数 = N + 1(N = CPU 核数)
  • 多一个线程是为了在偶尔的页面错误/系统调用时不浪费 CPU。
  • 太多线程反而上下文切换开销大。

5.2 IO 密集型

任务主要是等 IO(网络、磁盘、数据库)——CPU 大量空闲。

  • 线程数 = 2NN × (1 + 等待时间/计算时间)
  • 等待时间越长,可设越多线程——CPU 在等待时切换到别的线程。

5.3 混合型

既有计算又有等待——拆成两个线程池分别配置。

int cpuN = Runtime.getRuntime().availableProcessors();
// CPU 密集型
ThreadPoolExecutor cpuPool = new ThreadPoolExecutor(
    cpuN + 1, cpuN + 1, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000));
// IO 密集型
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
    2 * cpuN, 2 * cpuN, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000));

5.4 实践建议

  • 核心线程数 = 平时负载——避免频繁创建销毁。
  • 最大线程数 = 峰值负载——应对突发。
  • 队列大小 = 能容忍的延迟——队列越大延迟越高。
  • 监控——getActiveCount/getQueue().size()/getCompletedTaskCount()

六、ForkJoinPool 与分治

ForkJoinPool 是分治任务专用线程池——核心是工作窃取(Work-Stealing)

6.1 工作窃取

每个线程有自己的双端队列(Deque)存放任务。线程从自己队列的头部取任务(LIFO);当自己队列空了,从别的线程队列的尾部偷任务(FIFO)。

线程 A 的队列: [T1, T2, T3] → 从头取 T1
线程 B 的队列: [] → 偷 A 的 T3(从尾偷)

好处:减少线程间争用(每个线程主要操作自己的队列),提高 CPU 利用率。

6.2 ForkJoinTask

ForkJoinTask 是分治任务的抽象——RecursiveTask(有返回值)/RecursiveAction(无返回值)。

经典例子——并行求和:

class SumTask extends RecursiveTask<Long> {
    private final long[] arr;
    private final int start, end;
    private static final int THRESHOLD = 10000;

    SumTask(long[] arr, int start, int end) {
        this.arr = arr; this.start = start; this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) sum += arr[i];
            return sum;
        }
        int mid = (start + end) >>> 1;
        SumTask left = new SumTask(arr, start, mid);
        SumTask right = new SumTask(arr, mid, end);
        left.fork();          // 异步执行左半
        long rightResult = right.compute();   // 同步执行右半
        long leftResult = left.join();        // 等左半结果
        return leftResult + rightResult;
    }
}

ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new SumTask(arr, 0, arr.length));

6.3 parallelStream

parallelStream 底层就是 ForkJoinPool.commonPool()——默认并行度 = CPU 核数 - 1。

long sum = Arrays.stream(arr).parallel().sum();

注意

  • parallelStream 适合”数据量大 + 计算重”——数据少时拆分开销超过收益。
  • 不要在 parallelStream 里做阻塞 IO——会饿死公共池。
  • 共享池是 JVM 全局的——一个任务慢拖累所有 parallelStream 用户。

七、实战:自定义线程池 + 监控

下面这个例子展示完整的线程池定义、工作流程验证、四种拒绝策略对比、ForkJoinPool 分治求和。

Java · 在线运行

观察重点

  • 工作流程:core=2 时,任务 1、2 创建核心线程;任务 3、4 入队;任务 5、6 触发扩容到 max=4;任务 7 触发 AbortPolicy。
  • CallerRunsPolicy:让提交线程(main)自己执行任务——反压生产者,无任务丢失。
  • DiscardPolicy:任务被悄悄丢弃。
  • DiscardOldestPolicy:丢队列最老的任务,新任务入队。
  • ForkJoinPool:分治求和可能比串行快(核数多时加速明显),但小数据量反而慢——拆分开销。
  • parallelStream:底层用 ForkJoinPool.commonPool(),并行度默认 CPU 核数 - 1。

八、线程池的优雅关闭

shutdown()shutdownNow() 的区别:

方法行为
shutdown()不接受新任务,已提交的(队列里的+在跑的)继续完成
shutdownNow()尝试中断正在跑的任务,返回未执行的队列任务

优雅关闭模式

pool.shutdown();                          // 不接新任务
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    List<Runnable> dropped = pool.shutdownNow();   // 超时强制关
    log.warn("线程池强制关闭,丢弃 {} 个任务", dropped.size());
}

九、本章小结

概念核心要点
七参数core/max/keepAlive/unit/queue/factory/handler
工作流程core → queue → max → 拒绝
AbortPolicy默认,抛异常
CallerRunsPolicy调用者执行,反压
DiscardPolicy丢新任务
DiscardOldestPolicy丢最老任务
Executors 陷阱无界队列/无限线程,OOM 风险,禁止使用
CPU 密集型N+1
IO 密集型2N 或 N×(1+等待/计算)
ForkJoinPool工作窃取,分治专用
parallelStream底层用 ForkJoinPool.commonPool

记忆口诀

  • 七参数别忘:core、max、keepAlive、unit、queue、factory、handler。
  • 流程顺序:核心→队列→最大→拒绝(不是”队列先满才扩容”)。
  • Executors 别用——无界队列 OOM、无限线程 OOM,用 new ThreadPoolExecutor
  • CPU 密集 N+1,IO 密集 2N——根据任务类型算线程数。
  • CallerRunsPolicy 反压——不丢任务,让生产者慢下来。
  • ForkJoinPool 适合分治——递归拆分 + 工作窃取。
  • parallelStream 别阻塞 IO——会饿死公共池。

结语:线程池是工程化的开始

线程池是把”开线程”变成”提交任务”的关键——它让你专注业务,把线程生命周期交给框架。掌握 ThreadPoolExecutor 七参数、工作流程、拒绝策略、合理配置大小,是写出生产级并发代码的入门门槛。

ExecutorService.submit() 返回的 Future.get()阻塞的——你必须等结果出来才能继续,无法”完成后回调”。这在异步编程场景下很笨重。下一章讲 CompletableFuture——Java 的”Promise”,让异步任务可以链式编排、组合、异常处理——这是现代 Java 异步编程的核心。