ConcurrentLinkedQueue源码分析

简介

  • ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。
  • ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法(CAS来实现)。

一、 ConcurrentLinkedQueue数据结构实现

1.继承Queue接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public interface Queue<E> extends Collection<E> {
    /**
     * Inserts the specified element into this queue if it is possible to do so
     * immediately without violating capacity restrictions, returning
     * <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt>
     * if no space is currently available.
     * 如果有可能立即将指定元素插入到该队列中,如果不违反容量限制,则返回< tt > true </ tt >,
     * 并抛出一个< tt >IllegalStateException< / tt >,如果没有可用空间。
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null and
     *         this queue does not permit null elements
     * @throws IllegalArgumentException if some property of this element
     *         prevents it from being added to this queue
     */
    boolean add(E e);

    /**
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions.
     * When using a capacity-restricted queue, this method is generally
     * preferable to {@link #add}, which can fail to insert an element only
     * by throwing an exception.
     * 如果可以在不违反容量限制的情况下立即执行,则将指定的元素插入到此队列中。当使用容量限制队列时,
     * 这个方法通常优于{@ link # add},它只能通过抛出异常来插入一个元素。
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null and
     *         this queue does not permit null elements
     * @throws IllegalArgumentException if some property of this element
     *         prevents it from being added to this queue
     */
    boolean offer(E e);

    /**
     * Retrieves and removes the head of this queue.  This method differs
     * from {@link #poll poll} only in that it throws an exception if this
     * queue is empty.
     * 检索并删除该队列的头部。这个方法不同于{@ link # poll poll},它只在该队列为空时抛出异常。
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E remove();

    /**
     * Retrieves and removes the head of this queue,
     * or returns <tt>null</tt> if this queue is empty.
     * 如果此队列为空,则检索并删除此队列的头部,或返回< tt > null </ tt >。
     * @return the head of this queue, or <tt>null</tt> if this queue is empty
     */
    E poll();

    /**
     * Retrieves, but does not remove, the head of this queue.  This method
     * differs from {@link #peek peek} only in that it throws an exception
     * if this queue is empty.
     * 检索,但不删除,此队列的头部。此方法与{@ link # peek peek}不同,只有在该队列为空时才抛出异常。
     * @return the head of this queue
     * @throws NoSuchElementException if this queue is empty
     */
    E element();

    /**
     * Retrieves, but does not remove, the head of this queue,
     * or returns <tt>null</tt> if this queue is empty.
     * 如果此队列为空,则检索该队列的头部,但不删除该队列的头,或返回< tt > null </ tt >。
     * @return the head of this queue, or <tt>null</tt> if this queue is empty
     */
    E peek();
}

2.单链表结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * 构造一个新节点。使用轻松写入,因为只有在通过casNext发布后才能看到项目。
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
       //cas修改数据
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
       //更新本地线程内存中,过几纳秒刷新到主内存中
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
       //cas 更新next 对象
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

      

3.ConcurrentLinkedQueue构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;
    /**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * 可以在O(1)时间内到达第一个(非删除)节点(如果有)的节点
     * Invariants: 不变量:
     * - all live nodes are reachable from head via succ() 所有活节点可通过succ()从头部到达
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants: 非不变量:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * 列表上最后一个节点(即具有节点的惟一节点)的节点。可以在O(1)时间内到达
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;


    /**
     * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
     * 创建一个空节点赋值头节点和尾节点
     */
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

二、 入队

  • CAS的tail节点,执行两次offer才更新,为性能提升

1. offer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {  
                // p is last node  p是最后一个节点
                if (p.casNext(null, newNode)) { //CAS next 下个节点
                    if (p != t) // hop two nodes at a time 一次跳两个节点,高并发下,这里设计不要频繁更新tail节点,为性能提升
                        casTail(t, newNode);  // Failure is OK. 说明失败,其他线程已经更改
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)   //相等情况,出队列时,正好时tail节点,操作updateHead方法,p.next指向本身自己。
                p = (t != (t = tail)) ? t : head;
            else
                //检查两跳后的尾部更新
                p = (p != t && t != (t = tail)) ? t : q; 
        }
    }

2.add 调用offer

1
2
3
    public boolean add(E e) {
        return offer(e);
    }

3.allAll

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
     * Appends all of the elements in the specified collection to the end of
     * this queue, in the order that they are returned by the specified
     * collection's iterator.  Attempts to {@code addAll} of a queue to
     * itself result in {@code IllegalArgumentException}.
     * 将指定集合中的所有元素附加到此队列的末尾,按照指定集合的迭代器返回的顺序。
     * 尝试{@代码将队列的所有}添加到它本身导致{@代码非法的参数异常}。
     * @param c the elements to be inserted into this queue
     * @return {@code true} if this queue changed as a result of the call
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     * @throws IllegalArgumentException if the collection is this queue
     */
    public boolean addAll(Collection<? extends E> c) {
        if (c == this)
            // As historically specified in AbstractQueue#addAll
            throw new IllegalArgumentException();

        // Copy c into a private chain of Nodes
        Node<E> beginningOfTheEnd = null, last = null;
        for (E e : c) { //集合拼接Node链表
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (beginningOfTheEnd == null)
                beginningOfTheEnd = last = newNode;
            else {
                last.lazySetNext(newNode);
                last = newNode;
            }
        }
        if (beginningOfTheEnd == null)
            return false;

        // Atomically append the chain at the tail of this collection
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, beginningOfTheEnd)) {
                    // Successful CAS is the linearization point
                    // for all elements to be added to this queue.
                    if (!casTail(t, last)) {
                        // Try a little harder to update tail,
                        // since we may be adding many elements.
                        //尝试更难更新tail,因为我们可能添加了很多元素
                        t = tail;
                        if (last.next == null)
                            casTail(t, last);
                    }
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

三、 出队(poll)

  • CAS的head,执行两次更新,
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
   public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) { // item 不为空,CAS更新item为空
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time 一次跳两个节点 跟offer更新tail一样
                        updateHead(h, ((q = p.next) != null) ? q : p); 
                    return item;
                }
                else if ((q = p.next) == null) { 
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)  //这里相等情况,有个线程修改head,数据正好清空情况
                    continue restartFromHead; //重新迭代内部for语句
                else
                    p = q;
            }
        }
    }
    /**
     * Try to CAS head to p. If successful, repoint old head to itself
     * as sentinel for succ(), below.
     */
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }    

四、 读取列头数据

1.peek

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

2.element

element方法调用抽象AbstractQueue类

1
2
3
4
5
6
7
8

    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

五、 移除(remove)

  • CAS的item为空
  • 并发删除两个相邻节点有问题吗? N1->N2->N3-N4-N5
    • 并发删除N2,N3
    • N1.cas针对删除N2
    • N2.cas针对删除N3
    • N3没有删除成功
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    public boolean remove(Object o) {
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)) {
                Node<E> next = succ(p);
                if (pred != null && next != null)
                    pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }
   //查询头节点
    Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
    /**
     * Returns the successor of p, or the head node if p.next has been
     * linked to self, which will only be true if traversing with a
     * stale pointer that is now off the list.
     */
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }    

六、size 慢原因?还不精准?

  • 链表遍历,时间复杂度O(n),不慢才怪
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    public int size() {
        restartFromHead: for (;;) {
            int count = 0;
            for (Node<E> p = first(); p != null;) {
                if (p.item != null)
                    if (++count == Integer.MAX_VALUE)
                        break;  // @see Collection.size()
                if (p == (p = p.next))
                    continue restartFromHead;
            }
            return count;
        }
    }