当前位置: 代码迷 >> 综合 >> ReentrantReadWriteLock 深入源码解析
  详细解决方案

ReentrantReadWriteLock 深入源码解析

热度:46   发布时间:2024-01-17 01:26:06.0

??ReentrantReadWriteLock是JUC提供的读写锁,在某些应用场景下,读操作要比写操作频繁的多,此时应该尽可能利用读写之间协作,减少共享资源的竞争。

写操作 读操作
写操作 禁止 禁止
读操作 禁止 允许

??读写锁的三个重要特性:

??· 重入性:无论读锁和写锁都支持线程重入。

??· 公平性:支持公平锁和非公平锁,默认是非公平锁,非公平锁吞吐量较高。

??· 锁降级:线程在持有写锁时,可以获取读锁,然后释放写锁,最后释放读锁,这就是锁的降级。

??· 锁升级:不支持读锁到写锁的升级转换。

??演示示例:

??首先,新建两个线程类,用于模拟多线程环境下的读写线程:

package com.securitit.serialize.locks;import java.util.List;
import java.util.concurrent.locks.Lock;public class ReentrantReadLockThread extends Thread {
    // 锁实例.private Lock lock;// 模拟数据存储.private List<String> dataList;// 构造方法.public ReentrantReadLockThread(Lock lock, List<String> dataList) {
    this.lock = lock;this.dataList = dataList;}@Overridepublic void run() {
    try {
    lock.lock();System.out.println(Thread.currentThread().getName() + ":读锁获得锁.");for(String data : dataList) {
    System.out.println(Thread.currentThread().getName() + ":读锁读数据.");}Thread.sleep(2000);} catch (Exception ex) {
    ex.printStackTrace();} finally {
    System.out.println(Thread.currentThread().getName() + ":读锁释放锁.");lock.unlock();}}}
package com.securitit.serialize.locks;import java.util.List;
import java.util.concurrent.locks.Lock;public class ReentrantWriteLockThread extends Thread {
    // 锁实例.private Lock lock;// 模拟数据存储.private List<String> dataList;// 构造方法.public ReentrantWriteLockThread(Lock lock, List<String> dataList) {
    this.lock = lock;this.dataList = dataList;}@Overridepublic void run() {
    try {
    lock.lock();System.out.println(Thread.currentThread().getName() + ":写锁获得锁.");dataList.add("写锁写入数据");Thread.sleep(2000);} catch (Exception ex) {
    ex.printStackTrace();} finally {
    System.out.println(Thread.currentThread().getName() + ":写锁释放锁.");lock.unlock();}}}

??然后,在测试类中,启动五个读线程和五个写线程,同时操作数据模拟存储List:

package com.securitit.serialize.locks;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReentrantReadWriteLockTester {
    // 读写锁实例.private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();// 读锁.private static Lock readLock = readWriteLock.readLock();// 写锁.private static Lock writeLock = readWriteLock.writeLock();// 模拟数据存储.private static List<String> dataList = new ArrayList<String>();public static void main(String[] args) {
    new ReentrantReadLockThread(readLock, dataList).start();new ReentrantWriteLockThread(writeLock, dataList).start();new ReentrantReadLockThread(readLock, dataList).start();new ReentrantWriteLockThread(writeLock, dataList).start();new ReentrantReadLockThread(readLock, dataList).start();new ReentrantWriteLockThread(writeLock, dataList).start();new ReentrantReadLockThread(readLock, dataList).start();new ReentrantWriteLockThread(writeLock, dataList).start();new ReentrantReadLockThread(readLock, dataList).start();new ReentrantWriteLockThread(writeLock, dataList).start();}}

??输出结果:

