0%

Java 线程池学习(一)

声明: 本文基于 JDK1.8

引子

线程池的作用

  1. 减少资源开销

    线程可以重用,减少每次创建线程、销毁线程的开销。

  2. 提高响应速度

    每次请求到来,由于线程已经创建好,故可以直接执行任务。

  3. 提高线程的可管理性

    线程过多会占用大量资源,导致 OOM 等问题,同时会造成相互之间竞争。线程池可以对线程的创建与停止、线程的数量等加以控制,提高系统稳定性。

  4. 提供定期、延时等功能

线程池的实现原理

线程池一般由两种角色构成:多个工作线程和一个阻塞队列。

Executor 框架

Executor 框架便是 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

因此,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架三要素

任务

执行任务需要实现的 Runnable 接口Callable接口

两者区别:

  1. Runnable 是自从 Java1.0 就有了,而 Callable 是 1.5 之后才加上去的
  2. Runnable 规定的方法是 run(),Callable 规定的方法是 call()
  3. Runnable 的任务执行后没有返回值,Callable 的任务执行后可返回值
  4. Runnable 的 run() 不能抛出受检查异常,Callable 的 call() 可以
  5. Runnable 可以直接被 Thread 执行,Callable 只能提交给线程池执行
1
2
3
4
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

任务的执行

  • Executor 是任务执行的核心接口
  • ExecutorService 提供了线程池生命周期管理的方法
  • AbstractExecutorService 提供了默认实现
  • ThreadPoolExecutor 是最常用的线程池
  • ScheduledExecutorService 和 ScheduledThreadPoolExecutor 提供延后与定期执行功能
  • ForkJoinPool 是 JDK1.7 新增的,是一种支持任务分解的线程池
1
2
3
public interface Executor {
void execute(Runnable command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1
2
3
4
5
6
7
8
9
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit);
}

异步计算的结果

Future 接口以及其 实现类 FutureTask类

当我们把 Runnable 接口Callable 接口的实现类提交(调用submit 方法)给 ThreadPoolExecutorScheduledThreadPoolExecutor 时,会返回一个 FutureTask 对象

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

Executor 框架运行图

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象
  2. 然后可以把创建完成的 Runnable 对象直接交给 ExecutorService 执行
  3. 如果执行 ExecutorService.submit(…),ExecutorService将返回一个实现 Future 接口的对象
  4. 最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

ThreadPoolExecutor

四个构造方法

1
2
3
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue);
1
2
3
4
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) // with thread factory
1
2
3
4
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // with rejected execution handler
1
2
3
4
5
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, // with thread factory
RejectedExecutionHandler handler) // with rejected execution handler

其实前三个构造方法都是调用最后一个构造方法,分别利用 Executors.defaultThreadFactory() 和 defaultHandler 。

四个必须参数

核心线程池大小、最大线程池大小、空闲线程存活时长、任务队列。

1
private volatile int corePoolSize;
1
private volatile int maximumPoolSize;
1
private volatile long keepAliveTime;    // 纳秒数
1
private final BlockingQueue<Runnable> workQueue;

两个可选参数

1
private volatile ThreadFactory threadFactory;
1
private volatile RejectedExecutionHandler handler;

如果缺省,会使用如下默认值:

1
2
3
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
1
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

线程池状态控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池使用一个 AtomicInteger 存储线程池的状态和线程池中的任务数量。

  • 高 3 位存储线程池的状态
  • 低 29 位存储线程池中的任务数量

线程池有以下五个状态:

  • RUNNING:线程池能够接受新任务,以及对新添加的任务进行处理。
  • SHUTDOWN:线程池不可以接受新任务,但是可以对已添加的任务进行处理。
  • STOP:线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
  • TIDYING:当所有的任务已终止,ctl 记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated()。terminated() 在 ThreadPoolExecuto r类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。
  • TERMINATED:线程池彻底终止的状态
