ConcurrentLinkedQueue 原理探究
ConcurrentLinkedQueue 类图结构

ConcurrentLinedQueue 内部使用单向链表的方式实现,其中有两个volatile类型的Node节点分别用来存放队列的首尾节点。
Node节点里面维护了一个使用 volatile 修饰的变量item,用来存放节点的值,next用来存放下一个节点,其内部使用 Unsafe 工具类提供的 CAS 算法来保证入队时操作链表的原子性。
从无参构造函数来看,默认头尾节点指向item为null的哨兵节点。ConcurrentLinkedQueue提供了一个传递Collection的有参构造函数,它会把Collection中的item封装成Node然后存放在队列中。
1 | public ConcurrentLinkedQueue() { |
ConcurrentLinkedQueue 原理介绍
ConcurrentLinkedQueue的原理很复杂,因为它是通过CAS实现来实现线程安全的 并且是无锁的 所以会考虑到很多种情况,这里只对代码和方法作用作简单的解析,如果想深入原理可以参考Java并发编程之美的第七章。
offer 操作
offer(E e) 是在队列末尾添加一个元素。 由于 ConcurrentLinkedQueue 是无界队列,所以该方法一直会返回true。 另外使用的是 CAS 无阻塞算法, 因此该方法不会阻塞挂起调用的线程。
1 | public boolean offer(E e) { |
对于入队操作,offer会考虑到很多中情况,比如当进行到某些操作另一个线程调用了offer或者poll方法导致队列变化,正如上面代码中无限循环中的判断语句,其中的判断语句就是为了发生的多线程并发情况做准备,而整个offer(E e)方法最重要的就是使用CAS来控制某一时刻只有一个线程可以追加元素到队列末尾,而CAS失败的线程会通过无限循环判断队列的变化来再次进行CAS尝试,也就是说offer通过 无限循环不断进行 CAS 尝试方式来代替阻塞算法挂起调用线程 。 相比阻塞算法,这就是 使用 CPU 资源换取则色所带来的的开销 。
poll 操作
poll() 是在队列头部获取并移除元素,如果队列为空则返回null。
1 | public E poll() { |
可以看到的是,poll操作也考虑到了很多并发情况并做出处理,不得不说 Goug Lee 真的是一个天才,poll操作也是采用无限循环CAS代替阻塞算法的。
peek 操作
peek() 是获取头部的一个元素但是不移除。和poll差不多 只不过这里不使用CAS来移除。
1 | public E peek() { |
size 操作
size() 是获取队列元素个数,采用的是遍历队列并自增count, 由于这里没有添加任何的锁所以在进行size操作的时候可能会有其他线程进行poll remove offer等操作,所以size并不是一个精确的值。
1 | public int size() { |
remove 操作
remove(Object object) 是删除指定元素,如果存在多个则删除第一个,如果队列为空则返回 false 。
1 | public boolean remove(Object o) { |
contains 操作
contains 是判断队列中是否含有指定元素,由于是遍历整个队列并且没有加锁,所以该操作也像size一样不是精确的。比如当调用该方法的时候该元素还在队列中,但是在遍历的时候其他线程调用了remove移除该元素,那么就会返回false。
1 | public boolean contains(Object o) { |
ConcurrentLinkedQueue 小结
ConcurrentLinkedQueue 底层是使用单向链表来保存队列元素的,每个元素包装成Node节点,队列是靠头尾节点来维护的,创建队列的时候头尾节点指向一个item为null的哨兵节点。第一次执行 peek 或者 first 操作的时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,在进行size contains操作的时候可能其他线程同时进行了offer remove等操作导致size contains不精确。
入队,出队都是操作使用 volatile 修饰的 tail, head节点。要保证在多线程情况下入队线程安全,只要保证两个Node操作的 可见性 和 原子性,由于volatile已经保证了可见性,所以只需要保证原子性就行。
而offer采用CAS来保证两个线程同时调用CAS进行设置的时候只有一个会成功,失败的线程会无限进行CAS尝试直到成功,这样就保证了两个Node节点操作的原子性,而poll方法也是类似的原理。
LinkedBlockingQueue 原理探究
LinkedBlockingQueue 类图结构

LinkedBlockingQueue 也是通过单向链表来实现的。last 和 head 是用来存放尾结点和头结点的,而原子变量 count 用来记录队列中的元素个数。另外还有两个 ReentrantLock 实例 takeLock 和 putLock,其中takeLock是线程进行获取元素的锁,putLock是线程进行添加元素的锁。
另外还有两个条件变量 notEmpty 和 notFull。 它们内部都已一个条件队列,其实这就是一个 生产者——消费者模型 。
1 | /** Lock held by take, poll, etc */ |
当线程在 LinkedBlockingQueue 实例上执行 take,poll等操作的时候需要获取takeLock,从而保证同一时刻只有一个线程能操作链表头结点。另外由于条件变量 notEmpty 内部的条件队列的维护使用的是takeLock的锁状态管理机制, 所以在调用notEmpty的await和 signal方法前线程必须获得takeLock。当获得takeLock的线程调用了notEmpty的await方法时会被阻塞释放锁并放入相应的条件队列中,只有其他线程调用了notEmpty.signal方法的时候才能被唤醒重新竞争。
和上述一样,当线程在 LinkedBlockingQueue 实例上执行 put offer等操作的时候需要获取putLock,调用notFull的await和 signal 方法前线程也需要获取putLock等。二者相似,这里不再赘述。
1 | // LinkedBlockingQueue 是一个有界队列 需要制定capacity |
LinkedBlockingQueue 原理介绍
offer 操作
向队列尾部插入一个元素,如果队列中有空闲则插入成功返回true,如果队列已满则丢弃当前元素返回false,如果元素为null 则抛出NPE,另外该方法是非阻塞的。
1 | public boolean offer(E e) { |
put 操作
向队列尾部插入一个元素,如果队列有空闲则插入成功后返回,如果队列已满则阻塞当前线程,知道队列有空闲插入成功后返回。如果在阻塞过程中被其他线程设置了中断标志,则被阻塞线程会抛出InterruptException而返回。
1 | public void put(E e) throws InterruptedException { |
poll 操作
从队列头移除一个元素,如果队列为空则返回null,该方法是非阻塞的。
1 | public E poll() { |
peek 操作
获取队列头元素但是不溢出,如果队列为空则返回null,该方法是非阻塞的。
1 | public E peek() { |
take 操作
获取当前队列头部元素并且移除,如果队列为空则阻塞知道队列不为空然后返回元素,如果阻塞过程被其他线程设置了中断标志则抛出InterruptedException然后返回。和前面的 put 操作正好相反。
1 | public E take() throws InterruptedException { |
remove 操作
删除队列中指定元素 有就删除并返回true,没有则返回false
1 | public boolean remove(Object o) { |
size 操作
获取当前队列元素个数
1 | public int size() { |
LinkedBlockingQueue 小结
LinkedBlockingQueue 的内部通过单向链表来实现的, 对头尾节点的操作分别使用了不同的独占锁,每个独占锁对应相应的Condition和条件队列,其实实现的是一个生产消费模型。
ArrayBlockingQueue 原理探究
ArrayBlockingQueue 类图结构

我们可以看出,ArrayBlockingQueue 内部有个数组items用来存放队列元素,putIndex 用来表示入队元素下标, takeIndex 用来表示出队元素下标, count 统计队列元素个数。这些变量都没有使用volatile 因为访问这些变量都是在锁块内,而锁已经保证了可见性。另外有一个独占锁lock用来保证出队入队的操作的原子性,这保证了同时只有一个线程能进行入队,出队操作,另外,notEmpty,notFull条件变量用来进行出队入队的同步。
1 | // 有界队列需要传入容量大小 |
ArrayBlockingQueue 原理介绍
offer 操作
向队列尾部插入一个元素,如果队列有空闲空间则插入返回true,如果已满则丢弃并返回false,该方法是非阻塞的。作用和LinkedBlockingQueue中的offer方法基本一样。
1 | public boolean offer(E e) { |
put 操作
向队列尾部插入一个元素,如果队列有空闲则插入返回true,如果队列已满则阻塞当前线程知道队列有空闲插入后返回true,如果阻塞过程中被设置中断标志则抛出InterruptedException返回。和LinkedBlockingQueue的put也一样。
1 | public void put(E e) throws InterruptedException { |
poll 操作
从队列头获取并移除一个元素,如果队列为空则返回null,是非阻塞方法,和LinkedBlockingQueue 的poll方法一样。
1 | public E poll() { |
take 操作
获取当前队列头部元素移除,如果队列空则阻塞线程知道队列不为空被激活然后移除返回,如果阻塞过程被设置中断标志则抛出InterruptedException然后返回。和LinkedBlockingQueue的take一样。
1 | public E take() throws InterruptedException { |
peek 操作
获取头部元素但不从队列中移除它,如果队列为空则返回null,是非阻塞方法,和LinkedBlockingQueue的peek一样。
1 | public E peek() { |
size 操作
计算当前队列元素个数
1 | public int size() { |
ArrayBlockingQueue 小结
ArrayBlockingQueue 通过使用全局独占锁实现了同时只能有一个线程操作队列(包括入队 出队 获取个数等操作),由此可见,这个锁的粒度比较大,所以并发性能有一定影响。
PriorityBlockingQueue 原理探究
PriorityBlockingQueue 介绍
PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部使用的是平衡二叉树堆实现的,所以遍历队列元素不保证有序,默认使用对象的compareTo方法提供比较规则,如果需要自定义可以自定义comparators。
PriorityBlockingQueue 类图结构

PriorityBlockingQueue 中有一个数组queue用来存放队列元素,size用来存放队列个数。 allocationSpinLock 是一个自旋锁,其使用 CAS 来保证同时只有一个线程能进行扩容操作,状态为0或者1,0表示当前没有线程进行扩容。
由于是一个优先级队列,所以有一个比较器 comparator 用来比较元素大小。lock独占锁用来控制同时只有一个线程能进行入队,出队操作。notEmpty条件用来实现take方法的阻塞,没有notFull是因为这是一个无界的优先级队列。
1 | // 默认队列大小 |
PriorityBlockingQueue 原理介绍
offer 操作
在队列中插入一个元素,由于是无界队列,所以一直返回true,这里没有在队头插入是因为这是优先级队列。
1 | public boolean offer(E e) { |
我们需要看看如果size大于容量的时候 优先队列是怎么进行扩容操作的。
1 | private void tryGrow(Object[] array, int oldCap) { |
我们还需要看如何建堆的。
1 | private static <T> void siftUpComparable(int k, T x, Object[] array) { |
poll 操作
poll操作时获取队列内部堆树的根节点元素,如果队列为空返回null
1 | public E poll() { |
take 操作
获取队列内部堆树的根节点元素,如果队列为空则阻塞。
1 | public E take() throws InterruptedException { |
size 操作
计算队列元素个数。
1 | public int size() { |
PriorityBlockingQueue 小结
PriorityBlockingQueue 内部使用 二叉树堆 来维护元素优先级,使用数组作为元素存储的数据结构,这个数组是可扩容的,当添加元素的时候需要重置顺序,通过 递归比较插入节点和父节点,当移除根元素的时候需要 删除末尾节点并将这个结点放入根节点的位置重新设置堆的顺序,这也是堆实现的通用方法。队列还维护一个comparator 用户可以自定义。总体通过一个独占锁来控制同时只有一个线程可以进行出队和入队操作,只是用了一个notEmpty的条件变量因为他是一个无界队列,执行put方法永远不会处于await和take方法是阻塞队列并且可以中断。
DelayQueue 原理探究
DelayQueue并发队列是一个 无界阻塞延迟队列 。队列中每个元素都有过期时间,当从队列中获取元素的时候只有过期元素才会出队,队列头元素就是最快要过期的元素。
Delay 类图结构

DelayQueue 内部使用 PriorityQueue 存放数据, 使用 ReentrantLock 实现线程同步。另外队列里的元素要实现 Delayed 接口, 由于每个元素都有过期时间所以要实现获知当前元素还有多少时间过期的接口,由于内部使用优先队列来实现,所以要实现元素之间相互比较的接口。
1 | public interface Delayed extends Comparable<Delayed> { |
其中 lock 是独占锁,available 是对应的条件变量, 其中 leader 变量的使用基于 Leader-Follower 模式的变体,用于尽量减少不必要线程的等待 。 当一个线程调用队列的take方法变为leader线程后,它会调用条件变量 available 的awaitNanos(delay)等待delay时间,但是其他线程(follower线程)会调用 available 条件的await()进行无限等待。 leader线程延迟时间过期后会退出take方法, 并通过调用 available.signal()方法唤醒一个follower线程被选举为leader线程。
DelayQueue 主要函数原理讲解
offer 操作
插入元素到队列,如果元素为null抛出NPE,否则由于无界一直返回true,插入元素要实现 Delayed 接口。
1 | public boolean offer(E e) { |
take 操作
获取并移除队列里延迟过期的元素,如果队列里没有延迟过期的元素则等待
1 | public E take() throws InterruptedException { |
poll 操作
获取并移除过期元素,没有过期元素返回null
1 | public E poll() { |
- size 操作
1 | public int size() { |
DelayQueue 总结
DelayQueu内部使用了PriorityQueue存放数据,使用ReentrantLock实现线程同步,队列里的元素要实现Delayed接口,在出队时要判断元素是否过期了。