Java并发编程学习——Java并发编程之美学习笔记四

原子变量操作类

在JUC并发包中有很多原子变量类,比如AtomicInteger,AtomicLong和AtomicBoolean等。他们原理类似,这里讲解AtomicLong类,AtomicLong是原子性递增和递减类,其内部使用Unsafe实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;

// 获取unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 声明偏移量
private static final long valueOffset;

// 判断JVM是否支持long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

private static native boolean VMSupportsCS8();

static {
try {
// 获取value在Atomica中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 实际的变量值
private volatile long value;

// 初始化
public AtomicLong(long initialValue) {
value = initialValue;
}

public AtomicLong() {
}

// 获取
public final long get() {
return value;
}

// 设置
public final void set(long newValue) {
value = newValue;
}

public final void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}

// 获取并设置long值,返回原来long的值
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}

/**
* 原子性设置值为被给予的更新值如果当前值与参数中到的expect期望值相等的话
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

// 和上面方法一样 但是可能会失败而且不提供保证 一般很少代替上面的方法
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

// 原子性增加1
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

// 原子性减1
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}

// 原子性增加指定的值
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}

// 增加1且返回更新后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

// 原子性减1且返回更新后的值
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}

// 原子性更新循坏使用CAS 导致线程竞争
public final long getAndUpdate(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return prev;
}

public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}

// 传入函数 自定义算法
public final long getAndAccumulate(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return next;
}

public String toString() {
return Long.toString(get());
}

public int intValue() {
return (int)get();
}

public long longValue() {
return get();
}

public float floatValue() {
return (float)get();
}

public double doubleValue() {
return (double)get();
}

}

在这里AtomicLong类可以直接获取Unsafe是因为它本身是在rt.jar下面的是通过Bootstrap类加载器加载的。

上面的原子操作类都是使用的CAS非阻塞算法,性能更好,但是在高并发的情况下Atomicxxx还存放着性能问题(会导致线程一直竞争CAS,导致大量资源浪费),在JDK1.8之后提供了高并发下的LongAdder类。

JDK8新增的原子操作类LongAdder

在AtomicLong中,在高并发的情况下大量线程会同时竞争更新同一个原子变量,这样会导致大量线程失败通过无线循环不断进行自旋尝试CAS的操作,这回白白浪费CPU资源。

因为AtomicLong是多个线程竞争同一个原子变量,而LongAdder中则将原子变量和线程一一对应,比如设置一个Cell数组,其中的元素对应着每个线程(通过一定的算法实现),然后最后获取值的时候将base(基础值)和cell数组中的元素值相加。

LongAdder维护了一个延迟初始化的原子性更新数组(默认情况为null)和一个基础值base。由于cell占用的内存相对较大所以是在需要的时候创建,即惰性加载

因为cell是一个数组,数组中的元素内存地址是连续的,这就很容易导致伪共享的问题,在LongAdder中使用了@Contented注解来避免。

LongAdder代码分析

我们围绕着6个问题去分析源码

  1. LongAdder的结构是怎样的 答:继承了Striped64里面有一个base cell数组和cellBusy自旋操作标志

  2. 当前线程应该访问Cell数组里面的哪一个Cell元素 答:获取当前线程的探针(作为每个线程对应哪一个cell的算法基础),根据当前线程的随机数ThreadLocalRandomProbe和cell元素个数计算当前要访问的cell元素的下标,如果发现对应下标的元素为空则新增一个Cell元素到数组中并在之前将cellBusy设置为1防止其他线程竞争

  3. 如何初始化Cell数组 答:懒加载 但需要操作的时候进行初始化操作

  4. Cell数组如何进行扩容 答:当CPU个数大于cell元素进行扩容 这时会多个CPU(线程)争抢一个cell元素产生冲突

  5. 线程访问分配的Cell元素后有冲突后应该如何处理 答:进行扩容操作

  6. 如何保证线程操作被分配的Cell元素的原子性 答:使用volatile保证内存可见性,使用cas操作保证操作原子性

    LongAdder

    我们可以看到LongAdder继承Striped64,而Striped64中维持着base,cellBusy,cell三个变量。

    base是用来计算LongAdder的真实值的(base和cell元素相加),cellBusy是用来实现自旋锁的,状态值只有0和1,当创建Cell元素,扩容Cell数组或者初始化Cell数组的时候使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。

    我们来看一下Cell的构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@sun.misc.Contended static final class Cell {
// 维持着一个long,保证内存可见性声明为volatile
volatile long value;
Cell(long x) { value = x; }
// 进行cas操作,是更新时原子性的
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

我们来看一下LongAdder源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// 继承了Striped64
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;

public LongAdder() {
}

// 增加指定的值 主要操作
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 如果数组为空且cas失败会进入
// 或者数组不为空并且没有cas操作的时候进入
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 当数组为空直接进入 这里就是数组为空且刚刚cas失败进入
// 或者c数组不为空但再次cas失败则进入调用longAccumulate
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

// 增加1
public void increment() {
add(1L);
}
// 减少1
public void decrement() {
add(-1L);
}

// 计算所有的值即获取真正的value
// 没有做加锁操作 所以并不是原子性的 会存在并发问题
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

// 重置所有元素为0
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}

// 计算并重置
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}

public String toString() {
return Long.toString(sum());
}

public long longValue() {
return sum();
}

public int intValue() {
return (int)sum();
}

public float floatValue() {
return (float)sum();
}

public double doubleValue() {
return (double)sum();
}


private static class SerializationProxy implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;

/**
* The current value returned by sum().
* @serial
*/
private final long value;