Thread-0:读锁获得锁.
Thread-0:读锁释放锁.
Thread-1:写锁获得锁.
Thread-1:写锁释放锁.
Thread-3:写锁获得锁.
Thread-3:写锁释放锁.
Thread-5:写锁获得锁.
Thread-5:写锁释放锁.
Thread-4:读锁获得锁.
Thread-4:读锁读数据.
Thread-4:读锁读数据.
Thread-6:读锁获得锁.
Thread-4:读锁读数据.
Thread-2:读锁获得锁.
Thread-6:读锁读数据.
Thread-2:读锁读数据.
Thread-6:读锁读数据.
Thread-2:读锁读数据.
Thread-6:读锁读数据.
Thread-2:读锁读数据.
Thread-4:读锁释放锁.
Thread-6:读锁释放锁.
Thread-2:读锁释放锁.
Thread-7:写锁获得锁.
Thread-7:写锁释放锁.
Thread-8:读锁获得锁.
Thread-8:读锁读数据.
Thread-8:读锁读数据.
Thread-8:读锁读数据.
Thread-8:读锁读数据.
Thread-8:读锁释放锁.
Thread-9:写锁获得锁.
Thread-9:写锁释放锁.

??文中最初提到的锁的其他特性,应用起来也比较简单,可以对上面的示例稍作变换即可实现重入性、公平性、锁降级。

??源码分析:

??实现基础:

??ReentrantReadWriteLock与ReentrantLock实现类似,通过内部类FairSync和NonfairSync来实现作为实现基础:

