当前位置: 代码迷 >> 综合 >> 从零开始学并发三.ReentrantLock
  详细解决方案

从零开始学并发三.ReentrantLock

热度:101   发布时间:2023-11-26 12:00:59.0

ReentrantLock

  • ReentrantLock的结构
    • Node
    • lock()
      • acquire(int arg)
      • tryAcquire(int acquires)
      • acquireQueued(final Node node, int arg)
      • addWaiter(Node mode)
      • enq(final Node node)
      • shouldParkAfterFailedAcquire(p, node)
      • parkAndCheckInterrupt()
      • cancelAcquire(Node node)
      • unparkSuccessor(Node node)
    • unlock()
      • tryRelease(arg)
    • lockInterruptibly()
      • doAcquireInterruptibly(arg)
    • tryLock()
    • tryLock(long timeout, TimeUnit unit)
      • tryAcquireNanos(int arg, long nanosTimeout)
        • doAcquireNanos(int arg, long nanosTimeout)
  • 总结

ReentrantLock的结构

在这里插入图片描述在这里插入图片描述
看图可以知道 ReentrantLock 是基于内部类继承AbstractQueuedSynchronizer(AQS)来实现的
AQS 功能强大,我们后续讲的很多JUC类都是基于他实现的,我们一个一个类去讲解,讲完了来综合AQS各个方法的功能。
在这里我来讲几个AQS里面几个比较重要的属性
1.state 同步状态
2.tail 队尾
3.head 对头
讲到这三个属性,就不得不讲下AQS 中一个内部类

Node