状态 RUNNING SHUTDOWN STOP TIDYING TERMINATED
高三位 111 000 001 010 011

各状态之间的转换如下:

核心方法 execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 1.获取当前线程池状态
int c = ctl.get();
// 2.当前线程数量小于 coreSize 时创建一个新的线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 3.如果当前线程池处于运行状态,并且写入阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 4.重检查,再次获取线程状态;如果线程池状态变了(非运行状态)就需要从阻塞队列移除任务,
// 并尝试判断线程是否全部执行完毕。同时执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 5.如果当前线程池为空就新创建一个线程并执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 6.如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

ScheduledThreadPoolExecutor

四个构造函数

都是调用的 ThreadPoolExecutor 的构造函数,线程池最大容量为 Integer 的最大值,线程空闲的时间为 0,用 DelayWorkQueue 存放线程而非 BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

核心方法 schedule

1
2
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

分别用于无返回值和有返回值的延迟任务。

1
2
3
4
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)

用于固定频率周期的任务,以固定的周期开始下一次执行,不管上一次执行是否完毕。

1
2
3
4
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit)

用于固定延迟周期的任务,下一次执行的开始与上次执行的结束之间保持固定的延迟时间。

ForkJoinPool

ForkJoinPoo l是 JDK1.7 引入的线程池,核心思想是将大的任务拆分成多个小任务(即 fork ),然后在将多个小任务处理汇总到一个结果上(即 join ),非常像 MapReduce 处理原理。

使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,ForkJoin 框架提供了以下两个子类:

1
2
public abstract class RecursiveAction extends ForkJoinTask<Void>  // 用于没有返回结果的任务
public abstract class RecursiveTask<V> extends ForkJoinTask<V> // 用于有返回结果的任务

示例1:无返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolDemo {

public static void main(String[] args) throws InterruptedException {
PrintTask printTask = new PrintTask(1, 300);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(printTask);
// 或者可以用以下调用方法
// forkJoinPool.submit(printTask);
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
forkJoinPool.shutdown();
}
}

class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 50; // 任务分割的最小单元
private int start;
private int end;

public PrintTask(int start, int end) {
super();
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start < THRESHOLD) {
for (int i = start; i <= end; i++) {
System.out.println(Thread.currentThread().getName() + " print value " + i);
}
} else {
int mid = (end - start) / 2 + start;
PrintTask leftTask = new PrintTask(start, mid);
PrintTask rightTask = new PrintTask(mid + 1, end);
leftTask.fork();
rightTask.fork();
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
ForkJoinPool-1-worker-1 print value 264
ForkJoinPool-1-worker-0 print value 39
ForkJoinPool-1-worker-0 print value 40
ForkJoinPool-1-worker-3 print value 189
ForkJoinPool-1-worker-2 print value 114
ForkJoinPool-1-worker-3 print value 190
ForkJoinPool-1-worker-0 print value 41
ForkJoinPool-1-worker-0 print value 42
...

示例2:有返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.Random;
import java.util.concurrent.*;

public class ForkJoinPoolDemo2 {

public static void main(String[] args) throws InterruptedException, ExecutionException {
int[] nums = new int[100];
Random random = new Random();
int total = 0;
for (int i = 0; i <100; i++) {
nums[i] = random.nextInt(100);
total += nums[i];
}
System.out.println("Expect result: " + total);

SumTask sumTask = new SumTask(nums, 0, nums.length);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> future = forkJoinPool.submit(sumTask);
Integer result = future.get();
// 或者可以用以下调用方法
// Integer result = forkJoinPool.invoke(sumTask);
System.out.println("Fork and Join result: " + result);

forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
forkJoinPool.shutdown();
}
}

class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20; // 任务分割的最小单元
private int[] array;
private int start;
private int end;

public SumTask(int[] array, int start, int end) {
super();
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (end - start) / 2 + start;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
}

输出结果:

1
2
Expect result: 4907
Fork and Join result: 4907

-

参考: