← 返回首页
🔄

Java ExecutorService详解:线程池与任务调度

📂 java ⏱ 3 min 530 words

Java ExecutorService详解:线程池与任务调度

概述

ExecutorService是Java并发框架的核心组件,它提供了线程池和任务调度功能。通过ExecutorService,可以避免频繁创建和销毁线程,提高程序性能。

1. 线程池类型

import java.util.concurrent.*;
import java.util.*;

public class ThreadPoolTypes {
    public static void main(String[] args) throws InterruptedException {
        // 1. 固定大小线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(3);
        System.out.println("固定线程池: " + fixedPool);
        
        // 2. 缓存线程池
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        System.out.println("缓存线程池: " + cachedPool);
        
        // 3. 单线程池
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        System.out.println("单线程池: " + singlePool);
        
        // 4. 调度线程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        System.out.println("调度线程池: " + scheduledPool);
        
        // 5. 工作窃取线程池
        ExecutorService workStealingPool = Executors.newWorkStealingPool();
        System.out.println("工作窃取线程池: " + workStealingPool);
        
        // 关闭线程池
        fixedPool.shutdown();
        cachedPool.shutdown();
        singlePool.shutdown();
        scheduledPool.shutdown();
        workStealingPool.shutdown();
    }
}

2. 自定义线程池

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CustomThreadPool {
    public static void main(String[] args) {
        // 自定义线程池参数
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,                    // 核心线程数
            10,                   // 最大线程数
            60L,                  // 空闲线程存活时间
            TimeUnit.SECONDS,     // 时间单位
            new ArrayBlockingQueue<>(20),  // 工作队列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
        );
        
        // 监控线程池状态
        System.out.println("线程池大小: " + executor.getPoolSize());
        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("队列大小: " + executor.getQueue().size());
        
        // 提交任务
        for (int i = 0; i < 15; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println("任务 " + taskId + " 执行,线程: " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("所有任务完成");
    }
}

3. Future和Callable

import java.util.concurrent.*;
import java.util.*;

public class FutureCallableDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 创建Callable任务
        Callable<Integer> task1 = () -> {
            System.out.println("任务1开始");
            Thread.sleep(1000);
            return 100;
        };
        
        Callable<String> task2 = () -> {
            System.out.println("任务2开始");
            Thread.sleep(500);
            return "任务2完成";
        };
        
        Callable<Double> task3 = () -> {
            System.out.println("任务3开始");
            Thread.sleep(800);
            return 3.14;
        };
        
        // 提交任务并获取Future
        Future<Integer> future1 = executor.submit(task1);
        Future<String> future2 = executor.submit(task2);
        Future<Double> future3 = executor.submit(task3);
        
        try {
            // 获取结果
            System.out.println("任务1结果: " + future1.get());
            System.out.println("任务2结果: " + future2.get());
            System.out.println("任务3结果: " + future3.get());
            
            // 带超时的获取
            // Integer result = future1.get(2, TimeUnit.SECONDS);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
}

4. CompletionService

import java.util.concurrent.*;
import java.util.*;

public class CompletionServiceDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
        
        // 提交任务
        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            futures.add(completionService.submit(() -> {
                Thread.sleep((long) (Math.random() * 1000));
                return taskId * 10;
            }));
        }
        
        // 获取完成的任务结果
        for (int i = 0; i < 5; i++) {
            try {
                Future<Integer> completed = completionService.take();
                System.out.println("完成的任务结果: " + completed.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        executor.shutdown();
    }
}

5. 实际应用示例

异步任务执行器

import java.util.concurrent.*;
import java.util.*;
import java.util.function.*;

public class AsyncTaskExecutor {
    private final ExecutorService executor;
    
    public AsyncTaskExecutor(int threadCount) {
        this.executor = Executors.newFixedThreadPool(threadCount);
    }
    
    public <T> CompletableFuture<T> executeAsync(Callable<T> task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, executor);
    }
    
    public <T, R> CompletableFuture<R> thenApplyAsync(
            CompletableFuture<T> future, Function<T, R> function) {
        return future.thenApplyAsync(function, executor);
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        AsyncTaskExecutor executor = new AsyncTaskExecutor(3);
        
        CompletableFuture<String> future = executor.executeAsync(() -> {
            Thread.sleep(500);
            return "Hello";
        });
        
        CompletableFuture<Integer> result = executor.thenApplyAsync(future, 
            s -> s.length());
        
        try {
            System.out.println("结果: " + result.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
}

6. 最佳实践

  1. 合理配置线程池参数:根据任务类型和系统资源配置
  2. 使用合适的拒绝策略:CallerRunsPolicy适合不允许丢失任务的场景
  3. 监控线程池状态:定期检查线程池的活跃线程数和队列大小
  4. 优雅关闭线程池:先调用shutdown(),再调用awaitTermination()
  5. 避免线程池饥饿:使用有界队列和合适的拒绝策略

总结

ExecutorService是Java并发编程的核心组件。掌握线程池的创建、配置和使用,可以高效地处理并发任务。在实际编程中,要根据需求选择合适的线程池类型和参数配置。