Java AQS:并发框架的核心骨架
Java AQS:并发框架的核心骨架
概述
AQS(AbstractQueuedSynchronizer)是Java并发包的核心框架,ReentrantLock、Semaphore、CountDownLatch等同步器都是基于AQS实现的。
1. AQS核心概念
// AQS核心:
// 1. state变量:同步状态
// 2. CLH队列:等待线程队列
// 3. 独占模式 vs 共享模式
2. 独占模式
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyExclusiveLock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
3. 共享模式
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MySharedLock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared(int arg) {
if (getState() < getMaxState()) {
if (compareAndSetState(getState(), getState() + 1)) {
return 1;
}
}
return -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
while (true) {
int current = getState();
if (compareAndSetState(current, current - 1)) {
return true;
}
}
}
private int getMaxState() { return 10; }
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
4. ReentrantLock源码分析
// lock() -> sync.lock() -> NonfairSync.lock()
// compareAndSetState(0, 1) -> 如果成功,设置持有线程
// 如果state != 0 -> acquire(1) -> tryAcquire(1)
// tryAcquire(1) -> nonfairTryAcquire(1)
// 如果当前线程是持有线程 -> state+1(重入)
// 否则 -> enqueue -> 将线程加入CLH队列
// unlock() -> sync.release(1) -> tryRelease(1)
// state-1,如果state == 0 -> 完全释放 -> unparkSuccessor(head)
5. 自定义同步器
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyReentrantLock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (getState() == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
setState(getState() + acquires);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
int newState = getState() - releases;
setState(newState);
return newState == 0;
}
}
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
}
最佳实践
- 理解state变量:同步状态的核心
- 理解CLH队列:线程排队等待机制
- 区分独占和共享模式:根据场景选择
- 继承AQS:实现自定义同步器
- 使用现有同步器:优先使用ReentrantLock、Semaphore等
总结
AQS是Java并发包的核心框架,理解AQS的原理和实现,可以帮助我们更好地使用和扩展并发工具。