Java Executor框架详解
什么是Executor框架
Executor框架是JDK 5引入的线程管理框架,它将任务提交与任务执行分离,提供了灵活的线程池实现。
创建线程池
FixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "由线程" +
Thread.currentThread().getName() + "执行");
});
}
executor.shutdown();
}
}
CachedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "由线程" +
Thread.currentThread().getName() + "执行");
});
}
executor.shutdown();
}
}
SingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "由线程" +
Thread.currentThread().getName() + "执行");
});
}
executor.shutdown();
}
}
ScheduledThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(() -> {
System.out.println("延迟2秒执行的任务");
}, 2, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(() -> {
System.out.println("周期性任务: " + System.currentTimeMillis());
}, 0, 1, TimeUnit.SECONDS);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
自定义线程池
import java.util.concurrent.*;
public class CustomThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
4,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 8; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "由线程" +
Thread.currentThread().getName() + "执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列大小: " + executor.getQueue().size());
executor.shutdown();
}
}
Callable和Future
import java.util.concurrent.*;
public class CallableFutureDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<Integer> task = () -> {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
};
Future<Integer> future1 = executor.submit(task);
Future<Integer> future2 = executor.submit(task);
try {
System.out.println("任务1结果: " + future1.get());
System.out.println("任务2结果: " + future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
invokeAll和invokeAny
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class InvokeAllDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i;
tasks.add(() -> {
Thread.sleep((long) (Math.random() * 1000));
return "任务" + taskId + "结果";
});
}
try {
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
线程池监控
import java.util.concurrent.*;
public class ThreadPoolMonitor {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(500);
System.out.println("线程池大小: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}
线程池最佳实践
- 根据任务类型选择合适的线程池
- 避免使用Executors创建无界队列线程池
- 合理配置线程池参数
- 使用shutdownNow优雅关闭线程池
- 监控线程池运行状态
总结
Executor框架简化了线程管理和任务调度。掌握线程池的创建、配置和监控,能帮助你构建高效的并发应用程序。