当前位置: 代码迷 >> 综合 >> 【JUC源码】LinkedBlockingQueue
  详细解决方案

【JUC源码】LinkedBlockingQueue

热度:67   发布时间:2024-02-23 17:30:53.0

1.结构

LinkedBlockingQueue 继承关系,核心成员变量及主要构造函数:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
    // 链表的节点static class Node<E> {
    E item;// 当前元素的下一个,为空表示当前节点是最后一个Node<E> next;Node(E x) {
     item = x; }}// 链表的最大容量,默认是 Integer.MAX_VALUEprivate final int capacity;// 链表已有元素多少,使用 AtomicInteger,所以是线程安全的private final AtomicInteger count = new AtomicInteger();// 链表头// 注;head一般指向的都是一个哨兵节点(数据为null)。// 1.目的:辅助出队,即在 take/poll/remove 需要将头结点删除(dequeue)时发挥作用。// 2.注意:这个哨兵节点是随着每次出队都变化的。具体请看后面dequeue方法注释transient Node<E> head;// 链表尾private transient Node<E> last;//--------------------------------锁---------------------------------------// take 时的锁private final ReentrantLock takeLock = new ReentrantLock();// take 的条件队列,其实很容易理解:出队时队列不能为空private final Condition notEmpty = takeLock.newCondition();// put 时的锁// 注:设计两把锁的目的,主要为了 take 和 put 可以同时进行。ArrayBlockingQueue只有一把锁。private final ReentrantLock putLock = new ReentrantLock();// put 的条件队列,其实很容易理解:入队时队列不能是满的private final Condition notFull = putLock.newCondition();// 迭代器。LinkedBlockingQueue 实现了自己的迭代器private class Itr implements Iterator<E> {
    }//--------------------------------构造器--------------------------------------// 构造函数一:空参构造。不指定容量时,默认 Integer 的最大值public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);}// 构造函数二:指定链表容量大小。public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 使链表头尾相等,表示当前链表为空,并创建出第一个哨兵节点last = head = new Node<E>(null);}// 构造函数三:已有集合数据进行初始化public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {
    int n = 0;for (E e : c) {
    // 集合内的元素不能为空if (e == null)throw new NullPointerException();// capacity 代表链表的大小,在这里是 Integer 的最大值// 如果集合类的大小大于 Integer 的最大值,就会报错// 其实这个判断完全可以放在 for 循环外面,这样可以减少 Integer 的最大值次循环(最坏情况)if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {
    putLock.unlock();}}
}
  • 链表的作用是为了保存当前节点,节点中的数据可以是任意东西,是一个泛型,比如说队列被应用到线程池时,节点就是线程(thread),比如队列被应用到消息队列中,节点就是消息,节点的含义主要看队列被使用的场景
  • 锁有 take 锁和 put 锁,是为了保证队列操作时的线程安全,设计两种锁,是为了 take 和 put 两种操作可以同时进行,互不影响。如果只有一把锁put,take同时只能进行一个
  • 初始化时,容量大小是不会影响性能的,只影响在后面的使用,因为初始化队列太小,容易导致没有放多少就会报队列已满的错误
  • 在对给定集合数据进行初始化时,我们不反对在每次 for 循环的时候,都去检查当前链表的大小是否超过容量,但我们希望在 for 循环开始之前就做一步这样的工作。举个列子,给定集合大小是 1 w,链表大小是 9k,按照现在代码实现,只能在 for 循环 9k 次时才能发现,原来给定集合的大小已经大于链表大小了,导致 9k 次循环都是在浪费资源,还不如在 for 循环之前就 check 一次,如果 1w > 9k,直接报错即可

2.方法解析&api

队列的主要方法无非就三个:入队、出队、队首。所以接下来我们就从这三个方法入手,来看看 LinkedBlockingQueue 的具体实现。这里的内容可以对比着 ArrayBlockingQueue 的源码进行理解,具体参考【JUC源码】ArrayBlockingQueue源码分析,两者的实现思路大同小异。

2.1 入队

put():满时阻塞

  1. 首先是一些准备操作,比如新建node,获取到 put 锁和计数器 count
  2. 对 putLock 加锁,所以后面的新增数据是线程安全的
  3. 新增数据分为两步:
    • 如果队列满了,当前线程会被加入条件队列中阻塞
    • 将新node简单的追加到链表的尾部
  4. 新增数据成功后,在适当时机,会唤醒阻塞的 put 和 take 线程,保证了唤起的时机不被浪费
    • 队列不满,唤起 put 的等待线程
    • 在 put 之前队列为空会,唤醒 take 的等待线程
