侧边栏壁纸
博主头像
搞钱拒绝ICU

行动起来,活在当下

  • 累计撰写 26 篇文章
  • 累计创建 8 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

线程池

admin
2024-04-27 / 0 评论 / 0 点赞 / 8 阅读 / 0 字

线程池是用来管理和复用线程的工具,它可以减少线程的创建和销毁开销。在 Java 中,ThreadPoolExecutor是线程池的核心实现,它通过核心线程数、最大线程数、任务队列和拒绝策略来控制线程的创建和执行。

一、常用参数

public ThreadPoolExecutor(int corePoolSize,//1、核心线程数
                          int maximumPoolSize,//2、最大线程数量
                          long keepAliveTime,//3、线程空闲之后可以存活时间
                          TimeUnit unit,//4、时间单位
                          BlockingQueue<Runnable> workQueue, //5、用于存放任务的阻塞队列
                          ThreadFactory threadFactory,//6、线程工厂类
                          RejectedExecutionHandler handler //7、队列和线程池都满了后的饱和策略) {
                          }

拒绝策略

有四种:

  • AbortPolicy:默认的拒绝策略,会抛 RejectedExecutionException 异常。

  • CallerRunsPolicy:让提交任务的线程自己来执行这个任务,也就是调用 execute 方法的线程。

  • DiscardOldestPolicy:等待队列会丢弃队列中最老的一个任务,也就是队列中等待最久的任务,然后尝试重新提交被拒绝的任务。

  • DiscardPolicy:丢弃被拒绝的任务,不做任何处理也不抛出异常。

阻塞队列

①、ArrayBlockingQueue:

一个有界的先进先出的阻塞队列,底层是一个数组,适合固定大小的线程池。

ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10, true);

②、LinkedBlockingQueue:

底层是链表,如果不指定大小,默认大小是 Integer.MAX_VALUE,几乎相当于一个无界队列。

比如使用了 LinkedBlockingQueue 来配置 RabbitMQ 的消息队列。

技术派实战项目源码:RabbitMQ 的消息队列

③、PriorityBlockingQueue:

一个支持优先级排序的无界阻塞队列。任务按照其自然顺序或 Comparator 来排序。

适用于需要按照给定优先级处理任务的场景,比如优先处理紧急任务。

④、DelayQueue:

类似于 PriorityBlockingQueue,由二叉堆实现的无界优先级阻塞队列。

Executors 中的 newScheduledThreadPool() 就使用了 DelayQueue 来实现延迟执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
}

⑤、SynchronousQueue:

每个插入操作必须等待另一个线程的移除操作,同样,任何一个移除操作都必须等待另一个线程的插入操作。

Executors.newCachedThreadPool() 就使用了 SynchronousQueue,这个线程池会根据需要创建新线程,如果有空闲线程则会重复使用,线程空闲 60 秒后会被回收。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

二、原理图

每当有新的任务到线程池时,

第一步: 先判断线程池中当前线程数量是否达到了corePoolSize,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize,则进入下一步;

第二步: 判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;

第三步: 判断线程池中的线程数量是否达到了maximumPoolSize,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。

注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。

(由第三步可知,在一般情况下,Java线程池中会长期保持corePoolSize个线程。)

三、线程池参数如何设置

1、分析线程池中执行的任务类型是 CPU 密集型还是 IO 密集型?

①、对于 CPU 密集型任务,目标是尽量减少线程上下文切换,以优化 CPU 使用率。一般来说,核心线程数设置为处理器的核心数或核心数加一是较理想的选择。

+1 是为了以备不时之需,如果某线程因等待系统资源而阻塞时,可以有多余的线程顶上去,不至于影响整体性能。

②、对于 IO 密集型任务,由于线程经常处于等待状态,等待 IO 操作完成,所以可以设置更多的线程来提高并发,比如说 CPU 核心数的两倍。

2、根据业务需求和系统资源来调整线程池的其他参数,比如最大线程数、任务队列容量、非核心线程的空闲存活时间等。

3、通过监控和调试调整线程数

比如说通过 top 命令观察 CPU 的使用率,如果 CPU 使用率较低,可能是线程数过少;如果 CPU 使用率接近 100%,但吞吐量未提升,可能是线程数过多。

然后再通过 VisualVM 或 Arthas 分析线程运行情况,查看线程的状态、等待时间、运行时间等信息。

也可以使用 jstack 命令查看线程堆栈信息,查看线程是否处于阻塞状态。

jstack <Java 进程 ID> | grep -A 20 "BLOCKED" // 查看阻塞线程

如果有大量的 BLOCKED 线程,说明线程数可能过多,竞争比较激烈。

4、动态修改参数

线程池提供的 setter 方法就可以在运行时动态修改参数,比如说 setCorePoolSize 可以用来修改核心线程数、setMaximumPoolSize 可以用来修改最大线程数。

