Java 线程与线程池

#线程

#创建线程的方法

  1. 继承 Thread 类,重写 run() 方法。
  2. 实现 Runnable 接口,重写 run() 方法。
  3. 实现 Callable 接口,重写 call() 方法。
  4. 使用线程池。

#Runnable 和 Callable 的区别

Callable 接口和 Runnable 接口用于封装应该由另一个线程执行的任务。Runnable 对象可以通过 Executors 工具类中的 callable(Runnable task)callable(Runnable task, T result) 转换成 Callable 对象。

  • Runnable 是 JDK 1.0 以来就有的接口,Callable 是从 JDK 1.5 开始引入的。
  • Runnable 实例可以由 Thread 类以及 ExecutorService 运行,而 Callable 只能由 ExecutorService 运行。
  • Runnable 不接受任何的参数也不返回值,Callable 可以返回任务计算的结果。
  • Runnable 不能抛出已检查的异常,Callable 可以抛出已检查的异常。
  • Runnable 需要覆盖 run() 函数,callable 需要覆盖 call() 函数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

#线程的状态

  • NEW:尚未启动的线程的线程状态。有了一个线程的实例,但还没调用线程的 start 方法。
  • RUNNABLE:可运行线程的线程状态。包括就绪(READY)和正在运行(RUNNING),当处于就绪状态的线程获得了处理器时间后就变成正在运行。
  • BLOCKED:线程阻塞等待锁。
  • WAITING:等待其他线程的操作,只能通过其他线程唤醒。
  • TIMED_WAITING:等待其他线程,但不会无限期的等待,过了指定时间之间之后自动唤醒。
  • TREMINATED:已终止线程的线程状态,线程已完成执行。

Thread Status

与线程休眠有关的方法:

方法 休眠时线程状态 需不需要捕获 InterruptedException 是否会释放持有的锁 其他
Thread.sleep() TIMED_WAITING 需要 不会 必须指定休眠的时间
Object.wait() 带参数为 TIMED_WAITING;不带参数为 WAITING 需要 可以通过 notify() 或者 notifyAll() 唤醒,但是要在 wait() 之后,否则会丢失唤醒信号。notify() 是随机唤醒一个线程。必须获得对象上的锁才能执行对象的 wait() 方法。
LockSupport.park() WAITING 不需要,但也会响应中断 不会 通过二元信号量实现的阻塞,可以通过 unpark() 唤醒,如果 unpark() 在 park() 之前,可以保证下次 park() 时不被阻塞。

🔖 park() 方法是等待许可,许可是一个二元的信号量,要么为 0 要么为 1。如果此时没有许可,那么线程需要阻塞等待,调用 unpark(Thread) 可以解除阻塞;如果在 park() 之前调用了 unpark(Thread),那么下一次park() 不会被阻塞。多次调用 unpark(Thread) 也只能是下一次不被阻塞!

#start() 方法与 run() 方法的区别

start 方法启用一个线程,并使其进入就绪状态,当获得处理器时间片之后就可以开始运行 run 方法。结果是两个线程同时运行:当前线程(从对 start 方法的调用返回,不需要等待 run 的执行)和另一个线程(执行其 run 方法)。如果在主线程中直接调用 run 方法将会把它当做是主线程下的一个函数去运行,不会以多线程的方式去执行。

#线程的优先级

线程的优先级从 1 到 10,定义了 MIN_PRIORITY = 1NORM_PRIORITY = 5MAX_PRIORITY = 10。可以通过 getPriority 和 setPriority 来获得和更改线程的优先级。在设置线程优先级时,首先要检查线程是否可访问,并且设置的优先级不能大于 MAX_PRIORITY 也不能大于线程所属的线程组的最大优先级。

#线程的终止

  • 任务执行完毕,程序正常退出。
  • 使用退出标志,某些线程需要长时间的运行,只有外部满足某些条件才会退出。例如 run() 方法中是一个 while(!exit) 的循环体,只有设置的 exit 标志为 true 时才退出,exit 是一个外部的条件。
  • 使用 Interrupt() 方法。如果线程处于阻塞状态,会抛出 InterruptedException 异常,可以捕获该异常使线程停⽌的事件得以传播;如果不是阻塞状态就会设置线程的中断状态。
  • 使用 stop()。已经弃用,不安全。