node是一个CLH队列 双向链表。

 static final class Node {
    /** Marker to indicate a node is waiting in shared mode */// 共享Nodestatic final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */// 取消static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking *///通知static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition *///条件static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/// 广播static final int PROPAGATE = -3;// 等待状态volatile int waitStatus;//上个节点volatile Node prev;// 下个节点volatile Node next;/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/// 持有线程volatile Thread thread;}

类其实没啥好讲的,我开始来讲方法把,就是大概了解下这个类的一些内部属性

lock()

// 这里基于FairSync(公平锁来讲)public void lock() {
    sync.lock();}final void lock() {
    acquire(1);}public final void acquire(int arg) {
    // 尝试去拿锁,如果拿到锁,直接返回,//没拿到锁开始将线程入队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果入队的时候,线程被中断了,线程自身也发起中断selfInterrupt();}

acquire(int arg)

   public final void acquire(int arg) {
    // 尝试去拿锁,如果拿到锁,直接返回,//没拿到锁开始将线程入队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果入队的时候,线程被中断了,线程自身也发起中断selfInterrupt();}

tryAcquire(int acquires)

protected final boolean tryAcquire(int acquires) {
    // 拿到当前线程final Thread current = Thread.currentThread();// 拿到同步状态int c = getState();// 为0 说明当前没有线程持有锁if (c == 0) {
    // 判断当前队列是否有节点在排队,并通过CAS尝试拿锁// 拿到就直接返回if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);return true;}}// 如果当前线程重入else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

acquireQueued(final Node node, int arg)

 final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;try {
    boolean interrupted = false;for (;;) {
    //拿当前节点的上一个节点final Node p = node.predecessor();// 如果上个节点是对头,当前线程再去尝试拿一次锁if (p == head && tryAcquire(arg)) {
    // 拿到后将当前节点设为对头//将thread和prev置空setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {
    if (failed)cancelAcquire(node);}}

addWaiter(Node mode)

private Node addWaiter(Node mode) {
    // 初始化节点Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 拿到当前队尾节点,第一次进来时是初始化状态,所以队尾节点肯定为null// 后面再有节点进来,将后面的节点和队尾节点连接起来if (pred != null) {
    node.prev = pred;if (compareAndSetTail(pred, node)) {
    pred.next = node;return node;}}// 初始化队列enq(node);return node;

enq(final Node node)

  private Node enq(final Node node) {
    for (;;) {
    Node t = tail;// 当队尾为null,初始化队列//if (t == null) {
     // Must initialize//初始化对头和队尾if (compareAndSetHead(new Node()))tail = head;} else {
    // 如果再初始化的过程已经有另外的线程将队列初始化了,//当前线程直接入队即可node.prev = t;if (compareAndSetTail(t, node)) {
    t.next = node;return t;}}}}

shouldParkAfterFailedAcquire(p, node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;// 拿到节点状态,如果是可唤醒状态直接返回// 当这个节点已经是可唤醒状态,说明他之前已经至少进行过一次自旋了// 这时候他应该要被park住if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 节点状态大于0属于不正常节点,递归找到最后一个正常节点if (ws > 0) {
    /** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {
    node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {
    /** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/// CAS 将当前节点改为可唤醒状态compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

parkAndCheckInterrupt()

  private final boolean parkAndCheckInterrupt() {
    // 挂其线程LockSupport.park(this);// 查看此时线程是否被中断了return Thread.interrupted();}

cancelAcquire(Node node)

 private void cancelAcquire(Node node) {
    // Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessors// 跳过状态不正常的predecessorsNode pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.// 这里去拿prev的下个节点,是为了防止再多线程情况下,node已经被删除了Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.// 如果node现在是队尾,将队尾换成prev//并且将prev的next节点置空if (node == tail && compareAndSetTail(node, pred)) {
    compareAndSetNext(pred, predNext, null);} else {
    // If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;// 1.pred 不是对头// 2.pred是可唤醒的 // 3.prev有线程持有// 上述三个条件满足,将node节点排除,直接把pred节点连接到node 的next节点if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
    Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {
    // 1. 如果pred 是对头,那说明node节点本来是下一个唤醒的节点,但是因为他是cannel状态//必须要找到node后面的节点来唤醒,此时的node的waitStatus 是CANCELLED状态==1// 从队尾开始找到一个最接近node一个可唤醒的节点,并唤醒他//2.pred的状态是不可唤醒的,说明unparkSuccessor(node);}node.next = node; // help GC}}

unparkSuccessor(Node node)

  private void unparkSuccessor(Node node) {
    /** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;// 删除状态,将节点恢复为重新入队模式if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 找到最接近node的一个可唤醒或者可入队的节点Node s = node.next;if (s == null || s.waitStatus > 0) {
    s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果存在的话 唤醒节点if (s != null)LockSupport.unpark(s.thread);}

入队逻辑光看代码可能很难理解,下面我们结合一张图来讲
第一个线程入队时,此时,因为队列为空,需要初始化队列,所以对头节点waitStatus为0,且Thread为null,入队的第一个线程要挂再head节点后面,且再第一次循环后将前驱节点waitStatus改为-1
在这里插入图片描述

在这里插入图片描述

unlock()

  public void unlock() {
    sync.release(1);}

 public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;// 拿到头节点,如果头节点的状态不是持有锁的状态,就将head节点的下一个等待唤醒的节点唤醒if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

tryRelease(arg)

   protected final boolean tryRelease(int releases) {
    // 拿到最新的同步状态int c = getState() - releases;// 判断现在释放锁的线程是不是持有锁的线程,不是就抛错if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 当同步状态值为0 的时候 就说明此时没有先吃持有锁if (c == 0) {
    free = true;setExclusiveOwnerThread(null);}// 重置同步状态setState(c);return free;}

了解完上锁和解锁连的完整代码后,我们可以根据上述代码逻辑填充出自己的流程图,以便加深理解
在这里插入图片描述
上述图 完整描述了,线程入队出队的所有流程。

在ReentrantLock lock拿锁的时候 有以下几种情况
1.只有一个线程一次拿锁,直接拿锁成功,当前state==1
2.只有一个线程多次拿锁,直接拿锁成功,当前state等于拿锁的次数(重入
3.多个线程拿锁,除了拿到锁的线程,其他锁全部入队排队。

讲了FairSync(公平锁)上锁和解锁的流程那NonfairSync(非公平锁)上锁解锁流程是怎么样的呢

我们可以把两种上锁解锁代码拿出来比较,发现两种锁只有在上锁(lock)的时候有所不同

不同点一:

FairSync

    final void lock() {
    acquire(1);}

NonfairSync

 final void lock() {
    if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}

可以发现非公平锁在上锁的时候 会先去尝试拿一次锁,拿锁成功就直接上锁返回。
不同点二
FairSync

      protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
    if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

NonfairSync

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);}final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
在调用tryAcquire方法时,公平锁会调用hasQueuedPredecessors() 方法判断CLH队列中是否有节点排队,
非公平锁会直接去抢锁。

lockInterruptibly()

   public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);}public final void acquireInterruptibly(int arg)throws InterruptedException {
    //判断当前线程是否被中断,并清除中断标记if (Thread.interrupted())throw new InterruptedException();//尝试拿锁if (!tryAcquire(arg))//拿锁不成功调用doAcquireInterruptibly(arg);}

doAcquireInterruptibly(arg)

private void doAcquireInterruptibly(int arg)throws InterruptedException {
    //新增一个节点final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {
    for (;;) {
    // 拿前一个节点是头节点时,尝试拿锁final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {
    setHead(node);p.next = null; // help GCfailed = false;return;}// 将队列park,并检验线程是否被中断,被终端的话 直接抛异常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}}

看完lockInterruptibly()整个源码逻辑,大概就能理解这个方法的作用了,他在没被中断的时候和其他节点一样,正常排队,一旦被中断,他并不是只是标记一个中断标识,而是直接抛出检查异常,会阻碍线程正常运行。

这个方法的作用就可以用来做一些取消线程的操作。

tryLock()

 public boolean tryLock() {
    return sync.nonfairTryAcquire(1);}final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

可以看到tryLock()的调用逻辑,他只会去抢一次锁,抢到锁就直接占用,没抢到就直接返回,不入队。

这个方法的作用就是起到一个忽略作用,又或者定时任务防止重复执行。

tryLock(long timeout, TimeUnit unit)

   public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));}

tryAcquireNanos(int arg, long nanosTimeout)

 public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
    // 如果被中断直接抛异常if (Thread.interrupted())throw new InterruptedException();// 尝试拿锁,拿不到就去有限时间的等待return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}

doAcquireNanos(int arg, long nanosTimeout)

 private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
    if (nanosTimeout <= 0L)return false;// 取到最大等待时间final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {
    for (;;) {
    // 判断前节点是否为头节点,是的话,尝试入队final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {
    setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();// 判断是否超过最大等待时间if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)// park住nanosTimeout 的时间LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}}

这个方法也能抛出中断异常,且它能进行有限时间的park。

总结

到现在为止,基本上ReentrantLock方法基本上都讲解了一遍,还有一些其他的定义可能在别的同步类里面有用到,当我们讲到相关类再来讲。
大家在读我的文档时最好能一边看,一边自己动手试试,这样效果更好,因为每个人对ReentrantLock的理解都不同。只有自己理解出来的东西才是自己的。

最后希望大家能看完之后觉得还不错的话 帮忙点个赞,加个关注,这样我才能更有动力为大家奉献一些更好的文章。
Thanks!!

  相关解决方案