当前位置: 代码迷 >> 综合 >> 并发包源码解读——ConcurrentLinkedQueue
  详细解决方案

并发包源码解读——ConcurrentLinkedQueue

热度:7   发布时间:2024-02-26 13:23:38.0

并发包源码解读——ConcurrentLinkedQueue


? 先说点题外话,安利一波看jdk源码的好处,如果你不想看,略过也可

? 看源码真的对咱们程序员很有帮助。大家可能觉得看源码就是为了精通内部机制、然后会用,或者就是为了面试。但其实不是的,就我自身而言,我刚接触源码,看ArrayList的时候也是非常非常的慢,很多地方不知道大神写的什么意思,他们写这块逻辑的时候是怎么考虑的,可能得边看边存疑,然后回头再看一遍又一遍,又枯燥又没有成就感,哪有自己写一个页面或者一个后台功能来的爽,像极了当年高考做英语阅读的时候,因为看过的东西太少。但当你慢慢地啃下了第一篇源码,脑子里有了一个大致的原理,再看第二篇,你会发现有很多逻辑跟其他源码一样,能看懂大部分,又能学到一些新东西,再看第三篇、第四篇,良性循环,脑子里的知识越来越多,其他的新知识学起来也会更快

? 虽然第一步很难很难,但是只要你迈出了这一步,后面的道路肯定会越来越自信的

下面我们进入正题,ConcurrrentLinkedQueue是一个支持并发的高性能排队队列,我们以一个简单的例子开始

ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();queue.offer("");queue.poll();

1、构造函数

很简单,在构造函数中初始化了首尾节点

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);}

节点的构造函数就是初始化节点的值

所以ConcurrentLinkedQueue初始化了值为空的首尾节点

private static class Node<E> {
    volatile E item;volatile Node<E> next;Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {
    throw new Error(e);}}}

2、入队——offer

public boolean offer(E e) {
    //初始化一个值为e的节点checkNotNull(e);final Node<E> newNode = new Node<E>(e);//这是一个尾节点的自旋,for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;//如果尾节点没有后继,说明没有并发入队,就把尾节点的后继指向当前入队的eif (q == null) {
    if (p.casNext(null, newNode)) {
    //p原来是尾节点,上次自旋失败后p变成了他的后继,说明后继变了但是尾节点还没更新//那就先把两个后继连起来,然后再更新尾节点if (p != t) casTail(t, newNode);  return true;}}//这其实是出队时造成的现象,说明当前节点已经出队了,出队的头节点的后继都会指向自己//虽然尾节点出队了,但是并不表示尾节点出队后,其他节点没有入队,所以只能从头开始找else if (p == q)p = (t != (t = tail)) ? t : head;else//如果其他线程还没来的及改尾节点,把p赋值成他的后继q,然后重新尝试入队//如果尾节点已经被改了,那就给p重新赋值尾节点p = (p != t && t != (t = tail)) ? t : q;}}

3、出队——poll

public E poll() {
    //这又是一个自旋restartFromHead:for (;;) {
    for (Node<E> h = head, p = h, q;;) {
    E item = p.item;//cas出队,把头节点的值对象置为nullif (item != null && p.casItem(item, null)) {
    //p原来是头节点,上一次自旋失败后变成了头节点的后继(第二个节点)//头节点已经变了,要更新头节点,如果头节点还有后继就更新成后继,没有后继正好p的值对象item也是nullif (p != h) updateHead(h, ((q = p.next) != null) ? q : p);return item;}//cas失败,说明并发出队改变了头节点//如果头节点没有后继,说明队列里没有节点了,首尾节点相同是个值对象为null的节点,那就更新头节点为pelse if ((q = p.next) == null) {
    updateHead(h, p);return null;}//(先看下面的else,因为这是第一次自旋进不来的)如果头节点的后继就是他自己,说明该重新取头节点了else if (p == q)continue restartFromHead;//把头节点p置为他的后继q,然后在下一次循环时再尝试一次出队(下次尝试出队的就是后继q,因为并发出队,头节点已经被别的线程出队了),如果还没出队就跳到外层循环,重新取头节点elsep = q;}}}

这个updateHead需要注意一下

他的作用是更新头节点,但包含的操作不仅仅是cas,还有个lasySetNext方法用来将老的头节点的后继指向自己,这样对并发入队出队同一个节点有区分作用

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))h.lazySetNext(h);}
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}