信号量 Semaphore

简介

  Semaphore是一种基于计数的信号量,管理了一组许可。线程可以申请许可,当信号量中有许可时,线程申请成功, 拿走一个许可;没有许可时,线程阻塞等待其他线程用完了许可,归还给信号量。这个许可不是真正的许可(比如凭证), 只是一个计数,线程也不会真正使用这些许可。

  信号量初始化为1可以用作互斥锁,更常见的是称为二进制信号量,表示两种互斥状态。但和Lock有 些区别,Lock只能又获取锁的线程来释放锁,而Semaphore允许其他线程来做释放动作。

  Semaphore也支持公平和非公平策略。

一、Semaphore.Sync实现AQS

  • 使用共享模式
  • 使用AQS同步状态表示许可数量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     * 使用AQS 同步状态表示许可数量
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }
        //非公平
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                //如果确认有可申请的许可,那么通过CAS操作进行请求。
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        // 释放共享锁资源
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow  int最大值溢出
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        //减少许可
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow int最小值溢出
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        //将许可数量置为0,并返回现有的许可数量。
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

二、Semaphore.NonFair(非公平锁)实现Semaphore.Sync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

二、Semaphore.FairSync(公平锁)实现Semaphore.Sync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) //hasQueuedPredecessors就是防止插队。hasQueuedPredecessors在ReentrantLock源码分析.md具体分析过,在这里不做具体分析。
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

二、Semaphore实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;
  /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    //响应中断获得许可
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    //不响应中断获得许可
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    //尝试获得许可
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    //获得许可指定超时
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    //释放获得许可
    public void release() {
        sync.releaseShared(1);
    }
    //响应中断获得许可,指定许可数量
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    //不响应中断获得许可,指定许可数量
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }
   //尝试获得许可,指定许可数量
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    //获得许可指定超时,指定许可数量
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    //释放获得许可,指定许可数量
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
    //可用许可数量
    public int availablePermits() {
        return sync.getPermits();
    }
    //清楚许可数量为零
    public int drainPermits() {
        return sync.drainPermits();
    }
    //缩减许可数量
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
    //是否公平锁
    public boolean isFair() {
        return sync instanceof FairSync;
    }
    //是否有线程等待获取许可
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    //等待获取许可线程数量
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    //等待获取许可线程集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

}
    

三、Semaphore流程

  • 创建n个许可数量,当线程调用acquire方法时,许可数量减1,当线程调用release方法时,释放许可数量加1;
  • 当许可数量小于零时,进入线程加入到AQS同步队列中,等待线程release方法释放许可数量。