等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另外一个线程。前者是生产者,后者是消费者,这种模式隔离了“做什么”(What)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备良好的伸缩性,在Java语言中是如何实现类似的等待/通知机制的呢?
最简单的方式就是让消费者不断地循环检查变量是否符合预期,如下面代码所示,在while循环中设置不满足的条件,如果条件满足就退出循环,从而完成消费者的工作。
while (value != desire) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}
}doSomething();
该段代码在不满足条件时就睡眠一段时间,其目的是减少过多的无效尝试,降低对处理器资源的浪费,上述方式存在以下问题:
1、难以确保及时性,在睡眠时,基本不消耗处理器资源,但是如果睡眠过久,就不能及时发现条件的变化,也就是及时性难以保证。
2、难以降低开销,如果睡眠时间降低为1毫秒,这样消费者就能很迅速地发现条件的变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
以上两个问题,看似矛盾难以调和,但是通过Java的wait()/notify()实现的等待/通知机制就能够很好地解决这个矛盾并实现所需的功能。
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上,Object作为java中所有对象的基类,其存在的价值不言而喻,其中wait()和notify()方法的实现为多线程协作提供了保证。
等待/通知的相关方法描述如下
方法名称 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait()返回,而返回的前提是该线程获取到了对象的锁。 |
notifyAll() | 通知所有等待在该对象上的线程。 |
wait() | 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意, 调用wait()方法后,会释放对象的锁。 |
wait(long) | 超时等待一段时间,这里的参数是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回。 |
wait(long, int) | 对于超时时间更细粒度的控制,可以达到毫秒。 |
public class WaitNotifyTest {public static void main(String[] args) {Object lock = new Object();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("线程A等待获取lock锁");synchronized (lock) {try {System.out.println("线程A获取了lock锁");Thread.sleep(1000);System.out.println("线程A将要运行lock.wait()方法进行等待");lock.wait();System.out.println("线程A等待结束");} catch (InterruptedException e) {e.printStackTrace();}}}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("线程B等待获取lock锁");synchronized (lock) {System.out.println("线程B获取了lock锁");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程B将要运行lock.notify()方法进行通知");lock.notify();}}}).start();}
}
运行结果:
线程A等待获取lock锁
线程A获取了lock锁
线程B等待获取lock锁
线程A将要运行lock.wait()方法进行等待
线程B获取了lock锁
线程B将要运行lock.notify()方法进行通知
线程A等待结束
可以看到通过wait()/notify()完美实现了等待/通知机制,下面,我们来看一下它们的具体实现方式。
wait()方法实现
Object类中的wait()方法源码为
public final void wait() throws InterruptedException {wait(0);
}
wait()方法调用了它的重载方法wait(long),其声明如下
public final native void wait(long timeout) throws InterruptedException;
可以看到这是一个native方法,方法的具体实现,我们可以通过OpenJdk的源码(Object.c)来找到
static JNINativeMethod methods[] = {{"hashCode", "()I", (void *)&JVM_IHashCode},{"wait", "(J)V", (void *)&JVM_MonitorWait},{"notify", "()V", (void *)&JVM_MonitorNotify},{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
其中,JVM_MonitorWait和JVM_MonitorNotify分别对应于wait()和notify()方法,JVM_MonitorWait方法声明是在jvm.h中,如下所示
JNIEXPORT void JNICALL
JVM_MonitorWait(JNIEnv *env, jobject obj, jlong ms);
方法实现为
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))JVMWrapper("JVM_MonitorWait");Handle obj(THREAD, JNIHandles::resolve_non_null(handle));assert(obj->is_instance() || obj->is_array(), "JVM_MonitorWait must apply to an object");JavaThreadInObjectWaitState jtiows(thread, ms != 0);if (JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);}ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
可以看到JVM_MonitorWait方法最终调用了ObjectSynchronizer的wait方法。
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {if (UseBiasedLocking) {BiasedLocking::revoke_and_rebias(obj, false, THREAD);assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");}if (millis < 0) {TEVENT (wait - throw IAX) ;THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");}ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);monitor->wait(millis, true, THREAD);/* This dummy call is in place to get around dtrace bug 6254741. Oncethat's fixed we can uncomment the following line and remove the call */// DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);dtrace_waited_probe(monitor, obj, THREAD);
}
该方法首先判断了参数的合法性,然后调用ObjectSynchronizer::inflate()方法返回了一个ObjectMonitor对象,ObjectSynchronizer类中的方法大部分都是通过ObjectMonitor对象来实现的,inflate()方法声明为
// Inflate light weight monitor to heavy weight monitor
static ObjectMonitor* inflate(Thread * Self, oop obj);
可以看到,inflate()方法是将轻量级锁膨胀为重量级锁,关于轻量级锁、重量级锁以及ObjectMonitor的介绍可以看这一篇博客,最终,是通过调用ObjectMonitor的wait()方法来实现等待的,其主要代码如下
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {...// create a node to be put into the queue// Critically, after we reset() the event but prior to park(), we must check// for a pending interrupt.ObjectWaiter node(Self);node.TState = ObjectWaiter::TS_WAIT ;Self->_ParkEvent->reset() ;OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag// Enter the waiting queue, which is a circular doubly linked list in this case// but it could be a priority queue or any data structure.// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only// by the the owner of the monitor *except* in the case where park()// returns because of a timeout of interrupt. Contention is exceptionally rare// so we use a simple spin-lock instead of a heavier-weight blocking lock.Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;AddWaiter (&node) ;Thread::SpinRelease (&_WaitSetLock) ;if ((SyncFlags & 4) == 0) {_Responsible = NULL ;}intptr_t save = _recursions; // record the old recursion count_waiters++; // increment the number of waiters_recursions = 0; // set the recursion level to be 1exit (Self) ; // exit the monitorguarantee (_owner != Self, "invariant") ;...if (node._notified != 0 && _succ == Self) {node._event->unpark();}// The thread is on the WaitSet list - now park() it....
}
ObjectMonitor的wait()方法的实现主要分为以下几个步骤
1、将调用等待线程封装为ObjectWaiter类的对象node
ObjectWaiter类声明如下:
class ObjectWaiter : public StackObj {public:enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;enum Sorted { PREPEND, APPEND, SORTED } ;ObjectWaiter * volatile _next;ObjectWaiter * volatile _prev;Thread* _thread;ParkEvent * _event;volatile int _notified ;volatile TStates TState ;Sorted _Sorted ; // List placement dispositionbool _active ; // Contention monitoring is enabledpublic:ObjectWaiter(Thread* thread);void wait_reenter_begin(ObjectMonitor *mon);void wait_reenter_end(ObjectMonitor *mon);
};
ObjectWaiter对象是双向链表结构,保存了_thread(当前线程)以及当前的状态TState等数据,每个等待锁的线程都会被封装成ObjectWaiter对象。
2、通过ObjectMonitor::AddWaiter方法将node添加到_WaitSet列表中
调用此方法前后需要获取和释放_WaitSet列表的_WaitSetLock锁。从注释中可以看到,_WaitSet列表其实是一个双向循环链表。
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {assert(node != NULL, "should not dequeue NULL node");assert(node->_prev == NULL, "node already in list");assert(node->_next == NULL, "node already in list");// put node at end of queue (circular doubly linked list)if (_WaitSet == NULL) {_WaitSet = node;node->_prev = node;node->_next = node;} else {ObjectWaiter* head = _WaitSet ;ObjectWaiter* tail = head->_prev;assert(tail->_next == head, "invariant check");tail->_next = node;head->_prev = node;node->_next = head;node->_prev = tail;}
}
3、通过ObjectMonitor::exit方法释放当前的ObjectMonitor对象,这样其它竞争线程就可以获取该ObjectMonitor对象
void ATTR ObjectMonitor::exit(TRAPS) {Thread * Self = THREAD ;if (THREAD != _owner) {if (THREAD->is_lock_owned((address) _owner)) {// Transmute _owner from a BasicLock pointer to a Thread address.// We don't need to hold _mutex for this transition.// Non-null to Non-null is safe as long as all readers can// tolerate either flavor.assert (_recursions == 0, "invariant") ;_owner = THREAD ;_recursions = 0 ;OwnerIsThread = 1 ;} else {// NOTE: we need to handle unbalanced monitor enter/exit// in native code by throwing an exception.// TODO: Throw an IllegalMonitorStateException ?TEVENT (Exit - Throw IMSX) ;assert(false, "Non-balanced monitor enter/exit!");if (false) {THROW(vmSymbols::java_lang_IllegalMonitorStateException());}return;}}...
}
4、最终通过底层的park()方法挂起线程
notify()方法实现
notify()方法最终通过ObjectMonitor的void notify(TRAPS)实现。
void ObjectMonitor::notify(TRAPS) {CHECK_OWNER();if (_WaitSet == NULL) {TEVENT (Empty-Notify) ;return ;}DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);int Policy = Knob_MoveNotifyee ;Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;ObjectWaiter * iterator = DequeueWaiter() ;if (iterator != NULL) {TEVENT (Notify1 - Transfer) ;guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;guarantee (iterator->_notified == 0, "invariant") ;if (Policy != 4) {iterator->TState = ObjectWaiter::TS_ENTER ;}iterator->_notified = 1 ;ObjectWaiter * List = _EntryList ;if (List != NULL) {assert (List->_prev == NULL, "invariant") ;assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;assert (List != iterator, "invariant") ;}if (Policy == 0) { // prepend to EntryList} else if (Policy == 1) { // append to EntryList} else if (Policy == 2) { // prepend to cxq}...
}
ObjectMonitor的notify()方法的实现主要分为以下几个步骤:
1、若_WaitSet为NULL,即没有需要唤醒的线程,则直接退出。
2、通过ObjectMonitor::DequeueWaiter方法,获取_WaitSet列表中的第一个ObjectWaiter节点。
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {// dequeue the very first waiterObjectWaiter* waiter = _WaitSet;if (waiter) {DequeueSpecificWaiter(waiter);}return waiter;
}
这里需要注意的是,在jdk的notify()方法注释中说明的是随机唤醒一个线程,这里其实是第一个ObjectWaiter节点。
3、根据不同的策略,将取出来的ObjectWaiter节点,加入到_EntryList或则通过Atomic::cmpxchg_ptr指令进行自旋操作cxq
notifyAll()方法实现
lock.notifyAll()方法最终通过ObjectMonitor的void notifyAll(TRAPS)实现。
void ObjectMonitor::notifyAll(TRAPS) {...for (;;) {iterator = DequeueWaiter () ;if (iterator == NULL) break ;TEVENT (NotifyAll - Transfer1) ;++Tally ;...ObjectWaiter * List = _EntryList ;if (List != NULL) {assert (List->_prev == NULL, "invariant") ;assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;assert (List != iterator, "invariant") ;}if (Policy == 0) { // prepend to EntryList} else if (Policy == 1) { // append to EntryList} else if (Policy == 2) { // prepend to cxq}}
}
该方法和notify()方法比较类似,不同的是,notifyAll()通过for循环取出_WaitSet的ObjectWaiter节点,并根据不同策略,加入到_EntryList或则进行自旋操作。
总结
通过上述的分析,可以发现,wait()方法会释放所占有的ObjectMonitor对象,而notify()和notifyAll()并不会释放所占有的ObjectMonitor对象,它们的主要工作是将相应的线程从_WaitSet转移到_EntryList中,然后等待竞争获取锁。
其实真正释放ObjectMonitor对象的时间点是在执行monitorexit指令,一旦释放ObjectMonitor对象后,_EntryList中ObjectWaiter节点所保存的线程就可以竞争ObjectMonitor对象进行加锁操作了。
参考资料
周志明:《深入理解Java虚拟机》
方腾飞:《Java并发编程的艺术》