同步工具类图
- 闭锁相关:CountDownLatch
- 信号量相关:Semaphore
- 栅栏相关:CyclicBarrier 、Exchanger
- 线程池相关:Executors
什么是同步工具类
同步工具类可以是任何一种对象,只要它能够根据自身的状态来协调线程之间的控制流。阻塞队列、信号量、栅栏、闭锁等都是同步工具类。
闭锁
闭锁是一种工具同步类,可以延迟线程进度直到其达到终止状态。闭锁的作用相当于一道门,在闭锁到达结束状态之前,这扇门一直是关闭的,没有线程可以通过;当闭锁结束时,这扇门会打开所有线程可以通过。当闭锁达到结束状态打开门时,将不会再改变其状态,即门不会再次关闭。
闭锁就像是景点大门一样,在大门打开之前,任何人都不能进入景点;在大门打开之后,任何人都可以进入景点。
闭锁的应用场景:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
- 确保某个服务在其依赖的所有其他服务都已经启动后才启动;
- 等待直到某个操作的所有参与者都就绪再继续执行。
CountDownLatch
CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况下使用,它可以使一个或多个线程等待一组事件的发生。闭锁状态包括一个计数器,计数器被初始化为一个正数,表示需要等待的事件数量。countDown()方法递减计数器,表示一个事件已经发生了。await()方法等待计数器为0,这表示所有事件已经发生。如果计数器非0,那么await会一直阻塞,直到计数器为0,或者等待中的线程中断,或者等待超时。
public class TestHarness{public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{final CountDownLanch startGate = new CountDownLanch(1); //起始门,初始化为1final CountDownLanch endGate = new CountDownLanch(nThreads); //结束门,初始化为线程的数量for(int i=0; i<nThreads; i++){Thread t = new Thread(){public void run(){try{startGate.await(); //n个线程都会阻塞在这里,直到startGate门打开try{task.run();}finally{engGate.countDown(); //每个线程都在执行结束时将endGate门的计数器减一}}catch (InterruptedException ignored){}}};t.start(); //启动线程,但会阻塞在startGate门}//使用nanoTime()提供准确的计时long start = System.nanoTime();startGate.countDown(); //将StartGate门打开,n个线程同时执行endGate.await(); //阻塞直到n个线程都执行完成,计数器变为0long end = System.nanoTime();return end-start;}
}
上面的TestHarness类中使用闭锁,确保n个线程同时开始执行。这在测试n个线程并发执行某个任务所需要的时间是很有用。如果不适用闭锁,先启动的线程必将领先后启动的线程。
信号量Semaphore
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时指定某个特定操作的数量。信号量用来解决同步问题而不是用来解决死锁问题。
Semaphore中管理着一组虚拟的许可(premit),许可的初始数量可通过构造函数来制定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后释放许可。如果没有许可将被阻塞。
public class BoundedHashSet<T>{private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound){this.set = Collection.synchronizedSet(New HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException{sem.acquire(); //获取许可boolean wasAdded = false;try{wasAdded = set.add(o);return wasAdded}finally{if(!wasAdded) //如果add没有添加元素,立即释放许可sem.release(); }}public boolean remove(Object o){boolean wasRemoved = set.remove(o);if(wasRemoved)sem.release(); return wasRemoved;}
}
栅栏
栅栏类似与闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,栅栏用于等待其他线程。
栅栏就像是几个人决定在某个景点集合,要求所有人都到景点门口会面之后一起进入景点。
常见的栅栏有两种形式:CyclicBarrier和Exchanger。
CyclicBarrier:
CyclicBarrier可以使一定数量的参与方反复在栅栏处聚集,它在并发的迭代算法中非常有用:这种算法通常将一个问题拆分为一系列相对独立的子问题。当线程到达栅栏处时,将调用await()方法阻塞,直到所有线程都到达栅栏位置。然后栅栏打开,所有线程都被放行,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么认为栅栏被打破了,所有阻塞的await都将终止并抛出BrokenBarrierException。
Exchanger:
Exechanger是一种两方(Two-Party)栅栏,各方在栅栏位置交换数据。当两方执行不对称操作时Exechanger非常有用,例如当一个线程向缓冲区写数据,另一个线程从缓冲区读数据。这些线程可以使用Exechanger来汇合,并将满的缓冲区和空的缓冲区交换。