线程池是一组已经初始化并等待执行任务的线程集合,它能够有效地管理和复用线程资源,提升程序的性能和稳定性

创建

常用的线程池有两种:ThreadPoolExecutorThreadPoolTaskExecutor。前者适用于低层次并发编程,开发者完全控制线程池的行为。后者通常在Spring框架中使用,不需要复杂的线程池管理,仅需简单、可靠的线程池实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ThreadPoolExecutor executor = new ThreadPoolExecutor(  
// 核心线程数,线程池中最少的线程数
10,
// 最大线程数,线程池中允许的最大线程数
20,
// 空闲线程存活时间,超过核心线程数的线程在空闲时会被销毁,直到线程数量等于核心线程数
30,
// 存活时间的单位
TimeUnit.SECONDS,
// 阻塞队列,用于保存等待执行的任务
new LinkedBlockingQueue<>(1024),
// 线程工厂,用于创建线程
new ThreadFactoryBuilder().setNameFormat(threadPoolName + "%d").build(),
// 拒绝策略,当线程池和队列都满了,并且无法继续创建线程时,使用 CallerRunsPolicy 策略
new ThreadPoolExecutor.CallerRunsPolicy());

场景用法

场景用法一:执行异步任务

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {  
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3, // 核心线程数
5, // 最大线程数
60, // 空闲线程的存活时间
TimeUnit.SECONDS, // 存活时间的单位
new LinkedBlockingQueue<>());
// 执行异步任务
executor.execute(() -> {
System.out.println("此处执行异步任务,异步任务与主线程不冲突");
});
}

场景用法二:多个异步任务执行完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void main(String[] args) throws InterruptedException {  
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3, // 核心线程数
5, // 最大线程数
60, // 空闲线程的存活时间
TimeUnit.SECONDS, // 存活时间的单位
new LinkedBlockingQueue<>());
// 任务数量
int taskCount = 5;
// 创建 CountDownLatch 实例,计数器的初始值为任务数量
CountDownLatch latch = new CountDownLatch(taskCount);
// 提交多个异步任务给线程执行
for (int i = 0; i < taskCount; i++) {
int taskId = i;
executor.submit(() -> {
try {
System.out.println("开始执行任务,任务 taskId 为: " + taskId);
// 模拟任务执行,此线程延迟可替换为真实业务方法
Thread.sleep(1000);
System.out.println("任务执行完,任务 taskId 为:" + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 每次任务执行完成后,计数器减一
latch.countDown();
}
});
}
// 等待所有任务执行完成
latch.await();
// 所有任务执行完成,进入其他逻辑
System.out.println("所有任务执行完成");
// 关闭线程池
executor.shutdown();
}

场景用法三:集合异步任务工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**  
* 异步处理所有集合列表
*/
@GetMapping("/handleList")
public void handleList() throws Exception {
List<String> list = Arrays.asList("无人机", "机巢", "航线", "虚拟飞行");
Consumer<List<String>> consumer = new Consumer<List<String>>() {
@Override
public void accept(List<String> list) {
for (String name : list) {
System.out.println(name);
}
}
};
asyncRun(list, consumer);
}

/**
* 异步处理所有列表元素
*/
private <T> void asyncRun(List<String> list, Consumer<List<String>> consumer) {
// 使用 CompletableFuture 异步执行每个列表元素
List<CompletableFuture<Void>> compatibles = list.stream().map(item -> CompletableFuture.runAsync(() -> consumer.accept(Collections.singletonList(item)), executor)).collect(Collectors.toList());
awaitAllowOfFutureFinish(compatibles);
}

/**
* 等待所有元素完结
*/
private void awaitAllowOfFutureFinish(List<CompletableFuture<Void>> compatibles) {
try {
if (CollectionUtil.isNotEmpty(compatibles)) {
// 使用allOf方法,接收需要等待的多个异步任务行程的数组,在传入的所有异步任务都处理完成时返回一个新的 CompletableFuture // get方法阻塞返回的线程,直到所有异步任务执行完成
CompletableFuture.allOf(compatibles.toArray(new CompletableFuture[0])).get();
}
} catch (InterruptedException | ExecutionException e) {
// 在等待过程中发生异常时记录错误日志
System.out.println("异步线程处理失败" + e.getMessage());
}
}

场景用法四:集合异步任务工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void handleSync() throws Exception {  
// 链式调用异步任务,让异步任务变得有序执行
CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行成功1");
return true;
// thenCompose 接收前一个 CompletableFuture 的结果,将其结果作为参数传递给下一个任务,并返回一个新的 CompletableFuture }).thenCompose((flag) -> {
if (flag) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行成功2");
return CompletableFuture.completedFuture(null);
}).thenCompose(ignored -> {
System.out.println("异步任务执行成功3");
return CompletableFuture.completedFuture(null);
}).thenCompose(ignored -> {
System.out.println("异步任务执行成功4");
return CompletableFuture.completedFuture(null);
});
} else {
System.out.println("异步任务执行条件失败1");
return CompletableFuture.completedFuture(null);
}
// 前一个异步任务完后执行的某个操作,但是不需要它的返回值
}).thenRun(() -> {
System.out.println("异步任务执行终点");
});
}