Appearance
第26课:线程池
🎯 学习目标
- 理解线程池的概念和优势
- 掌握 Executor 框架
- 掌握 ThreadPoolExecutor 参数
- 理解线程池的拒绝策略
- 掌握常见线程池类型
📖 一、为什么需要线程池?
不使用线程池的问题
java
// 每次都创建新线程
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 执行任务
}).start();
}
// 问题:
// 1. 创建和销毁线程开销大
// 2. 无法控制线程数量
// 3. 可能耗尽系统资源使用线程池的优势
- ✅ 降低资源消耗(复用线程)
- ✅ 提高响应速度(无需创建线程)
- ✅ 提高线程的可管理性
- ✅ 提供更多功能(定时、周期执行)
📖 二、Executor 框架
1. 核心接口
Executor (接口)
└── ExecutorService (接口)
└── ThreadPoolExecutor (实现类)
└── ScheduledThreadPoolExecutor (实现类)2. 基本使用
java
import java.util.concurrent.*;
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
executor.execute(() -> {
System.out.println("Task 1");
});
// 提交有返回值的任务
Future<Integer> future = executor.submit(() -> {
return 42;
});
Integer result = future.get(); // 获取结果(阻塞)
// 关闭线程池
executor.shutdown();📖 三、常见线程池类型
1. FixedThreadPool(固定大小)
java
ExecutorService executor = Executors.newFixedThreadPool(5);
// 特点:
// - 固定线程数
// - 无界队列
// 适用场景:负载均衡的服务器2. CachedThreadPool(缓存线程池)
java
ExecutorService executor = Executors.newCachedThreadPool();
// 特点:
// - 线程数无上限
// - 空闲线程60秒后回收
// 适用场景:大量短期异步任务3. SingleThreadExecutor(单线程)
java
ExecutorService executor = Executors.newSingleThreadExecutor();
// 特点:
// - 只有一个线程
// - 任务按顺序执行
// 适用场景:需要保证顺序的任务4. ScheduledThreadPool(定时任务)
java
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
// 延迟执行
executor.schedule(() -> {
System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);
// 周期执行(固定延迟)
executor.scheduleWithFixedDelay(() -> {
System.out.println("每隔5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);
// 周期执行(固定速率)
executor.scheduleAtFixedRate(() -> {
System.out.println("每5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);📖 四、ThreadPoolExecutor 详解
1. 核心参数
java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize:核心线程数
10, // maximumPoolSize:最大线程数
60, // keepAliveTime:空闲线程存活时间
TimeUnit.SECONDS, // unit:时间单位
new LinkedBlockingQueue<>(100), // workQueue:任务队列
Executors.defaultThreadFactory(), // threadFactory:线程工厂
new ThreadPoolExecutor.AbortPolicy() // handler:拒绝策略
);2. 执行流程
1. 线程数 < corePoolSize
→ 创建新线程执行任务
2. 线程数 >= corePoolSize
→ 任务放入队列
3. 队列满 && 线程数 < maximumPoolSize
→ 创建新线程执行任务
4. 队列满 && 线程数 >= maximumPoolSize
→ 执行拒绝策略3. 拒绝策略
java
// 1. AbortPolicy(默认):抛出异常
new ThreadPoolExecutor.AbortPolicy();
// 2. CallerRunsPolicy:调用者线程执行
new ThreadPoolExecutor.CallerRunsPolicy();
// 3. DiscardPolicy:丢弃任务,不抛异常
new ThreadPoolExecutor.DiscardPolicy();
// 4. DiscardOldestPolicy:丢弃队列最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy();
// 5. 自定义策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝: " + r);
}
}📖 五、submit vs execute
java
ExecutorService executor = Executors.newFixedThreadPool(5);
// execute:无返回值,不能抛出异常
executor.execute(() -> {
System.out.println("execute");
});
// submit:有返回值,可以抛出异常
Future<Integer> future = executor.submit(() -> {
if (条件) {
throw new Exception("错误");
}
return 42;
});
try {
Integer result = future.get(); // 获取结果或异常
} catch (ExecutionException e) {
e.printStackTrace();
}📖 六、Future 和 Callable
1. Callable
java
// Callable 有返回值,可以抛出异常
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 42;
}
};
Future<Integer> future = executor.submit(task);2. Future
java
Future<Integer> future = executor.submit(() -> {
Thread.sleep(2000);
return 42;
});
// 检查是否完成
boolean done = future.isDone();
// 取消任务
boolean cancelled = future.cancel(true);
// 获取结果(阻塞)
Integer result = future.get();
// 获取结果(超时)
Integer result = future.get(1, TimeUnit.SECONDS);3. CompletableFuture(JDK 8+)
java
// 异步执行
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 42;
});
// 链式调用
future.thenApply(result -> result * 2)
.thenAccept(result -> System.out.println(result))
.thenRun(() -> System.out.println("完成"));
// 组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> a + b);
System.out.println(combined.get()); // 3📖 七、线程池监控
java
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 当前线程数
int poolSize = executor.getPoolSize();
// 活动线程数
int activeCount = executor.getActiveCount();
// 完成任务数
long completedTaskCount = executor.getCompletedTaskCount();
// 总任务数
long taskCount = executor.getTaskCount();
// 队列中的任务数
int queueSize = executor.getQueue().size();💡 最佳实践
1. 不推荐使用 Executors
java
// ❌ 不推荐:无界队列可能导致 OOM
ExecutorService executor = Executors.newFixedThreadPool(5);
// ✅ 推荐:手动创建,明确队列大小
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy()
);2. 合理设置线程数
java
// CPU 密集型任务:线程数 = CPU核心数 + 1
int cpuCount = Runtime.getRuntime().availableProcessors();
int threadCount = cpuCount + 1;
// IO 密集型任务:线程数 = CPU核心数 * 2
int threadCount = cpuCount * 2;3. 优雅关闭
java
// 关闭线程池
executor.shutdown(); // 不接受新任务,等待已提交任务完成
// 等待终止
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}4. 异常处理
java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.err.println("任务执行异常: " + t);
}
}
};⚠️ 常见陷阱
陷阱1:使用无界队列
Executors.newFixedThreadPool 使用无界队列,任务堆积时可能 OOM。
陷阱2:线程池没有命名
默认线程名无法定位业务来源。生产线程池应使用自定义 ThreadFactory。
陷阱3:submit 异常被 Future 包住
submit 的任务异常不会直接打印,只有调用 future.get() 时才看到。
陷阱4:shutdownNow 误以为能杀死线程
shutdownNow 只是中断正在执行的任务。任务如果不响应中断,仍可能继续运行。
陷阱5:把不同类型任务混用一个线程池
CPU 密集、IO 密集、定时任务、阻塞任务混在一起,会互相拖累。
🆚 Java vs C 对比
| 特性 | C/pthread | Java 线程池 |
|---|---|---|
| 线程复用 | 手写队列和 worker | ThreadPoolExecutor |
| 任务提交 | 自定义函数指针/队列 | Runnable / Callable |
| 拒绝策略 | 手写 | RejectedExecutionHandler |
| 定时任务 | timer/事件循环 | ScheduledThreadPoolExecutor |
对 C 程序员来说,ThreadPoolExecutor 是“任务队列 + worker 线程 + 拒绝策略 + 生命周期管理”的标准实现。
📖 八、生产线程池模板
java
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setName("order-worker-" + t.getId());
t.setUncaughtExceptionHandler((thread, ex) ->
System.err.println(thread.getName() + " failed: " + ex.getMessage()));
return t;
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
8,
16,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
factory,
new ThreadPoolExecutor.CallerRunsPolicy()
);模板要点:
text
明确核心线程数。
明确最大线程数。
使用有界队列。
线程命名。
选择可解释的拒绝策略。
暴露监控指标。📊 九、线程池监控指标
生产环境至少关注:
text
corePoolSize。
maximumPoolSize。
poolSize。
activeCount。
queueSize。
completedTaskCount。
rejectedCount。
taskCost。如果出现:
text
activeCount 长期等于 maximumPoolSize。
queueSize 持续上涨。
拒绝任务增加。
任务耗时变长。说明线程池已经成为瓶颈,应该排查任务耗时、下游依赖、队列容量和线程数配置。
✅ 掌握标准
学完本课后,应能做到:
text
能解释线程池为什么能复用线程。
能说明 ThreadPoolExecutor 七个参数。
能描述任务提交后的执行流程。
能解释四种拒绝策略。
能区分 execute 和 submit。
能正确关闭线程池。
能避免 Executors 无界队列风险。
能根据任务类型拆分线程池。
能设计带命名、队列容量和监控的生产线程池。线程池不是“把任务丢进去就完了”。它是系统容量控制的一部分,必须有边界、监控和拒绝策略。
📝 练习
完成 练习/Ex26_ThreadPool.java:
- 创建和使用线程池
- ThreadPoolExecutor 参数调优
- Future 使用
- CompletableFuture
- 线程池监控
- 综合:并发下载器
🎓 下一步
- 第27课:并发容器 - ConcurrentHashMap、CopyOnWriteArrayList