← 返回首页

Java AQS:并发框架的核心骨架

📂 java ⏱ 2 min 343 words

Java AQS:并发框架的核心骨架

概述

AQS(AbstractQueuedSynchronizer)是Java并发包的核心框架,ReentrantLock、Semaphore、CountDownLatch等同步器都是基于AQS实现的。

1. AQS核心概念

// AQS核心:
// 1. state变量:同步状态
// 2. CLH队列:等待线程队列
// 3. 独占模式 vs 共享模式

2. 独占模式

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyExclusiveLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
}

3. 共享模式

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MySharedLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            if (getState() < getMaxState()) {
                if (compareAndSetState(getState(), getState() + 1)) {
                    return 1;
                }
            }
            return -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            while (true) {
                int current = getState();
                if (compareAndSetState(current, current - 1)) {
                    return true;
                }
            }
        }

        private int getMaxState() { return 10; }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }
}

4. ReentrantLock源码分析

// lock() -> sync.lock() -> NonfairSync.lock()
// compareAndSetState(0, 1) -> 如果成功,设置持有线程
// 如果state != 0 -> acquire(1) -> tryAcquire(1)
// tryAcquire(1) -> nonfairTryAcquire(1)
// 如果当前线程是持有线程 -> state+1(重入)
// 否则 -> enqueue -> 将线程加入CLH队列

// unlock() -> sync.release(1) -> tryRelease(1)
// state-1,如果state == 0 -> 完全释放 -> unparkSuccessor(head)

5. 自定义同步器

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyReentrantLock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            if (getState() == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                setState(getState() + acquires);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getExclusiveOwnerThread() != Thread.currentThread()) {
                throw new IllegalMonitorStateException();
            }
            int newState = getState() - releases;
            setState(newState);
            return newState == 0;
        }
    }

    public void lock() { sync.acquire(1); }
    public void unlock() { sync.release(1); }
}

最佳实践

  1. 理解state变量:同步状态的核心
  2. 理解CLH队列:线程排队等待机制
  3. 区分独占和共享模式:根据场景选择
  4. 继承AQS:实现自定义同步器
  5. 使用现有同步器:优先使用ReentrantLock、Semaphore等

总结

AQS是Java并发包的核心框架,理解AQS的原理和实现,可以帮助我们更好地使用和扩展并发工具。