Java并发编程学习——Java并发编程之美学习笔记八

Java并发包中线程池 ThreadPoolExecutor 原理探究

介绍

线程池主要解决的两个问题: 一是当执行大量异步任务是线程池能提供较好的性能,使线程可复用而不需要再次new,二是提供一种资源限制和管理手段,比如可以限制线程的个数,动态新增线程等。

另外线程池也提供了许多可调参数可扩展性接口,以满足不同情况的需要。我们可以使用更方便的 Executors 的工厂方法去创建线程池,比如其中的 newCachedThreadPool (线程池线程个数最多科大 Integet.MAX_VALUE 线程自动回收), newFixedThreadPool (固定大小)和 newSingleThreadExecutor (单个线程)。

类图介绍

ThreadPoolExecutor

ThreadPoolExecutor 继承了 AbstractExecutorService, 而 AbstractExecutorService 实现了 ExecutorService, ExecutorService 则实现了 Executor。 ThreadPoolExecutor 中有五个内部类,其中有四个是关于 policy 的,这是对于拒绝策略的实现(当线程池的状态不能再添加新任务执行的策略), 还有一个 Worker 类, 他继承了 AQS 和实现了 Runnable,里面封装了firstTask记录着第一次执行的任务,thread保存的线程,这个线程的 Runnable 是 Worker 本身,当初始化的时候会将 this 传入 thread ,当 thread 启动的时候会调用 Worker 实现的 run() 方法, 其中会调用runWorker(this),在这个方法里面就会取出 firstTask 然后调用它的 run 方法。

这里有一个 ctl 的原子变量, 他和前面的读写锁有着异曲同工之妙,它通过一个变量记录着线程池状态和线程池线程个数,其中高三位表示线程池状态,其余表示线程池个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 初始化线程池状态为RUNNING 个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池线程个数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// RUNNING代表接受新任务并且处理阻塞队列的任务
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTTDOWN表示不接受新任务但会处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP表示拒绝新任务并且抛弃阻塞队列里的任务 同时中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有任务都执行完毕(包括阻塞队列里的) 将会调用terminated方法
private static final int TIDYING = 2 << COUNT_BITS;
// 终止状态 terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;

线程池参数如下

corePoolSize: 线程池核心线程数。

workQueue: 用于保存等待执行的任务的阻塞队列,比如基于数组的有界ArrayBlockingQueue,基于链表的无界 LinkedBlockingQueue等等等。

maximunPooSize: 线程池最大线程数

threadFactory: 创建线程的工厂

RejectedExecutionHandler: 饱和拒绝策略 前面提到过

keepAliveTime: 存活时间。如果当前线程池中的线程数量比核心线程数量多的时候,并且是闲置状态,则这些闲置的线程能存活的最大时间。

TimeUnit: 存活时间的时间单位。

mainLock: 独占锁, 用来控制新增 Worker 线程操作的原子性

termination: mainLock对应的条件队列,在线程调用 awaitTermination 时被阻塞的线程放入此队列。

Worker: 上文提到过 是具体承载任务的对象,它继承了 AQS 实现了自己的简单不可重入独占锁,其中state = 0表示锁未被获取,为1表示锁已经被获取,为-1表示创建的默认状态,为了避免该线程在运行newWorker()方法前被中断。

源码分析

  • public void execute(Runnable command)

    execute 方法的作用是提交任务 command 到线程池进行执行。

    execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public void execute(Runnable command) {
// 判空
if (command == null)
throw new NullPointerException();
// 获取线程池状态和线程数量
int c = ctl.get();
// 如果线程数量小于核心数量
if (workerCountOf(c) < corePoolSize) {
// 调用addWorker增加
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果是RUNNING状态则添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取
int recheck = ctl.get();
// 如果不是RUNNING则从队列中删除任务
if (! isRunning(recheck) && remove(command))
// 实施拒绝策略
reject(command);
// 如果线程数量为0增加线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列满则新增线程 失败则实行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
// 双重循环退出第一层循环的标志
retry:
for (;;) {
// 获取到线程池状态和线程数量
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// 检查队列是否只在必要时为空
// 将!展开得
// rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
// 以下几种情况会返回false
// 当前线程池状态为STOP,TIDYING或者TERMINATED
// 当前线程状态为SHUTDOWN 并且已经有了第一个任务
// 当前线程状态为SHUTDOWN并且任务队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数量
int wc = workerCountOf(c);
// 如果线程数量大于容量或者当前添加任务不是核心的时候当前线程数量大于最大数量
// 如果是核心那么大于等于核心线程数量的时候返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加线程数量
if (compareAndIncrementWorkerCount(c))
// 成功跳出整个循环
break retry;
c = ctl.get(); // Re-read ctl
// 如果CAS失败查看线程池状态是否改变了
// 变化则重新调到最外层循环再次获取
if (runStateOf(c) != rs)
continue retry;
// 没有改变继续内层循环
}
}
// 到这里说明CAS成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker 这时候会使用工厂来创建thread 但不一定能创建成功
w = new Worker(firstTask);
final Thread t = w.thread;
// 如果thread不为空
if (t != null) {
// 获取到独占锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取状态 避免前面操作的时候线程池状态改变了
int rs = runStateOf(ctl.get());
// 如果线程池状态为RUNNING 或者为 SHUTDOWN 并且第一个任务为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 提前检查线程是否已经开始运行 是则抛出线程状态异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 新增到工作集中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 新增成功启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
  • 工作线程worker的执行

    当用户线程添加到线程池后,由worker来执行,先看下Worker的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Worker(Runnable firstTask) {
// 设置worker状态为-1 避免当前Worker在调用runWorker之前被中断
// 可能其他线程调用了shutdownNow时 worker因为
// state >= 0而导致此时被中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 从工厂中获取线程 此线程会将this作为Runnable参数 因为Worker实现了饿Runnable接口
this.thread = getThreadFactory().newThread(this);
}
// 我们再来看一下runWorker()
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取worker中的第一个任务
Runnable task = w.firstTask;
// 置为空
w.firstTask = null;
// 解锁 其中会把state变为 0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果池停止,则确保线程被中断;如果没有,则确保线程不中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前做一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完成做一些清理工作
afterExecute(task, thrown);
}
} finally {
// 释放
task = null;
// 完成任务++
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 清理工作
processWorkerExit(w, completedAbruptly);
}
}
  • shutdown 操作

    调用shutdown方法后 线程池就不会接受新的任务了,但是工作队里里的任务还要执行的,该方法立即返回 不等待队列任务完成再返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 获取独占锁
try {
// 检查权限
checkShutdownAccess();
// 提前设置状态
advanceRunState(SHUTDOWN);
// 给所有空闲线程设置中断标志
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没被中断 则尝试获取worker的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果打断一个 那么跳出循环 不然中断所有workers里的线程 (工作集合)
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

总结

线程池巧妙地使用了Integer类型表示线程池状态和线程数量,在整个线程池中主要的操作其实就是execute,当执行的时候会判断线程池状态 线程数量等因素。如果不满足那么会将任务(Runnable)加入阻塞队列 ,如果成功那么会加入工作集中并且执行。整个线程池能做到线程复用主要是吧Runnable和Thread分开来了,如果线程存活的时候且任务为空,当一个任务进来的时候该线程就能执行该任务。这样就减少了一些不必要的开销。当然还有很多线程池管理的细节,这里就不细说了。

-------------本文结束感谢阅读-------------