??FairSync:

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 序列化版本号.private static final long serialVersionUID = 6317671515068378041L;// 高16位为读锁,低16位为写锁.static final int SHARED_SHIFT   = 16;// 读锁单位.static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 读锁最大数量.static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 写锁最大数量.static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 读锁,即共享锁的持有者数量.static int sharedCount(int c)    {
     return c >>> SHARED_SHIFT; }// 写锁,即独占锁的持有者数量.static int exclusiveCount(int c) {
     return c & EXCLUSIVE_MASK; }// 计数实现.static final class HoldCounter {
    int count = 0;final long tid = getThreadId(Thread.currentThread());}// ThreadLocal默认实现.static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
    return new HoldCounter();}}// 读锁持有者数量.private transient ThreadLocalHoldCounter readHolds;// 缓存持有者数量.private transient HoldCounter cachedHoldCounter;// 第一个读锁持有者.private transient Thread firstReader = null;// 第一个读锁持有者数量.private transient int firstReaderHoldCount;Sync() {
    readHolds = new ThreadLocalHoldCounter();setState(getState()); // ensures visibility of readHolds}// 读锁持有者应该阻塞.abstract boolean readerShouldBlock();// 写锁持有者应该阻塞.abstract boolean writerShouldBlock();// 释放锁.protected final boolean tryRelease(int releases) {
    // 是否持有独占锁.if (!isHeldExclusively())throw new IllegalMonitorStateException();// 计算重入次数.int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}// 尝试获取独占锁.protected final boolean tryAcquire(int acquires) {
    // 取得当前线程.Thread current = Thread.currentThread();int c = getState();// 独占锁数量.int w = exclusiveCount(c);if (c != 0) {
    // 条件翻译:独占锁数量为0 || 独占锁持有者非当前线程.if (w == 0 || current != getExclusiveOwnerThread())return false;// 独占锁总数超过上限.if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquiresetState(c + acquires);return true;}if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;// 设置独占锁持有者.setExclusiveOwnerThread(current);return true;}// 释放共享锁.protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();// 当前线程是第一个读锁持有者.if (firstReader == current) {
    // assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {
    // 设置cachedHoldCounter.HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {
    readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {
    int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}}// 尝试解锁读锁时,当前线程没有持有读锁.private IllegalMonitorStateException unmatchedUnlockException() {
    return new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread");}// 获取共享锁.protected final int tryAcquireShared(int unused) {
    // 取得当前线程.Thread current = Thread.currentThread();int c = getState();// 锁被当前线程以外的其他线程持有.if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 计算共享数量.int r = sharedCount(c);// 读锁是否阻塞.// 共享数量是否在上限之内.// 尝试以CAS方式设置锁状态.if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {
    // 设置firstReader、firstReaderHoldCount的值.// 设置cachedHoldCounter的值.// 将HoldCounter加入readHolds.if (r == 0) {
    firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {
    firstReaderHoldCount++;} else {
    HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullTryAcquireShared(current);}// 获取读锁,即共享锁.final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;for (;;) {
    int c = getState();// 锁被当前线程以外的其他线程持有,则返回失败.if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)return -1;} else // 读锁是否阻塞.if (readerShouldBlock()) {
    // firstReader是当前线程.if (firstReader == current) {
    // assert firstReaderHoldCount > 0;} else {
    // 初始化HoldCounter.if (rh == null) {
    rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {
    rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}// 共享锁数量是否在上限之内.if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 尝试以CAS方式设置锁状态.if (compareAndSetState(c, c + SHARED_UNIT)) {
    // 设置firstReader、firstReaderHoldCount的值.// 设置cachedHoldCounter的值.// 将HoldCounter加入readHolds.if (sharedCount(c) == 0) {
    firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {
    firstReaderHoldCount++;} else {
    if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}// 尝试获取写锁.final boolean tryWriteLock() {
    Thread current = Thread.currentThread();int c = getState();if (c != 0) {
    int w = exclusiveCount(c);// 若锁被当前线程外的其他线程持有,则获取失败.if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}// 尝试以CAS方式设置锁状态.if (!compareAndSetState(c, c + 1))return false;// 设置当前线程为锁持有者.setExclusiveOwnerThread(current);return true;}// 尝试获取读锁.final boolean tryReadLock() {
    Thread current = Thread.currentThread();for (;;) {
    int c = getState();// 若锁被当前线程以外的线程持有,则获取失败.if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return false;// 计算共享数量.int r = sharedCount(c);if (r == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 尝试以CAS方式设置锁状态.if (compareAndSetState(c, c + SHARED_UNIT)) {
    // 设置firstReader、firstReaderHoldCount的值.// 设置cachedHoldCounter的值.// 将HoldCounter加入readHolds.if (r == 0) {
    firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {
    firstReaderHoldCount++;} else {
    HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return true;}}}// 是否独占锁持有者.protected final boolean isHeldExclusively() {
    // 当前线程是否独占锁持有者.return getExclusiveOwnerThread() == Thread.currentThread();}// 获取Condition.final ConditionObject newCondition() {
    return new ConditionObject();}// 获取持有者.final Thread getOwner() {
    // Must read state before owner to ensure memory consistencyreturn ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());}// 获取读锁持有者数量.final int getReadLockCount() {
    return sharedCount(getState());}// 当前线程是否写锁持有者.final boolean isWriteLocked() {
    return exclusiveCount(getState()) != 0;}// 获取写锁持有者数量.final int getWriteHoldCount() {
    // 若是独占锁持有者,则返回独占锁数量,否则返回0.return isHeldExclusively() ? exclusiveCount(getState()) : 0;}// 获取读锁持有者数量.final int getReadHoldCount() {
    // 若为0,返回0.if (getReadLockCount() == 0)return 0;// 如果firstReader是当前线程,则返回firstReaderHoldCount.Thread current = Thread.currentThread();if (firstReader == current)return firstReaderHoldCount;// 若缓存持有者为当前线程,则返回.HoldCounter rh = cachedHoldCounter;if (rh != null && rh.tid == getThreadId(current))return rh.count;// 到读锁集合获取数量,并返回.int count = readHolds.get().count;if (count == 0) readHolds.remove();return count;}// 反序列化.private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {
    s.defaultReadObject();readHolds = new ThreadLocalHoldCounter();setState(0); // reset to unlocked state}// 获取锁状态.final int getCount() {
     return getState(); }
}

??NonFairSync:

static final class NonfairSync extends Sync {
    // 序列化版本号.private static final long serialVersionUID = -8159625535654395037L;// 写锁是否阻塞.final boolean writerShouldBlock() {
    return false; // writers can always barge}// 读锁是否阻塞.final boolean readerShouldBlock() {
    // return apparentlyFirstQueuedIsExclusive();}
}

