侧边栏壁纸
博主头像
ProSayJ 博主等级

Talk is cheap. Show me the code.

  • 累计撰写 72 篇文章
  • 累计创建 24 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

CountDownLatch、CyclicBarrier、Semaphore的实现原理

YangJian
2025-07-14 / 0 评论 / 0 点赞 / 9 阅读 / 0 字

✅ 总览:是否基于 AQS?

类名

是否基于 AQS

类型(独占 / 共享)

特点简述

CountDownLatch

✅ 是

共享锁(共享模式)

等待计数器归零,所有线程通过

Semaphore

✅ 是

共享锁(共享模式)

控制许可证数量,允许多个线程并发访问

CyclicBarrier

❌ 否

自定义同步机制

使用 ReentrantLock + Condition 实现


✅ 1️⃣ CountDownLatch 源码结构(基于 AQS)

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count); // 设置初始计数器
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1; // 只有 state==0 才允许继续
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                int next = c - 1;
                if (compareAndSetState(c, next)) {
                    return next == 0;
                }
            }
        }
    }

    private final Sync sync;

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); // 共享获取
    }

    public void countDown() {
        sync.releaseShared(1); // 共享释放
    }
}

✅ 特点:

  • await() 方法:线程阻塞,直到 state == 0;

  • countDown() 方法:线程调用后减一,直到减为 0 唤醒所有线程;

  • 使用 AQS 的共享锁模式,线程是一次性通过的(不可重用)。


✅ 2️⃣ Semaphore 源码结构(基于 AQS)

public class Semaphore {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits); // 设置初始许可数
        }

        protected final int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || !compareAndSetState(available, remaining)) {
                    return -1;
                }
                return 1;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (compareAndSetState(current, next)) return true;
            }
        }
    }

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void release() {
        sync.releaseShared(1);
    }
}

✅ 特点:

  • 基于 AQS 的共享模式

  • state 表示当前可用的许可数;

  • acquire() 获取许可(失败阻塞),release() 释放许可;

  • 非公平 / 公平都可支持(通过构造器选择 FairSync 或 NonfairSync);


✅ 3️⃣ CyclicBarrier 源码结构(未使用 AQS)

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties; // 参与线程数量
    private Generation generation = new Generation();

    private int count;

    public int await() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int index = --count;
            if (index == 0) {
                // 所有线程到达,重置并唤醒所有线程
                nextGeneration();
                return 0;
            }

            for (;;) {
                trip.await(); // 阻塞等待
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        trip.signalAll(); // 唤醒所有等待线程
        count = parties;
    }
}

✅ 特点:

  • 使用 ReentrantLock + Condition 实现线程协调;

  • 每个线程执行 await() 会阻塞,直到最后一个线程到达屏障;

  • 到达后统一唤醒所有线程,可重用机制

  • 无 AQS 依赖,逻辑更加灵活复杂。


✅ 四、源码机制总结

对比项

CountDownLatch

Semaphore

CyclicBarrier

是否用到 AQS

✅ 是

✅ 是

❌ 否

使用的锁类型

共享锁(AQS)

共享锁(AQS)

显式锁:ReentrantLock + Condition

适用场景

等待一组事件完成

限制并发资源访问

多线程同步启动(栅栏)

是否可重用

❌ 否

✅ 是

✅ 是


头脑风暴:

  • ☑ 如何基于 AQS 自定义一个锁或同步器?

  • ☑ Semaphore 如何实现公平 vs 非公平策略?

0

评论区