Java ExecutorService详解:线程池与任务调度
Java ExecutorService详解:线程池与任务调度
概述
ExecutorService是Java并发框架的核心组件,它提供了线程池和任务调度功能。通过ExecutorService,可以避免频繁创建和销毁线程,提高程序性能。
1. 线程池类型
import java.util.concurrent.*;
import java.util.*;
public class ThreadPoolTypes {
public static void main(String[] args) throws InterruptedException {
// 1. 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
System.out.println("固定线程池: " + fixedPool);
// 2. 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
System.out.println("缓存线程池: " + cachedPool);
// 3. 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
System.out.println("单线程池: " + singlePool);
// 4. 调度线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
System.out.println("调度线程池: " + scheduledPool);
// 5. 工作窃取线程池
ExecutorService workStealingPool = Executors.newWorkStealingPool();
System.out.println("工作窃取线程池: " + workStealingPool);
// 关闭线程池
fixedPool.shutdown();
cachedPool.shutdown();
singlePool.shutdown();
scheduledPool.shutdown();
workStealingPool.shutdown();
}
}
2. 自定义线程池
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class CustomThreadPool {
public static void main(String[] args) {
// 自定义线程池参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(20), // 工作队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 监控线程池状态
System.out.println("线程池大小: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列大小: " + executor.getQueue().size());
// 提交任务
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("任务 " + taskId + " 执行,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有任务完成");
}
}
3. Future和Callable
import java.util.concurrent.*;
import java.util.*;
public class FutureCallableDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建Callable任务
Callable<Integer> task1 = () -> {
System.out.println("任务1开始");
Thread.sleep(1000);
return 100;
};
Callable<String> task2 = () -> {
System.out.println("任务2开始");
Thread.sleep(500);
return "任务2完成";
};
Callable<Double> task3 = () -> {
System.out.println("任务3开始");
Thread.sleep(800);
return 3.14;
};
// 提交任务并获取Future
Future<Integer> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);
Future<Double> future3 = executor.submit(task3);
try {
// 获取结果
System.out.println("任务1结果: " + future1.get());
System.out.println("任务2结果: " + future2.get());
System.out.println("任务3结果: " + future3.get());
// 带超时的获取
// Integer result = future1.get(2, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
4. CompletionService
import java.util.concurrent.*;
import java.util.*;
public class CompletionServiceDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
// 提交任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i;
futures.add(completionService.submit(() -> {
Thread.sleep((long) (Math.random() * 1000));
return taskId * 10;
}));
}
// 获取完成的任务结果
for (int i = 0; i < 5; i++) {
try {
Future<Integer> completed = completionService.take();
System.out.println("完成的任务结果: " + completed.get());
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
5. 实际应用示例
异步任务执行器
import java.util.concurrent.*;
import java.util.*;
import java.util.function.*;
public class AsyncTaskExecutor {
private final ExecutorService executor;
public AsyncTaskExecutor(int threadCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
}
public <T> CompletableFuture<T> executeAsync(Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executor);
}
public <T, R> CompletableFuture<R> thenApplyAsync(
CompletableFuture<T> future, Function<T, R> function) {
return future.thenApplyAsync(function, executor);
}
public void shutdown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
AsyncTaskExecutor executor = new AsyncTaskExecutor(3);
CompletableFuture<String> future = executor.executeAsync(() -> {
Thread.sleep(500);
return "Hello";
});
CompletableFuture<Integer> result = executor.thenApplyAsync(future,
s -> s.length());
try {
System.out.println("结果: " + result.get());
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
6. 最佳实践
- 合理配置线程池参数:根据任务类型和系统资源配置
- 使用合适的拒绝策略:CallerRunsPolicy适合不允许丢失任务的场景
- 监控线程池状态:定期检查线程池的活跃线程数和队列大小
- 优雅关闭线程池:先调用shutdown(),再调用awaitTermination()
- 避免线程池饥饿:使用有界队列和合适的拒绝策略
总结
ExecutorService是Java并发编程的核心组件。掌握线程池的创建、配置和使用,可以高效地处理并发任务。在实际编程中,要根据需求选择合适的线程池类型和参数配置。