#创建线程池的方法

#为什么使用线程池

提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建和销毁、实现重复利用。

  • 提高响应速度(减少了创建新线程的时间)
  • 降低资源消耗(重复利用线程池中线程,不需要每次都创建)
  • 便于线程的管理

#使用 ThreadPoolExecutor 类

#线程池的状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  // aka 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;  // aka 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;  // aka 000
private static final int STOP       =  1 << COUNT_BITS;  // aka 001
private static final int TIDYING    =  2 << COUNT_BITS;  // aka 010
private static final int TERMINATED =  3 << COUNT_BITS;  // aka 011

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

使用一个 AtomicInteger 类型的 ctl 字段来表示线程池的两个概念字段:线程池的运行状态(run state,32 位整型的高 3 位)和线程池的有效线程数(work count,低 29 位)。因此,在设置线程池最大线程数时,可以设置到 Integer.MAX_VALUE,但实际上要受到表示范围 CAPACITY 的限制,最大为 $2^29 - 1$。使用 runStateOf() 在 ctl 中获得运行状态,使用 workerCountOf() 在 ctl 中获得存活的线程数,使用 ctlOf() 获得 ctl 本身。

各个状态的含义:

  • RUNNING:接受新的任务并处理阻塞队列中的任务。
  • SHUTDOWN:不接受新的任务,但是处理阻塞队列中的任务。
  • STOP:不接受新的任务,不处理阻塞队列中的任务,并中断正在进行的任务。
  • TIDYING:所有任务已经终止,workCount 等于 0,转换到 TIDYING,将会执行 terminated() 钩子方法。

