Java并发编程学习——Java并发编程之美学习笔记六

LockSupport工具类

LockSupport是JDk rt.jar下面的一个工具类,主要作用是 挂起和唤醒线程 ,该工具类是 创建锁和其他同步类的基础

LockSupport

LockSupport与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport的方法的线程是不持有许可证的。LockSupport是使用Unsafe类实现的。

LockSupport中的 void park() 方法

如果调用park()的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.park()时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起

如果其他线程调用 unpark(Thread thread) 并将该线程作为参数传入, 此时因为调用 park()方法而被阻塞的线程会返回,另外如果其他线程调用了阻塞线程的Interrupt()方法, 设置了中断标志或者线程被虚假唤醒,则阻塞线程也会返回,需要注意的是返回的时候不会抛出InterruptedException异常。

LockSupport中的 void unpark(Thread thread) 方法

当一个线程调用unpark方法,其中的参数线程如果没有持有与LockSupport类相关的许可证,那么会立即持有, 如果线程在之前已经调用了park()方法而被阻塞,那么该线程会立即返回,如果没有调用park()方法,那么调用unpark()之后调用park()会直接返回(因为通过unpark()获取了许可证)

LockSupport中的 void parkNanos(long nanos) 方法

和park()方法差不多,只不过如果调用线程没有许可证,那么会被阻塞指定时间而返回。

LockSupport中的 park(Object blocker) 方法

1
2
3
4
5
6
7
8
9
10
public static void park(Object blocker) {
// 获取当前线程
Thread t = Thread.currentThread();
// 设置Blocker
setBlocker(t, blocker);
// 挂起线程
UNSAFE.park(false, 0L);
// 线程被激活后清楚blocker
setBlocker(t, null);
}

Thread类中有一个volatile Object parkBlocker用来存放传递的blocker,也就是把blocker变量存放到了线程本地。一般使用LockSupport.park(this),在打印线程堆栈的时候就可以获得相关阻塞类的信息了。

LockSupport中的 void parkNanos(Object blocker, long nanos)

比上面方法多一个超时时间。

LockSupport中的 void parkUntil(Object blocker, long deadline)

多设置一个最终时间

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FIFOMutex {

private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>();

public void lock() {
// 判断是否被中断
boolean wasInterrupted = false;
// 获取当前线程并加入队列中
Thread current = Thread.currentThread();
waiters.add(current);

// 只有队首能获取锁
while (waiters.peek() != current || locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) {
wasInterrupted = true;
}
}
// 如果当前线程是队首并且没有其他线程抢占这个锁那么移除
waiters.remove();
// 如果刚刚被中断则相应中断
if (wasInterrupted) {
current.interrupt();
}
}

public void unlock() {
// 解锁的时候将布尔设置为false
locked.set(false);
// 激活等待中的队首线程
LockSupport.unpark(waiters.peek());
}

}

抽象同步队列 AQS 概述

AbstractQueuedSynchronizer 抽象同步队列简称 AQS。它是实现同步器的基本组件, 并发包中锁的底层就是使用AQS实现的。

AQS

我们大致能看到AQS是一个FIFO的双向队列,其内部通过headtail记录头尾节点,队列中元素类型为Node,其中Node中的thread变量用来存放AQS队列中的线程。Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起进入AQS队列的,EXCLUSIVE用来标记该线程是获取独占资源时被阻塞挂起进入AQS队列的。wiatStatus用来记录当前线程的等待状态,可以为CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在条件队列里等待),PROPAGATE(释放共享资源的时候需要通知其他节点);prev用来记录当前节点的前驱节点,next记录当前节点的后继节点。

在AQS中维持一个单一的状态信息state,可以通过getState,setState,compareAndSetState函数来操作, 对于 ReentrantLock 的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁 ReentrantReadWriteLock 来说,state的高16位表示读状态,低16位用来表示获取到写锁的线程的可重入次数。对于 semaphore 来说,state用来表示当前可用信号的个数,对于 CountDownlatch 来说,state用来表示计数器当前的值

AQS有一个内部类 ConditionObject ,用来结合锁实现线程同步。ConditionObject可以直接访问AQS的内部变量,比如state状态值和AQS队列,ConditionObject是一个条件变量,每一个条件变量对应着一个条件队列(单向链表队列)。其用来存放调用条件变量的await()方法后被阻塞的线程,条件队列的头尾元素分别为firstWaiter和lastWaiter。

