一、ThreadPoolExecutor 参数说明
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory
threadFactory, RejectedExecutionHandler handler)
*
corePoolSize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的
prestartAllCoreThreads() 方法,核心线程池会提前创建并启动所有核心线程。
* workQueue:任务队列。当核心线程池中没有线程时,所提交的任务会被暂存在队列中。Java 提供了多种阻塞队列
<https://www.cnblogs.com/jmcui/p/11442616.html>。
*
maximumPoolSize:线程池允许创建的最大线程数。如果队列也满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的空闲线程执行任务。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。
* keepAliveTime:当线程池中的线程数大于 corePoolSize 时,keepAliveTime
为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。
* TimeUnit:线程活动保持时间的单位。
*
threadFactory:创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置符合业务的名字。
// 依赖 guava new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
*
handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。Java 提供了以下4种策略:

* AbortPolicy:默认。直接抛出异常。
* CallerRunsPolicy:只用调用者所在线程来运行任务。
* DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
* DiscardPolicy:不处理,丢弃掉。
tips: 一般我们称核心线程池中的线程为核心线程,这部分线程不会被回收;超过任务队列后,创建的线程为空闲线程,这部分线程会被回收(回收时间即
keepAliveTime)

二、常见的 ThreadPoolExecutor 介绍

Executors 是创建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工厂类。

Java 提供了多种类型的 ThreadPoolExecutor,比较常见的有
FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。

FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new
ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>()); }
FixedThreadPool 被称为可重用固定线程数的线程池。可以看到 corePoolSize 和 maximumPoolSize 都被设置成了
nThreads;keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止;使用了阻塞队列 LinkedBlockingQueue
作为线程的工作队列(队列的容量为 Integer.MAX_VALUE)。

FixedThreadPool 所存在的问题是,由于队列的容量为 Integer.MAX_VALUE,基本可以认为是无界的,所以
maximumPoolSize 和 keepAliveTime 参数都不会生效,饱和拒绝策略也不会执行,会造成任务大量堆积在阻塞队列中。

FixedThreadPool 适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。









SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new
FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor 是使用单个线程的线程池。可以看到 corePoolSize 和 maximumPoolSize
被设置为1,其他参数与 FixedThreadPool 相同,所以所带来的风险也和 FixedThreadPool 一致,就不赘述了。

SingleThreadExecutor 适用于需要保证顺序的执行各个任务。









CachedThreadPool
public static ExecutorService newCachedThreadPool() { return new
ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>()); }
CachedThreadPool 是一个会根据需要创建新线程的线程池。可以看到 corePoolSize 被设置为
0,所以创建的线程都为空闲线程;maximumPoolSize 被设置为
Integer.MAX_VALUE(基本可认为无界),意味着可以创建无限数量的空闲线程;keepAliveTime
设置为60L,意味着空闲线程等待新任务的最长时间为60秒;使用没有容量的 SynchronousQueue 作为线程池的工作队列。

CachedThreadPool 所存在的问题是, 如果主线程提交任务的速度高于maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadPool 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

三、自建 ThreadPoolExecutor 线程池

鉴于上面提到的风险,我们更提倡使用 ThreadPoolExecutor 去创建线程池,而不用 Executors 工厂去创建。

以下是一个 ThreadPoolExecutor 创建线程池的 Demo 实例:
public class Pool { static ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("pool-task-%d").build(); static
ExecutorService executor = new
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 200, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), threadFactory, new
ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args)
throws ExecutionException, InterruptedException { // 1. 无返回值的任务执行 -> Runnable
executor.execute(() -> System.out.println("Hello World")); // 2. 有返回值的任务执行 ->
Callable Future<String> future = executor.submit(() -> "Hello World"); // get
方法会阻塞线程执行等待返回结果 String result = future.get(); System.out.println(result); // 3.
监控线程池 monitor(); // 4. 关闭线程池 shutdownAndAwaitTermination(); monitor(); }
private static void monitor() { ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor) Pool.executor;
System.out.println("【线程池任务】线程池中曾经创建过的最大线程数:" +
threadPoolExecutor.getLargestPoolSize()); System.out.println("【线程池任务】线程池中线程数:"
+ threadPoolExecutor.getPoolSize()); System.out.println("【线程池任务】线程池中活动的线程数:" +
threadPoolExecutor.getActiveCount()); System.out.println("【线程池任务】队列中等待执行的任务数:"
+ threadPoolExecutor.getQueue().size());
System.out.println("【线程池任务】线程池已执行完任务数:" +
threadPoolExecutor.getCompletedTaskCount()); } /** * 关闭线程池 * 1.
shutdown、shutdownNow 的原理都是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程。 * 2.
shutdownNow:将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。 * 3.
shutdown:将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。 */ private static void
shutdownAndAwaitTermination() { // 禁止提交新任务 executor.shutdown(); try { //
等待现有任务终止 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // 取消当前正在执行的任务
executor.shutdownNow(); // 等待一段时间让任务响应被取消 if (!executor.awaitTermination(60,
TimeUnit.SECONDS)) { System.err.println("Pool did not terminate"); } } } catch
(InterruptedException ie) { // 如果当前线程也中断,则取消 executor.shutdownNow(); // 保留中断状态
Thread.currentThread().interrupt(); } } }
创建线程池需要注意以下几点:

* CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程。
* IO 密集型任务(数据库读写等)应配置尽可能多的线程,如配置 Ncpu*2 个线程。
* 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。
* 建议使用有界队列。可以避免创建数量非常多的线程,甚至拖垮系统。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
四、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory
threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new
DelayedWorkQueue(), threadFactory); }
ScheduledThreadPoolExecutor 的功能与 Timer
<https://www.cnblogs.com/jmcui/p/7519759.html> 类似,但功能更强大、更灵活。Timer
对应的是单个后台线程,而ScheduledThreadPoolExecutor 可以在构造函数中指定多个对应的后台线程数。

Java 提供了多种类型的 ScheduledThreadPoolExecutor ,可以通过 Executors 创建,比较常见的有
ScheduledThreadPool、SingleThreadScheduledExecutor
等。适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。
public class ScheduleTaskTest { static ThreadFactory threadFactory = new
BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build(); static
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5, threadFactory); public static void
main(String[] args) throws ExecutionException, InterruptedException { // 1. 延迟
3 秒后执行 Runnable 方法 scheduledExecutorService.schedule(() ->
System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS); // 2. 延迟 3
秒后执行 Callable 方法 ScheduledFuture<String> scheduledFuture =
scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000,
TimeUnit.MILLISECONDS); System.out.println(scheduledFuture.get()); // 3. 延迟 1
秒后开始每隔 3 秒周期执行。 // 如果中间任务遇到异常,则禁止后续执行。 // 固定的频率来执行某项任务,它不受任务执行时间的影响。到时间,就执行。
scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello
ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS); // 4. 延迟 1 秒后,每个任务结束延迟
3 秒后再执行下个任务。 // 如果中间任务遇到异常,则禁止后续执行。 // 受任务执行时间的影响,等待任务执行结束后才开始计算延迟。
scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello
ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS); } }
ScheduledThreadPoolExecutor 的执行步骤大抵如下:

* 当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者
scheduleWithFixedDelay()方 法时,会向 DelayedWorkQueue 队列添加 ScheduledFutureTask 任务。
* 线程池中的线程从 DelayedWorkQueue队列中获取执行时间已到达的 ScheduledFutureTask,然后执行任务。
* 线程修改 ScheduledFutureTask 任务的执行时间为下次将要被执行的时间。
* 线程把修改后的 ScheduledFutureTask 重新放回队列。