【注意】最后更新于 September 2, 2018,文中内容可能已过时,请谨慎使用。
Striped64源码分析
Striped64博客
jdk 1.8.0.90 分析
简介
- Striped64是jdk1.8提供的用于支持如Long累加器,Double累加器这样机制的基础类。
- Striped64的设计核心思路就是通过内部的分散计算来避免竞争(比如多线程CAS操作时的竞争)。
- Striped64内部包含一个基础值(base)和一个单元哈希表(Cell)。没有竞争的情况下,要累加的数会累加到这个基础值上;如果有竞争的话,
会将要累加的数累加到单元哈希表中的某个单元里面。所以整个Striped64的值包括基础值和单元哈希表中所有单元的值的总和。
一、Striped64说明
数据 striping
根据维基百科的这段说明:
In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that
consecutive segments are stored on different physical storage devices.
Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading
segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method
for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage,
network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.
数据 striping 就是把逻辑上连续的数据分为多个段,使这一序列的段存储在不同的物理设备上。通过把段分散到多个设备上可以增加访问并发性,从而提升总体
的吞吐量。
设计思路
这个类维护一个延迟初始的、原子地更新值的表,加上额外的 “base” 字段。表的大小是 2 的幂。索引使用每线程的哈希码来masked。这个的几乎所有声明都是
包私有的,通过子类直接访问。
表的条目是 Cell 类,一个填充过(通过 sun.misc.Contended
)的 AtomicLong 的变体,用于减少缓存竞争。填充对于多数 Atomics 是过度杀伤的,因为它
们一般不规则地分布在内存里,因此彼此间不会有太多冲突。但存在于数组的原子对象将倾向于彼此相邻地放置,因此将通常共享缓存行(对性能有巨大的副作用),
在没有这个防备下。
部分地,因为Cell相对比较大,我们避免创建它们直到需要时。当没有竞争时,所有的更新都作用到 base 字段。根据第一次竞争(更新 base 的 CAS 失败),
表被初始化为大小 2。表的大小根据更多的竞争加倍,直到大于或等于CPU数量的最小的 2 的幂。表的槽在它们需要之前保持空。
一个单独的自旋锁(“cellsBusy”)用于初始化和resize表,还有用新的Cell填充槽。不需要阻塞锁,当锁不可得,线程尝试其他槽(或 base)。在这些重试中,
会增加竞争和减少本地性,这仍然好于其他选择。
通过 ThreadLocalRandom 维护线程中threadLocalRandomProbe字段,作为每线程的哈希码。我们让它们为 0 来保持未初始化直到它们在槽 0 竞争。然后初始化它们为通常不会互相
冲突的值。当执行更新操作时,竞争和/或表冲突通过失败了的 CAS 来指示。根据冲突,如果表的大小小于容量,它的大小加倍,除非有些线程持有了锁。如果一
个哈希后的槽是空的,且锁可得,创建新的Cell。否则,如果槽存在,重试CAS。重试通过 “重散列,double hashing” 来继续,使用一个次要的哈希算法
(Marsaglia XorShift)来尝试找到一个自由槽位。
表的大小是有上限的,因为,当线程数多于CPU数时,假如每个线程绑定到一个CPU上,存在一个完美的哈希函数映射线程到槽上,消除了冲突。当我们到达容量,
我们随机改变碰撞线程的哈希码搜索这个映射。因为搜索是随机的,冲突只能通过CAS失败来知道,收敛convergence(聚合) 是慢的,因为线程通常不会一直绑定到CPU上,
可能根本不会发生。然而,尽管有这些限制,在这些案例下观察到的竞争频率显著地低。
当哈希到特定 Cell 的线程终止后,Cell 可能变为空闲的,表加倍后导致没有线程哈希到扩展的 Cell 也会出现这种情况。我们不尝试去检测或移除这些 Cell,
在实例长期运行的假设下,观察到的竞争水平将重现,所以 Cell 将最终被再次需要。对于短期存活的实例,这没关系。
设计思路小结
- striping和缓存行填充:通过把类数据 striping 为 64bit 的片段,使数据成为缓存行友好的,减少CAS竞争。
- 分解表示:对于一个数字 5,可以分解为一序列数的和:
2 + 3
,这个数字加 1 也等价于它的分解序列中的任一 数字加 1:5 + 1 = 2 + (3 + 1)
。
- 通过把分解序列存放在表里面,表的条目都是填充后的 Cell;限制表的大小为 2 的幂,则可以用掩码来实现索引;同时把表的大小限制为大于等于CPU数量的最小的 2 的幂。
- 当表的条目上出现竞争时,在到达容量前表扩容一倍,通过增加条目来减少竞争。
二、Striped64数据结构
1.Cell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS. AtomicLong的填充变体只支持原始访问和CAS。
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
* JVM内部函数注意:如果提供的话,可以在这里使用CAS的只发布形式。
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
|
注解 @sun.misc.Contended
来自动实现缓存行填充,让Java编译器和JRE运行时来决定如何填充。本质上是一个填充了的、提供了CAS更新的volatile变量。
2.内部属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/** Number of CPUS, to place bound on table size */
//cpu的数量,确定cells数量大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
* 表的cells。当非空时,大小是2的幂。
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* base 值,在没有竞争时使用,也作为表初始化竞争时的一个后备。
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 自旋锁,在 resizing 和/或 创建Cell时使用。
*/
transient volatile int cellsBusy;
|
3.内部公用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
/**
* CASes the base field.
* CAS base字段
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
/**
* CASes the cellsBusy field from 0 to 1 to acquire lock.
* CAS cellsBusy 字段 0到1获取锁。
*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
* 返回当前线程的探测值。由于封装限制,从ThreadLocalRandom复制。
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
|
三、longAccumulate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
* 处理涉及初始化,调整大小,创建新单元格和/或争用的更新案例。请参阅上面的解释。这种方法受到乐观重试代码通常的非模块化问题的困扰,依靠重新检查的读取组。
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) { //获取当前线程probe值,probe为空时, ThreadLocalRandom.current()初始化当前线程probe值。
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;//这次相当于再次计算了当前线程probe值,所以设置未竞争标记为true。
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells 表不为空情况
if ((a = as[(n - 1) & h]) == null) { // (n - 1) & h 查找cells下标值。为空,添加值
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { //再一次做判断
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail CAS已经知道会失败,重新计算当前线程probe值
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // cas cells下标值。
break;
else if (n >= NCPU || cells != as) // cas cells下标值失败,判断一下cells大小大于cpu数量,大于,不能在扩容cells大小。
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) { //扩容cells大小,以2倍扩容
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // n << 1 2倍扩容
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 重新计算当前线程probe值,用的哈希算法(Marsaglia XorShift)。
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells为空情况,初始化cells表
boolean init = false;
try { // Initialize table 初始化cells数组值,默认数组大小2
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) //在竞争情况下,已经初始化cells表,cas,base字段值
break; // Fall back on using base
}
}
|
三、doubleAccumulate
doubleAccumulate跟longAccumulate逻辑是一样,我具体不做分析,唯一区别是先是Double.doubleToRawLongBits(x)转换为long值做longAccumulate操作。
在累加获取值时Double.longBitsToDouble转换为double在做累加。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
/**
* Same as longAccumulate, but injecting long/double conversions
* in too many places to sensibly merge with long version, given
* the low-overhead requirements of this class. So must instead be
* maintained by copy/paste/adapt.
*/
final void doubleAccumulate(double x, DoubleBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(Double.doubleToRawLongBits(x));
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base,
((fn == null) ?
Double.doubleToRawLongBits
(Double.longBitsToDouble(v) + x) :
Double.doubleToRawLongBits
(fn.applyAsDouble
(Double.longBitsToDouble(v), x)))))
break; // Fall back on using base
}
}
|
1.Double.doubleToRawLongBits
doubleToLongBits方法根据 IEEE 754 浮点双精度格式 (“double format”) 位布局,返回指定浮点值的表示形式。
第 63 位(掩码 0x8000000000000000L 选定的位)表示浮点数的符号,第62~52位(掩码 0x7ff0000000000000L 选定的位)表示指数,
第51~0位(掩码 0x000fffffffffffffL 选定的位)表示浮点数的有效数字(有时也称为尾数)。如果参数是正无穷大,则结果为 0x7ff0000000000000L;如果参数是负无穷大,
则结果为 0xfff0000000000000L;如果参数是 NaN,则结果为 0x7ff8000000000000L。
在所有情况下,结果都是一个 long 整数,将其赋予 longBitsToDouble(long) 方法将生成一个与 doubleToLongBits 的参数相同的浮点值(所有 NaN 值被压缩成一个"规范"NaN 值时除外)。
2.Double.longBitsToDouble
longBitsToDouble方法返回对应于给定位表示形式的 double 值。根据 IEEE 754 浮点"双精度格式"位布局,参数被视为浮点值表示形式。
如果参数是 0x7ff0000000000000L,则结果为正无穷大;如果参数是 0xfff0000000000000L,则结果为负无穷大;如果参数值在 0x7ff0000000000001L 到 0x7fffffffffffffffL
之间或者在 0xfff0000000000001L 到 0xffffffffffffffffL 之间,则结果为 NaN。Java提供的任何IEEE 754浮点操作都不能区分具有不同位模式的两个同类型NaN值,不同的NaN值只
能使用Double.doubleToRawLongBits方法区分。在所有其他情况下,设 s、e和m为可以通过以下参数计算的3个值。
int s = ((bits » 63) == 0) ? 1 : -1;
int e = (int)((bits » 52) & 0x7ffL);
long m = (e == 0) ? (bits & 0xfffffffffffffL) « 1 : (bits & 0xfffffffffffffL) | 0x10000000000000L;
那么浮点结果等于算术表达式 s·m·2e-1075 的值。
注意,此方法不能返回与 long 参数具有完全相同位模式的 double NaN。IEEE 754 区分了两种 NaN:quiet NaN和signaling NaN,这两种 NaN 之间的差别在中通常是不可见的。对 signaling NaN
进行的算术运算将它们转换为具有不同(但通常类似)位模式的 quiet NaN,但是在某些处理器上,只复制 signaling NaN 也执行这种转换。特别是在复制 signaling NaN以将其返回给调用方法时,
可能会执行这种转换。因此,longBitsToDouble可能无法返回具有signaling NaN 位模式的double值。对于某些long值,
doubleToRawLongBits(longBitsToDouble (start)) 可能不 等于 start。此外,尽管所有 NaN 位模式(不管是 quiet NaN 还是 signaling NaN)都必须在上面提到的 NaN 范围内,
但表示 signaling NaN 的特定位模式与平台有关。
五、总结
- 用分段锁机制,解决多个CAS竞争问题,更好提高性能
- IEEE754 这个博客讲的很棒,值得学习