✅ 总览:是否基于 AQS?
✅ 1️⃣ CountDownLatch 源码结构(基于 AQS)
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 设置初始计数器
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 只有 state==0 才允许继续
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
int next = c - 1;
if (compareAndSetState(c, next)) {
return next == 0;
}
}
}
}
private final Sync sync;
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 共享获取
}
public void countDown() {
sync.releaseShared(1); // 共享释放
}
}
✅ 特点:
await() 方法:线程阻塞,直到 state == 0;
countDown() 方法:线程调用后减一,直到减为 0 唤醒所有线程;
使用 AQS 的共享锁模式,线程是一次性通过的(不可重用)。
✅ 2️⃣ Semaphore 源码结构(基于 AQS)
public class Semaphore {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 设置初始许可数
}
protected final int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || !compareAndSetState(available, remaining)) {
return -1;
}
return 1;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (compareAndSetState(current, next)) return true;
}
}
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
}
✅ 特点:
基于 AQS 的共享模式;
state 表示当前可用的许可数;
acquire() 获取许可(失败阻塞),release() 释放许可;
非公平 / 公平都可支持(通过构造器选择 FairSync 或 NonfairSync);
✅ 3️⃣ CyclicBarrier 源码结构(未使用 AQS)
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties; // 参与线程数量
private Generation generation = new Generation();
private int count;
public int await() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int index = --count;
if (index == 0) {
// 所有线程到达,重置并唤醒所有线程
nextGeneration();
return 0;
}
for (;;) {
trip.await(); // 阻塞等待
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
trip.signalAll(); // 唤醒所有等待线程
count = parties;
}
}
✅ 特点:
使用 ReentrantLock + Condition 实现线程协调;
每个线程执行 await() 会阻塞,直到最后一个线程到达屏障;
到达后统一唤醒所有线程,可重用机制;
无 AQS 依赖,逻辑更加灵活复杂。
✅ 四、源码机制总结
头脑风暴:
☑ 如何基于 AQS 自定义一个锁或同步器?
☑ Semaphore 如何实现公平 vs 非公平策略?
评论区