Java并发编程学习——Java并发编程之美学习笔记七

ConcurrentLinkedQueue 原理探究

ConcurrentLinkedQueue 类图结构

ConcurrentLinkedQueue

ConcurrentLinedQueue 内部使用单向链表的方式实现,其中有两个volatile类型的Node节点分别用来存放队列的首尾节点。

Node节点里面维护了一个使用 volatile 修饰的变量item,用来存放节点的值,next用来存放下一个节点,其内部使用 Unsafe 工具类提供的 CAS 算法来保证入队时操作链表的原子性。

从无参构造函数来看,默认头尾节点指向item为null的哨兵节点。ConcurrentLinkedQueue提供了一个传递Collection的有参构造函数,它会把Collection中的item封装成Node然后存放在队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}

ConcurrentLinkedQueue 原理介绍

ConcurrentLinkedQueue的原理很复杂,因为它是通过CAS实现来实现线程安全的 并且是无锁的 所以会考虑到很多种情况,这里只对代码和方法作用作简单的解析,如果想深入原理可以参考Java并发编程之美的第七章。

  • offer 操作

    offer(E e) 是在队列末尾添加一个元素。 由于 ConcurrentLinkedQueue 是无界队列,所以该方法一直会返回true。 另外使用的是 CAS 无阻塞算法, 因此该方法不会阻塞挂起调用的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(E e) {
// 判断是否为null 如果为null抛出 NPE 异常
checkNotNull(e);
// 封装成Node
final Node<E> newNode = new Node<E>(e);
// 通过循环加入队列末尾
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// 如果 p 的next为null 代表p为最后一个
// 执行插入
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
// 多线程操作的时候由于 poll操作可能会把head变成自引用
else if (p == q)
// 需要重新设置head节点
p = (t != (t = tail)) ? t : head;
else
// 不然重新寻找尾结点
p = (p != t && t != (t = tail)) ? t : q;
}
}

对于入队操作,offer会考虑到很多中情况,比如当进行到某些操作另一个线程调用了offer或者poll方法导致队列变化,正如上面代码中无限循环中的判断语句,其中的判断语句就是为了发生的多线程并发情况做准备,而整个offer(E e)方法最重要的就是使用CAS来控制某一时刻只有一个线程可以追加元素到队列末尾,而CAS失败的线程会通过无限循环判断队列的变化来再次进行CAS尝试,也就是说offer通过 无限循环不断进行 CAS 尝试方式来代替阻塞算法挂起调用线程 。 相比阻塞算法,这就是 使用 CPU 资源换取则色所带来的的开销

  • poll 操作

    poll() 是在队列头部获取并移除元素,如果队列为空则返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E poll() {
// goto标记
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 当前节点有值则通过CAS设置为null
if (item != null && p.casItem(item, null)) {
// 更新头结点
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 如果当前节点被自引用了 则跳出循环在进入循环寻找新的头结点
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

可以看到的是,poll操作也考虑到了很多并发情况并做出处理,不得不说 Goug Lee 真的是一个天才,poll操作也是采用无限循环CAS代替阻塞算法的。

  • peek 操作

    peek() 是获取头部的一个元素但是不移除。和poll差不多 只不过这里不使用CAS来移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
  • size 操作

    size() 是获取队列元素个数,采用的是遍历队列并自增count, 由于这里没有添加任何的锁所以在进行size操作的时候可能会有其他线程进行poll remove offer等操作,所以size并不是一个精确的值。

1
2
3
4
5
6
7
8
9
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
  • remove 操作

    remove(Object object) 是删除指定元素,如果存在多个则删除第一个,如果队列为空则返回 false 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = p.casItem(item, null);
}
next = succ(p);
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}
  • contains 操作

    contains 是判断队列中是否含有指定元素,由于是遍历整个队列并且没有加锁,所以该操作也像size一样不是精确的。比如当调用该方法的时候该元素还在队列中,但是在遍历的时候其他线程调用了remove移除该元素,那么就会返回false。

1
2
3
4
5
6
7
8
9
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}

ConcurrentLinkedQueue 小结

ConcurrentLinkedQueue 底层是使用单向链表来保存队列元素的,每个元素包装成Node节点,队列是靠头尾节点来维护的,创建队列的时候头尾节点指向一个item为null的哨兵节点。第一次执行 peek 或者 first 操作的时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,在进行size contains操作的时候可能其他线程同时进行了offer remove等操作导致size contains不精确。

