并发编程之线程池实现原理

Posted on 2023-05-24

前言

池化思想是一种空间换时间的思想,期望使用预先创建好的对象来减少频繁创建对象的性能开销,同时还可以对对象进行统一管理,减少对象使用成本。
java中有多种池化思想的应用,例如:数据库连接池、线程池、字符串常量池等。

欢迎关注个人公众号【好好学技术】交流学习

为什么使用线程池

频繁的开启线程或者停止线程,线程需要重新被cpu从就绪到运行状态调度,需要发生cpu的上下文切换,效率非常低。

线程池作用

  1. 降低线程创建和销毁的开销:通过线程池重用已经创建的线程,可以避免频繁创建和销毁线程所造成的内存和CPU资源开销。
  2. 提高线程执行效率:线程池中的线程是经过优化的,通常会采用更少的线程、更高效的调度算法、更快的执行速度等方式,以提高线程的执行效率。
  3. 降低线程间竞争的激烈程度:线程池中的线程数量是有限的,可以减少线程间竞争的激烈程度,从而降低CPU资源的消耗。
  4. 提高应用程序的可伸缩性和健壮性:线程池可以控制线程的数量和执行速度,可以更好地满足应用程序的需求,从而提高应用程序的可伸缩性和健壮性。

ThreadPoolExecutor参数

  • int corePoolSize: 核心线程数
  • int maximumPoolSize: 最大线程数
  • long keepAliveTime: 超出corePoolSize后创建的线程的存活时间
  • TimeUnit unit: keepAliveTime的时间单位
  • BlockingQueue workQueue: 任务队列,存放待执行任务
  • ThreadFactory threadFactory: 创建线程的线程工厂
  • RejectedExecutionHandler handler: 拒绝策略

image.png

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满时:
    1)若线程数小于最大线程数,创建线程
    2)若线程数等于最大线程数,执行拒绝策略

拒绝策略

当线程池中的线程数量达到最大值或者任务队列已满时,如果再有新的任务提交给线程池,线程池会拒绝接受新的任务。这时,线程池会采用一定的拒绝策略来处理这些被拒绝的任务。

Java中提供了四种拒绝策略:

  1. AbortPolicy:默认策略,直接抛出RejectedExecutionException异常,表示任务被拒绝执行。
  2. CallerRunsPolicy:这个策略会使用当前客户端线程来执行任务。如果客户端线程不够,或者存在线程被阻塞,则仍然会抛出RejectedExecutionException异常。
  3. DiscardPolicy:这个策略会直接丢弃被拒绝的任务,不会执行任何操作。
  4. DiscardOldestPolicy:这个策略会丢弃队列中等待时间最长的任务,然后执行当前被提交的任务。如果队列中没有等待时间最长的任务,则会使用CallerRunsPolicy策略来处理被拒绝的任务。

也可以自定义拒绝策略,实现RejectedExecutionHandler接口即可。

建议自定义实现拒绝策略,将任务持久化到db,后期在手动补偿。

线程池的创建方式

Executors为我们提供了四种新建线程池的方式:
newCachedThreadPool()可缓存线程池

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                    60L, TimeUnit.SECONDS,  
                                    new SynchronousQueue<Runnable>());  
}

线程池是创建一个核心线程数为0,最大线程为Inter.MAX_VALUE的线程池,线程池数量不确定,有空闲线程则优先使用,没用则创建新的线程处理任务,处理完放入线程池。

newFixedThreadPool(): 可定长度,限制最大线程数

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>());  
}

创建一个核心线程数跟最大线程数相同的线程池,线程池数量大小不变,如果有任务放入队列,等待空闲线程。

newScheduledThreadPool(): 可定时线程池\

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}

public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
        new DelayedWorkQueue());  
}

创建一个没有最大线程数限制的可以定时执行线程池,还有创建一个只有单个线程的可以定时执行线程池(Executors.newSingleThreadScheduledExecutor())

newSingleThreadExecutor(): 单线程 线程池

public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}

池里只有一个线程