SerializationProxy(LongAdder a) {
value = a.sum();
}

/**
* Return a {@code LongAdder} object with initial state
* held by this proxy.
*
* @return a {@code LongAdder} object with initial state
* held by this proxy.
*/
private Object readResolve() {
LongAdder a = new LongAdder();
a.base = value;
return a;
}
}


private Object writeReplace() {
return new SerializationProxy(this);
}


private void readObject(java.io.ObjectInputStream s)
throws java.io.InvalidObjectException {
throw new java.io.InvalidObjectException("Proxy required");
}

}

我们来看一下longAccumulate函数源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 获取当前线程的探针(作为每个线程对应哪一个cell的算法基础)
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 如果cell不为空
if ((as = cells) != null && (n = as.length) > 0) {
// 根据当前线程的随机数ThreadLocalRandomProbe和cell元素个数计算当前要访问的cell元素的下标,如果发现对应下标的元素为空则新增一个Cell元素到数组中并在之前将cellBusy设置为1防止其他线程竞争
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 如果cas已经知道失败则重置标志
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 进行cas操作成功直接返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cell元素个数大于CPU个数会产生冲突 冲突则进行扩容操作
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
// 移位增加两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 扩容的时候进行cas操作避免其他线程进行扩容或者更新操作完成后设置cellBusy为0
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 如果为空则进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 初始化
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
```

## LongAccumulator类原理探究

LongAccumulator比LongAdder更加强大,原因在于LongAdder中只有累加操作,而LongAccumulator中是自定义函数来实现的。我们来看一下LongAccumulator的accumulate函数和LongAdder中的add方法的区别

```java
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
// 这里传入了计算的值和一个函数
longAccumulate(x, function, uncontended);
}
}

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 传入了值和一个null 我们可以猜测 传入null的时候给我们默认进行增加操作了
longAccumulate(x, null, uncontended);
}
}

我们继续回顾一下刚刚上面的longAccumulate方法源码中最后一个else if

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 很明显 当fn为null的时候使用了v + x操作 不然调用该fn的applyAsLong
// 我们可以继续猜测这里面进行了自定义
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;

// 这就是一个函数接口
@FunctionalInterface
public interface LongBinaryOperator {

/**
* Applies this operator to the given operands.
*
* @param left the first operand
* @param right the second operand
* @return the operator result
*/
long applyAsLong(long left, long right);
}
-------------本文结束感谢阅读-------------