当前位置: 代码迷 >> 综合 >> 【源码】JUC —— LinkedBlockingQueue 浅析
  详细解决方案

【源码】JUC —— LinkedBlockingQueue 浅析

热度:51   发布时间:2024-02-20 06:51:05.0

【源码】JUC —— LinkedBlockingQueue 浅析

  • 前言
  • LinkedBlockingQueue
    • Node
    • 属性、构造方法
    • add(E e) & offer(E e)
    • put(E e) & offer(E e, long timeout, TimeUnit unit)
    • enqueue(Node node)
    • signalNotEmpty()
    • remove() & poll()
    • take() & poll(long timeout, TimeUnit unit)
    • dequeue()
    • signalNotFull()
    • 其他方法
  • 总结

前言

LinkedBlockingQueue,基于 链表 实现的 容量有限BlockingQueue

LinkedBlockingQueue

与其他实现略有不同的是,LinkedBlockingQueue 分别定义了 takeLock 取锁 和 putLock 放锁,这使得 put 操作和 take 操作可以并行

  • put 操作在队列为空时插入第一个元素,将唤醒 take线程(因为 take 会在队列为空时阻塞),在队列未满时,put 操作不会阻塞且会唤醒所有已经阻塞的其他 put 线程,当队列满了后阻塞之后的 put 线程
  • take 操作在队列满时取出第一个元素,将唤醒 put线程(因为 put 会在队列满时阻塞),在队列未空时,take 操作不会阻塞且会唤醒所有已经阻塞的其他 take 线程,当队列空了后阻塞之后的 take 线程

Node

	static class Node<E> {
    E item;Node<E> next;Node(E x) {
     item = x; }}

链表节点内部类 Node

属性、构造方法

	// 容量private final int capacity;// 元素个数private final AtomicInteger count = new AtomicInteger();// 头节点transient Node<E> head;// 尾节点private transient Node<E> last;// “取”锁private final ReentrantLock takeLock = new ReentrantLock();// “取”锁对应条件队列private final Condition notEmpty = takeLock.newCondition();// “拿”锁private final ReentrantLock putLock = new ReentrantLock();// “拿”锁对应条件队列private final Condition notFull = putLock.newCondition();// 默认容量 Integer.MAX_VALUEpublic LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);}// 自定义容量public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}// 指定集合public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock();try {
    int n = 0;for (E e : c) {
    if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {
    putLock.unlock();}}

默认容量为 Integer.MAX_VALUE

add(E e) & offer(E e)

    ------------------ AbstractQueue ------------------public boolean add(E e) {
    if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}public boolean offer(E e) {
    // 不能为空if (e == null) throw new NullPointerException();// 元素个数final AtomicInteger count = this.count;// 满了则返回 falseif (count.get() == capacity)return false;final int c;final Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {
    if (count.get() == capacity)return false;// 插入enqueue(node);c = count.getAndIncrement();// 队列未满时会唤醒 put 线程if (c + 1 < capacity)notFull.signal();} finally {
    putLock.unlock();}// 如果是插入了第一个元素,则唤醒 take 线程if (c == 0)signalNotEmpty();return true;}

在队列未满时,会唤醒阻塞的 put 线程。在队列为空时的第一次插入会唤醒阻塞的 take 线程,因为在这之前 take 线程会被阻塞,队列满时插入失败,返回 false

put(E e) & offer(E e, long timeout, TimeUnit unit)

	public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();final int c;final Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {
    while (count.get() == capacity) {
    // 队列满时阻塞notFull.await();}enqueue(node);c = count.getAndIncrement();         if (c + 1 < capacity)notFull.signal();} finally {
    putLock.unlock();}if (c == 0)signalNotEmpty();}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
    if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);final int c;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {
    while (count.get() == capacity) {
    if (nanos <= 0L)return false;// 阻塞指定时间nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {
    putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

在队列未满时,会唤醒阻塞的 put 线程。在队列为空时的第一次插入会唤醒阻塞的 take 线程,因为在这之前 take 线程会被阻塞,队列满时 阻塞(指定时间)

enqueue(Node node)

	private void enqueue(Node<E> node) {
    last = last.next = node;}

从尾部入队,last 指向 node,也就是说,head 可以视为一个 dummy node,因此取元素我们应该取 head.next

signalNotEmpty()

	private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
    // 唤醒一个 take 线程notEmpty.signal();} finally {
    takeLock.unlock();}}

当空队列插入第一个元素时,唤醒一个 take 线程

remove() & poll()

	------------------ AbstractQueue ------------------public E remove() {
    E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}public E poll() {
    // 元素个数final AtomicInteger count = this.count;// 为空时返回 nullif (count.get() == 0)return null;final E x;final int c;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
    if (count.get() == 0)return null;// 取出x = dequeue();c = count.getAndDecrement();// 队列中还有元素则继续唤醒 take 线程if (c > 1)notEmpty.signal();} finally {
    takeLock.unlock();}// 队列满时取出第一个元素时唤醒 put 线程if (c == capacity)signalNotFull();return x;}

在队列未空时,会唤醒阻塞的 take 线程。在队列满时的第一次取出会唤醒阻塞的 put 线程,因为在这之前 put 线程会被阻塞,队列空时取出失败,返回 null

take() & poll(long timeout, TimeUnit unit)

	public E take() throws InterruptedException {
    final E x;final int c;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {
    while (count.get() == 0) {
    // 队列为空时阻塞notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {
    takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    final E x;final int c;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {
    while (count.get() == 0) {
    if (nanos <= 0L)return null;// 队列为空时阻塞指定时间nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {
    takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

在队列未空时,会唤醒阻塞的 take 线程。在队列满时的第一次取出会唤醒阻塞的 put 线程,因为在这之前 put 线程会被阻塞,队列空时 阻塞(指定时间)

dequeue()

	private E dequeue() {
    /*** 返回 head.next 的元素* 并更新 head */Node<E> h = head;Node<E> first = h.next;h.next = h; head = first;E x = first.item;first.item = null;return x;}

因为 head 是一个 dummy node,此处相当于返回了 head.next.item,并将新 headitem 置为 null

signalNotFull()

	private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;putLock.lock();try {
    // 唤醒一个 put 线程notFull.signal();} finally {
    putLock.unlock();}}

当满队列被取出第一个元素时,唤醒一个 put 线程

其他方法

同样地,LinkedBlockingQueue 还实现了其他方法比如 size() remainingCapacity() 等不再一一解读

总结

BlockingQueue 的经典实现,基于双锁因而效率更高,十分实用