ConcurrentLinkedQueue的实现

今天我们介绍一下ConcurrentLinkedQueue的内部实现。从名字就可以看出来,其内部使用链表实现。下面介绍一下它的结构:

基本结构

单单从类图看来,结构不算复杂,有两个重要的属性就是head和tail。

方法操作

CAS操作

Node节点里面有几个CAS的操作,介绍一下:

        // CAS设置Node的值,cmp 期望值、val 目标值
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        // CAS设置next的节点,val 目标值
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        // CAS设置next的节点,cmp 期望值、val 目标值
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

offer()

offer()操作就是入队操作,即在队未添加节点,并且在这一过程中移动head和tail指针。其过程如下所示:

从上图可以看出,网队列中添加元素的时候,tail指针并不是一直在队列末端的,而是隔一个Node改变一次位置。下面看一下代码实现:

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        // 无锁操作
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p是最后一个节点
                if (p.casNext(null, newNode)) {
                    if (p != t) // 每2次更新tail指针位置
                        casTail(t, newNode);
                    return true;
                }
                // CAS竞争失败,再次尝试
            }
            else if (p == q)
                // 当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。
                p = (t != (t = tail)) ? t : head;
            else
                // 取下一个Node或者最后一个Node
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

从上面代码可以看出,先从一个空队列开始添加节点。添加第1个节点时,执行到if(q == null)这个模块,然后插入第一个Node,但是此时并没有更新tail指针。

添加第2个节点时,q为第1个节点,所以进入到第3个判断。此时p取的是下一个节点q 或者最后一个节点t。然后再次循环到if(q == null) ,此时插入节点后,执行了tail更新。

程序执行的过程中,还有一种情况是:当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。如下图所示:

​​​​​​​

注意:下面解释一下这一行代码:

p = (t != (t = tail)) ? t : head;

这里面只有多线程的情况下才会出现:t != (t = tail),程序首先会获取 !=号左边的t值,然后计算右边的值,此时如果tail被其他线程修改了。就有可能造成!=的发生,从而程序“打赌”直接取了t(也就是刚刚更新的tail)。

poll()

poll()操作是从队列头部取出Node,然后更新head指针的位置。下面看一下其具体的流程:

从上面流程图可以看出,当队列取出第1个Node时,head位置更新了,当取出第2个Node时,并没有更新head。这个变更和入队时tail的变更类似。下面看一下具体代码实现:

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // 设置Node的item为null,此时Node并未移除
                    if (p != h) // 2次更新head,移除Node
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {// 队列中无Node时
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)// 出现哨兵节点,全部重新开始
                    continue restartFromHead;
                else
                    p = q; //空Node时,向后移动
            }
        }
    }

从上面的代码,设置一个Node的场景。然后从队列中poll()元素2次,第1次poll()时,会进入到第4个判断,然后p向后移动。再次循环时,进入到第1个判断,此时出队,并且更新head指针。

然后执行第2个poll()时,直接进入到第2个判断,然后更新head指针,返回null。

在多线程操作的时候,会进入到第三个判断。此时Node的next会指向自己,因此会重新遍历链表,节点会出现一下情况:

参考:《java高并发程序编程》