入队,出队都是操作使用 volatile 修饰的 tail, head节点。要保证在多线程情况下入队线程安全,只要保证两个Node操作的 可见性 和 原子性,由于volatile已经保证了可见性,所以只需要保证原子性就行。

而offer采用CAS来保证两个线程同时调用CAS进行设置的时候只有一个会成功,失败的线程会无限进行CAS尝试直到成功,这样就保证了两个Node节点操作的原子性,而poll方法也是类似的原理。

LinkedBlockingQueue 原理探究

LinkedBlockingQueue 类图结构

LinkedBlockingQueue

LinkedBlockingQueue 也是通过单向链表来实现的。last 和 head 是用来存放尾结点和头结点的,而原子变量 count 用来记录队列中的元素个数。另外还有两个 ReentrantLock 实例 takeLock 和 putLock,其中takeLock是线程进行获取元素的锁,putLock是线程进行添加元素的锁。

另外还有两个条件变量 notEmpty 和 notFull。 它们内部都已一个条件队列,其实这就是一个 生产者——消费者模型

1
2
3
4
5
6
7
8
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

当线程在 LinkedBlockingQueue 实例上执行 take,poll等操作的时候需要获取takeLock,从而保证同一时刻只有一个线程能操作链表头结点。另外由于条件变量 notEmpty 内部的条件队列的维护使用的是takeLock的锁状态管理机制, 所以在调用notEmpty的await和 signal方法前线程必须获得takeLock。当获得takeLock的线程调用了notEmpty的await方法时会被阻塞释放锁并放入相应的条件队列中,只有其他线程调用了notEmpty.signal方法的时候才能被唤醒重新竞争。

和上述一样,当线程在 LinkedBlockingQueue 实例上执行 put offer等操作的时候需要获取putLock,调用notFull的await和 signal 方法前线程也需要获取putLock等。二者相似,这里不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// LinkedBlockingQueue 是一个有界队列 需要制定capacity
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
// 首先调用默认狗仔方法
this(Integer.MAX_VALUE);
// 因为要存入元素 所以要事先获取putLock
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
// 设置大小
count.set(n);
} finally {
putLock.unlock();
}
}

