线程池是用来管理和复用线程的工具,它可以减少线程的创建和销毁开销。在 Java 中,ThreadPoolExecutor
是线程池的核心实现,它通过核心线程数、最大线程数、任务队列和拒绝策略来控制线程的创建和执行。
一、常用参数
public ThreadPoolExecutor(int corePoolSize,//1、核心线程数
int maximumPoolSize,//2、最大线程数量
long keepAliveTime,//3、线程空闲之后可以存活时间
TimeUnit unit,//4、时间单位
BlockingQueue<Runnable> workQueue, //5、用于存放任务的阻塞队列
ThreadFactory threadFactory,//6、线程工厂类
RejectedExecutionHandler handler //7、队列和线程池都满了后的饱和策略) {
}
拒绝策略
有四种:
AbortPolicy:默认的拒绝策略,会抛 RejectedExecutionException 异常。
CallerRunsPolicy:让提交任务的线程自己来执行这个任务,也就是调用 execute 方法的线程。
DiscardOldestPolicy:等待队列会丢弃队列中最老的一个任务,也就是队列中等待最久的任务,然后尝试重新提交被拒绝的任务。
DiscardPolicy:丢弃被拒绝的任务,不做任何处理也不抛出异常。
阻塞队列
①、ArrayBlockingQueue:
一个有界的先进先出的阻塞队列,底层是一个数组,适合固定大小的线程池。
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10, true);
②、LinkedBlockingQueue:
底层是链表,如果不指定大小,默认大小是 Integer.MAX_VALUE,几乎相当于一个无界队列。
比如使用了 LinkedBlockingQueue 来配置 RabbitMQ 的消息队列。
③、PriorityBlockingQueue:
一个支持优先级排序的无界阻塞队列。任务按照其自然顺序或 Comparator 来排序。
适用于需要按照给定优先级处理任务的场景,比如优先处理紧急任务。
④、DelayQueue:
类似于 PriorityBlockingQueue,由二叉堆实现的无界优先级阻塞队列。
Executors 中的 newScheduledThreadPool()
就使用了 DelayQueue 来实现延迟执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
⑤、SynchronousQueue:
每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都必须等待另一个线程的插入操作。
Executors.newCachedThreadPool()
就使用了 SynchronousQueue,这个线程池会根据需要创建新线程,如果有空闲线程则会重复使用,线程空闲 60 秒后会被回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
二、原理图
每当有新的任务到线程池时,
第一步: 先判断线程池中当前线程数量是否达到了corePoolSize,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize,则进入下一步;
第二步: 判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;
第三步: 判断线程池中的线程数量是否达到了maximumPoolSize,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。
注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。
(由第三步可知,在一般情况下,Java线程池中会长期保持corePoolSize个线程。)
三、线程池参数如何设置
1、分析线程池中执行的任务类型是 CPU 密集型还是 IO 密集型?
①、对于 CPU 密集型任务,目标是尽量减少线程上下文切换,以优化 CPU 使用率。一般来说,核心线程数设置为处理器的核心数或核心数加一是较理想的选择。
+1 是为了以备不时之需,如果某线程因等待系统资源而阻塞时,可以有多余的线程顶上去,不至于影响整体性能。
②、对于 IO 密集型任务,由于线程经常处于等待状态,等待 IO 操作完成,所以可以设置更多的线程来提高并发,比如说 CPU 核心数的两倍。
2、根据业务需求和系统资源来调整线程池的其他参数,比如最大线程数、任务队列容量、非核心线程的空闲存活时间等。
3、通过监控和调试调整线程数
比如说通过 top 命令观察 CPU 的使用率,如果 CPU 使用率较低,可能是线程数过少;如果 CPU 使用率接近 100%,但吞吐量未提升,可能是线程数过多。
然后再通过 VisualVM 或 Arthas 分析线程运行情况,查看线程的状态、等待时间、运行时间等信息。
也可以使用 jstack 命令查看线程堆栈信息,查看线程是否处于阻塞状态。
jstack <Java 进程 ID> | grep -A 20 "BLOCKED" // 查看阻塞线程
如果有大量的 BLOCKED 线程,说明线程数可能过多,竞争比较激烈。
4、动态修改参数
线程池提供的 setter 方法就可以在运行时动态修改参数,比如说 setCorePoolSize 可以用来修改核心线程数、setMaximumPoolSize 可以用来修改最大线程数。
需要注意的是,调用 setCorePoolSize()
时如果新的核心线程数比原来的大,线程池会创建新的线程;如果更小,线程池不会立即销毁多余的线程,除非有空闲线程超过 keepAliveTime。
还可以利用 Nacos 配置中心,或者实现自定义的线程池,监听参数变化去动态调整参数。
四、四种常用线程池
1、固定大小的线程池
Executors.newFixedThreadPool(int nThreads);
,适合用于任务数量确定,且对线程数有明确要求的场景。例如,IO 密集型任务、数据库连接池等。
线程池大小是固定的,corePoolSize == maximumPoolSize
,默认使用 LinkedBlockingQueue 作为阻塞队列,适用于任务量稳定的场景,如数据库连接池、RPC 处理等。
new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
新任务提交时,如果线程池有空闲线程,直接执行;如果没有,任务进入 LinkedBlockingQueue 等待。
缺点是任务队列默认无界,可能导致任务堆积,甚至 OOM。
2、缓存线程池
Executors.newCachedThreadPool();
,适用于短时间内任务量波动较大的场景。例如,短时间内有大量的文件处理任务或网络请求。
线程池大小不固定,corePoolSize = 0
,maximumPoolSize = Integer.MAX_VALUE
。空闲线程超过 60 秒会被销毁,使用 SynchronousQueue 作为阻塞队列,适用于短时间内有大量任务的场景。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
提交任务时,如果线程池没有空闲线程,直接新建线程执行任务;如果有,复用线程执行任务。线程空闲 60 秒后销毁,减少资源占用。
缺点是线程数没有上限,在高并发情况下可能导致 OOM。
3、定时任务线程池
Executors.newScheduledThreadPool(int corePoolSize);
,适用于需要定时执行任务的场景。例如,定时发送邮件、定时备份数据等。
定时任务线程池的大小可配置,支持定时 & 周期性任务执行,使用 DelayedWorkQueue 作为阻塞队列,适用于周期性执行任务的场景。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
执行定时任务时,schedule()
方法可以将任务延迟一定时间后执行一次;scheduleAtFixedRate()
方法可以将任务延迟一定时间后以固定频率执行;scheduleWithFixedDelay()
方法可以将任务延迟一定时间后以固定延迟执行。
缺点是,如果任务执行时间 >
设定时间间隔,scheduleAtFixedRate 可能会导致任务堆积。
4、单线程线程池
Executors.newSingleThreadExecutor();
,适用于需要按顺序执行任务的场景。例如,日志记录、文件处理等。
线程池只有 1 个线程,保证任务按提交顺序执行,使用 LinkedBlockingQueue 作为阻塞队列,适用于需要按顺序执行任务的场景。
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
始终只创建 1 个线程,新任务必须等待前一个任务完成后才能执行,其他任务都被放入 LinkedBlockingQueue 排队执行。
缺点是无法并行处理任务。
五、线程池五种状态
不同状态控制着线程池的任务调度和关闭行为。
状态由 RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED 依次流转。
调用线程池的shutdown
或shutdownNow
方法来关闭线程池。
shutdown 不会立即停止线程池,而是等待所有任务执行完毕后再关闭线程池。
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> System.out.println("Task 1"));
executor.execute(() -> System.out.println("Task 2"));
executor.shutdown(); // 不会立刻关闭,而是等待所有任务执行完毕
shutdownNow 会尝试通过一系列动作来停止线程池,包括停止接收外部提交的任务、忽略队列里等待的任务、尝试将正在跑的任务 interrupt 中断。
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> {
try {
Thread.sleep(5000); // 模拟长时间运行任务
System.out.println("Task executed");
} catch (InterruptedException e) {
System.out.println("任务被中断");
}
});
List<Runnable> unexecutedTasks = executor.shutdownNow(); // 立即关闭线程池
System.out.println("未执行的任务数: " + unexecutedTasks.size());
注意,shutdownNow 不会真正终止正在运行的任务,只是给任务线程发送 interrupt 信号,任务是否能真正终止取决于线程是否响应 InterruptedException。
参考链接
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
评论区