对于AQS来说, 线程同步的关键在于对state的操作 。独占方式下获取和释放资源使用的方法是acquire(int), void acquireInterruptibly(int), boolean release(int)。共享方式:acquireShared(int),acquireSharedInterruptibly(int),boolean releaseShared(int)方法。

共享方式指的是能有多个线程获取资源(只要符合要求),独占指的是只能有一个线程获取资源。

独占方式获取和释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 在调用acquire获取的时候首先会调用tryAcquire来设置当前状态值
// 如果失败则会调用acquireQueued将其标志为EXCLUSIVE并放入AQS队列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 我们可以看到tryAcquire是需要子类去实现的
// 就比如ReentrantLock实现了对应的tryAcquire,使得state代表独占可重入
// 当state=0的时候代表没有线程占用,为大于等于1代表有线程占用并且state为可重入次数
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 在释放的时候首先会调用tryRelease()方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
// 同样的tryRelease也需要子类实现去怎么设置state的值
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

共享方式获取和释放

对于共享方式和独占方式基本差不多,只不过在获取资源失败的时候线程会被标记为SHARED并放入AQS队列。还需要注意的是对于acquire系列的方法有一些是带Interruptibly的,这些带有Interruptibly的需要对中断进行相应,即其他线程中断了次挂起线程后,这个线程需要抛出InterruptedException然后返回。

条件变量的支持

对于 synchronized 来说只能支持一个共享变量的notify或wait方法, 而 AQS的锁能对应多个条件变量 ,和synchronized一样在调用条件变量的await和signal方法之前必须先获取条件变量对应的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 创建一个ReentrantLock独占可重入锁
ReentrantLock lock = new ReentrantLock();
// 通过锁创建一个条件变量
Condition codition = lock.newCondition();
// 获取锁
lock.lock();
try {
begin wait...
// 进入条件变量对应的阻塞队列
condition.await();
// 被唤醒后
end wait...
} catch (Excepetion e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock()
}

// 获取锁
lock.lock();
try {
begin signal...
// 唤醒条件变量对应的阻塞队列中的线程
condition.signal();
end signal...
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}

在其中有一个 lock.newCondition() 这个其实是new了一个在AQS中内部声明的一个ConditionObject对象,可以上去查看刚刚的类图。这个ConditionObject是AQS的一个内部类,每个条件变量维护了一个条件队列(我们可以看到ConditionObject中有firstWaiter lastWaiter等)用来存放调用相应condition的await()方法而被阻塞的线程。

我们来看condition的await方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建节点并插入尾部
Node node = addConditionWaiter();
// 释放当前线程获取的锁
// 将状态存入
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 调用lockSupport的park挂起线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 插入条件队列尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 释放当前线程获取的锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

我们再来看看signal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队列中的队头节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 转移节点到AQS队列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 进入AQS队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws,Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}

总结一下: 当多个线程调用lock.lock()方法的时候,未竞争到锁的线程会被转换成一个NOde节点插入AQS队列中,如果获取到了锁的线程(因为某种条件不满足)调用了await()方法的时候,会被转换成Node节点并插入相应的Condition条件队列中并且释放锁,这个时候未获取到锁的线程(在AQS队列中)就可以重新竞争锁了,如果另外一个线程调用条件变量的signal()或者signalAll()方法,会把条件变量队列中的一个或全部Node移动到AQS阻塞队列里

独占锁 ReentrantLock 的原理

类图结构

ReentrantLock

1
2
3
4
5
6
7
// 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

其中 ReentrantLock 中有一个内部类Sync 这个类继承了AQS,并且扩展了公平锁(FairSync)和非公平锁(NonFairSync)

在Sync中AQS的state状态值表示 线程的可重入次数

获取锁

  1. void lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 在ReentrantLock中的lock方法调用了sync的lock
