线程池是一组已经初始化并等待执行任务的线程集合,它能够有效地管理和复用线程资源,提升程序的性能和稳定性
创建
常用的线程池有两种:ThreadPoolExecutor
和 ThreadPoolTaskExecutor
。前者适用于低层次并发编程,开发者完全控制线程池的行为。后者通常在Spring框架中使用,不需要复杂的线程池管理,仅需简单、可靠的线程池实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat(threadPoolName + "%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
|
场景用法
场景用法一:执行异步任务
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 3, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); executor.execute(() -> { System.out.println("此处执行异步任务,异步任务与主线程不冲突"); }); }
|
场景用法二:多个异步任务执行完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor( 3, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); int taskCount = 5; CountDownLatch latch = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { int taskId = i; executor.submit(() -> { try { System.out.println("开始执行任务,任务 taskId 为: " + taskId); Thread.sleep(1000); System.out.println("任务执行完,任务 taskId 为:" + taskId); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }); } latch.await(); System.out.println("所有任务执行完成"); executor.shutdown(); }
|
场景用法三:集合异步任务工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@GetMapping("/handleList") public void handleList() throws Exception { List<String> list = Arrays.asList("无人机", "机巢", "航线", "虚拟飞行"); Consumer<List<String>> consumer = new Consumer<List<String>>() { @Override public void accept(List<String> list) { for (String name : list) { System.out.println(name); } } }; asyncRun(list, consumer); }
private <T> void asyncRun(List<String> list, Consumer<List<String>> consumer) { List<CompletableFuture<Void>> compatibles = list.stream().map(item -> CompletableFuture.runAsync(() -> consumer.accept(Collections.singletonList(item)), executor)).collect(Collectors.toList()); awaitAllowOfFutureFinish(compatibles); }
private void awaitAllowOfFutureFinish(List<CompletableFuture<Void>> compatibles) { try { if (CollectionUtil.isNotEmpty(compatibles)) { CompletableFuture.allOf(compatibles.toArray(new CompletableFuture[0])).get(); } } catch (InterruptedException | ExecutionException e) { System.out.println("异步线程处理失败" + e.getMessage()); } }
|
场景用法四:集合异步任务工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void handleSync() throws Exception { CompletableFuture.supplyAsync(() -> { System.out.println("异步任务执行成功1"); return true; if (flag) { return CompletableFuture.supplyAsync(() -> { System.out.println("异步任务执行成功2"); return CompletableFuture.completedFuture(null); }).thenCompose(ignored -> { System.out.println("异步任务执行成功3"); return CompletableFuture.completedFuture(null); }).thenCompose(ignored -> { System.out.println("异步任务执行成功4"); return CompletableFuture.completedFuture(null); }); } else { System.out.println("异步任务执行条件失败1"); return CompletableFuture.completedFuture(null); } }).thenRun(() -> { System.out.println("异步任务执行终点"); }); }
|