??ReadLock:

public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;// Constructor.protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;}// 获得锁,public void lock() {
    sync.acquireShared(1);}// 带中断的获取锁.public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);}// 尝试获得锁.public boolean tryLock() {
    return sync.tryReadLock();}// 尝试指定等待时间的获取锁.public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}// 尝试释放当前共享锁.public void unlock() {
    sync.releaseShared(1);}// 读锁不支持Condition.public Condition newCondition() {
    throw new UnsupportedOperationException();}
}

??WriteLock:

public static class WriteLock implements Lock, java.io.Serializable {
    // 序列化版本号.private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;// Constructor.protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;}// 获得锁.public void lock() {
    sync.acquire(1);}// 带中断的获取锁.public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);}// 尝试获取锁.public boolean tryLock( ) {
    return sync.tryWriteLock();}// 尝试指定等待时间的获取锁.public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));}// 释放锁.public void unlock() {
    sync.release(1);}// 获得锁的Condition.public Condition newCondition() {
    return sync.newCondition();}// toString().public String toString() {
    Thread o = sync.getOwner();return super.toString() + ((o == null) ?"[Unlocked]" :"[Locked by thread " + o.getName() + "]");}// 是否被当前线程持有.public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();}// 获得持有者数量.public int getHoldCount() {
    return sync.getWriteHoldCount();}
}

??上面的四个内部类是ReentrantReadWriteLock实现的主要基础,首先Sync继承自AQS,AQS本身对独占锁和共享锁进行了抽象和实现,其实独占锁和共享锁对应的就是写锁和读锁。ReadLock和WriteLock实现了Lock接口,具体实现使用内部类Sync已经封装好的关于独占锁和共享锁的实现。

??基本方法:

// 是否公平锁.
public final boolean isFair() {
    return sync instanceof FairSync;
}
// 获得锁的持有者.
protected Thread getOwner() {
    return sync.getOwner();
}
// 获得读锁的数量.
public int getReadLockCount() {
    return sync.getReadLockCount();
}
// 写锁是否已被持有.
public boolean isWriteLocked() {
    return sync.isWriteLocked();
}
// 当前线程是否获得写锁.
public boolean isWriteLockedByCurrentThread() {
    return sync.isHeldExclusively();
}
// 获得写锁持有者数量.
public int getWriteHoldCount() {
    return sync.getWriteHoldCount();
}
// 获得读锁持有者数量.
public int getReadHoldCount() {
    return sync.getReadHoldCount();
}
// 获得写锁持有者集合.
protected Collection<Thread> getQueuedWriterThreads() {
    return sync.getExclusiveQueuedThreads();
}
// 获得读锁持有者集合.
protected Collection<Thread> getQueuedReaderThreads() {
    return sync.getSharedQueuedThreads();
}
// 同步队列中是否线程.
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}
// 同步队列中是否有指定线程.
public final boolean hasQueuedThread(Thread thread) {
    return sync.isQueued(thread);
}
// 同步队列长度.
public final int getQueueLength() {
    return sync.getQueueLength();
}
// 同步队列的所有线程集合.
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}
// 是否有指定Condition的等待者.
public boolean hasWaiters(Condition condition) {
    if (condition == null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException("not owner");return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 指定Condition的等待者数量.
public int getWaitQueueLength(Condition condition) {
    if (condition == null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException("not owner");return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

??上面是提供的针对同步队列的一些信息获取,可以通过这些方法进行同步队列监控,以便更好的处理业务逻辑。

??总结:

??在多线程高并发的情况下,如何合理减少资源的竞争是提高并发效率的根本,从Java提供的语言级的synchronized、到JUC的ReentrantLock重入锁、再到读写分离的ReentrantReadWriteLock,或者基于CAS的其他同步工具,都是在减少锁的使用或减少锁的作用空间或时间,以此来削减独占锁(也叫互斥锁、排它锁)带来的负面影响。

??注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。

??如果有哪里有不明白或不清楚的内容,欢迎留言哦!