当前位置: 代码迷 >> 综合 >> 【JUC源码学习03】读写锁(ReentrantReadWriteLock)学习
  详细解决方案

【JUC源码学习03】读写锁(ReentrantReadWriteLock)学习

热度:46   发布时间:2023-10-27 00:12:20.0

读写锁(ReentrantReadWriteLock)学习

一、读写锁概述

1、读写锁:维护了一对锁,一个读锁,一个写锁,通过分离读锁和写锁,使得并发性能比一般的排他锁有了很大的提升。

2、过程:同一时刻,可以允许多个读线程进行访问;在写线程访问时,其他读线程和其他写线程均(其他读写操作)被阻塞。等当前写操作完成释放锁后,所有操作继续执行。

3、一般情况下,读写锁性能比排他锁性能好,因为大多数场景读是多余写的。在读多余写的情况下,读写锁能提供比排他锁更好的并发性和吞吐量。=

4、ReentrantReadWriteLock 的特性

  • 公平选择性:支持非公平(默认)和公平的获取锁的方式,吞吐量:非公平优于公平。
  • 重进入:读写线程为例:读线程在获取锁之后,能够再次获取锁;写线程在获取锁之后,能够再次获取写锁,同时也可以获取读锁。
  • 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能降级成为读锁。

1、读写锁的接口定义

ReadWriteLock接口定义了两个接口,一个读接口,一个写接口;

public interface ReadWriteLock {
    /*** Returns the lock used for reading.** @return the lock used for reading*/Lock readLock();/*** Returns the lock used for writing.** @return the lock used for writing*/Lock writeLock();
}

2、读写锁的实现分析

  • 读写状态的设计
  • 写锁的获取与释放
  • 读锁的获取与释放
  • 锁降级

2.1 读写状态的设计

1、读写状态概述

读写锁,依赖于同步器来实现 同步功能,而读写状态就是其同步器的同步状态。state

读写锁在同一个同步状态上维护多个读线程和一个写线程的状态(一个整形变量 state),使得该状态成为读写锁实现的关键。

2、如何在一个整形(int)变量上维护多种状态?

如何在一个整形(int)变量上维护多种状态,就一定需要“按位切割使用”这个变量:读写锁将变量切分为两个部分:

  • 高 16 位表示读
  • 低 16 位表示写。

3、读写状态计算公式

读写锁如何快速确定读和写各自的状态呢?答案是:通过位运算 (牛逼)

假设当前同步状态为 S,则:

  • 写状态等于:S&0x0000FFFF(将 高 16 位全部抹去)
  • 读状态等于:S>> 16 (无符号补0 右移16 位)

当写状态增加 1 时,等于S+1

当读状态增加 1 时,等于S+(1<<16),也就是S+0x00010000

二、写锁的获取与释放

写锁是一个支持重入的排他锁。

如果当前线程已经获取了写锁,则增加状态;

如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

1、tryLock( )获取写锁

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock()

		public boolean tryLock( ) {
    return sync.tryWriteLock();}

1.1 tryWriteLock()

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryWriteLock

    	final boolean tryWriteLock() {
    Thread current = Thread.currentThread();int c = getState();// 获取同步状态大小,不为 0 说明锁已被获取// 存在写锁if (c != 0) {
    int w = exclusiveCount(c);// 存在读锁或者当前获取线程不是已经获取写锁的线程if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)//static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int SHARED_SHIFT = 16;throw new Error("Maximum lock count exceeded");}if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;}

1.2 exclusiveCount(int c)

返回值=0:存在读锁

返回值≠0:存在写锁

  static int exclusiveCount(int c) {
     return c & EXCLUSIVE_MASK; }

2、 unlock()释放写锁

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock

写锁的释放与 ReentrantLock 的释放过程是类似的,每次释放锁均减少写状态,当写状态为 0 时,表示锁已经被释放。

从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

 		public void unlock() {
    sync.release(1);}

2.1 release(int arg)

 public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

2.2 tryRelease(int releases)

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease

  	protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())throw new IllegalMonitorStateException();// 每次释放锁均减少写状态,当写状态为 0 时,表示锁已经被释放int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}