线程池的状态转变:

  • RUNNING -> SHUTDOWN:调用 shutdown() 时,可能隐含在 finalize() 中。
  • (RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow()
  • SHUTDOWN -> TIDYING:当阻塞队列和线程池都为空时。
  • STOP -> TIDYING:当线程池为空时。
  • TIDYING -> TERMINATED:terminated() 方法执行完毕进入 TERMINATED 状态。

线程池状态转变

shutdown()shutdownNow() 的区别:

shutdown() 方法在实现上调用的了 interruptIdleWorkers() 方法,中断所有等待任务的线程(空闲线程),也即不再接受新的任务但是会处理阻塞队列中的任务,进入的状态是 SHUTDOWN,关闭的速度慢但是较为安全;shutdownNow() 在内部调用的是 interruptWorkers(),中断所有的线程,并返回在阻塞队列中的任务列表,进入 STOP 状态,关闭的速度较快但是因为会打断任务的执行所以存在风险。

#线程池构造函数

重点在于理解构造方法的参数,从而避免资源耗尽

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

以参数最多的一个构造方法解释各个参数的含义,其他的构造方法就是设置了一些默认值。

  • corePoolSize:核心线程池的大小,核心线程池中的线程是不会被空闲超时销毁的,除非设置了 allowCoreThreadTimeOut。
  • maximumPoolSize:线程池能够包含的最大的线程数,可以设置成整型的最大值相当于无界,但内部其实是要受到 CAPACITY 的限制。
  • keepAliveTime:超过核心线程池的线程会在经过 KeepAliveTime 的空闲时间后被终止。
  • unit:keepAliveTime 的时间单位。
  • workQueue:阻塞队列,用于在执行任务之前保存任务的队列。此队列将仅保存execute 方法提交的 Runnable 任务!
  • threadFactory:创建线程使用的工厂。使用自定义的线程工厂可以对线程命名,更改线程组、优先级、守护状态等。
  • handler:当线程达到了最大线程数并且阻塞队列也已经满了的时候的处理策略。

#任务提交给线程池的处理

  1. 如果不使用 public boolean prestartCoreThread()public int prestartAllCoreThreads(),前者在线程池启动时创建一个核心线程使其空闲等待任务,后者在线程池启动时创建所有的核心线程并返回创建的线程数,那么启动时线程池内没有线程;默认情况下,当有任务到来的时候才会创建一个线程,每当新到一个任务就创建一个新的线程去执行,直到达到核心线程池的大小。
  2. 当线程数达到 corePoolSize 后,再来新的任务就尝试排队(添加到阻塞队列),如果成功添加到阻塞队列也不意味着任务一定能被成功执行,需要有一些额外的判断(如下源代码中说明)。
  3. 当阻塞队列满了的时候,在线程数小于 maximumPoolSize 时,尝试创建新的线程去处理任务。
  4. 否则,执行拒绝策略。

execute 流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获得线程池的控制变量
    int c = ctl.get();
    // 如果工作线程数小于 corePoolSize,就尝试创建 核心线程 执行任务
    if (workerCountOf(c) < corePoolSize) {
        // 如果创建执行任务的线程成功,就直接返回
        if (addWorker(command, true))
            return;
        // 说明创建失败,重新获得线程池的控制变量
        c = ctl.get();
    }
    // 如果线程池是运行状态并且添加到阻塞队列成功了
    if (isRunning(c) && workQueue.offer(command)) {
        // 添加到阻塞队列不意味着任务就一定能被执行,需要加一些判断
        // 因为线程池没有加锁控制,在此期间可能存在其他线程把线程池的状态改变了,所以需要再次获得 ctl
        int recheck = ctl.get();
        // 如果线程池不在 RUNNING 状态需要把任务移除出队列
        if (! isRunning(recheck) && remove(command))
            // 能够成功移除的话,执行拒绝策略
            reject(command);
        // 如果线程池是运行状态并且工作线程数为 0
        // 或者线程池不是运行状态但是移除任务失败,并且工作线程数为 0
        else if (workerCountOf(recheck) == 0)
            // 添加一个任务为空的非核心线程去处理阻塞队列中的任务
            // 任务为 null 的原因是阻塞队列中任务可能不止一个,需要按一定的顺序执行
            // 非核心线程是因为 corePoolSize 可能设置成 0,调用 addWorker 创建核心线程会失败
            // 导致阻塞队列的任务没法执行
            addWorker(null, false);
    }
    // 如果无法添加到阻塞队列(阻塞队列已满),尝试创建 非核心线程
    else if (!addWorker(command, false))
        // 线程数已经达到 maximumPoolSize,创建失败,执行拒绝策略
        reject(command);
}

#execute() 和 submit() 的区别

  • execute()Executor 接口中的方法,其参数为 Runnable 对象,返回值为空,因此不会返回任何的结果,用于执行不需要返回值的任务。

  • submit()ExecutorService 接口中的方法, ExecutorService 接口继承了 Executor 接口,并且添加了 submit() 方法,用来提交需要返回值的任务。submit() 方法返回一个 Future 对象, Future 表示了一个任务的生命周期,提供了相应的方法来判断任务是否已经完成或者取消。Futureget() 方法的行为取决于任务的状态,如果任务已经完成,会立即返回或者抛出一个异常,如果没有完成,get() 就会阻塞直到任务完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public interface Executor {
	// 在将来的某个时间执行给定的 command。
	// 根据 Executor 实现的判断,该命令可以在新线程、线程池中的线程或调用者线程中执行。
    void execute(Runnable command);
}

public interface ExecutorService extends Executor {
    // 所有的 submit 方法都会返回一个 Future 对象
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
}

#排队策略和阻塞队列的类型

workQueue 的类型为 BlockingQueue<Runnable>,它用来存放等待执行的任务。

