Skip to content

第26课:线程池

🎯 学习目标

  • 理解线程池的概念和优势
  • 掌握 Executor 框架
  • 掌握 ThreadPoolExecutor 参数
  • 理解线程池的拒绝策略
  • 掌握常见线程池类型

📖 一、为什么需要线程池?

不使用线程池的问题

java
// 每次都创建新线程
for (int i = 0; i < 1000; i++) {
    new Thread(() -> {
        // 执行任务
    }).start();
}

// 问题:
// 1. 创建和销毁线程开销大
// 2. 无法控制线程数量
// 3. 可能耗尽系统资源

使用线程池的优势

  • ✅ 降低资源消耗(复用线程)
  • ✅ 提高响应速度(无需创建线程)
  • ✅ 提高线程的可管理性
  • ✅ 提供更多功能(定时、周期执行)

📖 二、Executor 框架

1. 核心接口

Executor (接口)
└── ExecutorService (接口)
    └── ThreadPoolExecutor (实现类)
    └── ScheduledThreadPoolExecutor (实现类)

2. 基本使用

java
import java.util.concurrent.*;

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 提交任务
executor.execute(() -> {
    System.out.println("Task 1");
});

// 提交有返回值的任务
Future<Integer> future = executor.submit(() -> {
    return 42;
});
Integer result = future.get();  // 获取结果(阻塞)

// 关闭线程池
executor.shutdown();

📖 三、常见线程池类型

1. FixedThreadPool(固定大小)

java
ExecutorService executor = Executors.newFixedThreadPool(5);

// 特点:
// - 固定线程数
// - 无界队列
// 适用场景:负载均衡的服务器

2. CachedThreadPool(缓存线程池)

java
ExecutorService executor = Executors.newCachedThreadPool();

// 特点:
// - 线程数无上限
// - 空闲线程60秒后回收
// 适用场景:大量短期异步任务

3. SingleThreadExecutor(单线程)

java
ExecutorService executor = Executors.newSingleThreadExecutor();

// 特点:
// - 只有一个线程
// - 任务按顺序执行
// 适用场景:需要保证顺序的任务

4. ScheduledThreadPool(定时任务)

java
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

// 延迟执行
executor.schedule(() -> {
    System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);

// 周期执行(固定延迟)
executor.scheduleWithFixedDelay(() -> {
    System.out.println("每隔5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);

// 周期执行(固定速率)
executor.scheduleAtFixedRate(() -> {
    System.out.println("每5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);

📖 四、ThreadPoolExecutor 详解

1. 核心参数

java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // corePoolSize:核心线程数
    10,                     // maximumPoolSize:最大线程数
    60,                     // keepAliveTime:空闲线程存活时间
    TimeUnit.SECONDS,       // unit:时间单位
    new LinkedBlockingQueue<>(100),  // workQueue:任务队列
    Executors.defaultThreadFactory(), // threadFactory:线程工厂
    new ThreadPoolExecutor.AbortPolicy()  // handler:拒绝策略
);

2. 执行流程

1. 线程数 < corePoolSize
   → 创建新线程执行任务

2. 线程数 >= corePoolSize
   → 任务放入队列

3. 队列满 && 线程数 < maximumPoolSize
   → 创建新线程执行任务

4. 队列满 && 线程数 >= maximumPoolSize
   → 执行拒绝策略

3. 拒绝策略

java
// 1. AbortPolicy(默认):抛出异常
new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy:调用者线程执行
new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy:丢弃任务,不抛异常
new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy:丢弃队列最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy();

// 5. 自定义策略
new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("任务被拒绝: " + r);
    }
}

📖 五、submit vs execute

java
ExecutorService executor = Executors.newFixedThreadPool(5);

// execute:无返回值,不能抛出异常
executor.execute(() -> {
    System.out.println("execute");
});

// submit:有返回值,可以抛出异常
Future<Integer> future = executor.submit(() -> {
    if (条件) {
        throw new Exception("错误");
    }
    return 42;
});

try {
    Integer result = future.get();  // 获取结果或异常
} catch (ExecutionException e) {
    e.printStackTrace();
}

📖 六、Future 和 Callable

1. Callable

java
// Callable 有返回值,可以抛出异常
Callable<Integer> task = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 42;
    }
};

Future<Integer> future = executor.submit(task);

2. Future

java
Future<Integer> future = executor.submit(() -> {
    Thread.sleep(2000);
    return 42;
});

// 检查是否完成
boolean done = future.isDone();

// 取消任务
boolean cancelled = future.cancel(true);

// 获取结果(阻塞)
Integer result = future.get();

// 获取结果(超时)
Integer result = future.get(1, TimeUnit.SECONDS);

3. CompletableFuture(JDK 8+)

java
// 异步执行
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 42;
});

