← 返回首页
🔧

Java并发工具类详解:CountDownLatch与CyclicBarrier

📂 java ⏱ 3 min 501 words

Java并发工具类详解:CountDownLatch与CyclicBarrier

概述

Java并发包提供了多种同步工具类,用于协调线程之间的执行。本教程介绍CountDownLatch、CyclicBarrier和Semaphore的使用。

1. CountDownLatch

import java.util.concurrent.*;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始");
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        
        System.out.println("等待所有任务完成...");
        latch.await();
        System.out.println("所有任务完成!");
    }
}

2. CyclicBarrier

import java.util.concurrent.*;
import java.util.*;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程到达屏障点");
        });
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadId + " 准备");
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("线程 " + threadId + " 到达屏障");
                    
                    barrier.await();
                    
                    System.out.println("线程 " + threadId + " 继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3. Semaphore

import java.util.concurrent.*;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制并发数
        Semaphore semaphore = new Semaphore(3);  // 最多3个并发
        
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println("任务 " + taskId + " 获取许可");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                    System.out.println("任务 " + taskId + " 释放许可");
                }
            }).start();
        }
    }
}

4. 实际应用示例

数据库连接池

import java.util.concurrent.*;
import java.util.*;

public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> connections;
    
    public ConnectionPool(int maxConnections) {
        this.semaphore = new Semaphore(maxConnections);
        this.connections = new ArrayBlockingQueue<>(maxConnections);
        
        for (int i = 0; i < maxConnections; i++) {
            connections.offer(new Connection("连接-" + i));
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return connections.poll();
    }
    
    public void releaseConnection(Connection connection) {
        connections.offer(connection);
        semaphore.release();
    }
    
    static class Connection {
        private String name;
        
        Connection(String name) {
            this.name = name;
        }
        
        @Override
        public String toString() {
            return name;
        }
    }
    
    public static void main(String[] args) {
        ConnectionPool pool = new ConnectionPool(3);
        
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    Connection conn = pool.getConnection();
                    System.out.println("任务 " + taskId + " 获取连接: " + conn);
                    Thread.sleep(1000);
                    pool.releaseConnection(conn);
                    System.out.println("任务 " + taskId + " 释放连接");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

多阶段任务执行

import java.util.concurrent.*;

public class MultiPhaseTask {
    public static void main(String[] args) {
        int phases = 3;
        int tasksPerPhase = 3;
        
        CyclicBarrier barrier = new CyclicBarrier(tasksPerPhase, () -> {
            System.out.println("=== 阶段完成 ===");
        });
        
        for (int i = 0; i < tasksPerPhase; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    for (int phase = 1; phase <= phases; phase++) {
                        System.out.println("任务 " + taskId + " 阶段 " + phase);
                        Thread.sleep((long) (Math.random() * 1000));
                        barrier.await();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

5. 最佳实践

  1. CountDownLatch:用于一次性等待多个任务完成
  2. CyclicBarrier:用于多线程分阶段同步
  3. Semaphore:用于控制并发数
  4. Phaser:更灵活的同步工具(Java 7+)
  5. Exchanger:线程间交换数据

总结

Java并发工具类提供了丰富的同步机制。掌握CountDownLatch、CyclicBarrier和Semaphore的使用,可以高效地协调多线程执行。在实际编程中,要根据需求选择合适的同步工具。