排队策略有以下三种:

  1. Direct handoffs.(直接移交) 在这种策略中使用的是 SynchronousQueue,它没有容量,每个插入操作必须等待另一个线程的删除操作,反之亦然。它将任务直接交给工作线程,本身不存储任务;如果没有空闲的线程可以执行任务,那么任务排队失败,就会创建一个新的线程。通常需要将 maximumPoolSize 设置为 Integer.MAX_VALUE 来避免拒绝新提交的任务。在处理可能具有内部依赖关系的任务集时,此策略可避免死锁。如果任务的平均到达速度超过线程的处理速度的话,可能会导致线程无限增长
  2. Unbounded queues.(无界队列) 使用无界队列(例如没有预定义容量LinkedBlockingQueue,默认为 Integer.MAX_VALUE)在核心线程池中的线程都在繁忙时将新任务都放在队列中,线程池中的线程数最大为 corePoolSize,因此 maximumPoolSize 的没有任何影响。每个任务之间相互独立时,使用这种策略是合适的。这种排队方式有助于平滑瞬间的请求突发,但是当任务继续以超过处理速度的速度到达时,可能会导致队列无限增长
  3. Bounded queues.(有界队列) 使用有界队列(例如 ArrayBlockingQueue)和有限的 maximumPoolSize 可以防止资源耗尽。使用时需要权衡最大线程数和队列的大小:使用大型队列和小型池可以最大限度地减少 CPU 使用、操作系统资源和上下文切换开销,但可能会导致人为的低吞吐量;使用小队列通常需要更大的池大小,这使 CPU 更繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。

🔖 以上所提到的“界”、“限”都是指 Integer.MAX_VALUE,等于 Integer.MAX_VALUE 就认为是无界的。

🔖 LinkedBlockingQueue 包含有指定初始容量的构造函数和默认的构造函数,默认构造函数设置队列最大值为 Integer.MAX_VALUE,可以通过给定容量来构建一个基于链表的有界队列使用。链接队列通常比基于数组的队列具有更高的吞吐量,但在大多数并发应用程序中性能更不可预测。

🔖 前面提到 workQueue 中仅保存execute 方法提交的 Runnable 任务。那么 submit 提交的任务怎么存到阻塞队列中呢?ThreadPoolExecutor 继承了 AbstractExecutorService 类,AbstractExecutorService 类实现了 ExecutorService 接口,AbstractExecutorService 类重写了 submit 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractExecutorService implements ExecutorService {
    // 对给定的 Runnable 和默认的值返回一个 RunnableFuture 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // 对给定的 Callable 返回一个 RunnableFuture 
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);  // execute 方法在 ThreadPoolExecutor 中重写了
        return ftask;
    }
    
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

从上面的源代码可以看出,对于 submit 提交的 Runnable 以及 Callable,都对其使用 newTaskFor 进行了封装返回一个 RunnableFuture。RunnableFuture 实现了 Runnable 和 Future 接口,这样就可以将 Callable 的任务变成 Runnable 去执行了。

继承关系

#拒绝策略

  • ThreadPoolExecutor.AbortPolicy 默认策略,丢弃任务并抛出 RejectedExecutionException 异常。
  • ThreadPoolExecutor.CallerRunsPolicy 调用 execute 自身的线程运行任务。这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。
  • ThreadPoolExecutor.DiscardPolicy 直接丢弃,不抛异常,仅使用在不关注任务的完成结果的罕见情况中。
  • ThreadPoolExecutor.DiscardOldestPolicy 如果线程池没有关闭的话,丢弃最旧的未处理任务,即阻塞队列中头部的任务,然后重试 execute。在此过程中,可能还会被拒绝,重复该过程。

