← 返回首页

Java Executor框架详解

📂 java ⏱ 3 min 524 words

什么是Executor框架

Executor框架是JDK 5引入的线程管理框架,它将任务提交与任务执行分离,提供了灵活的线程池实现。

创建线程池

FixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "由线程" +
                    Thread.currentThread().getName() + "执行");
            });
        }

        executor.shutdown();
    }
}

CachedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "由线程" +
                    Thread.currentThread().getName() + "执行");
            });
        }

        executor.shutdown();
    }
}

SingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "由线程" +
                    Thread.currentThread().getName() + "执行");
            });
        }

        executor.shutdown();
    }
}

ScheduledThreadPool

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolDemo {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        executor.schedule(() -> {
            System.out.println("延迟2秒执行的任务");
        }, 2, TimeUnit.SECONDS);

        executor.scheduleAtFixedRate(() -> {
            System.out.println("周期性任务: " + System.currentTimeMillis());
        }, 0, 1, TimeUnit.SECONDS);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

自定义线程池

import java.util.concurrent.*;

public class CustomThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,
            4,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 8; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "由线程" +
                    Thread.currentThread().getName() + "执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("队列大小: " + executor.getQueue().size());

        executor.shutdown();
    }
}

Callable和Future

import java.util.concurrent.*;

public class CallableFutureDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Callable<Integer> task = () -> {
            int sum = 0;
            for (int i = 1; i <= 100; i++) {
                sum += i;
            }
            return sum;
        };

        Future<Integer> future1 = executor.submit(task);
        Future<Integer> future2 = executor.submit(task);

        try {
            System.out.println("任务1结果: " + future1.get());
            System.out.println("任务2结果: " + future2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

invokeAll和invokeAny

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAllDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        List<Callable<String>> tasks = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            tasks.add(() -> {
                Thread.sleep((long) (Math.random() * 1000));
                return "任务" + taskId + "结果";
            });
        }

        try {
            List<Future<String>> futures = executor.invokeAll(tasks);
            for (Future<String> future : futures) {
                System.out.println(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

线程池监控

import java.util.concurrent.*;

public class ThreadPoolMonitor {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        Thread.sleep(500);
        System.out.println("线程池大小: " + executor.getPoolSize());
        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
        System.out.println("队列大小: " + executor.getQueue().size());

        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    }
}

线程池最佳实践

  1. 根据任务类型选择合适的线程池
  2. 避免使用Executors创建无界队列线程池
  3. 合理配置线程池参数
  4. 使用shutdownNow优雅关闭线程池
  5. 监控线程池运行状态

总结

Executor框架简化了线程管理和任务调度。掌握线程池的创建、配置和监控,能帮助你构建高效的并发应用程序。