2.3 unparkSuccessor(Node node)

释放锁后,激活下同步队列中的下一个线程进行锁的获取

  private void unparkSuccessor(Node node) {
    /** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {
    s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

三、读锁的获取与释放

**读锁是一个支持可重入的共享锁。**它能够被多个线程获取,在没有其他写线程访问(或者写入状态为 0)时,读锁总会被成功的获取,而所做的也只是线程安全的增加读状态。

如果当前线程在获取读锁时,写锁已经被其他线程获取,则进入等待状态。

1、lock()获取读锁

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()

Acquires the read lock.Acquires the read lock if the write lock is not held by another thread and returns immediately. If the write lock is held by another thread then the current thread becomes disabled for thread scheduling purposes and lies dormant until the read lock has been acquired.

	 public void lock() {
    sync.acquireShared(1);}

1.1 acquireShared(int arg)

		public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}

1.2 tryAcquireShared(int unused)

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared

如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态;

如果当前线程获取了写锁,或者写锁未被获取,则当前线程(线程安全,依靠 CAS 保证)增加读状态,成功获取锁。

			protected final int tryAcquireShared(int unused) {
    /** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*/Thread current = Thread.currentThread();int c = getState();// 存在写锁if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {
    if (r == 0) {
    firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {
    firstReaderHoldCount++;} else {
    HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullxzzTryAcquireShared(current);}

1.3 fullTryAcquireShared(Thread current)

		final int fullTryAcquireShared(Thread current) {
    /** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh = null;// 自旋for (;;) {
    int c = getState();// 存在写锁if (exclusiveCount(c) != 0) {
    // 且写锁获取的线程不是当前线程,则退出自旋if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {
    // Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {
    // assert firstReaderHoldCount > 0;} else {
    if (rh == null) {
    rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {
    rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {
    if (sharedCount(c) == 0) {
    // 第一次获取读锁firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {
    // 支持读锁可重入firstReaderHoldCount++;} else {
    if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}

1.4 doAcquireShared(int arg)

 private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
    boolean interrupted = false;for (;;) {
    final Node p = node.predecessor();if (p == head) {
    int r = tryAcquireShared(arg);if (r >= 0) {
    setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {
    if (failed)cancelAcquire(node);}}

2、unlock()读锁的释放

		public void unlock() {
    sync.releaseShared(1);}

2.1 releaseShared(int arg)

   public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();return true;}return false;}

2.2 tryReleaseShared(int unused)

读锁释放条件:同步状态为 0

读锁的每次释放(线程安全,可能有多个线程同时释放锁)均减少读状态,减少的值是(1<<16)。

		protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();if (firstReader == current) {
    // assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {
    HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {
    readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {
    int c = getState();// static final int SHARED_UNIT = (1 << SHARED_SHIFT);// 即 nextc = c - 1<<16;int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}

四、锁降级和升级

1、锁降级

1、锁降级的定义

锁降级:指的是写锁降级为读锁。 面向的是同一个线程而言。

过程:把持住当前拥有的写锁,再获取读锁,随后释放先前拥有的写锁的过程。(同一个线程,先获取写锁,再获取读锁,最后释放写锁

注意:先获取写锁,然后将写锁释放,最后再获取读锁,这种分阶段完成的过程不能称之为锁降级。

2、锁降级的目的

保证数据的可见性。

如果当前线程不获取读锁而是直接释放写锁,假设此刻另外一个线程(记作线程 T)获取了写锁,并修改了数据,那么当前线程无法感知到线程 T 的数据更新;

如果当前线程获取读锁,即遵循锁降级的步骤,则写线程 T 将会被阻塞,直到当前线程使用数据并释放读锁之后,线程 T 才能获取写锁并进行数据更新。

2、锁升级

1、锁升级的定义

锁升级:指的是读锁升级为写锁面向的是同一个线程而言。

过程:同一个线程,先获取读锁,再获取写锁,最后释放读锁的过程。

2、锁升级的目的:

保证数据可见性。

如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

ReentrantReadWriteLock 不支持锁升级。