gofor's blog

  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

Striped64 学习

发表于 2020-01-08 更新于 2020-01-10 分类于 技术 阅读次数: Valine:

Striped64 学习

很长一段时间,不知道Striped64这个类的存在,当然,还是由于对并发包不了解的缘故。在大概了解了一下之后,才知道,原来这是一个并发计数组件,这个类很有意思,在这里学习总结一下。

简单介绍

Striped64是在java8中新增的用于支持累加器的并发组件(和LongAdder、DoubleAdder一起增加的一个抽象内部类),它支持在并发环境下计数,其设计思想是避免线程激烈竞争时引起的开销和异常,通过分散竞争的方式(允许不超过CPU核心数个线程同时执行)实现最大效率。
在Striped64内部维护了3个重要操作变量,分别是:cells(Cell类型的数组)、base(基础计数器)、cellsBusy(Cell数组状态值)。Cell对象内部维护了一个计数值value,用来记录线程局部计数值。

计数核心过程为:

  • 根据当前线程来计算一个哈希值,根据算法(hashCode & (length - 1))取模定位到该线程被分散到的Cell数组中的位置;
  • 如果Cell数组还没有被创建,那么就去获取cellBusy这个状态值,如果获取成功,则初始化Cell数组,初始容量为2,初始化完成之后将x(计数增值)包装到一个Cell,哈希计算之后分散到相应的index上。如果获取cellBusy失败,那么会试图将x(计数增值)累计到base上,更新失败会重试(自旋)直到成功。
  • 如果Cell数组已经被初始化过了,那么就根据线程的哈希值分散到一个Cell数组元素上,获取这个位置上的Cell并且赋值给一个变量,如果该值为null,说明该位置还没有被初始化,那么在竞争cellBusy变量成功后初始化,如果不为null,说明该位置上的值是当前线程的,更新该计数值。
  • 如果Cell数组的大小已经最大了(大于等于CPU的数量),那么就需要重新计算哈希,来重新分散当前线程到另外一个Cell位置上再走一遍该方法的逻辑,否则就需要对Cell数组进行扩容(2倍大小于原数组),然后将原来的计数内容迁移过去。由于Cell里面保存的是计数值,所以扩容后没有必要做其他处理,直接根据index将旧的Cell数组内容复制到新的Cell数组中。
  • 最后将base值和cells中各个Cell的value值进行累加,即为最终计数结果。

源码分析

核心变量
1
2
3
4
5
6
7
8
9
10
11
/** CPU核心数,用来控制Cells数组大小 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** Cell数组,当非空时,大小为2的倍数 */
transient volatile Cell[] cells;

/** 基础计数值,用于没有线程竞争时或初始化Cell数组失败时,通过CAS更新值 */
transient volatile long base;

/** 自旋锁,当新建Cell数组或对Cell数组扩容时,通过CAS加锁 */
transient volatile int cellsBusy;
内部类Cell

