【注意】最后更新于 July 22, 2017,文中内容可能已过时,请谨慎使用。
ConcurrentLinkedQueue源码分析
简介
- ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。
- ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法(CAS来实现)。
一、 ConcurrentLinkedQueue数据结构实现
1.继承Queue接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
public interface Queue<E> extends Collection<E> {
/**
* Inserts the specified element into this queue if it is possible to do so
* immediately without violating capacity restrictions, returning
* <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt>
* if no space is currently available.
* 如果有可能立即将指定元素插入到该队列中,如果不违反容量限制,则返回< tt > true </ tt >,
* 并抛出一个< tt >IllegalStateException< / tt >,如果没有可用空间。
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions.
* When using a capacity-restricted queue, this method is generally
* preferable to {@link #add}, which can fail to insert an element only
* by throwing an exception.
* 如果可以在不违反容量限制的情况下立即执行,则将指定的元素插入到此队列中。当使用容量限制队列时,
* 这个方法通常优于{@ link # add},它只能通过抛出异常来插入一个元素。
* @param e the element to add
* @return <tt>true</tt> if the element was added to this queue, else
* <tt>false</tt>
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
boolean offer(E e);
/**
* Retrieves and removes the head of this queue. This method differs
* from {@link #poll poll} only in that it throws an exception if this
* queue is empty.
* 检索并删除该队列的头部。这个方法不同于{@ link # poll poll},它只在该队列为空时抛出异常。
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
E remove();
/**
* Retrieves and removes the head of this queue,
* or returns <tt>null</tt> if this queue is empty.
* 如果此队列为空,则检索并删除此队列的头部,或返回< tt > null </ tt >。
* @return the head of this queue, or <tt>null</tt> if this queue is empty
*/
E poll();
/**
* Retrieves, but does not remove, the head of this queue. This method
* differs from {@link #peek peek} only in that it throws an exception
* if this queue is empty.
* 检索,但不删除,此队列的头部。此方法与{@ link # peek peek}不同,只有在该队列为空时才抛出异常。
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
E element();
/**
* Retrieves, but does not remove, the head of this queue,
* or returns <tt>null</tt> if this queue is empty.
* 如果此队列为空,则检索该队列的头部,但不删除该队列的头,或返回< tt > null </ tt >。
* @return the head of this queue, or <tt>null</tt> if this queue is empty
*/
E peek();
}
|
2.单链表结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* 构造一个新节点。使用轻松写入,因为只有在通过casNext发布后才能看到项目。
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
//cas修改数据
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//更新本地线程内存中,过几纳秒刷新到主内存中
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//cas 更新next 对象
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
|
3.ConcurrentLinkedQueue构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static final long serialVersionUID = 196745693267521676L;
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* 可以在O(1)时间内到达第一个(非删除)节点(如果有)的节点
* Invariants: 不变量:
* - all live nodes are reachable from head via succ() 所有活节点可通过succ()从头部到达
* - head != null
* - (tmp = head).next != tmp || tmp != head
* Non-invariants: 非不变量:
* - head.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
private transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* 列表上最后一个节点(即具有节点的惟一节点)的节点。可以在O(1)时间内到达
* Invariants:
* - the last node is always reachable from tail via succ()
* - tail != null
* Non-invariants:
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
*/
private transient volatile Node<E> tail;
/**
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
* 创建一个空节点赋值头节点和尾节点
*/
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
|
二、 入队
- CAS的tail节点,执行两次offer才更新,为性能提升
1. offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node p是最后一个节点
if (p.casNext(null, newNode)) { //CAS next 下个节点
if (p != t) // hop two nodes at a time 一次跳两个节点,高并发下,这里设计不要频繁更新tail节点,为性能提升
casTail(t, newNode); // Failure is OK. 说明失败,其他线程已经更改
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) //相等情况,出队列时,正好时tail节点,操作updateHead方法,p.next指向本身自己。
p = (t != (t = tail)) ? t : head;
else
//检查两跳后的尾部更新
p = (p != t && t != (t = tail)) ? t : q;
}
}
|
2.add 调用offer
1
2
3
|
public boolean add(E e) {
return offer(e);
}
|
3.allAll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
/**
* Appends all of the elements in the specified collection to the end of
* this queue, in the order that they are returned by the specified
* collection's iterator. Attempts to {@code addAll} of a queue to
* itself result in {@code IllegalArgumentException}.
* 将指定集合中的所有元素附加到此队列的末尾,按照指定集合的迭代器返回的顺序。
* 尝试{@代码将队列的所有}添加到它本身导致{@代码非法的参数异常}。
* @param c the elements to be inserted into this queue
* @return {@code true} if this queue changed as a result of the call
* @throws NullPointerException if the specified collection or any
* of its elements are null
* @throws IllegalArgumentException if the collection is this queue
*/
public boolean addAll(Collection<? extends E> c) {
if (c == this)
// As historically specified in AbstractQueue#addAll
throw new IllegalArgumentException();
// Copy c into a private chain of Nodes
Node<E> beginningOfTheEnd = null, last = null;
for (E e : c) { //集合拼接Node链表
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode;
else {
last.lazySetNext(newNode);
last = newNode;
}
}
if (beginningOfTheEnd == null)
return false;
// Atomically append the chain at the tail of this collection
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, beginningOfTheEnd)) {
// Successful CAS is the linearization point
// for all elements to be added to this queue.
if (!casTail(t, last)) {
// Try a little harder to update tail,
// since we may be adding many elements.
//尝试更难更新tail,因为我们可能添加了很多元素
t = tail;
if (last.next == null)
casTail(t, last);
}
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
|
三、 出队(poll)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) { // item 不为空,CAS更新item为空
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time 一次跳两个节点 跟offer更新tail一样
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q) //这里相等情况,有个线程修改head,数据正好清空情况
continue restartFromHead; //重新迭代内部for语句
else
p = q;
}
}
}
/**
* Try to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
|
四、 读取列头数据
1.peek
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
|
2.element
element方法调用抽象AbstractQueue类
1
2
3
4
5
6
7
8
|
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
|
五、 移除(remove)
- CAS的item为空
- 并发删除两个相邻节点有问题吗?
N1->N2->N3-N4-N5
- 并发删除N2,N3
- N1.cas针对删除N2
- N2.cas针对删除N3
- N3没有删除成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public boolean remove(Object o) {
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
return true;
}
pred = p;
}
return false;
}
//查询头节点
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
|
六、size 慢原因?还不精准?
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public int size() {
restartFromHead: for (;;) {
int count = 0;
for (Node<E> p = first(); p != null;) {
if (p.item != null)
if (++count == Integer.MAX_VALUE)
break; // @see Collection.size()
if (p == (p = p.next))
continue restartFromHead;
}
return count;
}
}
|