LinkedBlockingQueue 原理介绍

  • offer 操作

    向队列尾部插入一个元素,如果队列中有空闲则插入成功返回true,如果队列已满则丢弃当前元素返回false,如果元素为null 则抛出NPE,另外该方法是非阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public boolean offer(E e) {
// 判空
if (e == null) throw new NullPointerException();
// 获取count
final AtomicInteger count = this.count;
// 如果队列已满
if (count.get() == capacity)
return false;
int c = -1;
// 封装成Node
Node<E> node = new Node<E>(e);
// 获取putLock
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 为什么获取锁还要再次判断?
// 因为有可能出现两个线程同时进入方法然后同时要获取锁
// 比如线程A首先获取锁然后添加元素导致队列已满
// 线程B原本以判断过队列是否已满 那时是未满 但这次进去就满了
// 所以还要判断一次
// 如果队列未满
if (count.get() < capacity) {
// 入队
enqueue(node);
// 增加
c = count.getAndIncrement();
// 如果队列还有空闲位置
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 这个时候count肯定是大于0的 因为前面做了自增
// 这个时候则通知因为队列空而阻塞的线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
  • put 操作

    向队列尾部插入一个元素,如果队列有空闲则插入成功后返回,如果队列已满则阻塞当前线程,知道队列有空闲插入成功后返回。如果在阻塞过程中被其他线程设置了中断标志,则被阻塞线程会抛出InterruptException而返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void put(E e) throws InterruptedException {
// 判空
if (e == null) throw new NullPointerException();
int c = -1;
// 封装Node
Node<E> node = new Node<E>(e);
// 获取锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 这里调用了lockInterruptibly 这是能够响应中断的关键
putLock.lockInterruptibly();
try {
// 放置虚假唤醒
// 如果队列满
while (count.get() == capacity) {
notFull.await();
}
// 不然入队
enqueue(node);
c = count.getAndIncrement();
// 队列还有空闲位置 通知因为队列满而被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 此时count大于0 通知因为空而被阻塞的线程
if (c == 0)
signalNotEmpty();
}
  • poll 操作

    从队列头移除一个元素,如果队列为空则返回null,该方法是非阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E poll() {
final AtomicInteger count = this.count;
// 判断队列是否为空
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 获取takeLock
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 如果队列不为空则出队
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
// 如果出队之后还有元素 通知因为队列空而被阻塞的队列
if (c > 1)
notEmpty.signal();
}
} finally {
// 释放锁
takeLock.unlock();
}
// 此时队列肯定有空闲位置
if (c == capacity)
signalNotFull();
return x;
}
  • peek 操作

    获取队列头元素但是不溢出,如果队列为空则返回null,该方法是非阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
if (count.get() == 0)
return null;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
  • take 操作

    获取当前队列头部元素并且移除,如果队列为空则阻塞知道队列不为空然后返回元素,如果阻塞过程被其他线程设置了中断标志则抛出InterruptedException然后返回。和前面的 put 操作正好相反。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 抛出InterruptedException的关键
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
  • remove 操作

    删除队列中指定元素 有就删除并返回true,没有则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean remove(Object o) {
// 为空则直接false
if (o == null) return false;
// 获取双重锁 因为这里要先获取然后移除
fullyLock();
try {
// 遍历链表
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
// 移除
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 释放双重锁
fullyUnlock();
}
}
// 注意这里双重锁的获取和释放的顺序是颠倒的
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
  • size 操作

    获取当前队列元素个数

1
2
3
4
public int size() {
// 直接返回count 因为count原子变量一直被维护着。
return count.get();
}

LinkedBlockingQueue 小结

LinkedBlockingQueue 的内部通过单向链表来实现的, 对头尾节点的操作分别使用了不同的独占锁,每个独占锁对应相应的Condition和条件队列,其实实现的是一个生产消费模型。

ArrayBlockingQueue 原理探究

ArrayBlockingQueue 类图结构

ArrayBlockingQueue

我们可以看出,ArrayBlockingQueue 内部有个数组items用来存放队列元素,putIndex 用来表示入队元素下标, takeIndex 用来表示出队元素下标, count 统计队列元素个数。这些变量都没有使用volatile 因为访问这些变量都是在锁块内,而锁已经保证了可见性。另外有一个独占锁lock用来保证出队入队的操作的原子性,这保证了同时只有一个线程能进行入队,出队操作,另外,notEmpty,notFull条件变量用来进行出队入队的同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 有界队列需要传入容量大小
public ArrayBlockingQueue(int capacity) {
// 默认非公平锁
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 同时生成锁和条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 首先构建
this(capacity, fair);
// 获取锁然后将集合中的元素放入数组中
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
// 维持count
count = i;
// 如果个数已经到了容量那么设置putIndex为0
putIndex = (i == capacity) ? 0 : i;
} finally {
// 释放锁
lock.unlock();
}
}

