Skip to content

Seamphore (信号量)

约 5106 字大约 17 分钟

2025-11-05

Semaphore概述

在 Java 中,Semaphore是一个并发工具类,位于 java.util.concurrent包中,用于控制同时访问某一资源或临界区的线程数量。

它维护了一个许可(permit)的计数器:

  • 当线程调用 acquire()时,如果还有可用的许可,则许可数减一,线程继续执行;
  • 如果没有可用的许可,线程将被阻塞,直到有其他线程调用 release()释放许可;
  • 线程完成任务后,应调用 release()将许可归还,以供其他线程使用。

Semaphore基本使用方法

创建Semaphore

// 参数 permits 表示初始可用的许可数量
Semaphore semaphore = new Semaphore(3); // 最多允许 3 个线程同时访问

如果需要Semaphore在许可不可用的时候时非阻塞地返回失败,可以使用带布尔参数的构造函数:

 // false 表示非公平锁,true 表示公平锁
Semaphore semaphore = new Semaphore(3, false);

公平性(fairness):设置为 true时,Semaphore 会按照线程请求的顺序分配许可(先到先得),避免线程饥饿,但可能降低吞吐量。

使用 Semaphore 控制并发(示例代码)

下面是一个完整的 Java 示例,模拟 多个线程争抢有限资源(比如数据库连接、API 调用额度等)的情况:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    // 创建一个 Semaphore,初始有 3 个许可
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 模拟 10 个线程尝试访问受限资源
        for (int i = 1; i <= 10; i++) {
            Thread thread = new Thread(new Worker(i), "Thread-" + i);
            thread.start();
        }
    }

    // 工作线程
    static class Worker implements Runnable {
        private int id;

        public Worker(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 正在尝试获取资源...");
            try {
                // 请求一个许可,如果没有就阻塞
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " 获取到资源,正在执行任务...");
                
                // 模拟任务执行
                Thread.sleep(2000);

                System.out.println(Thread.currentThread().getName() + " 执行完毕,释放资源");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放许可
                semaphore.release();
            }
        }
    }
}

Semaphore源码分析

Semaphore也有两种模式,公平锁和非公平锁的模式,依旧是从两个方面来进行分析。

非公平模式

创建Semaphore

在创建的Semaphore的时候使用下面的方式创建的Semaphore都是非公平模式的:

// 参数 permits 表示初始可用的许可数量
Semaphore semaphore = new Semaphore(3); 
// false 表示非公平锁,true 表示公平锁
Semaphore semaphore = new Semaphore(3, false);

那么分别来看下这两个方法对应的构造方法:

  	public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
	
	public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

在构造方法中公平模式和非公平模式的区别在与创建的是FairSync类型的实例还是NonfairSync类型的实例。

在ReentrantLock也有FairSync和NonfairSync两个内部类,这里也是两个内部类,它们是属于Semaphore的内部类,需要与ReentrantLock中的内部类区分开。

接着来看下非公平模式下NonfairSync的构造方法:

    NonfairSync(int permits) {
       super(permits);
   	}
	
	
    Sync(int permits) {
        // 将共享变量的state的设置为需要的许可数量
        setState(permits);
    }

NofairSync的构造方法最终调用的是它的父类的Sync的构造方法,而Sync是AbstractQueuedSynchronizer的子类。

所以在非公平模式下创建Semaphore就是创建一个AQS类的实例,然后将state的值设置为了需要的许可数量。

tryAcquire()

tryAcquire()的意思就是尝试获取一个许可。它一共有如下几种重载实现:

// 尝试获取一个许可
public boolean tryAcquire();
// 尝试获取一个许可,并设置一个超时时间
public boolean tryAcquire(long timeout, TimeUnit unit)
// 尝试获取多个许可
public boolean tryAcquire(int permits)
// 尝试获取多个许可,并设置一个超时时间
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

