ReentrantLock
- ReentrantLock的结构
-
- Node
- lock()
-
- acquire(int arg)
- tryAcquire(int acquires)
- acquireQueued(final Node node, int arg)
- addWaiter(Node mode)
- enq(final Node node)
- shouldParkAfterFailedAcquire(p, node)
- parkAndCheckInterrupt()
- cancelAcquire(Node node)
- unparkSuccessor(Node node)
- unlock()
-
- tryRelease(arg)
- lockInterruptibly()
-
- doAcquireInterruptibly(arg)
- tryLock()
- tryLock(long timeout, TimeUnit unit)
-
- tryAcquireNanos(int arg, long nanosTimeout)
-
- doAcquireNanos(int arg, long nanosTimeout)
- 总结
ReentrantLock的结构
看图可以知道 ReentrantLock 是基于内部类继承AbstractQueuedSynchronizer(AQS)来实现的
AQS 功能强大,我们后续讲的很多JUC类都是基于他实现的,我们一个一个类去讲解,讲完了来综合AQS各个方法的功能。
在这里我来讲几个AQS里面几个比较重要的属性
1.state 同步状态
2.tail 队尾
3.head 对头
讲到这三个属性,就不得不讲下AQS 中一个内部类
Node
node是一个CLH队列 双向链表。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */// 共享Nodestatic final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */// 取消static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking *///通知static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition *///条件static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/// 广播static final int PROPAGATE = -3;// 等待状态volatile int waitStatus;//上个节点volatile Node prev;// 下个节点volatile Node next;/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/// 持有线程volatile Thread thread;}
类其实没啥好讲的,我开始来讲方法把,就是大概了解下这个类的一些内部属性
lock()
// 这里基于FairSync(公平锁来讲)public void lock() {
sync.lock();}final void lock() {
acquire(1);}public final void acquire(int arg) {
// 尝试去拿锁,如果拿到锁,直接返回,//没拿到锁开始将线程入队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果入队的时候,线程被中断了,线程自身也发起中断selfInterrupt();}
acquire(int arg)
public final void acquire(int arg) {
// 尝试去拿锁,如果拿到锁,直接返回,//没拿到锁开始将线程入队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果入队的时候,线程被中断了,线程自身也发起中断selfInterrupt();}
tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程final Thread current = Thread.currentThread();// 拿到同步状态int c = getState();// 为0 说明当前没有线程持有锁if (c == 0) {
// 判断当前队列是否有节点在排队,并通过CAS尝试拿锁// 拿到就直接返回if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;}}// 如果当前线程重入else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;try {
boolean interrupted = false;for (;;) {
//拿当前节点的上一个节点final Node p = node.predecessor();// 如果上个节点是对头,当前线程再去尝试拿一次锁if (p == head && tryAcquire(arg)) {
// 拿到后将当前节点设为对头//将thread和prev置空setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {
if (failed)cancelAcquire(node);}}
addWaiter(Node mode)
private Node addWaiter(Node mode) {
// 初始化节点Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 拿到当前队尾节点,第一次进来时是初始化状态,所以队尾节点肯定为null// 后面再有节点进来,将后面的节点和队尾节点连接起来if (pred != null) {
node.prev = pred;if (compareAndSetTail(pred, node)) {
pred.next = node;return node;}}// 初始化队列enq(node);return node;
enq(final Node node)
private Node enq(final Node node) {
for (;;) {
Node t = tail;// 当队尾为null,初始化队列//if (t == null) {
// Must initialize//初始化对头和队尾if (compareAndSetHead(new Node()))tail = head;} else {
// 如果再初始化的过程已经有另外的线程将队列初始化了,//当前线程直接入队即可node.prev = t;if (compareAndSetTail(t, node)) {
t.next = node;return t;}}}}
shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// 拿到节点状态,如果是可唤醒状态直接返回// 当这个节点已经是可唤醒状态,说明他之前已经至少进行过一次自旋了// 这时候他应该要被park住if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 节点状态大于0属于不正常节点,递归找到最后一个正常节点if (ws > 0) {
/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {
node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {
/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/// CAS 将当前节点改为可唤醒状态compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 挂其线程LockSupport.park(this);// 查看此时线程是否被中断了return Thread.interrupted();}
cancelAcquire(Node node)
private void cancelAcquire(Node node) {
// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessors// 跳过状态不正常的predecessorsNode pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.// 这里去拿prev的下个节点,是为了防止再多线程情况下,node已经被删除了Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.// 如果node现在是队尾,将队尾换成prev//并且将prev的next节点置空if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);} else {
// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;// 1.pred 不是对头// 2.pred是可唤醒的 // 3.prev有线程持有// 上述三个条件满足,将node节点排除,直接把pred节点连接到node 的next节点if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {
// 1. 如果pred 是对头,那说明node节点本来是下一个唤醒的节点,但是因为他是cannel状态//必须要找到node后面的节点来唤醒,此时的node的waitStatus 是CANCELLED状态==1// 从队尾开始找到一个最接近node一个可唤醒的节点,并唤醒他//2.pred的状态是不可唤醒的,说明unparkSuccessor(node);}node.next = node; // help GC}}
unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;// 删除状态,将节点恢复为重新入队模式if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 找到最接近node的一个可唤醒或者可入队的节点Node s = node.next;if (s == null || s.waitStatus > 0) {
s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果存在的话 唤醒节点if (s != null)LockSupport.unpark(s.thread);}
入队逻辑光看代码可能很难理解,下面我们结合一张图来讲
第一个线程入队时,此时,因为队列为空,需要初始化队列,所以对头节点waitStatus为0,且Thread为null,入队的第一个线程要挂再head节点后面,且再第一次循环后将前驱节点waitStatus改为-1
unlock()
public void unlock() {
sync.release(1);}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;// 拿到头节点,如果头节点的状态不是持有锁的状态,就将head节点的下一个等待唤醒的节点唤醒if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
tryRelease(arg)
protected final boolean tryRelease(int releases) {
// 拿到最新的同步状态int c = getState() - releases;// 判断现在释放锁的线程是不是持有锁的线程,不是就抛错if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 当同步状态值为0 的时候 就说明此时没有先吃持有锁if (c == 0) {
free = true;setExclusiveOwnerThread(null);}// 重置同步状态setState(c);return free;}
了解完上锁和解锁连的完整代码后,我们可以根据上述代码逻辑填充出自己的流程图,以便加深理解
上述图 完整描述了,线程入队出队的所有流程。
在ReentrantLock lock拿锁的时候 有以下几种情况
1.只有一个线程一次拿锁,直接拿锁成功,当前state==1
2.只有一个线程多次拿锁,直接拿锁成功,当前state等于拿锁的次数(重入)
3.多个线程拿锁,除了拿到锁的线程,其他锁全部入队排队。
讲了FairSync(公平锁)上锁和解锁的流程那NonfairSync(非公平锁)上锁解锁流程是怎么样的呢
我们可以把两种上锁解锁代码拿出来比较,发现两种锁只有在上锁(lock)的时候有所不同
不同点一:
FairSync
final void lock() {
acquire(1);}
NonfairSync
final void lock() {
if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
可以发现非公平锁在上锁的时候 会先去尝试拿一次锁,拿锁成功就直接上锁返回。
不同点二
FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);}final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
在调用tryAcquire方法时,公平锁会调用hasQueuedPredecessors() 方法判断CLH队列中是否有节点排队,
非公平锁会直接去抢锁。
lockInterruptibly()
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);}public final void acquireInterruptibly(int arg)throws InterruptedException {
//判断当前线程是否被中断,并清除中断标记if (Thread.interrupted())throw new InterruptedException();//尝试拿锁if (!tryAcquire(arg))//拿锁不成功调用doAcquireInterruptibly(arg);}
doAcquireInterruptibly(arg)
private void doAcquireInterruptibly(int arg)throws InterruptedException {
//新增一个节点final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {
for (;;) {
// 拿前一个节点是头节点时,尝试拿锁final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {
setHead(node);p.next = null; // help GCfailed = false;return;}// 将队列park,并检验线程是否被中断,被终端的话 直接抛异常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
if (failed)cancelAcquire(node);}}
看完lockInterruptibly()整个源码逻辑,大概就能理解这个方法的作用了,他在没被中断的时候和其他节点一样,正常排队,一旦被中断,他并不是只是标记一个中断标识,而是直接抛出检查异常,会阻碍线程正常运行。
这个方法的作用就可以用来做一些取消线程的操作。
tryLock()
public boolean tryLock() {
return sync.nonfairTryAcquire(1);}final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();int c = getState();if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
可以看到tryLock()的调用逻辑,他只会去抢一次锁,抢到锁就直接占用,没抢到就直接返回,不入队。
这个方法的作用就是起到一个忽略作用,又或者定时任务防止重复执行。
tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));}
tryAcquireNanos(int arg, long nanosTimeout)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
// 如果被中断直接抛异常if (Thread.interrupted())throw new InterruptedException();// 尝试拿锁,拿不到就去有限时间的等待return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}
doAcquireNanos(int arg, long nanosTimeout)
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
if (nanosTimeout <= 0L)return false;// 取到最大等待时间final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {
for (;;) {
// 判断前节点是否为头节点,是的话,尝试入队final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {
setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();// 判断是否超过最大等待时间if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)// park住nanosTimeout 的时间LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {
if (failed)cancelAcquire(node);}}
这个方法也能抛出中断异常,且它能进行有限时间的park。
总结
到现在为止,基本上ReentrantLock方法基本上都讲解了一遍,还有一些其他的定义可能在别的同步类里面有用到,当我们讲到相关类再来讲。
大家在读我的文档时最好能一边看,一边自己动手试试,这样效果更好,因为每个人对ReentrantLock的理解都不同。只有自己理解出来的东西才是自己的。
最后希望大家能看完之后觉得还不错的话 帮忙点个赞,加个关注,这样我才能更有动力为大家奉献一些更好的文章。
Thanks!!