当前位置: 代码迷 >> 综合 >> Java基础- Semaphore,CountDownLatch,CyclicBarrier
  详细解决方案

Java基础- Semaphore,CountDownLatch,CyclicBarrier

热度:43   发布时间:2023-12-15 12:58:24.0

Semaphore(信号量)

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

public class SemaphoreExample1 {
    // 请求的数量private static final int threadCount = 550;public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)ExecutorService threadPool = Executors.newFixedThreadPool(300);// 一次只能允许执行的线程数量。final Semaphore semaphore = new Semaphore(20);for (int i = 0; i < threadCount; i++) {
    final int threadnum = i;threadPool.execute(() -> {
    // Lambda 表达式的运用try {
    semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20test(threadnum);semaphore.release();// 释放一个许可} catch (InterruptedException e) {
    // TODO Auto-generated catch blocke.printStackTrace();}});}threadPool.shutdown();System.out.println("finish");}public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作System.out.println("threadnum:" + threadnum);Thread.sleep(1000);// 模拟请求的耗时操作}
}

执行 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:

semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 释放5个许可

除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。
Semaphore 有两种模式,公平模式和非公平模式:

 public Semaphore(int permits) {
    sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 与 CountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量。

CountDownLatch (倒计时器)

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

CountDownLatch 的两种典型用法
  1. 某一线程在开始运行前等待 n 个线程执行完毕。
    将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性。
    注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
CountDownLatch 的使用示例
public class CountDownLatchExample1 {
    // 请求的数量private static final int threadCount = 550;public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)ExecutorService threadPool = Executors.newFixedThreadPool(300);final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {
    final int threadnum = i;threadPool.execute(() -> {
    // Lambda 表达式的运用try {
    test(threadnum);} catch (InterruptedException e) {
    // TODO Auto-generated catch blocke.printStackTrace();} finally {
    countDownLatch.countDown();// 表示一个请求已经被完成}});}countDownLatch.await();threadPool.shutdown();System.out.println("finish");}public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作System.out.println("threadnum:" + threadnum);Thread.sleep(1000);// 模拟请求的耗时操作}
}
  • 与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
  • 其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。

CountDownLatch 的不足
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

CyclicBarrier(循环栅栏)

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

再来看一下它的构造函数:

public CyclicBarrier(int parties) {
    this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}
  • 其中,parties 就代表了要拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

public class CyclicBarrierExample2 {
    // 请求的数量private static final int threadCount = 550;// 需要同步的线程数量private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);public static void main(String[] args) throws InterruptedException {
    // 创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(10);for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;Thread.sleep(1000);threadPool.execute(() -> {
    try {
    test(threadNum);} catch (InterruptedException e) {
    // TODO Auto-generated catch blocke.printStackTrace();} catch (BrokenBarrierException e) {
    // TODO Auto-generated catch blocke.printStackTrace();}});}threadPool.shutdown();}public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");try {
    /**等待60秒,保证子线程完全执行结束*/cyclicBarrier.await(60, TimeUnit.SECONDS);} catch (Exception e) {
    System.out.println("-----CyclicBarrierException------");}System.out.println("threadnum:" + threadnum + "is finish");}}

运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await() 方法之后的方法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。示例代码如下:

public class CyclicBarrierExample3 {
    // 请求的数量private static final int threadCount = 550;// 需要同步的线程数量private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
    System.out.println("------当线程数达到之后,优先执行------");});public static void main(String[] args) throws InterruptedException {
    // 创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(10);for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;Thread.sleep(1000);threadPool.execute(() -> {
    try {
    test(threadNum);} catch (InterruptedException e) {
    // TODO Auto-generated catch blocke.printStackTrace();} catch (BrokenBarrierException e) {
    // TODO Auto-generated catch blocke.printStackTrace();}});}threadPool.shutdown();}public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");cyclicBarrier.await();System.out.println("threadnum:" + threadnum + "is finish");}}

运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
CyclicBarrier 源码分析

当调用 CyclicBarrier 对象await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);} catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen}
}

dowait(false, 0L):

 // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。private int count;/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {
    final ReentrantLock lock = this.lock;// 锁住lock.lock();try {
    final Generation g = generation;if (g.broken)throw new BrokenBarrierException();// 如果线程中断了,抛出异常if (Thread.interrupted()) {
    breakBarrier();throw new InterruptedException();}// cout减1int index = --count;// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件if (index == 0) {
      // trippedboolean ranAction = false;try {
    final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 将 count 重置为 parties 属性的初始化值// 唤醒之前等待的线程// 下一波执行开始nextGeneration();return 0;} finally {
    if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {
    try {
    if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
    breakBarrier();throw ie;} else {
    // We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {
    breakBarrier();throw new TimeoutException();}}} finally {
    lock.unlock();}}

总结:CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务并且重置count。

CyclicBarrier 和 CountDownLatch 的区别
  1. CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用
  2. CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
  相关解决方案