// 链式调用
future.thenApply(result -> result * 2)
      .thenAccept(result -> System.out.println(result))
      .thenRun(() -> System.out.println("完成"));

// 组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);

CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> a + b);
System.out.println(combined.get());  // 3

📖 七、线程池监控

java
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

// 当前线程数
int poolSize = executor.getPoolSize();

// 活动线程数
int activeCount = executor.getActiveCount();

// 完成任务数
long completedTaskCount = executor.getCompletedTaskCount();

// 总任务数
long taskCount = executor.getTaskCount();

// 队列中的任务数
int queueSize = executor.getQueue().size();

💡 最佳实践

1. 不推荐使用 Executors

java
// ❌ 不推荐:无界队列可能导致 OOM
ExecutorService executor = Executors.newFixedThreadPool(5);

// ✅ 推荐:手动创建,明确队列大小
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),  // 有界队列
    new ThreadPoolExecutor.CallerRunsPolicy()
);

2. 合理设置线程数

java
// CPU 密集型任务:线程数 = CPU核心数 + 1
int cpuCount = Runtime.getRuntime().availableProcessors();
int threadCount = cpuCount + 1;

// IO 密集型任务:线程数 = CPU核心数 * 2
int threadCount = cpuCount * 2;

3. 优雅关闭

java
// 关闭线程池
executor.shutdown();  // 不接受新任务,等待已提交任务完成

// 等待终止
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
    executor.shutdownNow();  // 强制关闭
}

4. 异常处理

java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100)
) {
    @Override
protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            System.err.println("任务执行异常: " + t);
        }
    }
};

⚠️ 常见陷阱

陷阱1:使用无界队列

Executors.newFixedThreadPool 使用无界队列,任务堆积时可能 OOM。

陷阱2:线程池没有命名

默认线程名无法定位业务来源。生产线程池应使用自定义 ThreadFactory。

陷阱3:submit 异常被 Future 包住

submit 的任务异常不会直接打印,只有调用 future.get() 时才看到。

陷阱4:shutdownNow 误以为能杀死线程

shutdownNow 只是中断正在执行的任务。任务如果不响应中断,仍可能继续运行。

陷阱5:把不同类型任务混用一个线程池

CPU 密集、IO 密集、定时任务、阻塞任务混在一起,会互相拖累。


🆚 Java vs C 对比

特性C/pthreadJava 线程池
线程复用手写队列和 workerThreadPoolExecutor
任务提交自定义函数指针/队列Runnable / Callable
拒绝策略手写RejectedExecutionHandler
定时任务timer/事件循环ScheduledThreadPoolExecutor

对 C 程序员来说,ThreadPoolExecutor 是“任务队列 + worker 线程 + 拒绝策略 + 生命周期管理”的标准实现。


📖 八、生产线程池模板

java
ThreadFactory factory = r -> {
    Thread t = new Thread(r);
    t.setName("order-worker-" + t.getId());
    t.setUncaughtExceptionHandler((thread, ex) ->
        System.err.println(thread.getName() + " failed: " + ex.getMessage()));
    return t;
};

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    8,
    16,
    60,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    factory,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

模板要点:

text
明确核心线程数。
明确最大线程数。
使用有界队列。
线程命名。
选择可解释的拒绝策略。
暴露监控指标。

📊 九、线程池监控指标

生产环境至少关注:

text
corePoolSize。
maximumPoolSize。
poolSize。
activeCount。
queueSize。
completedTaskCount。
rejectedCount。
taskCost。

如果出现:

text
activeCount 长期等于 maximumPoolSize。
queueSize 持续上涨。
拒绝任务增加。
任务耗时变长。

说明线程池已经成为瓶颈,应该排查任务耗时、下游依赖、队列容量和线程数配置。


✅ 掌握标准

学完本课后,应能做到:

text
能解释线程池为什么能复用线程。
能说明 ThreadPoolExecutor 七个参数。
能描述任务提交后的执行流程。
能解释四种拒绝策略。
能区分 execute 和 submit。
能正确关闭线程池。
能避免 Executors 无界队列风险。
能根据任务类型拆分线程池。
能设计带命名、队列容量和监控的生产线程池。

线程池不是“把任务丢进去就完了”。它是系统容量控制的一部分,必须有边界、监控和拒绝策略。


📝 练习

完成 练习/Ex26_ThreadPool.java

  1. 创建和使用线程池
  2. ThreadPoolExecutor 参数调优
  3. Future 使用
  4. CompletableFuture
  5. 线程池监控
  6. 综合:并发下载器

🎓 下一步

  • 第27课:并发容器 - ConcurrentHashMap、CopyOnWriteArrayList