当前位置: 代码迷 >> 综合 >> Java 并发 (9) -- Semaphore 类
  详细解决方案

Java 并发 (9) -- Semaphore 类

热度:15   发布时间:2023-12-16 13:15:23.0

文章目录

  • 1. 简介
  • 2. 精讲
    • 1. 概念
    • 2. 实现
      • 1. 构造器
      • 2. acquire()
      • 3. release()
    • 3. 示例

1. 简介

  1. synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,而 Semaphore 可以指定多个线程同时访问某个资源,和 CountDownLatch 一样,其本质上是一个 “共享锁”,它通常用于限制可以访问某些资源的线程数目
  2. Semaphore 提供了 acquire() 方法来获取一个许可;获取了许可并用完之后可以通过 release() 方法来释放许可

2. 精讲

1. 概念

信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个 “共享锁”。

Semaphore 在 API 是这么介绍的:

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

下面我们就一个停车场的简单例子来阐述 Semaphore:

为了简单起见我们假设停车场仅有 5 个停车位,一开始停车场没有车辆,所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车,然后又来三辆,这个时候由于只有两个停车位,所有只能停两辆,其余一辆必须在外面候着,直到停车场有空车位,当然以后每来一辆都需要在外面候着。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆 要看选择的机制是公平还是非公平)。
.
从程序角度看,停车场就相当于信号量 Semaphore,其中许可数为 5,车辆就相当于线程。当来一辆车时,许可数就会减 1 ,当停车场没有车位了(即许可书 == 0 ),其他来的车辆需要在外面等候着。如果有一辆车开出停车场,许可数 + 1,然后放进来一辆车。
.
信号量 Semaphore 是一个非负整数( >=0 )。当一个线程想要访问某个共享资源时,它必须要先获取 Semaphore,当 Semaphore > 0 时,获取该资源并使 Semaphore – 1。如果 Semaphore 值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore 则 +1

2. 实现

1. 构造器

在这里插入图片描述
Semaphore 内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类 Sync,其中 Sync 继承 AQS

Semaphore 提供了两个构造函数:

  1. Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的 Semaphore。
  2. Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的 Semaphore。

实现如下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore 默认选择非公平锁

当信号量 Semaphore == 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态,当 =1 时表示其他线程可以获取,当 =0 时,排他,即其他线程必须要等待

2. acquire()

Semaphore 提供了 acquire() 方法来获取一个许可,即获取信号量

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

内部调用 AQS 的 acquireSharedInterruptibly(int arg),该方法以共享模式获取同步状态:

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly(int arg) 中,tryAcquireShared(int arg) 由子类来实现,对于 Semaphore 而言,如果我们选择非公平模式,则调用 NonfairSync 的 tryAcquireShared(int arg) 方法,否则调用 FairSync 的 tryAcquireShared(int arg) 方法

  1. 选择公平模式:

    protected int tryAcquireShared(int acquires) {
          for (;;) {
          //判断该线程是否位于CLH队列的列头if (hasQueuedPredecessors())return -1;//获取当前的信号量许可int available = getState();//设置“获得acquires个信号量许可之后,剩余的信号量许可数”int remaining = available - acquires;//CAS设置信号量if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
    }
    
  2. 选择非公平模式:

    对于非公平而言,因为它不需要判断当前线程是否位于 CLH 同步队列列头,所以相对而言会简单些

    protected int tryAcquireShared(int acquires) {
          return nonfairTryAcquireShared(acquires);
    }final int nonfairTryAcquireShared(int acquires) {
          for (;;) {
          int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
    }
    

3. release()

获取了许可,当用完之后就需要释放,Semaphore 提供 release() 来释放许可,即释放信号量

public void release() {
    sync.releaseShared(1);
}

内部调用 AQS 的 releaseShared(int arg)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();return true;}return false;
}

releaseShared(int arg) 调用 Semaphore 内部类 Sync 的 tryReleaseShared(int arg)

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    int current = getState();//信号量的许可数 = 当前信号许可数 + 待释放的信号许可数int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//设置可获取的信号许可数为nextif (compareAndSetState(current, next))return true;}
}

3. 示例

public class SemaphoreTest {
    static class Parking {
    //信号量private Semaphore semaphore;Parking(int count){
    semaphore = new Semaphore(count);}public void park(){
    try {
    //获取信号量semaphore.acquire();long time = (long) (Math.random() * 10);System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒..." );Thread.sleep(time);System.out.println(Thread.currentThread().getName() + "开出停车场...");} catch (InterruptedException e) {
    e.printStackTrace();} finally {
    semaphore.release();}}}static class Car extends Thread {
    Parking parking ;Car(Parking parking){
    this.parking = parking;}@Overridepublic void run() {
    parking.park();  //进入停车场}}public static void main(String[] args){
    Parking parking = new Parking(3);for(int i = 0 ; i < 5 ; i++){
    new Car(parking).start();}}
}

在这里插入图片描述

  相关解决方案