public void lock() {
sync.lock();
}
// 在Sync中是一各抽象方法是子类去实现
abstract void lock();
// 公平锁实现
final void lock() {
acquire(1);
}
// 非公平锁实现
final void lock() {
// CAS设置状态值
// 如果成功则设置锁的持有者为当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 不然调用AQS的acquire
else
acquire(1);
}
// 再来回顾一下AQS的acquire方法
public final void acquire(int arg) {
// 首先尝试调用tryAcquire
// 不成功则加入AQS队列并设置标记为EXCLUSIVE(抢占独占资源而被阻塞的标志)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 我们知道AQS并没有实现具体的tryAcquire方法 需要子类去实现
// 非公平锁实现的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 非公平锁
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前状态值
int c = getState();
// 如果状态值为0 那么没有线程获取锁
if (c == 0) {
// 如果CAS设置成功
if (compareAndSetState(0, acquires)) {
// 设置锁持有者为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果状态值不为0且当前线程为锁的持有者 那么将状态值加上参数acquires
// 可重入锁的关键
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 数值校验
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平锁的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取装填值
int c = getState();
if (c == 0) {
// 注意这里增加了一个hasQueuedPredecessors()校验
// 根据字面意思就是判断是否有前驱节点
// 如果没有那么执行和刚刚一样
// 如果有且不是锁持有者就直接退出竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平锁实现的关键
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 验证当前队列是否只有一个元素或者当前线程是否是队列第一个元素(是否在哨兵节点的后面)
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

我们来回顾一下非公平机制和公平机制到底是怎么实现的。

比如这个时候有两个线程竞争ReentrantLock 线程A和线程B并且线程C已经获取了ReentrantLock,当线程A进入lock()方法然后进入acquire()方法再进入nonfairTryAcquire()方法(非公平策略)会因为这个锁已经被其他线程占有而被阻塞进入AQS队列,线程B也同时竞争锁,进入进入lock()方法然后进入acquire()方法再进入nonfairTryAcquire()方法(非公平策略),而这个时候恰恰线程C释放了锁,那么线程B就会直接判断state值发现为0然后线程B就抢占了。我们可以发现,线程B是后来的但是先获得了锁,这就是非公平的体现。

而公平策略中会在抢占锁之前判断前面有没有线程在等待,如果有那么该线程直接放弃竞争。所以公平锁的资源浪费会比非公平高。

  1. void lockInterruptibly() 方法

    和lock()方法类似,但是它会对中断进行响应,就是当前线程在调用该方法的时候,其他线程如果调用了当前线程的interrupt()方法,该线程会抛出InterruptException异常然后返回。

释放锁

  1. void unlock() 方法

    尝试释放锁,如果当前线程持有锁则将AQS的state值减一,如果减下来为0那么该线程会释放锁,否则仅仅减一。如果当前线程没有持有锁而调用该方法会抛出IllegalMonitorStateException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 首先尝试
if (tryRelease(arg)) {
// 如果释放成功那么唤醒下一个
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 将状态值减去参数
int c = getState() - releases;
// 如果没有获得锁的线程调用 抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 设置free标志为false 后面要返回释放是否成功的
boolean free = false;
// 如果减去后为0那么就释放
if (c == 0) {
free = true;
// 设置锁的持有者为空
setExclusiveOwnerThread(null);
}
// 设置标志
setState(c);
// 返回释放是否成功
return free;
}

读写锁 ReentrantReadWriteLock 原理

读写锁类图结构

ReentrantReadWriteLock

读写锁内部维护了一个ReadLock和WriteLock,他们 依赖Sync来实现具体功能, 而Sync继承AQS并且提供了公平和非公平锁的实现。

AQS中只维护了一个state变量,而读写锁中通过将state一分为二控制读状态和写状态。它巧妙的 将state的高16位表示读状态(也就是读锁的次数),低16位表示写状态也就是写锁的可重入次数

1
2
3
4
5
6
7
8
9
10
11
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
// 返回读锁线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 返回写锁可重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

其中 firstReader 用来获取第一个获取到读锁的线程, firstReaderHoldCount 则记录第一个获取到读锁的线程获取读锁的可重入次数, cachedHoldCounter 用来记录最后一个获取读锁的线程获取读锁的可重入次数。

1
2
3
4
5
6
// 内部类 里面维护了count和线程id
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

readHolds 是 ThreadLocal变量,用来存放出去第一个获取读锁线程外的其他线程获取读锁的可重入次数, ThreadLocalHoldCounter 继承了ThreadLocal。

1
2
3
4
5
6
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

写锁的获取与释放

写锁通过WriteLock来实现。

  1. void lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// WriteLock的lock()
public void lock() {
sync.acquire(1);
}
// AQS的acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 读写锁的sync的tryAcquire
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前状态 如果为0那么就代表没有线程获取读锁或者写锁
int c = getState();
// 获取写锁 即低16位
int w = exclusiveCount(c);
// 如果有线程获取写锁或者读锁
if (c != 0) {
// 因为肯定有线程获取写锁或者读锁 而且w=0代表没有线程获取写锁
// 则肯定有线程获取读锁
// 这里代表的是有线程获取读锁
// 或者有线程获取读锁或者写锁且当前线程不是写锁拥有者
// 的时候返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果有线程获取写锁并且是当前线程的
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 如果当前没有线程获取读锁或者写锁 则尝试设置状态 不成功返回false 成功设置当前锁拥有者为当前线程并放回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

我们来看一下writerShouldBlock的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 非公平实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
// 直接return false
// 代表非公平直接竞争
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 公平实现
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 判断AQS中是否有前驱的
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. boolean tryLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean tryLock( ) {
return sync.tryWriteLock();
}
// 和lock差不多 但是当当前已经有线程持有写锁或者读锁 直接返回false 不阻塞
// 如果CAS成功直接返回true
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
  1. void unlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 成功·则通知唤醒下一个
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 判断当前线程是否是该写锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 如果减到0那么释放
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

读锁的获取域释放

  • void lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void lock() {
sync.acquireShared(1);
}
// AQS的acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// AQS需要被实现的tryAcquireShared
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 读写锁实现的
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断写锁是否被占用
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 如果当前线程是写锁拥有者也可以占用读锁
// 记录当前读锁拥有数
int r = sharedCount(c);
// 判断readerShouldBlock
// 队列里第一个是否在获取写锁 如果不是那么判断是否到达最大值 如果不是那么cas状态+1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果为0 那么设置firstReader
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前线程已经是first了那么增加firstReaderHoldCount
firstReaderHoldCount++;
} else {
// 不然放入cachedHoldCounter中
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 以上都不满足
// 即写锁不被占用
// 就自旋获取
return fullTryAcquireShared(current);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// 非公平的readerShouldBlock实现
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 如果队列里面存在一个元素 则判断第一个元素是否在尝试获取写锁
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
  • void unlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void unlock() {
sync.releaseShared(1);
}
// 读写锁实现的tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 判断当前线程是不是第一个reader
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果是1 那么设置第一个读者为空
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 不然 --
firstReaderHoldCount--;
} else {
// 那么找其他线程 并通过线程id获取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取状态
int c = getState();
// 释放读锁对读取器没有影响,但如果读锁和写锁现在都是空闲的,它可能允许等待写入器继续进行
int nextc = c - SHARED_UNIT;
// 循环知道自己的读技术-1 CAS更新成功
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

JDK8中新增的 StampedLock 探究

概述

StampedLock是JDK8中新增的锁,它提供了三种模式的读写锁

  1. 写锁。 是一个独占锁而且是不可重入锁,请求该锁成功后会返回一个stamp参数来表示该锁的版本,当释放该锁的时候需要调用unlockWrite并传递获取锁是的stamp参数,并且它提供一个非阻塞的 tryWriteLock 方法。
  2. 悲观读锁。 是一个共享锁,在没有线程占有写锁的情况下可以获取。如果有线程占用写锁,那么竞争该锁的线程会被阻塞(和读写锁的读锁很像 但是他是不可重入锁) 这里悲观指 需要先将数据加锁, 这是在读少写多的去情况的一种考虑。 该锁请求成功后也会返回一个stamp值,并且在调用unlockRead方法时需要传入获取该锁时的stamp来释放锁。并且它提供了一个非阻塞的 tryReadLock 方法。
  3. 乐观读锁。 在操作数据之前没有通过CAS设置锁的装填,仅仅通过位运算测试,如果当前没有线程持有写锁,则简单返回一个非0的stamp版本信息。获取该stamp后再具体操作数据前还需要调用validate方法验证该stamp是否已经不可用,也就是看调用tryOptimisticRead返回stamp后当当前是否有其他线程持有了写锁,如果是则返回0,否则就可以通过该stamp版本的锁对数据进行操作,适用于读多写少的情况。

    这三种锁还能在一定条件下相互转换。

最佳实践

我们可以巧妙的利用三种锁的转换和三种锁性能差异来设计一套最佳实践模板 这也是官方推荐的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 读模板
final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读⼊⽅法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读⼊⽅法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使⽤⽅法局部变量执⾏业务操作
......

// 写模板
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
-------------本文结束感谢阅读-------------