// 把e新增到队列的尾部。如果有可以新增的空间的话,直接新增成功,否则当前线程陷入等待
public void put(E e) throws InterruptedException {
    // 要添加的元素 e 为空,抛出异常if (e == null) throw new NullPointerException();// 预先设置 c 为 -1,约定负数为新增失败int c = -1;Node<E> node = new Node<E>(e); // 创建新nodefinal ReentrantLock putLock = this.putLock; // 获取put的锁final AtomicInteger count = this.count; // 获取到队列中元素的个数putLock.lockInterruptibly(); // 加锁,设置可中断锁try {
    // !!!队列如果是满的,就将当前线程加入到notFull的条件队列中,然后进入阻塞状态// 等待某个线程[新增成功 || 拿走队列元素 || 删除元素]后,有机会被唤醒进入同步队列// 注:这里的while循环有double-check的意思,即防止线程已经被调度执行了,但前一刻有另一个线程put/offer/add成功while (count.get() == capacity) {
    // await 让出CPU,休眠,释放锁notFull.await();}// 队列没有满,直接新增到队列的尾部enqueue(node);// conut+1。这里是原子操作新增,getAndIncrement 返回的是旧值,所以 c 是比真实的 count 小 1 的c = count.getAndIncrement();// 如果链表现在的大小 小于链表的容量,说明队列未满。可以尝试唤醒一个 put 的等待线程 if (c + 1 < capacity)notFull.signal();} finally {
    putLock.unlock(); // 释放锁}// c==0,代表了队列之前空,现在刚刚新增了一个的情况。所以会尝试唤醒一个take的等待线程if (c == 0)signalNotEmpty();
}

offer():满时返回false

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;// !!!先判断是否满,满则返回falseif (count.get() == capacity)return false;// 剩下逻辑同 put int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {
    if (count.get() < capacity) {
    enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {
    putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}

add():队满抛异常

在 LinkedBlockingQueue 中并没有直接实现 add 方法,它是在父类 AbstractQueue 中,逻辑很简单就是简单调用 offer方法
在这里插入图片描述

enqueue()

  • 入队,把新元素放到队尾
  • add,offer,put都要调用此方法
private void enqueue(Node<E> node) {
    // 1.连接:last.next = node// 2.后移:last = nodelast = last.next = node;
}

2.2 出队

take():队空阻塞

队列空则阻塞,方法的原理与 put 相似

  1. 先上锁,所以后面的出队操作线程安全
  2. 取数据时分为两步:
    1. 若队列为空,当前线程会被加入 notEmpty 条件队列阻塞
    2. 调用dequeue,头删
  3. 出队完成后,看能否唤醒阻塞的线程
    • 队列不为空,唤醒 take 的等待线程
    • 在 take 之前满队列满,唤醒 put 的等待线程
public E take() throws InterruptedException {
    E x;// 默认负数,代表失败int c = -1;final AtomicInteger count = this.count; // count 代表当前链表数据的真实大小final ReentrantLock takeLock = this.takeLock; // 获取take的锁takeLock.lockInterruptibly(); // 加锁,设置为可中断try {
    // !!!队列如果是空的,就将当前线程加入到notEmpty条件队列中,然后进入阻塞状态// 等待某个线程拿向队列放入元素后,有机会被唤醒进入同步队列// 注:这里的while循环有double-check的意思,即防止线程已经被调度执行了,但前一刻有另一个线程take/poll/remove成功了,所以他又要进入条件队列notFull中阻塞等待。while (count.get() == 0) {
    notEmpty.await();}// 非空队列,从队列的头部拿一个出来x = dequeue();// count-1,这里是原子操作,getAndDecrement 返回的值是旧值(c 比真实的 count 大1)c = count.getAndDecrement();// 如果队列里面有值,从 take 的等待线程里面唤醒一个。if (c > 1)notEmpty.signal();} finally {
    takeLock.unlock(); // 释放锁}// 如果队列空闲还剩下一个,尝试从 put 的等待线程中唤醒一个if (c == capacity)signalNotFull();return x;
}

poll():队空返回null

public E poll() {
    final AtomicInteger count = this.count;// !!!队列为空返回nullif (count.get() == 0)return null;E x = null;// 下面逻辑同 putint c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {
    if (count.get() > 0) {
    x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {
    takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

remove():队空抛异常

同上面的 add,remove 方法在 LinkedBlockingQueue 也没有具体实现,也是在父类 AbstractQueue 中,也是直节调用 poll
在这里插入图片描述

dequeue()

  • 队头中取数据,链表头删。这里就体现出了哨兵节点的作用,具体请看下面注释。
  • remove,poll,take 都会调用此方法
private E dequeue() {
    Node<E> h = head; // 获取头结点h(哨兵节点)Node<E> first = h.next; // 获取第一个元素节点firsth.next = h; // help GChead = first; // 将头结点置为first。到这步相当于已经删除了之前的哨兵节点E x = first.item; // 保存第一元素节点first的数据xfirst.item = null;// 将first的数据删除,变为新的哨兵节点return x;
}

2.3 获取队首:peek

  • 查看并不删除元素,如果队列为空,返回 null
  • 注意,虽然删除,但在读取的时候也要拿到 take 锁,避免头结点被删除
public E peek() {
    // count 代表队列实际大小,队列为空,直接返回 nullif (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;// 上take锁,避免读取时被删除takeLock.lock();try {
    // 拿到队列头Node<E> first = head.next;// 判断队列头是否为空,并返回if (first == null)return null;elsereturn first.item;} finally {
    takeLock.unlock(); // 释放锁}
}