关于注解@sun.misc.Contended:来避免伪共享。原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。
什么是伪共享:当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行(缓存行是2的整数幂个连续字节,一般为32-256个字节,最常见的缓存行大小是64个字节),就会无意中影响彼此的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 该类是基于AtomicLong的变体,仅支持原始访问和CAS
@sun.misc.Contended static final class Cell {
// 真实计数值
volatile long value;
Cell(long x) { value = x; }
// 执行CAS操作,期望值和实际值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// value在内存中的位置
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
xorshift 伪随机数生成函数
1
2
3
4
5
6
7
8
9
10
// 同一个线程根据上一次的hashcode通过xorshift函数再次生成随机数后,
// 更新到threadLocalRandomProbe中,并返回随机数作为hashcode
// 由于包限制,这段代码是从ThreadLocalRandom拷贝过来的
static final int advanceProbe(int probe) {
probe ^= probe << 13;
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
longAccumulate方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// x 元素
// fn 更新函数,如果是add可以为null,这个约定避免了longadder中定义额外的变量或者函数
// wasUncontended 如果CAS在调用之前失败了,这个值为false
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 获取当前线程的probe值,如果为0,则需要初始化该线程的probe值
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// 以上代码通过ThreadLocalRandom伪随机数生成当前线程的哈希值
// 碰撞标记,如果上一个slot不为空置为true
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells不为空,进入cell操作逻辑
if ((as = cells) != null && (n = as.length) > 0) {
// 根据(n - 1) & h])取模得到Index,若该Index位置为空准备创建Cell对象
if ((a = as[(n - 1) & h]) == null) {
// 判断锁状态
if (cellsBusy == 0) { // Try to attach new Cell
// 在该Index位置上创建一个封装了x的Cell对象
Cell r = new Cell(x); // Optimistically create
// 再次判断锁状态,同时获取锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 对数组取模,判断该Index位置是否为空
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 数组中新增Cell对象,并更新创建状态
rs[j] = r;
created = true;
}
} finally {
// 释放cellsBusy锁
cellsBusy = 0;
}
// 创建成功跳出,否则重试
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// (n - 1) & h])位置上已经有一个Cell对象,不需要再初始化了
// CAS操作已经失败了,出现了竞争
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 尝试修改a(a为Index位置对应的Cell)上的计数值
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// cells大小大于等于CPU核心数,或者cells已经被更新
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 置反
else if (!collide)
collide = true;
// 再次判断锁状态,同时获取锁,并对数组扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// cell数组没有被修改过
if (cells == as) { // Expand table unless stale
// 创建一个新的cell数组,大小为原来的2倍
Cell[] rs = new Cell[n << 1];
// 将旧数组中的cell对象迁移到新的数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 更新cell数组
cells = rs;
}
} finally {
// 是否cellsBusy锁
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新获取哈希值
h = advanceProbe(h);
}
// cells为空,cellsBusy锁空闲,获取锁并初始化cells
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// cells未发生变化(始终为空)
if (cells == as) {
// cells默认大小为2
Cell[] rs = new Cell[2];
// 将x封装到cell中,将cell更新到数组第(h&1)个位置
rs[h & 1] = new Cell(x);
cells = rs;
// 初始化成功
init = true;
}
} finally {
// 释放cellsBusy锁
cellsBusy = 0;
}
if (init)
break;
}
// cells为空,获取cellsBusy锁失败,通过CAS方式更新base值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

doubleAccumulate方法是从longAccumulate方法拷贝的,除了类型部分,代码逻辑一致,拷贝的目的是为了减少由于复用引起的类型转换开销。
java8中,LongAdder和DoubleAdder类都实现了Striped64类,且分别使用到了longAccumulate方法和doubleAccumulate方法。在使用上,由于LongAdder用到了Striped64类似分散竞争的思想,因此在效率上要比AtomicLong类更高。
以下为LongAdder中,主要方法的实现,DoubleAdder中的实现类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 将cells赋值给临时变量as
// cells不为空或者CAS更新base值失败时
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 当前数组为空或者有空闲位置时,或
// 当数组不为空,且当前线程所在数组中位置不为空时,判断为线程自己,
// 尝试CAS更新当前位置的计数值,更新失败时
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 使用Striped64中方法,自旋直至更新成功
longAccumulate(x, null, uncontended);
}
}

public long sum() {
// 获取到cell数组
Cell[] as = cells; Cell a;
// 拿到base计数值
long sum = base;
// cell数组不为空
if (as != null) {
// 循环获取数组中不为空的cell的值value,进行累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
// 返回最终结果
return sum;
}

Striped64的设计思路在java8的ConcurrentHashMap中size的大小更新也有体现,使用类似的方式实现。

  • 本文作者: gofor
  • 本文链接: https://acehjm.github.io/2020/01/08/Striped64学习/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
# java # concurrent
SSH快速远程连接配置
LongAdder浅析
gofor

gofor

Programming technology
14 日志
4 分类
14 标签
RSS
GitHub StackOverflow
Creative Commons
© 2016 – 2020 gofor
由 Hexo 强力驱动 v3.9.0
|
主题 – NexT.Pisces v7.3.0
0%