这四种底层都是基于ThreadPoolExecutor构造函数封装,且都采用的无界队列,使用时需注意防止内存溢出

自定义线程名称

可以通过自定义ThreadFactory来为线程池中的线程指定名称。

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("custom-thread-%d").build();

线程池的五种状态

        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
  1. RUNNING:线程池运行状态,此时线程池中的任务队列可能有等待中的任务,但线程池会持续从队列中取出任务执行。
  2. SHUTDOWN:线程池关闭状态,此时线程池会拒绝接受新的任务,但会执行完已经排队的任务,不接受新任务并不意味着已经排队的任务必须执行完。
  3. STOP:线程池强制停止状态,此时线程池不仅会拒绝接受新的任务,而且会中断正在执行的任务,并终止线程池。
  4. TIDYING:线程池在转换为终止状态时的一种特殊状态,此时线程池会执行钩子方法terminated(),并等待所有任务执行完成。
  5. TERMINATED:线程池终止状态,此时线程池中的所有任务已经执行完成,线程池被彻底终止。

线程数设置

  • 核心线程数:线程池中始终存在的线程数量。当任务被提交到线程池时,如果当前运行的线程少于核心线程数,则会创建一个新的线程来执行该任务,即使其他的核心线程正在空闲状态。因此,核心线程数通常应该设置为预期的并发数。
  • 最大线程数:允许线程池中同时存在的最大线程数量。当线程池中的线程数量达到最大线程数时,后续提交到线程池中的任务将被暂存到任务队列中等待处理。因此,最大线程数不应该设置过高,否则可能会导致系统资源紧张。

合理地设置核心线程数和最大线程数可以优化线程池的性能和响应时间。下面是一些设置建议:

  • 核心线程数 = CPU核心数 + 1
  • 最大线程数 = 核心线程数 * 2
  • 如果任务执行时间较长,可以适当增加最大线程数,以避免任务堆积在队列中无法及时处理

具体的设置需要根据实际情况来考虑,如果线程池主要执行的是I/O密集型任务,可以适当增加核心线程数和最大线程数,以充分利用系统资源。如果线程池主要执行的是CPU密集型任务,则需要根据系统的CPU核心数来设置核心线程数和最大线程数,避免过度消耗CPU资源。

springboot集成线程池

package com.fandf.common.config;  
  
import com.fandf.common.utils.CustomThreadPoolTaskExecutor;  
import lombok.Getter;  
import lombok.Setter;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.context.annotation.Bean;  
import org.springframework.core.task.TaskExecutor;  
import org.springframework.scheduling.annotation.EnableAsync;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
  
import java.util.concurrent.ThreadPoolExecutor;  
  
/**  
* @author fandongfeng   
*/  
@EnableAsync(proxyTargetClass = true) 
@Configuration
public class DefaultAsycTaskConfig {  
  
    /**  
    * 线程池维护线程的最小数量.  
    */  
    @Value("${asyc-task.corePoolSize:10}")  
    private int corePoolSize;  
    /**  
    * 线程池维护线程的最大数量  
    */  
    @Value("${asyc-task.maxPoolSize:200}")  
    private int maxPoolSize;  
    /**  
    * 队列最大长度  
    */  
    @Value("${asyc-task.queueCapacity:10000}")  
    private int queueCapacity;  
    /**  
    * 线程池前缀  
    */  
    @Value("${asyc-task.threadNamePrefix:FdfExecutor-}")  
    private String threadNamePrefix;  

    @Bean  
    public TaskExecutor taskExecutor() {  
        ThreadPoolTaskExecutor executor = new CustomThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);  
        executor.setMaxPoolSize(maxPoolSize);  
        executor.setQueueCapacity(queueCapacity);  
        executor.setThreadNamePrefix(threadNamePrefix);  
        /*  
        rejection-policy:当pool已经达到max size的时候,如何处理新任务  
        CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  
        */  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();  
        return executor;  
    }  
  
}

使用

@Service
public class MyService {

    @Async("taskExecutor")
    public void doSomething() {
        // 异步执行的任务
    }
}