当前位置: 代码迷 >> 综合 >> Java Concurrent--同步工具类
  详细解决方案

Java Concurrent--同步工具类

热度:64   发布时间:2023-09-21 21:03:11.0

同步工具类图

  • 闭锁相关:CountDownLatch
  • 信号量相关:Semaphore
  • 栅栏相关:CyclicBarrier 、Exchanger
  • 线程池相关:Executors

什么是同步工具类

同步工具类可以是任何一种对象,只要它能够根据自身的状态来协调线程之间的控制流。阻塞队列、信号量、栅栏、闭锁等都是同步工具类。

闭锁

闭锁是一种工具同步类,可以延迟线程进度直到其达到终止状态。闭锁的作用相当于一道门,在闭锁到达结束状态之前,这扇门一直是关闭的,没有线程可以通过;当闭锁结束时,这扇门会打开所有线程可以通过。当闭锁达到结束状态打开门时,将不会再改变其状态,即门不会再次关闭。

闭锁就像是景点大门一样,在大门打开之前,任何人都不能进入景点;在大门打开之后,任何人都可以进入景点。

闭锁的应用场景:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  • 确保某个服务在其依赖的所有其他服务都已经启动后才启动;
  • 等待直到某个操作的所有参与者都就绪再继续执行。

CountDownLatch

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况下使用,它可以使一个或多个线程等待一组事件的发生。闭锁状态包括一个计数器,计数器被初始化为一个正数,表示需要等待的事件数量。countDown()方法递减计数器,表示一个事件已经发生了。await()方法等待计数器为0,这表示所有事件已经发生。如果计数器非0,那么await会一直阻塞,直到计数器为0,或者等待中的线程中断,或者等待超时。

public class TestHarness{public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{final CountDownLanch startGate = new CountDownLanch(1);  //起始门,初始化为1final CountDownLanch endGate = new CountDownLanch(nThreads);  //结束门,初始化为线程的数量for(int i=0; i<nThreads; i++){Thread t = new Thread(){public void run(){try{startGate.await();    //n个线程都会阻塞在这里,直到startGate门打开try{task.run();}finally{engGate.countDown();    //每个线程都在执行结束时将endGate门的计数器减一}}catch (InterruptedException ignored){}}};t.start();    //启动线程,但会阻塞在startGate门}//使用nanoTime()提供准确的计时long start = System.nanoTime();startGate.countDown();    //将StartGate门打开,n个线程同时执行endGate.await();    //阻塞直到n个线程都执行完成,计数器变为0long end = System.nanoTime();return end-start;}
}

上面的TestHarness类中使用闭锁,确保n个线程同时开始执行。这在测试n个线程并发执行某个任务所需要的时间是很有用。如果不适用闭锁,先启动的线程必将领先后启动的线程。

信号量Semaphore

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时指定某个特定操作的数量。信号量用来解决同步问题而不是用来解决死锁问题。

Semaphore中管理着一组虚拟的许可(premit),许可的初始数量可通过构造函数来制定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后释放许可。如果没有许可将被阻塞。

public class BoundedHashSet<T>{private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound){this.set = Collection.synchronizedSet(New HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException{sem.acquire();    //获取许可boolean wasAdded = false;try{wasAdded = set.add(o);return wasAdded}finally{if(!wasAdded)    //如果add没有添加元素,立即释放许可sem.release();    }}public boolean remove(Object o){boolean wasRemoved = set.remove(o);if(wasRemoved)sem.release(); return wasRemoved;}
}

栅栏

栅栏类似与闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,栅栏用于等待其他线程。

栅栏就像是几个人决定在某个景点集合,要求所有人都到景点门口会面之后一起进入景点。

常见的栅栏有两种形式:CyclicBarrier和Exchanger。

CyclicBarrier:

CyclicBarrier可以使一定数量的参与方反复在栅栏处聚集,它在并发的迭代算法中非常有用:这种算法通常将一个问题拆分为一系列相对独立的子问题。当线程到达栅栏处时,将调用await()方法阻塞,直到所有线程都到达栅栏位置。然后栅栏打开,所有线程都被放行,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么认为栅栏被打破了,所有阻塞的await都将终止并抛出BrokenBarrierException。

Exchanger:

Exechanger是一种两方(Two-Party)栅栏,各方在栅栏位置交换数据。当两方执行不对称操作时Exechanger非常有用,例如当一个线程向缓冲区写数据,另一个线程从缓冲区读数据。这些线程可以使用Exechanger来汇合,并将满的缓冲区和空的缓冲区交换。

  相关解决方案