一:工作原理
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。
synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源
Semaphore 有两种模式,公平模式和非公平模式。
- 公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO;
- 非公平模式: 抢占式的。
Semaphore 对应的两个构造方法如下:
/*** Creates a {@code Semaphore} with the given number of* permits and nonfair fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** Creates a {@code Semaphore} with the given number of* permits and the given fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.* @param fair {@code true} if this semaphore will guarantee* first-in first-out granting of permits under contention,* else {@code false}*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式
Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。
这里就看一下NonfairSync的构造方法:
NonfairSync(int permits) {super(permits);}
可以看到直接调用了父类的构造方法,Sync的构造方法如下:
Sync(int permits) {setState(permits);}
可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
获取许可
先从获取一个许可看起,并且先看非公平模式下的实现。首先看acquire方法,acquire方法有几个重载,但主要是下面这个方法
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}
从上面可以看到,调用了Sync的acquireSharedInterruptibly方法,该方法在父类AQS中,如下:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//如果线程被中断了,抛出异常if (Thread.interrupted())throw new InterruptedException();//获取许可失败,将线程加入到等待队列中if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
该方法调用了父类中的nonfairTyAcquireShared方法,如下:
final int nonfairTryAcquireShared(int acquires) {for (;;) {// 获取剩余许可int available = getState();// 计算给完这次许可后剩余的个数int remaining = available - acquires;//如果许可不够或者可以将许可数量重置的话,返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
从上面可以看到,只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。看完了非公平的获取,再看下公平的获取,代码如下:
protected int tryAcquireShared(int acquires) {for (;;) {//如果前面有线程再等待,直接返回-1if (hasQueuedPredecessors())return -1;//后面与非公平一样int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
从上面可以看到,FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
看完了获取许可后,再看一下释放许可。
释放许可
释放许可也有几个重载方法,但都会调用下面这个带参数的方法,
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}
releaseShared方法在AQS中,如下:
public final boolean releaseShared(int arg) {//如果改变许可数量成功if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
AQS子类实现共享模式的类需要实现tryReleaseShared类来判断是否释放成功,实现如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {// 获取当前许可数量int current = getState();// 计算回收后的数量int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS改变许可数量成功,返回trueif (compareAndSetState(current, next))return true;}}
从上面可以看到,一旦CAS改变许可数量成功,那么就会调用doReleaseShared()方法释放阻塞的线程。
减小许可数量
Semaphore还有减小许可数量的方法,该方法可以用于用于当资源用完不能再用时,这时就可以减小许可证。代码如下:
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}
可以看到,委托给了Sync,Sync的reducePermits方法如下:
final void reducePermits(int reductions) {for (;;) {//得到当前剩余许可数量int current = getState();//得到减完之后的许可数量int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//如果CAS改变成功if (compareAndSetState(current, next))return;}}
从上面可以看到,就是CAS改变AQS中的state变量,因为该变量代表许可证的数量。
获取剩余许可数量
Semaphore还可以一次将剩余的许可数量全部取走,该方法是drain方法,如下:
/*** Acquires and returns all permits that are immediately available.** @return the number of permits acquired*/public int drainPermits() {return sync.drainPermits();}
Sync的实现如下:
final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
可以看到,就是CAS将许可数量置为0。
二:Semaphore实例
package com.github.springbootdemo.demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;/*** @Description: 需要一次性拿一个许可的情况*/
public class SemaphoreExample {//请求的数量private static final int threatCount = 550;public static void main(String[] args) {//创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给的太少的话,你会发现执行的很慢)ExecutorService threadPool = Executors.newFixedThreadPool(500);//一次只能允许执行的线程数量final Semaphore semaphore = new Semaphore(20);for(int i=0;i<threatCount;i++){final int threadNum = i;threadPool.execute(()->{ //Lambda 表达式的运用try {semaphore.acquire(); // 获取一个许可,所以可运行线程数量为20/1=20test(threadNum);semaphore.release(); //释放一个许可}catch (Exception e){e.printStackTrace();}});}//关闭线程池threadPool.shutdown();System.out.println("finish");}public static void test(int threadnum) throws Exception{Thread.sleep(1000); //模拟请求的耗时时间System.out.println("threadnum:" + threadnum);Thread.sleep(1000); //模拟请求的耗时时间}}
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:
semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 获取5个许可,所以可运行线程数量为20/5=4
运行结果:
threadnum:31
threadnum:515
threadnum:516
threadnum:519
threadnum:523
threadnum:32
threadnum:526
threadnum:532
threadnum:12
threadnum:35
threadnum:33
threadnum:524
threadnum:527
threadnum:525
threadnum:533
threadnum:30