← 返回首页

Java并发模式:生产者消费者与线程池

📂 java ⏱ 2 min 353 words

Java并发模式:生产者消费者与线程池

概述

并发模式是解决并发问题的常用方案。本教程介绍生产者消费者和线程池模式。

1. 生产者消费者模式

import java.util.concurrent.*;

public class ProducerConsumer {
    private final BlockingQueue<String> queue;
    
    public ProducerConsumer(int capacity) {
        this.queue = new ArrayBlockingQueue<>(capacity);
    }
    
    public void produce(String item) throws InterruptedException {
        queue.put(item);
        System.out.println("生产: " + item);
    }
    
    public String consume() throws InterruptedException {
        String item = queue.take();
        System.out.println("消费: " + item);
        return item;
    }
    
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer(5);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    pc.produce("Item-" + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    pc.consume();
                    Thread.sleep(150);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        producer.start();
        consumer.start();
    }
}

2. 线程池模式

import java.util.concurrent.*;

public class ThreadPoolPattern {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 执行,线程: " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("所有任务完成");
    }
}

3. 实际应用示例

异步任务处理

@Service
public class AsyncTaskService {
    private final ExecutorService executor;
    
    public CompletableFuture<String> processAsync(String task) {
        return CompletableFuture.supplyAsync(() -> {
            // 异步处理
            return "处理完成: " + task;
        }, executor);
    }
    
    public void shutdown() {
        executor.shutdown();
    }
}

数据流处理

public class DataStreamProcessor {
    private final BlockingQueue<Data> inputQueue;
    private final BlockingQueue<Data> outputQueue;
    private final ExecutorService executor;
    
    public DataStreamProcessor(int capacity, int threadCount) {
        this.inputQueue = new ArrayBlockingQueue<>(capacity);
        this.outputQueue = new ArrayBlockingQueue<>(capacity);
        this.executor = Executors.newFixedThreadPool(threadCount);
    }
    
    public void startProcessing() {
        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                while (true) {
                    try {
                        Data data = inputQueue.take();
                        Data processed = process(data);
                        outputQueue.put(processed);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
    }
    
    private Data process(Data data) {
        // 处理数据
        return data;
    }
}

4. 最佳实践

  1. 选择合适的队列:有界队列或无界队列
  2. 设置合适的线程数:根据任务类型和CPU核心数
  3. 处理异常:确保线程池中的任务正确处理异常
  4. 监控线程池状态:定期检查线程池的活跃线程数
  5. 优雅关闭线程池:先调用shutdown(),再调用awaitTermination()

总结

并发模式是解决并发问题的常用方案。掌握生产者消费者和线程池模式,可以编写高效的并发程序。