三分恶面渣逆袭:JDK 线程池参数设置

需要注意的是,调用 setCorePoolSize() 时如果新的核心线程数比原来的大,线程池会创建新的线程;如果更小,线程池不会立即销毁多余的线程,除非有空闲线程超过 keepAliveTime。

还可以利用 Nacos 配置中心,或者实现自定义的线程池,监听参数变化去动态调整参数。

四、四种常用线程池

1、固定大小的线程池

Executors.newFixedThreadPool(int nThreads);,适合用于任务数量确定,且对线程数有明确要求的场景。例如,IO 密集型任务、数据库连接池等。

线程池大小是固定的,corePoolSize == maximumPoolSize,默认使用 LinkedBlockingQueue 作为阻塞队列,适用于任务量稳定的场景,如数据库连接池、RPC 处理等。

new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                       new LinkedBlockingQueue<>());

新任务提交时,如果线程池有空闲线程,直接执行;如果没有,任务进入 LinkedBlockingQueue 等待。

缺点是任务队列默认无界,可能导致任务堆积,甚至 OOM。

2、缓存线程池

Executors.newCachedThreadPool();,适用于短时间内任务量波动较大的场景。例如,短时间内有大量的文件处理任务或网络请求。

线程池大小不固定,corePoolSize = 0maximumPoolSize = Integer.MAX_VALUE。空闲线程超过 60 秒会被销毁,使用 SynchronousQueue 作为阻塞队列,适用于短时间内有大量任务的场景。

new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                       new SynchronousQueue<>());

提交任务时,如果线程池没有空闲线程,直接新建线程执行任务;如果有,复用线程执行任务。线程空闲 60 秒后销毁,减少资源占用。

缺点是线程数没有上限,在高并发情况下可能导致 OOM。

3、定时任务线程池

Executors.newScheduledThreadPool(int corePoolSize);,适用于需要定时执行任务的场景。例如,定时发送邮件、定时备份数据等。

定时任务线程池的大小可配置,支持定时 & 周期性任务执行,使用 DelayedWorkQueue 作为阻塞队列,适用于周期性执行任务的场景。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

执行定时任务时,schedule() 方法可以将任务延迟一定时间后执行一次;scheduleAtFixedRate() 方法可以将任务延迟一定时间后以固定频率执行;scheduleWithFixedDelay() 方法可以将任务延迟一定时间后以固定延迟执行。

缺点是,如果任务执行时间 > 设定时间间隔,scheduleAtFixedRate 可能会导致任务堆积。

4、单线程线程池

Executors.newSingleThreadExecutor();,适用于需要按顺序执行任务的场景。例如,日志记录、文件处理等。

线程池只有 1 个线程,保证任务按提交顺序执行,使用 LinkedBlockingQueue 作为阻塞队列,适用于需要按顺序执行任务的场景。

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                       new LinkedBlockingQueue<>());

始终只创建 1 个线程,新任务必须等待前一个任务完成后才能执行,其他任务都被放入 LinkedBlockingQueue 排队执行。

缺点是无法并行处理任务。

五、线程池五种状态

不同状态控制着线程池的任务调度和关闭行为。

状态由 RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED 依次流转。

三分恶面渣逆袭:线程池状态切换图

状态

状态码

是否接收新任务

是否执行队列中的任务

是否中断正在执行的任务

RUNNING

111

✅ 是

✅ 是

❌ 否

SHUTDOWN

000

❌ 否

✅ 是

❌ 否

STOP

001

❌ 否

❌ 否

✅ 是

TIDYING

010

❌ 否

❌ 否

❌ 否

TERMINATED

011

❌ 否

❌ 否

❌ 否

调用线程池的shutdownshutdownNow方法来关闭线程池。

shutdown 不会立即停止线程池,而是等待所有任务执行完毕后再关闭线程池。

ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> System.out.println("Task 1"));
executor.execute(() -> System.out.println("Task 2"));

executor.shutdown(); // 不会立刻关闭,而是等待所有任务执行完毕

shutdownNow 会尝试通过一系列动作来停止线程池,包括停止接收外部提交的任务、忽略队列里等待的任务、尝试将正在跑的任务 interrupt 中断。

ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(() -> {
    try {
        Thread.sleep(5000); // 模拟长时间运行任务
        System.out.println("Task executed");
    } catch (InterruptedException e) {
        System.out.println("任务被中断");
    }
});

List<Runnable> unexecutedTasks = executor.shutdownNow(); // 立即关闭线程池
System.out.println("未执行的任务数: " + unexecutedTasks.size());

注意,shutdownNow 不会真正终止正在运行的任务,只是给任务线程发送 interrupt 信号,任务是否能真正终止取决于线程是否响应 InterruptedException。

参考链接

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

0

评论区