1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
* 基本算法是循环尝试,执行下面两个步中的,其中一个:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
* 如果显然为空或已经包含相同模式的节点,则尝试在堆栈上推送节点并等待匹配,返回该匹配;如果取消,则返回null。
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
* 如果包含一个互补模式的节点(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)),
* 则尝试一个FULFILLING节点入栈,同时匹配等待的协同节点,两个节点同时出栈,返回匹配的元素。
* 由于其他线程执行步骤3,实际匹配和解除链接指针动作不会发生。
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
* 如果栈顶存在另外一个FULFILLING的节点,则匹配节点,并出栈。这段的代码
* 与fulfilling相同,除非没有元素返回
*/
SNode s = null; // constructed/reused as needed 根据需要构建/重用
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode 1.头节点为空或者模式相同
if (timed && nanos <= 0) { // can't wait 没有等到时间
if (h != null && h.isCancelled()) 头节点不为空并且头结点是否取消
casHead(h, h.next); // pop cancelled node 把头结点next设置为头节点
else
return null; // 头结点为空或者头结点不为取消节点就返回null
} else if (casHead(h, s = snode(s, e, h, mode))) { // CAS 头结点 snode创建或者重置节点
SNode m = awaitFulfill(s, timed, nanos); // 自旋转或者等待线程
if (m == s) { // wait was cancelled 等待取消,清理next节点
clean(s);
return null;
}
if ((h = head) != null && h.next == s) // CAS 头节点
casHead(h, s.next); // help s's fulfiller
return (mode == REQUEST) ? m.item : s.item; // REQUEST返回match.item,反之s.item
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 2.头节点模式不是FULFILLING
if (h.isCancelled()) // already cancelled 头结点取消,CAS 设置头节点
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //CAS 头结点 snode创建或者重置节点
for (;;) { // loop until matched or waiters disappear 循环直到匹配或waiters消失
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone 所有waiters都不见了
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) { //
casHead(s, mn); // pop both s and m
return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller 3.FULFILLING 模式
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone waiter 没有
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match 帮助match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
/**
* Spins/blocks until node s is matched by a fulfill operation.
* 自旋或阻塞直到节点s匹配完成操作
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* When a node/thread is about to block, it sets its waiter
* field and then rechecks state at least one more time
* before actually parking, thus covering race vs
* fulfiller noticing that waiter is non-null so should be
* woken.
*当一个节点/线程即将被阻塞时,它将设置其waiter字段,然后在实际parking之前至少再次检查状态,
* 从而涵盖竞争与履行者注意到waiter是非空的,所以应该被唤醒。
*
*
* When invoked by nodes that appear at the point of call
* to be at the head of the stack, calls to park are
* preceded by spins to avoid blocking when producers and
* consumers are arriving very close in time. This can
* happen enough to bother only on multiprocessors.
* 当被节点调用时,它会出现在堆栈的顶部,为了避免在生产者和消费者及时到达时,调用park进行自旋以避免阻塞。
* 这种情况只会发生在多处理器上。
*
* The order of checks for returning out of main loop
* reflects fact that interrupts have precedence over
* normal returns, which have precedence over
* timeouts. (So, on timeout, one last check for match is
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
* 从主循环返回的检查顺序反映了中断优先于正常返回(优先于超时)的事实。
* (所以,在超时之后,最后一次检查匹配是在放弃之前完成的。)
* 除了不计时的SynchronousQueue。{poll / offer}的调用不检查中断并且根本不等待,
* 所以被困在传输方法中比呼唤awaitFulfill。
*/
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
int spins = (shouldSpin(s) ? //spins 自旋转的次数
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(); // 把节点match字段设置自己
SNode m = s.match;
if (m != null) // 节点match不为空,就返回match
return m;
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime; //计算等待时间
lastTime = now;
if (nanos <= 0) { //计算出等待时间小于0,进行取消状态。
s.tryCancel();
continue;
}
}
if (spins > 0) //进行自旋转操作
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null) // 设置等待线程
s.waiter = w; // establish waiter so can park next iter
else if (!timed) // 判断是否等待超时时间,否 park 是parkNanos
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* Returns true if node s is at head or there is an active
* fulfiller.
* 如果节点在头或有活跃的Fulfilling,则返回true。
*/
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
/**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
* 在最坏的情况下,我们可能需要遍历整个栈来取消链接。如果有多个并发调用需要清理,
* 如果另一个线程已经删除了它,我们可能看不到s。但是当我们看到任何已知的节点时,我们可以停止。
* 我们使用s.next,除非它被取消,在这种情况下,我们尝试一个过去的节点。
* 我们不再进一步检查,因为我们不希望双重穿越而找到哨兵。
*/
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head 在头部吸收取消的节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes 无法嵌入节点
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
|