Java并发工具类详解:CountDownLatch与CyclicBarrier
Java并发工具类详解:CountDownLatch与CyclicBarrier
概述
Java并发包提供了多种同步工具类,用于协调线程之间的执行。本教程介绍CountDownLatch、CyclicBarrier和Semaphore的使用。
1. CountDownLatch
import java.util.concurrent.*;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskId + " 开始");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
System.out.println("等待所有任务完成...");
latch.await();
System.out.println("所有任务完成!");
}
}
2. CyclicBarrier
import java.util.concurrent.*;
import java.util.*;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程到达屏障点");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 准备");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + threadId + " 到达屏障");
barrier.await();
System.out.println("线程 " + threadId + " 继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
3. Semaphore
import java.util.concurrent.*;
public class SemaphoreExample {
public static void main(String[] args) {
// 限制并发数
Semaphore semaphore = new Semaphore(3); // 最多3个并发
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("任务 " + taskId + " 获取许可");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("任务 " + taskId + " 释放许可");
}
}).start();
}
}
}
4. 实际应用示例
数据库连接池
import java.util.concurrent.*;
import java.util.*;
public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> connections;
public ConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
this.connections = new ArrayBlockingQueue<>(maxConnections);
for (int i = 0; i < maxConnections; i++) {
connections.offer(new Connection("连接-" + i));
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return connections.poll();
}
public void releaseConnection(Connection connection) {
connections.offer(connection);
semaphore.release();
}
static class Connection {
private String name;
Connection(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(3);
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
Connection conn = pool.getConnection();
System.out.println("任务 " + taskId + " 获取连接: " + conn);
Thread.sleep(1000);
pool.releaseConnection(conn);
System.out.println("任务 " + taskId + " 释放连接");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
多阶段任务执行
import java.util.concurrent.*;
public class MultiPhaseTask {
public static void main(String[] args) {
int phases = 3;
int tasksPerPhase = 3;
CyclicBarrier barrier = new CyclicBarrier(tasksPerPhase, () -> {
System.out.println("=== 阶段完成 ===");
});
for (int i = 0; i < tasksPerPhase; i++) {
final int taskId = i;
new Thread(() -> {
try {
for (int phase = 1; phase <= phases; phase++) {
System.out.println("任务 " + taskId + " 阶段 " + phase);
Thread.sleep((long) (Math.random() * 1000));
barrier.await();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
5. 最佳实践
- CountDownLatch:用于一次性等待多个任务完成
- CyclicBarrier:用于多线程分阶段同步
- Semaphore:用于控制并发数
- Phaser:更灵活的同步工具(Java 7+)
- Exchanger:线程间交换数据
总结
Java并发工具类提供了丰富的同步机制。掌握CountDownLatch、CyclicBarrier和Semaphore的使用,可以高效地协调多线程执行。在实际编程中,要根据需求选择合适的同步工具。