一、介绍
Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。它的用法如下:
public class Service {private Semaphore semaphore = new Semaphore(1);//只能通过一个线程public void testMethod() {try {semaphore.acquire();//获取许可 permitSystem.out.println(Thread.currentThread().getName()+"begin timer:"+System.currentTimeMillis());Thread.sleep(5000);System.out.println(Thread.currentThread().getName()+"end timer="+System.currentTimeMillis());semaphore.release();//释放持有许可} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
-
-
二、方法摘要
所有方法 接口方法 具体的方法 Modifier and Type Method and Description voidacquire()从该信号量获取许可证,阻止直到可用,或线程为 interrupted 。
voidacquire(int permits)从该信号量获取给定数量的许可证,阻止直到所有可用,否则线程为 interrupted 。
voidacquireUninterruptibly()从这个信号灯获取许可证,阻止一个可用的。
voidacquireUninterruptibly(int permits)从该信号量获取给定数量的许可证,阻止直到所有可用。
intavailablePermits()返回此信号量中当前可用的许可数。
intdrainPermits()获取并返回所有可立即获得的许可证。
protected Collection<Thread>getQueuedThreads()返回一个包含可能正在等待获取的线程的集合。
intgetQueueLength()返回等待获取的线程数的估计。
booleanhasQueuedThreads()查询任何线程是否等待获取。
booleanisFair()如果此信号量的公平设置为真,则返回
true。protected voidreducePermits(int reduction)缩小可用许可证的数量。
voidrelease()释放许可证,将其返回到信号量。
voidrelease(int permits)释放给定数量的许可证,将其返回到信号量。
StringtoString()返回一个标识此信号量的字符串及其状态。
booleantryAcquire()从这个信号量获得许可证,只有在调用时可以使用该许可证。
booleantryAcquire(int permits)从这个信号量获取给定数量的许可证,只有在调用时全部可用。
booleantryAcquire(int permits, long timeout, TimeUnit unit)从该信号量获取给定数量的许可证,如果在给定的等待时间内全部可用,并且当前线程尚未 interrupted 。
booleantryAcquire(long timeout, TimeUnit unit)如果在给定的等待时间内可用,并且当前线程尚未 到达 interrupted,则从该信号量获取许可。
-
-
-
三、构造方法详细信息
-
Semaphore
public Semaphore(int permits)
创建一个
Semaphore与给定数量的许可证和非公平公平设置。参数
permits-permits的初始许可证。 该值可能为负数,在这种情况下,必须在任何获取被授予之前发布释放。
-
Semaphore
public Semaphore(int permits,boolean fair)
创建一个
Semaphore与给定数量的许可证和给定的公平设置。参数
permits-permits的初始许可证。 该值可能为负数,在这种情况下,必须在任何获取被授予之前发布释放。fair-true如果这个信号量将保证首先在竞争中首先授予许可证,否则false
-
-
四、用Semaphore实现生产者消费者模式
Service 处理业务逻辑:
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class RepastService {volatile private Semaphore setSemaphore = new Semaphore(10);//厨师volatile private Semaphore getSemaphore = new Semaphore(10);//顾客volatile private ReentrantLock lock = new ReentrantLock(); //锁volatile private Condition setCondition = lock.newCondition();volatile private Condition getCondition = lock.newCondition();volatile private Object[] producePosition = new Object[4];//只有四个餐盘private boolean isEmpty() {boolean isEmpty = true;for (int i = 0; i< producePosition.length;i++) {if (producePosition[i]!=null) {isEmpty=false;break;}}if(isEmpty==true) {return true;}else {return false;} }private boolean isFull() {boolean isFull = true;for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]==null) {isFull = false;break;}}return isFull;}public void set() {try {setSemaphore.acquire();lock.lock();while(isFull()) {//生产者在等待setCondition.await();}for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]==null) {producePosition[i] = "数据";System.out.println(Thread.currentThread().getName()+" 生产了 "+producePosition[i]);break;}}getCondition.signalAll();lock.unlock();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {setSemaphore.release();}}public void get() {try {getSemaphore.acquire();lock.lock();while (isEmpty()) {getCondition.await();}for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]!=null) {System.out.println(Thread.currentThread().getName()+" 消费了 "+ producePosition[i]);producePosition[i]=null;break;}} setCondition.signalAll();lock.unlock();}catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {getSemaphore.release();}}
}
ThreadCus 消费者:
public class ThreadCus extends Thread {private RepastService service ;public ThreadCus(RepastService service) {super();this.service = service;}public void run() {service.get();}
}
ThreadPro 生产者:
public class ThreadPro extends Thread {private RepastService service ;public ThreadPro(RepastService service) {super();this.service = service;}public void run() {service.set();}
}
测试类:
public class Runtest {public static void main(String[] args) throws InterruptedException {RepastService service = new RepastService();ThreadPro[] arrayP = new ThreadPro[60];ThreadCus[] arrayC = new ThreadCus[60];for (int i = 0; i < arrayP.length; i++) {arrayP[i] = new ThreadPro(service);arrayC[i] = new ThreadCus(service);}Thread.sleep(2000);for (int i = 0; i < arrayP.length; i++) {arrayP[i].start();arrayC[i].start();}}
}