【源码】JUC —— LinkedBlockingQueue 浅析
- 前言
- LinkedBlockingQueue
-
- Node
- 属性、构造方法
- add(E e) & offer(E e)
- put(E e) & offer(E e, long timeout, TimeUnit unit)
- enqueue(Node node)
- signalNotEmpty()
- remove() & poll()
- take() & poll(long timeout, TimeUnit unit)
- dequeue()
- signalNotFull()
- 其他方法
- 总结
前言
LinkedBlockingQueue,基于 链表 实现的 容量有限 的 BlockingQueue
LinkedBlockingQueue
与其他实现略有不同的是,LinkedBlockingQueue 分别定义了 takeLock
取锁 和 putLock
放锁,这使得 put
操作和 take
操作可以并行
put
操作在队列为空时插入第一个元素,将唤醒take
线程(因为take
会在队列为空时阻塞),在队列未满时,put
操作不会阻塞且会唤醒所有已经阻塞的其他put
线程,当队列满了后阻塞之后的put
线程take
操作在队列满时取出第一个元素,将唤醒put
线程(因为put
会在队列满时阻塞),在队列未空时,take
操作不会阻塞且会唤醒所有已经阻塞的其他take
线程,当队列空了后阻塞之后的take
线程
Node
static class Node<E> {
E item;Node<E> next;Node(E x) {
item = x; }}
链表节点内部类 Node
属性、构造方法
// 容量private final int capacity;// 元素个数private final AtomicInteger count = new AtomicInteger();// 头节点transient Node<E> head;// 尾节点private transient Node<E> last;// “取”锁private final ReentrantLock takeLock = new ReentrantLock();// “取”锁对应条件队列private final Condition notEmpty = takeLock.newCondition();// “拿”锁private final ReentrantLock putLock = new ReentrantLock();// “拿”锁对应条件队列private final Condition notFull = putLock.newCondition();// 默认容量 Integer.MAX_VALUEpublic LinkedBlockingQueue() {
this(Integer.MAX_VALUE);}// 自定义容量public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}// 指定集合public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock();try {
int n = 0;for (E e : c) {
if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {
putLock.unlock();}}
默认容量为 Integer.MAX_VALUE
add(E e) & offer(E e)
------------------ AbstractQueue ------------------public boolean add(E e) {
if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}public boolean offer(E e) {
// 不能为空if (e == null) throw new NullPointerException();// 元素个数final AtomicInteger count = this.count;// 满了则返回 falseif (count.get() == capacity)return false;final int c;final Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {
if (count.get() == capacity)return false;// 插入enqueue(node);c = count.getAndIncrement();// 队列未满时会唤醒 put 线程if (c + 1 < capacity)notFull.signal();} finally {
putLock.unlock();}// 如果是插入了第一个元素,则唤醒 take 线程if (c == 0)signalNotEmpty();return true;}
在队列未满时,会唤醒阻塞的 put
线程。在队列为空时的第一次插入会唤醒阻塞的 take
线程,因为在这之前 take
线程会被阻塞,队列满时插入失败,返回 false
put(E e) & offer(E e, long timeout, TimeUnit unit)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();final int c;final Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {
while (count.get() == capacity) {
// 队列满时阻塞notFull.await();}enqueue(node);c = count.getAndIncrement(); if (c + 1 < capacity)notFull.signal();} finally {
putLock.unlock();}if (c == 0)signalNotEmpty();}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);final int c;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {
while (count.get() == capacity) {
if (nanos <= 0L)return false;// 阻塞指定时间nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {
putLock.unlock();}if (c == 0)signalNotEmpty();return true;}
在队列未满时,会唤醒阻塞的 put
线程。在队列为空时的第一次插入会唤醒阻塞的 take
线程,因为在这之前 take
线程会被阻塞,队列满时 阻塞(指定时间)
enqueue(Node node)
private void enqueue(Node<E> node) {
last = last.next = node;}
从尾部入队,last
指向 node
,也就是说,head
可以视为一个 dummy node
,因此取元素我们应该取 head.next
signalNotEmpty()
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
// 唤醒一个 take 线程notEmpty.signal();} finally {
takeLock.unlock();}}
当空队列插入第一个元素时,唤醒一个 take
线程
remove() & poll()
------------------ AbstractQueue ------------------public E remove() {
E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}public E poll() {
// 元素个数final AtomicInteger count = this.count;// 为空时返回 nullif (count.get() == 0)return null;final E x;final int c;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
if (count.get() == 0)return null;// 取出x = dequeue();c = count.getAndDecrement();// 队列中还有元素则继续唤醒 take 线程if (c > 1)notEmpty.signal();} finally {
takeLock.unlock();}// 队列满时取出第一个元素时唤醒 put 线程if (c == capacity)signalNotFull();return x;}
在队列未空时,会唤醒阻塞的 take
线程。在队列满时的第一次取出会唤醒阻塞的 put
线程,因为在这之前 put
线程会被阻塞,队列空时取出失败,返回 null
take() & poll(long timeout, TimeUnit unit)
public E take() throws InterruptedException {
final E x;final int c;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {
while (count.get() == 0) {
// 队列为空时阻塞notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {
takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final E x;final int c;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {
while (count.get() == 0) {
if (nanos <= 0L)return null;// 队列为空时阻塞指定时间nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {
takeLock.unlock();}if (c == capacity)signalNotFull();return x;}
在队列未空时,会唤醒阻塞的 take
线程。在队列满时的第一次取出会唤醒阻塞的 put
线程,因为在这之前 put
线程会被阻塞,队列空时 阻塞(指定时间)
dequeue()
private E dequeue() {
/*** 返回 head.next 的元素* 并更新 head */Node<E> h = head;Node<E> first = h.next;h.next = h; head = first;E x = first.item;first.item = null;return x;}
因为 head
是一个 dummy node
,此处相当于返回了 head.next.item
,并将新 head
的 item
置为 null
signalNotFull()
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;putLock.lock();try {
// 唤醒一个 put 线程notFull.signal();} finally {
putLock.unlock();}}
当满队列被取出第一个元素时,唤醒一个 put
线程
其他方法
同样地,LinkedBlockingQueue 还实现了其他方法比如 size()
remainingCapacity()
等不再一一解读
总结
BlockingQueue 的经典实现,基于双锁因而效率更高,十分实用