精华内容
下载资源
问答
  • 如何自定义线程池

    2020-09-20 21:51:08
    如何自定义线程池 //自定义线程池 public class ThreadPoolExecutorDemo { public static void main(String[] args) { //得到自己电脑cpu是几核的 一般maximumPoolSize默认 为自己电脑cpu为几核 +1 或者 +2 ...

    如何自定义线程池

    //自定义线程池
    public class ThreadPoolExecutorDemo {
        public static void main(String[] args) {
            //得到自己电脑cpu是几核的 一般maximumPoolSize默认 为自己电脑cpu为几核 +1 或者 +2
            System.out.println(Runtime.getRuntime().availableProcessors());
            ExecutorService threadPool = new ThreadPoolExecutor(
                    2,
                    9,
                    2L,
                    TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    Executors.defaultThreadFactory(),
                    //拒绝访问策略
                    /**
                     * new ThreadPoolExecutor.AbortPolicy() :AbortPolicy(默认)
                     *      直接抛出RejectedExecutionException异常阻止系统正常运行
                     * DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
                     * 尝试再次提交当前任务。
                     * DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。
                     * 如果允许任务丢失,这是最好的一种策略。
                     */
                    //CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
                    new ThreadPoolExecutor.CallerRunsPolicy()
                    );
             try{
                 for (int i = 1; i <=10 ; i++) {
                    threadPool.execute(
                            new Thread(()->{
                                System.out.println(Thread.currentThread().getName()+"开始执行工作");
                            })
                    );
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭线程池
                threadPool.shutdown();
            }
        }
    }

    可以仿照底层ThreadPoolExecutor源码来进行编写

    展开全文
  • 线程池各个参数详解以及如何自定义线程池

    万次阅读 多人点赞 2019-01-06 21:59:46
    虽然jdk提供了几种常用特性的线程池给我们,但是很多时候,我还是需要自己去自定义自己需要特征的线程池。并且阿里巴巴规范手册里面,就是不建议使用jdk的线程池,而是建议程序员手动创建线程池。 为什么呢?原文...

    一:为什么要自己定义线程池

    虽然jdk提供了几种常用特性的线程池给我们,但是很多时候,我还是需要自己去自定义自己需要特征的线程池。并且阿里巴巴规范手册里面,就是不建议使用jdk的线程池,而是建议程序员手动创建线程池。
    为什么呢?原文如下
    【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
    说明:Executors 返回的线程池对象的弊端如下:

    1、FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

    2、CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

    但是除此之外,自己通过ThreadPoolExecutor定义线程池还有很多好处。
    比如说,
    1.自己根据需求定义自己的拒绝策略,如果线程过多,任务过多 如何处理。
    2.补充完善的线程信息,比如线程名,这一点在将来如果出现线上bug的时候,你会感谢自己,因为你绝不想在线上看到什么threa-1 threa-2 等这种线程爆出的错误,而且是看到自己 “处理xxx线程 错误”,可以一眼看到
    3.可以通过ThreadPoolExecutor的beforExecute(),
    afterExecute()和terminated()方法去拓展对线程池运行前,运行后,结束后等不同阶段的控制。比如说通过拓展打印日志输出一些有用的调试信息。在故障诊断是非常有用的。
    4.可以通过自定义线程创建,可以自定义线程名称,组,优先级等信息,包括设置为守护线程等,根据需求。

    二: 线程池详解

    之前我们在Executors工厂里面看到的方法,

     /*
        newFixedThreadPool 固定线程线程数量的线程池,该线程池内的线程数量始终不变,  
        如果任务到来,内部有空闲线程,则立即执行,如果没有或任务数量大于线程数,多出来的任务,  
        则会被暂存到任务队列中,待线程空闲,按先入先出的顺序处理。
        该任务队列是LinkedBlockingQueue,是无界队列,如果任务数量特别多,可能会导致内存不足
        */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
        
        
        /*
        newSingleThreadExecutor(),该方法返回一个只有一个线程的线程池,多出来的任务会被放到任务队列内,  
        待线程空闲,按先入先出的顺序执行队列中的任务。队列也是无界队列
        */
         public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        
    

    可以看到,本质都是对ThreadPoolExecutor进行封装。
    那么强大的ThreadPoolExecutor又是如何使用?
    先看构造方法定义

          public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    
    

    1.corePoolSize 指定了线程池里的线程数量
    2.maximumPoolSize 指定了线程池里的最大线程数量
    3.keepAliveTime 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁。
    4.unit 时间单位
    5.workQueue 任务队列,用于存放提交但是尚未被执行的任务。
    6.threadFactory 线程工厂,用于创建线程,一般可以用默认的
    7.handler 拒绝策略,当任务过多时候,如何拒绝任务。

    主要是workQueue和handler的差异比较大

    workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。
    ThreadPoolExecutor的构造函数中,可使用以下几种BlockingQueue

    1.直接提交队列: 即SynchronousQueue ,这是一个比较特殊的BlockKingQueue, SynchronousQueue没有容量,每一个插入操作都要等待对应的删除操作,反之 一个删除操作都要等待对应的插入操作。 也就是如果使用SynchronousQueue,提交的任务不会被真实保存,而是将新任务交给空闲线程执行,如果没有空闲线程,则创建线程,如果线程数都已经大于最大线程数,则执行拒绝策略。使用这种队列,需要将maximumPoolSize设置的非常大,不然容易执行拒绝策略。比如说

    没有最大线程数限制的newCachedThreadPool()

     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    但是这个在大量任务的时候,会启用等量的线程去处理,有风险造成系统资源不足。

    2.有界任务队列。 有界任务队列可以使用ArrayBlockingQueue实现。需要给一个容量参数表示该队列的最大值。当有新任务进来时,如果当前线程数小于corePoolSize,则会创建新线程执行任务。如果大于,则会将任务放到任务队列中,如果任务队列满了,在当前线程小于将maximumPoolSize的情况下,将会创建新线程,如果大于maximumPoolSize,则执行拒绝策略。
    也就是,一阶段,当线程数小于coresize的时候,创建线程;二阶段,当线程任务数大于coresize的时候,放入到队列中;三阶段,队列满,但是还没大于maxsize的时候,创建新线程。 四阶段,队列满,线程数也大于了maxsize, 则执行拒绝策略。
    可以发现,有界任务队列,会大概率将任务保持在coresize上,只有队列满了,也就是任务非常繁忙的时候,会到达maxsie。

    3.无界任务队列。
    使用linkedBlockingQueue实现,队列最大长度限制为integer.MAX。无界任务队列,不存在任务入队失败的情况, 当任务过来时候,如果线程数小于coresize ,则创建线程,如果大于,则放入到任务队列里面。也就是,线程数几乎会一直维持在coresize大小。FixedThreadPool和singleThreadPool即是如此。 风险在于,如果任务队列里面任务堆积过多,可能导致内存不足。
    4.优先级任务队列。使用PrioriBlockingQueue ,特殊的无界队列,和普通的先进先出队列不同,它是优先级高的先出。

    因此 ,自定义线程池的时候,应该根据实际需要,选择合适的任务队列应对不同的场景。

    这里给出ThreadPool的核心调度代码

    //runnable为线程内的任务
     public void execute(Runnable command) {
     //为null则抛出异常
            if (command == null)
                throw new NullPointerException();
           
           //获取工作的线程数量
            int c = ctl.get();
            //数量小于coresize
            if (workerCountOf(c) < corePoolSize) {
            //addWorker 直接执行新线程
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            
            //如果大于corsize 则放到等待队列中
            //workQueue.offer()表示放到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果放到队列中失败,直接提交给线程池执行
            //如果提交失败,则执行reject() 拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    jdk内置的拒绝策略如下:

    拒绝策略
    1.AbortPolicy 该策略会直接抛出异常,阻止系统正常工作。
    2.CallerRunsPolic 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,这样做不会真正的丢弃任务,但是 任务提交线程的性能可能会急剧下降。
    3.DisCardOledestPolicy 策略: 该策略默默地丢弃无法处理的任务,不予任务处理,如果允许任务丢失,这是最好的方法了。

    jdk内置的几种线程池,主要采用的是AbortPolicy 策略,会直接抛出异常,defaultHandler如下。

    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    

    以上内置的策略均实现了RejectedExecutionHandler接口,因此我们也可以实现这个接口,自定义我们自己的策略。

     public static class AbortPolicy implements RejectedExecutionHandler {
          
            public AbortPolicy() { }
    
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    最后的最后,所有的线程池里面,线程是从哪里来的?
    答案是ThreadFactory
    这个是一个接口,里面只有一个方法,用来创建线程

    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    

    线程池的默认ThreadFactory如下

      static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
    
    

    同理我们也可以通过实现ThreadFactory来定义我们自己的线程工厂,比如说自定义线程名称,组,优先级等信息,可以跟着线程池究竟在何时创建了多少线程等等。

    展开全文
  • ThreadPoolExecutor:线程池的创建方式有许多种,如FixedThreadPool、CachedThreadPool、ScheduledThreadPool 、SingleThreadPool、ForkJoinPool,但是在实际开发过程中往往都是使用自定线程池。 public class ...

    ThreadPoolExecutor:线程池的创建方式有许多种,如FixedThreadPool、CachedThreadPool、ScheduledThreadPool 、SingleThreadPool、ForkJoinPool,但是在实际开发过程中往往都是使用自定线程池。

    在这里插入图片描述

    public class Test_19 {
        /** 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut*/
        static int corePoolSize = 10;
    
        /** 池中允许的最大线程数*/
        static int maximumPoolSize  = 100;
    
        /**当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间*/
        static long keepAliveTime = 1;
    
        /**在执行任务之前用于保存任务的队列, 该队列将仅保存execute方法提交的Runnable任务*/
        static ArrayBlockingQueue workQueue = new ArrayBlockingQueue(10);
        
        /**指定有意义的线程池名称*/
        static MyFactory myFactory = new MyFactory("某线程组");
    
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    TimeUnit.SECONDS,
                    workQueue,myFactory);
            System.out.println(executor);
            for (int i = 0 ;i<5;i++){
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.MICROSECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName()+" - test executor - ");
                    }
                });
            }
            System.out.println(executor);
            executor.shutdown();
            executor.isShutdown();
            executor.isTerminated();
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(executor);
        }
    }
    /**
    * 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
    * */
    class MyFactory implements ThreadFactory{
        private final String namePrefix;
        private AtomicInteger nextId = new AtomicInteger(1);
        MyFactory(String whatFeatureOfGroup){
            this.namePrefix = "From TestFactory's " + whatFeatureOfGroup + " - worker - ";
        }
        @Override
        public Thread newThread(Runnable task) {
            String name = this.namePrefix + nextId.getAndIncrement();
            Thread thread=new Thread(null, task, name, 0);
            System.out.println(thread.getName());
            return thread;
        }
    }
    

    阅读原文

    展开全文
  • 代码比较复制代码package multiplethread...public class ThreadPool {// 线程池大小int threadPoolSize;// 任务容器LinkedList tasks = new LinkedList();// 试图消费任务的线程public ThreadPool() {threadPoolSiz...

    代码比较

    复制代码

    package multiplethread;

    import java.util.LinkedList;

    public class ThreadPool {

    // 线程池大小

    int threadPoolSize;

    // 任务容器

    LinkedList tasks = new LinkedList();

    // 试图消费任务的线程

    public ThreadPool() {

    threadPoolSize = 10;

    // 启动10个任务消费者线程

    synchronized (tasks) {

    for (int i = 0; i < threadPoolSize; i++) {

    new TaskConsumeThread("任务消费者线程 " + i).start();

    }

    }

    }

    public void add(Runnable r) {

    synchronized (tasks) {

    tasks.add(r);

    // 唤醒等待的任务消费者线程

    tasks.notifyAll();

    }

    }

    class TaskConsumeThread extends Thread {

    public TaskConsumeThread(String name) {

    super(name);

    }

    Runnable task;

    public void run() {

    System.out.println("启动: " + this.getName());

    while (true) {

    synchronized (tasks) {

    while (tasks.isEmpty()) {

    try {

    tasks.wait();

    } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    task = tasks.removeLast();

    // 允许添加任务的线程可以继续添加任务

    tasks.notifyAll();

    }

    System.out.println(this.getName() + " 获取到任务,并执行");

    task.run();

    }

    }

    }

    }

    package multiplethread;

    import java.util.LinkedList;

    public class ThreadPool {

    // 线程池大小

    int threadPoolSize;

    // 任务容器

    LinkedList tasks = new LinkedList();

    // 试图消费任务的线程

    public ThreadPool() {

    threadPoolSize = 10;

    // 启动10个任务消费者线程

    synchronized (tasks) {

    for (int i = 0; i < threadPoolSize; i++) {

    new TaskConsumeThread("任务消费者线程 " + i).start();

    }

    }

    }

    public void add(Runnable r) {

    synchronized (tasks) {

    tasks.add(r);

    // 唤醒等待的任务消费者线程

    tasks.notifyAll();

    }

    }

    class TaskConsumeThread extends Thread {

    public TaskConsumeThread(String name) {

    super(name);

    }

    Runnable task;

    public void run() {

    System.out.println("启动: " + this.getName());

    while (true) {

    synchronized (tasks) {

    while (tasks.isEmpty()) {

    try {

    tasks.wait();

    } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    task = tasks.removeLast();

    // 允许添加任务的线程可以继续添加任务

    tasks.notifyAll();

    }

    System.out.println(this.getName() + " 获取到任务,并执行");

    task.run();

    }

    }

    }

    }

    展开全文
  • java有预置线程池:newSingleThreadExecutor,newFixedThreadPool...如果不适合,还可以使用ThreadPoolExecutor创建自定义线程池。主要构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
  • 文章目录一、定时和异步业务场景描述二、定时调度任务的实现方式三、定时调度任务的问题...本案例主要讲述如何通过 @Scheduled 注解来完成定时任务的调度工作,以及非定时任务的接口异步任务的自定义线程池的配置工作。
  • 自定义线程池

    2020-09-09 13:47:52
    自定义线程池参数解释自定义线程池示例演示新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居...
  • /*** 自定义线程池 * *@authorxl,lang **/ public classMyExecutorService {//初始化线程 protected ListinitThreads;//执行任务列表 protected BlockingQueuetaskQueues;//线程执行状态 protected volatile boolean...
  • 但是如果使用自定义线程池,又该如何来控制异步调用的并发呢? 1.首先在启动类中定义好线程池。 @SpringBootApplication public class ExecutorApplication { public static void main(String[] args) { ...
  • 最近写了个自定义的线程池,用于处理持续时间短、频次高的任务...自定义线程池不是重点,本文的重点在线程池捕获异常的问题。 在实现该线程池的时候,笔者为其设置了一个ThreadFacotry,其中实现了UncaughtExceptionHan
  • java.util.concurrent.Executors 线程池工厂类提供了很多创建线程池的方法,常见的方法如下: newFixedThreadPool(int nThreads) : 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 ...
  • 主要给大家介绍了关于Spring Boot利用@Async如何实现异步调用:自定义线程池的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 自定义线程池ThreadPoolExecutor

    多人点赞 2020-11-28 15:34:29
    优雅使用线程池ThreadPoolExecutor实现自定义 一.引言 线程池想必大家也都用过,JDK的Executors 也自带一些线程池。但是不知道大家有没有想过,如何才是最优雅的方式去使用过线程池吗? 生产环境要怎么去配置自己的...
  • 话说接触自定义线程池的时候还是几年前用Volley框架的时候,当时就注意到了一个东西叫队列,后来在实际开发中也没有用到相关的东西,只是写过简单的demo而已,现在各种框架很成熟,用多了就缺少了动手能力,所以我就...
  • 自定义线程池-参数设计分析 通过观察Java中的内置线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个好的线程池,就必须合理的设置线程池的4个参数;那到底该如何合理的设计4个参数的值呢?我们一起往下看. ...
  • 示例 1 : synchronized 同步的方式 package multiplethread; import java.text.SimpleDateFormat; import java.util.Date; public class TestThread { public static String now(){ return new SimpleDateFormat(...
  • JAVA 如何开发一个自定义线程池

    千次阅读 2018-01-09 11:11:22
    每一个线程的启动和结束都是比较消耗...线程池的模式很像生产者消费者模式,消费的对象是一个一个的能够运行的任务 线程池设计思路 线程池的思路和生产者消费者模型是很接近的。 1. 准备一个任务容器 2. 一次
  • 一、线程池简介 线程池就是预先创建一些线程,它们的集合称为...二、如何创建线程池 Executors是jdk1.5之后的一个新类,提供了一些静态方法,帮助我们方便的生成一些常见的线程池: newSingleThreadExecutor:创建
  • 每一个线程的启动和结束都是比较消耗时间和占用资源的。 如果在系统中用到了很多...步骤1:线程池设计思路步骤2:开发一个自定义线程池步骤3:测试线程池步骤4:使用java自带线程池步骤5:练习- 借助线程池同步查找文件内...
  • python如何实现线程池#这个类是线程类,用来在主程序中调用生成一个线程。其实线程池就是线程的集合地,#能够解决有效统一的管理线程,基本就达到了线程池的目的;#这一段代码是我的爬虫程序中的一部分,希望对你...

空空如也

空空如也

1 2 3 4 5 ... 14
收藏数 264
精华内容 105
关键字:

如何自定义线程池