精华内容
下载资源
问答
  • 自定义线程池

    2020-11-11 16:12:35
    自定义线程池创建API 参数详解 自定义线程池创建demo
    1. 自定义线程池创建API
      线程池创建通过JUC的接口 Executor 实现, 平时我们使用其实现类 ThreadPoolExecutor 实现自定义线程池。
      常用构造函数:
          public ThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
              if (corePoolSize < 0 ||
                  maximumPoolSize <= 0 ||
                  maximumPoolSize < corePoolSize ||
                  keepAliveTime < 0)
                  throw new IllegalArgumentException();
              if (workQueue == null || threadFactory == null || handler == null)
                  throw new NullPointerException();
              this.acc = System.getSecurityManager() == null ?
                      null :
                      AccessController.getContext();
              this.corePoolSize = corePoolSize;
              this.maximumPoolSize = maximumPoolSize;
              this.workQueue = workQueue;
              this.keepAliveTime = unit.toNanos(keepAliveTime);
              this.threadFactory = threadFactory;
              this.handler = handler;
          }

       

    2. 参数详解
      corePoolSize:核心线程数。
      maximumPoolSize:最大线程数
      keepAliveTime:空闲最大存活时间
      unit:存活时间单位
      workQueue:任务缓存队列
      threadFactory:线程工厂类
      handler:拒绝策略
      线程池创建线程的规则:默认情况下,线程池创建时启动的线程为0,当任务执行时创建线程来执行任务,并发任务时,首先创建的线程数为设置的corePoolSize ,多余的任务存到 workQueue 队列中如果队列满了之后,且设置的 maximumPoolSize 大于 corePoolSize ,则会创建新的线程执行任务,对于大于 corePoolSize 数量的线程,再空闲 keepAliveTime 时间后会销毁线程。如果创建的线程达到 maximumPoolSize 还有并发任务等待,则会执行 设置的拒绝策略。

      这里有个点需要注意:只有队列满了之后才会创建新的线程数量至maximumPoolSize ,和执行拒绝策略,所以如果队列的大小必须是有界队列,这也是不使用 Executors 默认五种api创建线程池的原因。
    3. 自定义线程池创建demo 
      private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,16,10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(400),
              Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
      这里设置的核心线程池10,最大线程池16,空闲线程时间10分钟,队列大小为400,默认工厂,抛出异常策略创建。
    4. 队列选择
    5. 线程工厂类
      线程工厂类顾名思义指定线程创建过程,默认的线程创建源码为:
          static class DefaultThreadFactory implements ThreadFactory {
              private static final AtomicInteger poolNumber = new AtomicInteger(1);
              private final ThreadGroup group;
              private final AtomicInteger threadNumber = new AtomicInteger(1);
              private final String namePrefix;
      
              DefaultThreadFactory() {
                  SecurityManager s = System.getSecurityManager();
                  group = (s != null) ? s.getThreadGroup() :
                                        Thread.currentThread().getThreadGroup();
                  namePrefix = "pool-" +
                                poolNumber.getAndIncrement() +
                               "-thread-";
              }
      
              public Thread newThread(Runnable r) {
                  Thread t = new Thread(group, r,
                                        namePrefix + threadNumber.getAndIncrement(),
                                        0);
                  if (t.isDaemon())
                      t.setDaemon(false);
                  if (t.getPriority() != Thread.NORM_PRIORITY)
                      t.setPriority(Thread.NORM_PRIORITY);
                  return t;
              }
          }

      主要是实现 ThreadFactory 实现 newThread 来指定创建线程的过程 ,从源码中我们可以看出,指定线程为用户线程(非守护线程),设置优先级为默认优先级。 使用 Thread的构造函数创建Thread对象。Thread的构造函数这里不展开介绍
      我们也可以自定义一个线程工厂类,例:
       

      public class NameThreadFactory implements ThreadFactory {
          @Override
          public Thread newThread(Runnable r) {
              Thread thread = new Thread(r);
              return thread;
          }
      }

      这里使用最简单方式演示,当然我们也可以使用类似默认线程池工厂类类似,指定名称,线程组更多属性。

    6. 拒绝策略
       当线程池的任务缓存队列已满,且线程池的线程数据达到maximumPoolSize 时,如果还会任务来到就会采用配置的拒绝策略,拒绝策略实现 RejectedExecutionHandler 接口,通过查看类结构可以看到线程池提供拒绝策略常用的有以下:

      ThreadPoolExecutor.AbortPolicy :丢弃任务并发抛出 RejectedExecutionException 异常 ;

      ThreadPoolExecutor.DiscardPolicy : 丢弃任务但是不抛出异常

      ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

      ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理改任务
    7. 线程池简单使用
          /**
           * 异步发送消息
           *
           * @param msgReq
           * @return
           */
          public static boolean sendMsgAsync(MsgReq msgReq) {
              threadPoolExecutor.execute(() -> {
                  sendMsgSync(msgReq);
              });
              return true;
          }

       

    展开全文
  • 自定义线程池实现

    2020-11-26 09:37:21
    线程池 - 自定义线程池实现 用线程池来管理线程,减少系统消耗,线程数可控。 目录 线程池 - 自定义线程池实现 一、自定义线程池代码 package com.yuantiaokj.commonmodule.threadpool; import org.spring...

    线程池 - 自定义线程池实现

    用线程池来管理线程,减少系统消耗,线程数可控。


    目录

    线程池 - 自定义线程池实现

    一、自定义线程池代码 

    二、测试


    一、自定义线程池代码 

    自定义使用 ThreadPoolTaskExecutor 

    package com.yuantiaokj.commonmodule.threadpool;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * ***********************************************************
     * Copyright © 2019  Inc.All rights reserved.  *    *
     * **********************************************************
     *
     * @program: pay_inspector
     * @name: TaskExecutePool
     * @author: Mr.Cnzz
     * @create: 2019-11-28 15:25
     * @description: 创建线程池
     **/
    
    
    @Configuration
    @EnableAsync
    public class TaskExecutePool {
    
        /**
         * 1.这种形式的线程池配置是需要在使用的方法上面@Async("taskExecutor"),
         * 2.如果在使用的方法上面不加该注解那么spring就会使用默认的线程池
         * 3.所以如果加@Async注解但是不指定使用的线程池,又想自己定义线程池那么就可以重写spring默认的线程池
         * 4.所以第二个方法就是重写默认线程池
         * 注意:完全可以把线程池的参数写到配置文件中
         */
        @Autowired
        private TaskThreadPoolConfig config;
    
        @Bean("taskExecutePoolCnzz")
        public Executor taskExecutePoolCnzz() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("taskExecutePoolCnzz");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.initialize();
    
            return executor;
        }
    
    }
    
    

     

    配置类

    package com.yuantiaokj.commonmodule.threadpool;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * ***********************************************************
     * Copyright © 2019  Inc.All rights reserved.  *    *
     * **********************************************************
     *
     * @program: pay_inspector
     * @name: TaskThreadPoolConfig
     * @author: Mr.Cnzz
     * @create: 2019-11-28 15:21
     * @description: 线程池配置类
     **/
    @ConfigurationProperties(prefix = "task.pool")
    @Data
    @Component
    public class TaskThreadPoolConfig {
        
        //核心线程
        private int corePoolSize;
        //最大线程
        private int maxPoolSize;
        //空闲时间
        private int keepAliveSeconds;
        //队列容量
        private int queueCapacity;
    }
    

    yml 配置线程池参数

    #线程池配置参数
    task:
      pool:
        corePoolSize: 20 #设置核心线程数
        maxPoolSize: 100  #设置最大线程数
        keepAliveSeconds: 60 #设置线程活跃时间(秒)
        queueCapacity: 50 #设置队列容量

     

    启动类Application 添加异步注解

    //开启线程异步支持
    @EnableAsync
    // 开启配置属性支持
    @EnableConfigurationProperties({TaskThreadPoolConfig.class})

     

    使用,springboot直接使用@Async异步执行

     


    二、测试

      @Async
        public void printNum(String str) {
            log.info("str={}", str);
        }
     @Resource
        AsyncTest asyncTest;
    
        @Test
        public void testAsync(){
            for (int i = 0; i <20 ; i++) {
                asyncTest.printNum(String.valueOf(i));
            }
    
        }

    这里注意调用方法要在不同的类才会生效,由于容器加载原因,在同一个类时需要容器重新加载下。

    1、队列,最大线程数 都较小的情况下,会生成到10个线程,触发饱和策略,也会有主线程执行。

    测试配置,实际按环境情况配置,这里为了看到测试效果

    #线程池配置参数
    task:
      pool:
        corePoolSize: 2 #设置核心线程数
        maxPoolSize: 10  #设置最大线程数
        keepAliveSeconds: 10 #设置线程活跃时间(秒)
        queueCapacity: 5 #设置队列容量

     

     

     

    2、增大队列容量,让在队列等待,只有2个主线程在跑

    #线程池配置参数
    task:
      pool:
        corePoolSize: 2 #设置核心线程数
        maxPoolSize: 10  #设置最大线程数
        keepAliveSeconds: 10 #设置线程活跃时间(秒)
        queueCapacity: 20 #设置队列容量

     

     

    3、稍微减少队列,看下当队列满了,及未达到线程总数,未饱和的情况

    #线程池配置参数
    task:
      pool:
        corePoolSize: 2 #设置核心线程数
        maxPoolSize: 20  #设置最大线程数
        keepAliveSeconds: 10 #设置线程活跃时间(秒)
        queueCapacity: 15 #设置队列容量

     

     

    具体参数配置及原理可参考   线程池-线程池参数及配置  https://blog.csdn.net/xinpz/article/details/110132365


     

    展开全文
  • 主要介绍了Python自定义线程池实现方法,结合实例形式较为详细的分析了Python自定义线程池的概念、原理、实现方法及相关注意事项,需要的朋友可以参考下
  • 文章目录一、定时和异步业务场景描述二、定时调度任务的实现方式三、定时调度任务的问题...本案例主要讲述如何通过 @Scheduled 注解来完成定时任务的调度工作,以及非定时任务的接口异步任务的自定义线程池的配置工作。

    一、定时和异步业务场景描述

    • 在项目的开发过程中,定时任务调度的场景是非常常见的项目功能,随着业务的不断迭代和日益复杂化,可能一个项目中还会出现定时任务调度和异步接口调度共同存在的情况。

    • 本案例主要讲述如何通过 @Scheduled 注解来完成定时任务的调度工作,以及非定时任务的接口异步任务的自定义线程池的配置工作。

    二、定时调度任务的实现方式

    1. 在启动类添加 @EnableScheduling 注解,用以允许应用程序启动定时任务调度器

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * Spring Boot Scheduled Sample 演示启动类
       *
       * @author  Rambo
       * @date    2021/02/19 17:06
       * @since   1.0.0.1
       */
      @SpringBootApplication
      @EnableScheduling
      public class SpringBootScheduleSampleApplication {
          public static void main(String[] args) {
              SpringApplication.run(SpringBootScheduleSampleApplication.class, args);
          }
      
    2. 编写定时任务

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * 定时任务组件类
       *
       * @author  Rambo
       * @date    2021/2/20 09:50
       * @since   1.0.0.1
       */
      @Component
      @Slf4j
      public class ScheduledTasks {
          @Scheduled(cron = "0/1 * * * * ?")
          public void scheduledCron1() throws InterruptedException {
              // 模拟该任务响应时间较长,导致所有采用 @Scheduled 注解调度的定时任务都被阻塞
              TimeUnit.SECONDS.sleep(10);
              log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron1()],执行频率:1秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());
          }
      
          @Scheduled(cron = "0/2 * * * * ?")
          public void scheduledCron2() {
              log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron2()],执行频率:2秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());
          }
      
          @Scheduled(cron = "0/3 * * * * ?")
          public void scheduledCron3() {
              log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron3()],执行频率:3秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());
          }
      }
      

      P.S
      @Scheduled 注解所支持的参数

      参数属性属性描述
      croncron表达式,指定任务在特定时间执行
      fixedDelay表示上一次任务执行完成后多久再次执行,参数类型为long,单位ms
      fixedDelayString与fixedDelay含义一样,只是参数类型变为String
      fixedRate表示按一定的频率执行任务,参数类型为long,单位ms
      fixedRateString与fixedRate的含义一样,只是将参数类型变为String
      initialDelay表示延迟多久再第一次执行任务,参数类型为long,单位ms
      initialDelayString与initialDelay的含义一样,只是将参数类型变为String
      zone时区,默认为当前时区,一般没有用到
    3. 查看启动效果
      1

    三、定时调度任务的问题描述

    • 按照以上方式配置,不管在哪个类中,只要采用 @Scheduled 注解来调度任务方法,所有的任务方法都默认使用同一个线程池中的同一个线程来进行任务调度。

    • 所有采用 @Scheduled 注解的方法,可以交替执行(表面上),但是如果任何一个被调度的方法响应时间过长或者执行时间过长,将会导致所有被 @Scheduled 注解调度的任务呈阻塞状态。

    • 一旦出现阻塞状态,所谓的定时调度也就失去了原本预定的意义,需要特别注意。

    四、定时调度多线程解决方案(方案一)

    1. 配置文件配置任务调度线程池的配置信息

      # 自定义调度任务线程池
      schedule-pool:
        # 核心线程池大小
        core-pool-size: 20
        # 自定义线程名称
        thread-name-prefix: SCHEDULE-BIZ-
        # 设置终止等待时间:秒
        await-termination-time: 60
        # 线程结束前,是否等待线程队列中的任务执行完成
        wait-tasks-complete: true
        # 线程拒绝策略(ABORT_POLICY、CALLER_RUNS_POLICY、DISCARD_OLDEST_POLICY、DISCARD_POLICY)
        rejected-policy: CALLER_RUNS_POLICY
      
    2. 创建配置类读取配置文件的配置信息

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * "@Scheduled" 注解定时调度器线程池配置类
       *
       * @author  Rambo
       * @date    2021/2/20 16:56
       * @since   1.0.0.1
       */
      @Component
      @ConfigurationProperties(prefix = "schedule-pool")
      @Data
      public class SchedulePoolConfig {
      
          /** 核心线程池大小*/
          private int corePoolSize = 1;
      
          /** 自定义线程名称*/
          private String threadNamePrefix = "SCHEDULE";
      
          /** 设置终止等待时间:秒*/
          private int awaitTerminationTime = 30;
      
          /** 线程结束前,是否等待线程队列中的任务执行完成*/
          private boolean waitTasksComplete = true;
      
          /** 线程拒绝策略 RejectedPolicy.java*/
          private String rejectedPolicy = "ABORT_POLICY";
      }
      
    3. 根据自定义线程配置信息创建 ThreadPoolTaskScheduler 线程池

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * 线程拒绝策略常量类
       *
       * @author  Rambo
       * @date    2021/2/22 14:14
       * @since   1.0.0.1
       */
      public interface RejectedPolicy {
          /** 默认的拒绝策略,会 throw RejectedExecutionException 拒绝*/
          String ABORT_POLICY = "ABORT_POLICY";
      
          /** 提交任务的主线程自己去执行该任务*/
          String CALLER_RUNS_POLICY = "CALLER_RUNS_POLICY";
      
          /** 丢弃最老的任务,然后把新任务加入到工作队列*/
          String DISCARD_OLDEST_POLICY = "DISCARD_OLDEST_POLICY";
      
          /** 直接丢弃任务,没有任何异常抛出*/
          String DISCARD_POLICY = "DISCARD_POLICY";
      }    
      
      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * 多线程并发调度任务配置类
       *
       * 此配置弊端:线程池大小无界限、默认线程队列长度 16
       *
       * @author  Rambo
       * @date    2021/2/20 10:36
       * @since   1.0.0.1
       */
      @Configuration
      @Slf4j
      public class ScheduleThreadPoolExecutor implements SchedulingConfigurer {
      
          @Resource
          private SchedulePoolConfig poolConfig;
      
          @Override
          public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
              // 未自定义线程池,线程池没有界限,有出现 OOM 的风险
              // taskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
      
              // 自定义线程池
              taskRegistrar.setScheduler(taskScheduler());
          }
      
          /**
           * destroyMethod = shutdown 进程结束前,执行完成等待队列中的所有任务后退出应用程序
           *
           * @author  Rambo
           * @date    2021/2/22 14:26
           * @return  org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
           */
          @Bean(destroyMethod = "shutdown")
          public ThreadPoolTaskScheduler taskScheduler() {
              // 1. 实例化任务调度线程
              ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
              // 2. 设置线程池大小
              scheduler.setPoolSize(poolConfig.getCorePoolSize());
              // 3. 设置线程名称
              scheduler.setThreadNamePrefix(poolConfig.getThreadNamePrefix());
              // 4. 设置等待终止时间:秒
              scheduler.setAwaitTerminationSeconds(poolConfig.getAwaitTerminationTime());
              // 5. 进程结束前,等待线程队列中的任务执行完成
              scheduler.setWaitForTasksToCompleteOnShutdown(poolConfig.isWaitTasksComplete());
              // 6. 设置拒绝策略
              // setRejectedExecutionHandler:当线程池已经达到 max size 的时候,如何处理新任务
              // AbortPolicy          默认的拒绝策略,会 throw RejectedExecutionException 拒绝
              // CallerRunsPolicy     提交任务的主线程自己去执行该任务
              // DiscardOldestPolicy  丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
              // DiscardPolicy        相当大胆的策略,直接丢弃任务,没有任何异常抛出
              scheduler.setRejectedExecutionHandler(chooseRejectedPolicy(poolConfig.getRejectedPolicy()));
              // 7. 设置异常输出格式
              scheduler.setErrorHandler(throwable -> log.error("调度任务发生异常", throwable));
              log.info("------>== @Schedule 业务处理线程配置成功,核心线程池:[{}],程名称前缀:[{}] ==<------",poolConfig.getCorePoolSize(), poolConfig.getThreadNamePrefix());
              return scheduler;
          }
      
          /**
           * 实例化线程拒绝策略
           *
           * @author  Rambo
           * @date    2021/2/22 14:23
           * @param	rejectedPolicy  拒绝策略枚举
           * @return  java.util.concurrent.RejectedExecutionHandler
           */
          private RejectedExecutionHandler chooseRejectedPolicy(String rejectedPolicy) {
              RejectedExecutionHandler handler;
              switch (rejectedPolicy) {
                  case RejectedPolicy.CALLER_RUNS_POLICY:
                      handler = new ThreadPoolExecutor.CallerRunsPolicy();
                      break;
                  case RejectedPolicy.DISCARD_OLDEST_POLICY:
                      handler = new ThreadPoolExecutor.DiscardOldestPolicy();
                      break;
                  case RejectedPolicy.DISCARD_POLICY:
                      handler = new ThreadPoolExecutor.DiscardPolicy();
                      break;
                  default:
                      handler = new ThreadPoolExecutor.AbortPolicy();
                      break;
              }
              return handler;
          }
      }    
      

      2

    弊端

    • 如果自定义线程池配置不合理,调度任务数量大于线程池数量,并且各个线程都处于工作状态,那么新来的任务将会被阻塞,等待前面的线程执行完成后,再被执行

    • 线程池大小无界限、默认线程队列长度 16

    五、异步多线程程序实现方式

    1. 应用程序启动类或者需要异步的实现类上添加 @EnableAsync 注解

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * Spring Boot Scheduled Sample 演示启动类
       *
       * @author  Rambo
       * @date    2021/02/19 17:06
       * @since   1.0.0.1
       */
      @SpringBootApplication
      @EnableScheduling
      @EnableAsync
      public class SpringBootScheduleSampleApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(SpringBootScheduleSampleApplication.class, args);
          }
      }
      
    2. 配置文件配置异步任务线程池的配置信息

      # 自定义异步任务线程池
      async-pool:
        # 核心线程池大小
        core-pool-size: 20
        # 最大线程数大小
        maximum-pool-size: 40
        # 活跃时间:秒
        keep-alive-seconds: 300
        # 线程等待队列大小
        queue-capacity: 50
        # 自定义线程名称前缀
        thread-name-prefix: ASYNC-BIZ-
        # 设置终止等待时间:秒
        await-termination-time: 60
        # 线程结束前,是否等待线程队列中的任务执行完成
        wait-tasks-complete: true   
      
    3. 创建配置类读取配置文件异步任务线程池的配置信息

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * @Async 注解异步线程池配置类,如果需要根据不同业务指定多个不同的异步线程池,则可以通过 @Async 指定不同自定义线程池的方式实现
       *
       * @author  Rambo
       * @date    2021/2/22 11:28
       * @since   1.0.0.1
       */
      @Configuration
      @ConfigurationProperties(prefix = "async-pool")
      @Data
      public class AsyncPoolConfig {
          /** 核心线程池大小*/
          private int corePoolSize = 10;
      
          /** 最大线程数大小*/
          private int maximumPoolSize = 20;
      
          /** 活跃时间:秒*/
          private int keepAliveSeconds = 60;
      
          /** 线程等待队列大小*/
          private int queueCapacity = 30;
      
          /** 自定义线程名称前缀*/
          private String threadNamePrefix = "ASYNC-";
      
          /** 设置终止等待时间:秒*/
          private int awaitTerminationTime = 30;
      
          /** 线程结束前,是否等待线程队列中的任务执行完成*/
          private boolean waitTasksComplete = true;
      }
      
    4. 根据自定义线程配置信息创建 ThreadPoolTaskExecutor 线程池

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * "@Async" 注解 异步任务自定义线程池装配类
       *
       * @author  Rambo
       * @date    2021/1/13 18:55
       * @since   1.0.0.1
       */
      @Configuration
      @Slf4j
      public class AsyncThreadPoolExecutor {
      
          @Resource
          private AsyncPoolConfig poolConfig;
      
          @Bean
          public Executor asyncExecutor() {
              // 1. 实例化异步任务线程池
              ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              // 2. 设置核心线程池大小
              executor.setCorePoolSize(poolConfig.getCorePoolSize());
              // 3. 设置最大线程数
              executor.setMaxPoolSize(poolConfig.getMaximumPoolSize());
              // 4. 设置线程等待队列大小
              executor.setQueueCapacity(poolConfig.getQueueCapacity());
              // 5. 设置活跃时间:秒
              executor.setKeepAliveSeconds(poolConfig.getKeepAliveSeconds());
              // 6. 设置线程名字前缀
              executor.setThreadNamePrefix(poolConfig.getThreadNamePrefix());
              // setRejectedExecutionHandler:当线程池已经达到 max size 的时候,如何处理新任务
              // AbortPolicy          默认的拒绝策略,会 throw RejectedExecutionException 拒绝
              // CallerRunsPolicy     提交任务的主线程自己去执行该任务
              // DiscardOldestPolicy  丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
              // DiscardPolicy        相当大胆的策略,直接丢弃任务,没有任何异常抛出
              executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
              // 7. 设置等待终止时间:秒
              executor.setAwaitTerminationSeconds(poolConfig.getAwaitTerminationTime());
              // 8. 进程结束前,等待线程队列中的任务执行完成
              executor.setWaitForTasksToCompleteOnShutdown(poolConfig.isWaitTasksComplete());
              // 9. 手动初始化线程池
              executor.initialize();
              log.info("------>== @Async 业务处理线程配置成功,核心线程池:[{}],最大线程池:[{}],队列容量:[{}],线程名称前缀:[{}] ==<------",poolConfig.getCorePoolSize(), poolConfig.getMaximumPoolSize(), poolConfig.getQueueCapacity(), poolConfig.getThreadNamePrefix());
              return executor;
          }
      
          @Bean
          public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
              return (throwable, method, objects) -> {
                  log.error("@Async 业务 ----> 异常精简信息:[{}],异常Throwable:{}", throwable.getMessage(), throwable);
                  log.error("@Async 业务 ----> 触发异常的方法名称:{}", method.getName());
              };
          }
      }
      
    5. 在需要使用异步线程的方法上添加 @Async(value = "自定义ThreadPoolTaskExecutor的 Bean 的方法名")

      /**
       * Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd
       * 异步任务线程演示控制器
       *
       * @author  Rambo
       * @date    2021/2/22 16:09
       * @since   1.0.0.1
       */
      @RestController
      @RequestMapping("/async")
      @Slf4j
      public class AsyncController {
      
          @GetMapping("/info")
          @Async(value = "asyncExecutor")
          public void info() throws InterruptedException {
              log.info("-------------> 调度线程名称:[{}],被调度方法名称:[AsyncController.info()],当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());
              // 模拟业务任务处理耗时
              TimeUnit.SECONDS.sleep(5);
          }
      }
      
    6. 频繁疯狂请求的效果

      3

    六、定时调度多线程解决方案(方案二)

    1. 根据上一步骤,我们可以知道,采用 @Async 注解也可以自定义线程池来实现异步任务

    2. 那么我们是否可以将 @Async 注解 和 @Schedule 注解同时使用,来解决默认 @Schedule 注解同一线程池同一线程处理的弊端?

    3. 代码实现

      @Async(value = "asyncExecutor")
      @Scheduled(cron = "0/2 * * * * ?")
      public void scheduledCron2() throws InterruptedException {
          log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron2()],执行频率:2秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());
          // 模拟该任务响应时间
          TimeUnit.SECONDS.sleep(20);
      }
      
    4. 调度效果

      4

    展开全文
  • SpringBoot 自定义线程池

    千次阅读 2019-07-03 21:10:16
    2.自定义线程池有两种方法,第一种自定义线程池然后使用自己的自定义的,第二种重写spring默认的线程池,然后使用自己重写过的线程池 一:自定义线程池 1.1 修改application.yml #线程池配置参数 task: pool: ...

    1.我们都知道spring只是为我们简单的处理线程池,每次用到线程总会new 一个新的线程,效率不高,所以我们需要自定义一个线程池。

    2.自定义线程池有两种方法,第一种自定义线程池然后使用自己的自定义的,第二种重写spring默认的线程池,然后使用自己重写过的线程池

    一:自定义线程池

    1.1 修改application.yml

    #线程池配置参数
    task:
      pool:
        corePoolSize: 5 #设置核心线程数
        maxPoolSize: 20  #设置最大线程数
        keepAliveSeconds: 300 #设置线程活跃时间(秒)
        queueCapacity: 50 #设置队列容量

    1.2 线程池配置属性类TaskThreadPoolConfig .java

    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * 线程池配置属性类
     */
    @ConfigurationProperties(prefix = "task.pool")
    public class TaskThreadPoolConfig {
        private int corePoolSize;
    
        private int maxPoolSize;
    
        private int keepAliveSeconds;
    
        private int queueCapacity;
        ...getter and setter methods...
    }

    1.3  注意启动类上一定要开启线程异步支持

    @EnableAsync
    @EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持

    1.4 创建线程池 TaskExecutePool .java

    /**
     * 创建线程池配置类
     */
    @Configuration
    public class TaskExecutePool {
    
        @Autowired
        private TaskThreadPoolConfig config;
    
        /**
         * 1.这种形式的线程池配置是需要在使用的方法上面@Async("taskExecutor"),
         * 2.如果在使用的方法上面不加该注解那么spring就会使用默认的线程池
         * 3.所以如果加@Async注解但是不指定使用的线程池,又想自己定义线程池那么就可以重写spring默认的线程池
         * 4.所以第二个方法就是重写默认线程池
         * 注意:完全可以把线程池的参数写到配置文件中
         */
    
        @Bean
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("TaskExecutePool-");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
             // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.initialize();
            return executor;
        }
    }
    

    1.5 测试方法

    
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiOperation;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author qijx
     */
    @Api(description = "测试控制类11111")
    @RestController
    @RequestMapping("/threadPoolController1")
    public class ThreadPoolController1 {
    
            @Autowired
            private ThreadPoolService1 threadPoolService;
    
    
            @ApiOperation(value = "测试方法")
            @ResponseBody
            @RequestMapping(value = "/test",method = RequestMethod.GET)
            public String threadPoolTest() {
                threadPoolService.executeAsync();
                return "hello word!";
            }
    
    
    }

    1.6 service测试方法

    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    /**
     * @author qijx
     * @date 2019-07-03 17:40
     */
    
    @Service
    public class ThreadPoolService1 {
        private static final Logger logger = LoggerFactory.getLogger(ThreadPoolService1.class);
    
        @Async("taskExecutor") //指定使用那个线程池配置,不然会使用spring默认的线程池
        public void executeAsync() {
            logger.info("start executeAsync");
            try {
                System.out.println("当前运行的线程名称:" + Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info("end executeAsync");
        }
    
    
    
    }

    二:配置默认的线程池

    2.1 第一种方式的那个线程池使用时候总要加注解@Async("taskExecutor"),而这种方式是重写spring默认线程池的方式,使用的时候只需要加@Async注解就可以,不用去声明线程池类。

    2.2 这个和上面的TaskThreadPoolConfig类相同,这里不重复

    2.3 NativeAsyncTaskExecutePool.java 装配线程池

    **
     * 原生(Spring)异步任务线程池装配类,实现AsyncConfigurer重写他的两个方法,这样在使用默认的
     *  线程池的时候就会使用自己重写的
     */
    @Slf4j
    @Configuration
    public class NativeAsyncTaskExecutePool implements AsyncConfigurer{
    
    
        //注入配置类
        @Autowired
        TaskThreadPoolConfig config;
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程池大小
            executor.setCorePoolSize(config.getCorePoolSize());
            //最大线程数
            executor.setMaxPoolSize(config.getMaxPoolSize());
            //队列容量
            executor.setQueueCapacity(config.getQueueCapacity());
            //活跃时间
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            //线程名字前缀
            executor.setThreadNamePrefix("NativeAsyncTaskExecutePool-");
    
            // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
            // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.initialize();
            return executor;
        }
    
    
        /**
         *  异步任务中异常处理
         * @return
         */
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new AsyncUncaughtExceptionHandler() {
    
                @Override
                public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
                    log.error("=========================="+arg0.getMessage()+"=======================", arg0);
                    log.error("exception method:"+arg1.getName());
                }
            };
        }
    }

    2.4 测试controller

    /**
     * @author qijx
     */
    @Api(description = "测试控制类22222")
    @RestController
    @RequestMapping("/threadPoolController2")
    public class ThreadPoolController2 {
            @Autowired
            private ThreadPoolService2 threadPoolService;
    
            @ApiOperation(value = "测试方法")
            @ResponseBody
            @RequestMapping(value = "/test",method = RequestMethod.GET)
            public String threadPoolTest() {
                threadPoolService.executeAsync();
                return "hello word!";
            }
    }
    

    2.5 测试service方法

    /**
     * @author qijx
     */
    
    @Service
    public class ThreadPoolService2 {
        private static final Logger logger = LoggerFactory.getLogger(ThreadPoolService2.class);
    
        /**
         * @Async该注解不需要在指定任何bean
         */
        @Async
        public void executeAsync() {
            logger.info("start executeAsync");
            try {
                System.out.println("当前运行的线程名称:" + Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info("end executeAsync");
        }
    
    
    }

     

    展开全文
  • 通过观察Java中的内置线程池参数和线程池工作流程总结,从而发现要设计好一个好的线程池,就必须合理的设置线程池的4个参数 1、核心线程数(corePoolSize) 核心线程数的设计需要根据“任务的处理时间”和“每秒...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 84,575
精华内容 33,830
关键字:

如何自定义线程池