我们还是自从最基本的tryAcquire()方法分析:

	public boolean tryAcquire() {
        // nonfairTryAcquireShared()方法用于修改state为剩余许可数量,并返回剩余许可数量
        // 剩余许可数量大于等于0表示获取锁成功,反之就表示获取锁失败了
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

它最终调用的Sync类的nonfairTryAcquireShared()方法:

 final int nonfairTryAcquireShared(int acquires) {
     // 死循环
     for (;;) {
         // 获取当前许可数量
         int available = getState();
         // 将可用的许可数量减去本次需要获取的许可数量得到最终剩余的许可数量
         int remaining = available - acquires;
         // 如果剩余许可数量小于0,就直接返回剩余许可的数量
         // 如果剩余许可数量大于0,就尝试通过CAS的方式将state值修改为新的剩余许可数量
         // CAS操作失败了,就说明有线程并发修改了,反之就代表获取许可成功了
         if (remaining < 0 ||
             compareAndSetState(available, remaining))
             return remaining;
     }
 }

如下图所示非公平模式下tryAcquire()方法的执行流程如下所示:

image-20251105224818264
image-20251105224818264

acquire()

acquire()是阻塞式的获取一个许可,它也有几种重载实现:

// 阻塞式获取一个许可
public void acquire()
// 阻塞式获取指定数量的许可
public void acquire(int permits)

一样是从最基本的acquire()方法开始分析:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

同样调用的是Sync的acquireSharedInterruptibly()方法:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
    // 在获取许可之前先判断下当前线程的中断标志位
    if (Thread.interrupted())
        // 如果当前线程被中断了,就抛出中断异常
        throw new InterruptedException();
    // 再次尝试修改state的值,并返回剩余可用许可数量
    // 返回结果小于0表示当前许可已经用完了,当前线程就需要加入到同步队列
    // 返回结果大于0表示获取到了当前许可
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在acquireSharedInterruptibly()方法会响应用户线程的中断,如果中断标志位为true就会抛出InterruptedException异常。在该方法中还会再次调用Sync#nonfairTryAcquireShared()方法再次尝试修改state的值。如果获取许可失败的话,就会执行doAcquireSharedInterruptibly()方法来将当前线程加入到同步队列中。

doAcquireSharedInterruptibly()方法

doAcquireSharedInterruptibly()方法是在用户线程获取许可失败时候执行,它的作用是将当前线程加入到同步队列。

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
    /**
     * 将当前节点构建成一个Node,此时Node节点的结构为:
     * Node.SHARED的值是一个空节点:Node SHARED = new Node();
     * Node{nextWaiter=SHARED,thread=Thread.currentThread()}
     */
    final Node node = addWaiter(Node.SHARED);
    // 记录操作结果,默认表示操作失败
    boolean failed = true;
    try {
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 当前节点的前驱节点就是头节点
            if (p == head) {
                // 再次尝试修改state属性的值并返回剩余可用许可数量
                int r = tryAcquireShared(arg);
                // 剩余许可数量大于0,表示修改state属性成功了,当前线程获取到了许可数量
                if (r >= 0) {
                    // 设置当前节点为头节点
                    setHeadAndPropagate(node, r);
                    // 将原本的头节点设置为null,方便被GC回收掉
                    p.next = null;
                    // 将操作结果设置为false,表示操作成功
                    failed = false;
                    return;
                }
            }
            /**
             * shouldParkAfterFailedAcquire():方法的作用是判断前驱节点的waitStatus
             * 通过CAS的方式将前驱节点的waitStatus修改为SINGAL
             * 如果前驱节点的waitStatus是SIGNAL就返回true,反之就返回false
             * 如果前驱接单的waitStatus是CANCELLED,就从当前节点往前遍历,找到第一个
             * waitStatus<=0的节点,然后修改它的next指针为当前节点,也就是将当前节点
             * 挂载到队列中最后一个waitStatus<=0的节点后面
             * parkAndCheckInterrupt():方法的作用是暂停当前线程;当被唤醒的时候返回
             * 线程的中断标志位,并清除掉中断标志位
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 如果线程在park期间被其它线程中断了,就会抛出InterruptedException
        // 此时failed的值就是true,就会进入cancelAcquire()方法来取消获取
        if (failed)
            cancelAcquire(node);
    }
}

在doAcquireSharedInterruptibly()方法中的主要的目的就是将它的前驱节点的waitStatus设置SIGNAL并暂停当前线程,然后在线程被唤醒后可以再次去争抢许可。如下图所示,就是doAcquireSharedInterruptibly()方法的执行逻辑。

image-20251105234922623
image-20251105234922623

还有一个地方需要额外的注意的就是线程唤醒的两种场景:

  • 如果当前线程被park后有其它的线程中断了当前线程,就会抛出InterruptedException,此时就会去执行
  • 如果当前线程是被它的前驱节点唤醒的,就会返回当前线程的中断标志位,如果中断标志位的值为true,依然会抛出InterruptedException异常。

上面两种场景下都会执行到cancelAcquire()方法,它是负责处理上面两种的场景的:

private void cancelAcquire(Node node) {
    // 如果当前节点为null的话,就不再处理了
    if (node == null)
        return;
	// 将当前节点的thread属性设置为null
    node.thread = null;

    // 清理同步队列中CANCELLED状态节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 运行到这里的时候,pred是找到的第一个不是CANCELLED的节点
    // 当前节点的前驱指针指向pred
	// predNext表示的pred的原始后继节点
    /**
     *  ----             --------
     * |pred| --next--> |predNext|
     *  ----             --------
     *  ----             ----
     * |pred| <--prev-- |node|
     *  ----             ----
     */
    Node predNext = pred.next;

    // 将当前节点的状态设置为CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 如果当前节点就是尾节点,就CAS的方式将当前节点的前驱节点设置为新的尾节点
    // 如果CAS的操作成功了,就pred节点的后继节点设置为null
    // 等于是当前节点从同步队列中删除掉了
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        /**
         * 如果当前节点的前驱节点不是头节点并且它的状态是SIGNAL
         * 如果当前节点的前驱节点不是头节点并且它的状态不是CANCELLED,就通过
         * CAS的方式修改前驱节点的状态为SIGNAL
         * pred.thread != null这个条件是为了解决多线程并发的时候找到的前驱节点
         * 已经被其它线程修改了
         */
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
            // 获取当前节点的后继接单
            Node next = node.next;
            // 当前节点的后继节点不为null,并且后继节点的waitStatus不为CANCELLED
            if (next != null && next.waitStatus <= 0)
                // 将当前节点的前驱节点的后继指针设置为当前节点的后继节点
                compareAndSetNext(pred, predNext, next);
        } else {
            /**
             * 场景1:如果当前节点的前驱节点就是头节点的话,就说明当前节点是有被唤醒的机会的
             * 场景2:当前节点的前驱节点不是头接点,前驱节点的状态也不是CANCELLED,但是
             * 通过CAS去修改前驱节点的状态为SINGAL的时候失败了,此时对于前驱节点的状态
             * 就不清楚了,对于这种不确定的状态就去唤醒一下,给次机会
             * 尝试去唤醒这个节点
             */
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

在看到的这里的时候其实有点疑惑:为啥pred.next = node.next,但是并没有 next.prev = pred。

一般来说,双向链表意味着如果修改了某个节点的next,也应该相应地修改下一个节点的prev,以保持双一致性。但是在AQS的实现中,这种双向链表并不是在所有地方都被严格维护的,特别是在取消节点以及出队操作中存在优化和权衡

还有一个比较重要的原因是当需要唤醒下一个节点的时候的,通常都是通过next指针来找到要唤醒的节点的,而不会通过prev指针来进行。因此,这里采取的值修改了next指针,如果要修改prev指针的话还会带来额外的竞争开销。

公平模式

创建Semaphore

创建公平模式的Semaphore的方式有:

// false 表示非公平锁,true 表示公平锁
Semaphore semaphore = new Semaphore(3, true);

它对应的构造方法就是:

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

所以它最终调用的是FairSync的构造方法:

Sync(int permits) {
    setState(permits);
}

所以非公平锁与公平锁在创建的时候没有什么区别,都是设置state属性的值为需要的许可数量。

tryAcquire()

tryAcquire()方法最终也是调用到了Sync的nonfairTryAcquireShared()方法,这里跟非公平锁模式也是没啥区别的。

acquire()

对于acquire()方法而言,它在公平锁模式下与非公平锁模式是有区别的:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 这里的tryAcquireShared()方法是由子类实现的,返回剩余的可用许可数量
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared()方法是AQS中的模板方法,它需要由子类实现。对于公平锁模式,它执行的就是Sync#nonfairTryAcquireShared()方法,而公平锁模式下,它执行的FairSync#tryAcquireShared()方法。

protected int tryAcquireShared(int acquires) {
    for (;;) {
        /**
         * 主要是判断同步队列中是否有等待的线程
         * 只有同步队列已经被初始化,且当前线程就是头节点的下一个线程,
         * 才会返回false,否则都是返回true
         */
        if (hasQueuedPredecessors()) return -1;
        // CAS的方式来修改state属性的值
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

所以看到这公平锁和非公平锁它他们之间的区别就在于获取锁的时候是否需要先判断同步队列中是否有处于等待状态的状态,如果有的话就会老实排队。只有在当前线程就是头节点的后继节点的时候才会尝试去获取锁。

非公平锁与公平锁的区别

非公平模式和公平模式的区别就是acquire()方法的区别,如果是公平模式会先判断等待队列里面是否还有等待的线程,如果有就会老实去排队。而非公平锁模式下的,都是会去尝试修改state属性的值的。

但是其实在tryAcquire()方法是没啥区别的。

释放锁

release()

release()方法的作用就是释放锁资源,它有以下几种重载模式:

public void release();
public void release(int permits);

依旧是从release()方法开始看起:

public void release() {
    sync.releaseShared(1);
}

这里调用的就是AbstractQueuedSynchronizer#releaseShared()方法,所以也没有公平不公平的说法。

public final boolean releaseShared(int arg) {
    // 尝试释放令牌(令牌不发生溢出的话都是返回true)
    if (tryReleaseShared(arg)) {
        // 修改同步队列
        doReleaseShared();
        return true;
    }
    return false;
}

这里的tryReleaseShared()是AQS的模板方法,它由子类实现,这里执行的Sync#tryReleaseShared()方法:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取到当前的令牌数量
        int current = getState();
        // 计算剩余的令牌数量
        int next = current + releases;
        // 防止令牌溢出
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS修改剩余令牌的数量
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared()方法就是修改state的值,而doReleaseShared()是就是修改同步队列的结构了。

private void doReleaseShared() {
    for (;;) {
        // 获取同步队列的头节点
        Node h = head;
        // 头节点不为null并且头节点不等于尾节点(表示同步中有等待的线程)
        if (h != null && h != tail) {
            // 获取头节点的状态
            int ws = h.waitStatus;
            // 头节点的状态为SINGAL
            if (ws == Node.SIGNAL) {
                // 将头节点的状态设置为0,如果CAS修改失败了,就在此进入再次修改
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;
                // CAS修改成功了,就唤醒当前处于同步队列中的等待的线程
                unparkSuccessor(h);
            } 
            // 这里同步队列已经被初始化了,所以head节点的状态只有三种情况:
            // 0 - 初始状态:后继节点还没来得及修改head节点的waitStatus的值为-1
            // -3 - 共享状态:其它在释放的时候已经修改了head的
            else if (ws == 0 &&
                !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;  
        }
        // 如果头节点在此期间没有发生变化,就退出循环,反之就重新来一次
        if (h == head) 
            break;
    }
}

doReleaseShared()方法有两种情况:

  • head节点的状态就是SINGAL,那么就会执行unparkSuccessor()方法
  • head节点的状态就是0,那么会通过CAS的方式将它修改为PROPAGATE,最后再次循环式发现head节点没有发生变化就会跳出循环;
  • head节点的状态就是PROPAGATE,那么在处理期间head节点没有发生变化,最终就会跳出循环;

unparkSuccessor()方法的作用就是为了唤醒对应的节点内的线程:

private void unparkSuccessor(Node node) {
	// 获取当前节点的等待状态
    int ws = node.waitStatus;
    // 如果当前节点的状态小于0,SINGAL或者是PROPAGATE
    if (ws < 0)
        // 通过CAS的方式将状态设置为0
        compareAndSetWaitStatus(node, ws, 0);
	// 获取当前节点的下一个节点
    Node s = node.next;
    // 如果下一个节点为null,或状态是CANCELLED
    if (s == null || s.waitStatus > 0) {
        // 将头节点的后继节点置为null
        s = null;
        // 找出排在最前面的不是CANCELLED状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果找到了不为CANCELLED的节点
    if (s != null)
        // 唤醒对应的节点
        LockSupport.unpark(s.thread);
}

这里唤醒s.thread后,s.thread就会去竞争锁,回到了doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法。

private void setHeadAndPropagate(Node node, int propagate) {
    // 获取同步队列的头节点
    Node h = head; 
    // 将当前节点设置成同步队列的头节点,同时将thread属性设置为null
    setHead(node);
    // 这里的这么多判断就是为了判断同步队列里面是否还有等待的线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 如果当前节点的后继节点不为null,并且它也是共享模式接单
        Node s = node.next;
        if (s == null || s.isShared())
            // 再次调用释放锁的方法,再次释放一个
            doReleaseShared();
    }
}

所以按照顺序来看的话,Semaphore释放许可的流程可以概括为下面这张图:

image-20251106213455466
image-20251106213455466

在Semaphore中有几个点需要额外的注意下的:

在Semaphore中不检查释放的许可大于初始的许可

这个是Semaphore的设计思路,它并不是令牌桶的,它不需要限定许可的数量,它的主要设计目标就是获取许可的线程减少state的值,释放许可的线程增加state的值。当获取许可的线程获取不到足够的许可时就进入同步队列等待。

在Semaphore中释放锁的时候需要将头节点的状态修改为0

Semaphore是共享模式的,所以存在就是同时有2个线程释放许可。当有2个线程同时来释放许可的时候,会利用CAS的方式去修改头节点的状态,将它从-1(SIGNAL)修改为0(初始状态)。只有第一个CAS修改成功的线程,才具备唤醒头节点的后继节点;第二个线程的CAS操作肯定是失败的,就会进入下面的判断,通过CAS的方式将节点的状态修改为-3(PROPAGATE)。

如果线程CAS成功后,之前获取到的副本h节点,肯定跟现在的头节点是不同的,所以会再次进入循环。进入循环后此时head节点的waitStatus的值就是-3,然后就不会再去CAS修改head节点,所以最终h == head为true,最后break出循环结束释放锁。

这里需要与ReentrantLock区别开来,因为ReentrantLock是独占锁。只有获取到锁的线程才能释放锁,所以不会有多个线程来同时释放锁。所以在ReentrantLock中释放锁的时候它不会将头节点的状态从SIGNAL修改为0。

这篇文章需要对着AQS的源码分析来看,其实AQS的核心内容它是独占锁和共享锁模式。ReentrantLock和基于独占锁实现的,而Semaphore就是基于共享锁实现的。