#addWorker 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
private boolean addWorker(Runnable firstTask, boolean core) {
    // 标记 for 循环,用来在内层循环跳到外层循环
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 这个 if 语句给出的是构建工作线程失败的情况,首先就是线程池状态 >= SHUTDOWN,即不在 Running 状态
        // 接着下面的 3 条语句中有一个或者多个为 false 的时候,直接返回 false,表示创建线程失败
        // 1. 线程池状态不是 SHUTDOWN,那么只能是 SHUTDOWN 之上的状态了,也就不需要添加线程处理任务
        // 2. 到这说明是 SHUTDOWN 状态,如果任务不为空的话,说明不是处理阻塞队列中的任务(根据 execute 的代码
        //    可以知道只有一种情况会传递 null 作为参数),该状态下不接受新任务,也返回 false
        // 3. 到这说明是 SHUTDOWN 状态且 firstTask 为空,如果此时队列为空,也就没必要创建线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // java 10 中的对代码做了改变,逻辑上是一样的,但比原来好理解
        // if (runStateAtLeast(c, SHUTDOWN)
        //    && (runStateAtLeast(c, STOP)
        //        || firstTask != null
        //        || workQueue.isEmpty()))
        //    return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 如果是核心线程不能超过 corePoolSize,如果是非核心线程不能超过 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS 方式增加 workerCount
            if (compareAndIncrementWorkerCount(c))
                // 成功则退出外层循环
                break retry;
            // CAS 失败就重新获得 ctl
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态发生了变化,就要重新开始外层循环;没变就重新开始内层循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    // ========== 经过以上的代码,实现的就是工作线程数加一 ========== 

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建 Worker,把任务交给 Worker
        w = new Worker(firstTask);
        // 获得 Worker 中的线程
        final Thread t = w.thread;
        if (t != null) {
            // 获得线程池的全局锁,避免添加任务时其他线程对线程池操作造成线程池状态改变
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                int rs = runStateOf(ctl.get());
				// 如果是 RUNNING 状态;或者是 SHUTDOWN 状态,创建空任务的工作线程去处理阻塞队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将工作线程添加到 workers 中,workers 是一个 HashSet<Worker>
                    workers.add(w);
                    // 工作线程数
                    int s = workers.size();
                    // 记录历史最大工作线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 如果工作线程添加成功,就启动线程,并设置启动状态
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 启动失败,调用 addWorkerFailed 方法
            // 在该方法中,获得线程池的锁,把该线程从 workers 中移除,将 workerConut 减一(CAS 方式)
            addWorkerFailed(w);
    }
    return workerStarted;
}

#Worker 的作用

Worker 是内部封装类,线程池创建线程时,会把线程封装成 Worker,Worker 中的 firstTask 是线程要执行的第一个任务,执行完后循环获取阻塞队列中的任务去执行。

#线程池的监控

  • getPoolSize():线程池中当前的线程数。
  • getActiveCount():线程池中当前活动的线程数。
  • getLargestPoolSize():线程池⾥曾经创建过的最⼤线程数量。
  • getTaskCount():获得已经安排执行的任务的大致数量。
  • getCompletedTaskCount():获得已经完成的任务的数量。

🔖 通过继承 ThreadPoolExecutor 并重写其中的 beforeExecute()afterExecute() 来定义在执行之前和执行之后的行为。

#使用 Executors 工具类(不推荐)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

// newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
  • newFixedThreadPool:corePoolSize 和 maximumPoolSize 相同,可以控制线程的最大并发数。使用无界队列,当线程池中的线程已经达到最大值且所有线程都在工作时,新来的任务在阻塞队列中排队。
  • newSingleThreadExecutor:创建一个使用单个工作线程在无界队列上运行的 Executor。如果该单线程在关闭前的执行过程中因失败而终止,则如果需要执行后续任务,则新线程将取代它。任务保证按顺序执行,并且在任何给定时间不会有多个任务处于活动状态。newSingleThreadExecutor() 与 newFixedThreadPool(1) 不同,前者返回的是包装之后的 FinalizableDelegatedExecutorService,不可以重新配置线程池,而后者可以重新配置线程池。配置线程池指改变线程池的属性,比如通过 setCorePoolSize 和 setMaximumPoolSize 等方法改变线程池中的线程数。
  • newCachedThreadPool:创建一个线程池,使用 SynchronousQueue 根据需要创建新线程,但有空闲线程时将重用以前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,对执行的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源。
  • newScheduledThreadPool:创建一个线程池,可以安排命令在**给定延迟后运行** schedule(),或定期执行 scheduleAtFixedRate() 以及 scheduleWithFixedDelay()

🔖 scheduleAtFixedRate(command, initialDelay, period, unit) 是在一个 initialDelay 时间后首次启用任务,然后每隔一个 period 时间再启用该任务,不强调之前的任务是否已经完成。scheduleWithFixedDelay(command, initialDelay, delay, unit) 是在一个 initialDelay 时间后首次启用任务,然后等到任务结束后隔上 delay 的时间后再启用该任务。

#参考

[1] Class ThreadPoolExecutor

updatedupdated2022-08-212022-08-21