ArrayBlockingQueue 原理介绍

  • offer 操作

    向队列尾部插入一个元素,如果队列有空闲空间则插入返回true,如果已满则丢弃并返回false,该方法是非阻塞的。作用和LinkedBlockingQueue中的offer方法基本一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean offer(E e) {
// 判空
checkNotNull(e);
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果已满则丢弃返回false
if (count == items.length)
return false;
else {
// 入队
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 获取items数组
final Object[] items = this.items;
// 存入入队索引对应的空间
items[putIndex] = x;
// 如果队列满 则将putIndex置为0
if (++putIndex == items.length)
putIndex = 0;
count++;
// 通知因为队列空而被阻塞的线程
notEmpty.signal();
}
  • put 操作

    向队列尾部插入一个元素,如果队列有空闲则插入返回true,如果队列已满则阻塞当前线程知道队列有空闲插入后返回true,如果阻塞过程中被设置中断标志则抛出InterruptedException返回。和LinkedBlockingQueue的put也一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
// 关键
lock.lockInterruptibly();
try {
// 防止虚假唤醒使用while
while (count == items.length)
// 如果满则阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
  • poll 操作

    从队列头获取并移除一个元素,如果队列为空则返回null,是非阻塞方法,和LinkedBlockingQueue 的poll方法一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll() {
final ReentrantLock lock = this.lock;
// 获取独占锁
lock.lock();
try {
// 如果不为空则出队 为空返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 将takeIndex索引下的元素置为null
items[takeIndex] = null;
// 如果出队索引是最后一个 那么重置为0
if (++takeIndex == items.length)
takeIndex = 0;
// count减一
count--;
if (itrs != null)
itrs.elementDequeued();
// 通知因为队列满而被阻塞的线程
notFull.signal();
return x;
}
  • take 操作

    获取当前队列头部元素移除,如果队列空则阻塞线程知道队列不为空被激活然后移除返回,如果阻塞过程被设置中断标志则抛出InterruptedException然后返回。和LinkedBlockingQueue的take一样。

1
2
3
4
5
6
7
8
9
10
11
12
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 关键
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
  • peek 操作

    获取头部元素但不从队列中移除它,如果队列为空则返回null,是非阻塞方法,和LinkedBlockingQueue的peek一样。

1
2
3
4
5
6
7
8
9
10
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列为空返回null
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
  • size 操作

    计算当前队列元素个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int size() {
final ReentrantLock lock = this.lock;
// 在这里要加锁
// 因为不加锁 如果其他线程调用offer poll等操作的时候会对count进行修改
// 而这里count没有使用volatile修饰 不能保证可见性,所以需要在锁块里。
// 这样返回的就是从主内存中的count,如果不加锁count调用此方法的时候会被放入缓存中!
lock.lock();
try {
// 直接返回维护的count
return count;
} finally {
lock.unlock();
}
}

ArrayBlockingQueue 小结

ArrayBlockingQueue 通过使用全局独占锁实现了同时只能有一个线程操作队列(包括入队 出队 获取个数等操作),由此可见,这个锁的粒度比较大,所以并发性能有一定影响。

PriorityBlockingQueue 原理探究

PriorityBlockingQueue 介绍

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部使用的是平衡二叉树堆实现的,所以遍历队列元素不保证有序,默认使用对象的compareTo方法提供比较规则,如果需要自定义可以自定义comparators。

PriorityBlockingQueue 类图结构

PriorityBlockingQueue

PriorityBlockingQueue 中有一个数组queue用来存放队列元素,size用来存放队列个数。 allocationSpinLock 是一个自旋锁,其使用 CAS 来保证同时只有一个线程能进行扩容操作,状态为0或者1,0表示当前没有线程进行扩容。

由于是一个优先级队列,所以有一个比较器 comparator 用来比较元素大小。lock独占锁用来控制同时只有一个线程能进行入队,出队操作。notEmpty条件用来实现take方法的阻塞,没有notFull是因为这是一个无界的优先级队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 默认队列大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 默认构造
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定大小
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 指定大小和比较器 如果默认则使用当前元素自身的比较器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

PriorityBlockingQueue 原理介绍

  • offer 操作

    在队列中插入一个元素,由于是无界队列,所以一直返回true,这里没有在队头插入是因为这是优先级队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean offer(E e) {
// 判空
if (e == null)
throw new NullPointerException();
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 如果size 队列个数大于容量了则进行扩容操作
while ((n = size) >= (cap = (array = queue).length))
// 扩容
tryGrow(array, cap);
try {
// 将队列比较器赋值过去
Comparator<? super E> cmp = comparator;
// 如果为空
if (cmp == null)
siftUpComparable(n, e, array);
else
// 不为空
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 入队成功后通知因为队伍空阻塞的线程
notEmpty.signal();
} finally {
// 解锁
lock.unlock();
}
return true;
}

我们需要看看如果size大于容量的时候 优先队列是怎么进行扩容操作的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void tryGrow(Object[] array, int oldCap) {
// 首先将进行offer时所抢占的锁释放
// 因为扩容是一个费事的操作 这个时候释放出来让其他线程执行
// 会提高并发能力
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 判断allocationSpinLock自旋锁标志 如果为0代表没有线程进行扩容
// 如果为0使用CAS设置为1 控制只有当前线程能执行扩容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
// oldCapr如果小于64 则扩容为 oldCap + 2
// 如果否则扩容50%,并且限制最大为MAX_ARRAY_SIZE
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// 如果同时多个线程进行CAS CAS失败的线程会进入这里
// newArray == null 代表第一个线程进去还没扩容完成 其余CAS失败的线程执行到这
// yield尽量让出CPU资源 目的是让扩容的线程执行完扩容之后优先获得锁
// 但这里的yield让出资源时不保证的 有可能扩容线程还未扩容完 yield就已经结束
// 然后CAS失败的线程获取锁直接退出到offer方法
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
// 如果扩容完毕并且获取到锁 复制当前queue里面的元素到newArray
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

我们还需要看如何建堆的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
// n为队列个数
// 如果大于0代表队列中有元素 所以需要“排序”
while (k > 0) {
// 通过位移获取插入节点的父节点索引 相当于 (k - 1) / 2
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 如果key 大于 父节点 那么跳出循环
// 不然交换并继续循环 由此可见这是一个最小堆
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
  • poll 操作

    poll操作时获取队列内部堆树的根节点元素,如果队列为空返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// 获取最大索引
int n = size - 1;
// 队列为空返回null
if (n < 0)
return null;
else {
Object[] array = queue;
// 获取队首元素
E result = (E) array[0];
// 获取队尾元素并赋值为null
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 将变量x插入数组下标为0的位置
// 就是将堆底元素删除并插入堆顶然后重新排序
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
  • take 操作

    获取队列内部堆树的根节点元素,如果队列为空则阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 调用对中断响应的lockInterruptibly
lock.lockInterruptibly();
E result;
try {
// 防止虚假唤醒
while ( (result = dequeue()) == null)
// 阻塞到条件队列中
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
  • size 操作

    计算队列元素个数。

1
2
3
4
5
6
7
8
9
10
public int size() {
// 获取锁 由于是加锁的所以获取的size是精确的。
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}

PriorityBlockingQueue 小结

PriorityBlockingQueue 内部使用 二叉树堆 来维护元素优先级,使用数组作为元素存储的数据结构,这个数组是可扩容的,当添加元素的时候需要重置顺序,通过 递归比较插入节点和父节点,当移除根元素的时候需要 删除末尾节点并将这个结点放入根节点的位置重新设置堆的顺序,这也是堆实现的通用方法。队列还维护一个comparator 用户可以自定义。总体通过一个独占锁来控制同时只有一个线程可以进行出队和入队操作,只是用了一个notEmpty的条件变量因为他是一个无界队列,执行put方法永远不会处于await和take方法是阻塞队列并且可以中断。

DelayQueue 原理探究

DelayQueue并发队列是一个 无界阻塞延迟队列 。队列中每个元素都有过期时间,当从队列中获取元素的时候只有过期元素才会出队,队列头元素就是最快要过期的元素。

Delay 类图结构

DelayQueue

DelayQueue 内部使用 PriorityQueue 存放数据, 使用 ReentrantLock 实现线程同步。另外队列里的元素要实现 Delayed 接口, 由于每个元素都有过期时间所以要实现获知当前元素还有多少时间过期的接口,由于内部使用优先队列来实现,所以要实现元素之间相互比较的接口。

1
2
3
4
public interface Delayed extends Comparable<Delayed> {
// 获取还剩多长时间
long getDelay(TimeUnit unit);
}

其中 lock 是独占锁,available 是对应的条件变量, 其中 leader 变量的使用基于 Leader-Follower 模式的变体,用于尽量减少不必要线程的等待当一个线程调用队列的take方法变为leader线程后,它会调用条件变量 available 的awaitNanos(delay)等待delay时间,但是其他线程(follower线程)会调用 available 条件的await()进行无限等待。 leader线程延迟时间过期后会退出take方法, 并通过调用 available.signal()方法唤醒一个follower线程被选举为leader线程。

DelayQueue 主要函数原理讲解

  • offer 操作

    插入元素到队列,如果元素为null抛出NPE,否则由于无界一直返回true,插入元素要实现 Delayed 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e) {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 添加元素到优先级队列
q.offer(e);
// 获取头元素 如果是e
// 那么设置leader为null
if (q.peek() == e) {
leader = null;
// 通知某个等待线程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
  • take 操作

    获取并移除队列里延迟过期的元素,如果队列里没有延迟过期的元素则等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 可以被中断
lock.lockInterruptibly();
try {
for (;;) {
// 获取优先队列队首元素,如果为空则等待
E first = q.peek();
if (first == null)
available.await();
else {
// 获取延迟时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于0那么直接移除
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// 查看leader是否为null 不为null说明其他线程也在执行take
if (leader != null)
// 其他线程在执行则等待无限时间
available.await();
else {
// 将当前线程设为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞delay时间
// 这是会释放锁 其他线程能进行操作
available.awaitNanos(delay);
} finally {
// 最后将leader设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader为null 且队头元素不为null 激活等待线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
  • poll 操作

    获取并移除过期元素,没有过期元素返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
// 移除后调整堆顺序
if (s != 0)
siftDown(0, x);
return result;
}
  • size 操作
1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}

DelayQueue 总结

DelayQueu内部使用了PriorityQueue存放数据,使用ReentrantLock实现线程同步,队列里的元素要实现Delayed接口,在出队时要判断元素是否过期了。

-------------本文结束感谢阅读-------------