精华内容
下载资源
问答
  • 线程池中如果发生OOM后会如何? 线程池ThreadPoolExecutor当有线程发生了OOM线程池会停止工作吗? public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, ...

    环境:JDK1.8


    线程池中如果发生OOM后会如何?

    线程池ThreadPoolExecutor当有线程发生了OOM,线程池会停止工作吗?

    public static void main(String[] args) {
      ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2)) ;
      pool.execute(() -> {
        int i = 0 ;
        for (;;) {
          System.out.println(Thread.currentThread().getName() + ", i = " + (i++) + "," + pool) ;
    		  try {
            TimeUnit.MILLISECONDS.sleep(50) ;
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
      pool.execute(() -> {
        int j = 0 ;
        for (;;) {
          System.out.println(Thread.currentThread().getName() + ", j = " + (j++) + "," + pool) ;
          try {
            TimeUnit.MILLISECONDS.sleep(50) ;
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
      pool.execute(() -> {
        int k = 0 ;
        List<byte[]> datas = new ArrayList<>() ;
        for (;;) {
          System.out.println(Thread.currentThread().getName() + ", k = " + (k++) + "," + pool) ;
          byte[] buf = new byte[1024 * 100] ;
          datas.add(buf) ;
          try {
            TimeUnit.MILLISECONDS.sleep(20) ;
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
    }

    调整JVM运行内存

    -Xms10m -Xmx10m

    执行结果:

    Java线程池中的线程发生OOM会如何?

     

    一开始3个线程交替执行,当其中一个线程发生OOM后,线程池中除了发生OOM的线程池不再继续工作外,其它的线程都继续工作。

    自定义线程异常处理

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
      new ArrayBlockingQueue<Runnable>(2), new ThreadFactory() {
        private final ThreadGroup group = new ThreadGroup("Pack-Group");
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix = "pool-custom-thread-";
    
       @Override
      public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
          @Override
          public void uncaughtException(Thread t, Throwable e) {
            System.out.println("自定义线程异常处理:" + t.getName());
            e.printStackTrace();
         }});
        return t;
      }
    });

    线程池的拒绝策略

    代码:

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.AbortPolicy()) ;
      for (int i = 0; i < 6; i++) {
        pool.execute(() -> {
          System.out.println(Thread.currentThread().getName() + ", 进入执行") ;
          try {
            TimeUnit.SECONDS.sleep(2) ;
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        });
      }
    System.out.println("ActiveCount: " + pool.getActiveCount()) ;
    System.out.println("PoolSize: " + pool.getPoolSize()) ;
    System.out.println("TaskCount: " + pool.getTaskCount()) ;

    1、AbortPolicy策略

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.AbortPolicy()) ;

    Java线程池中的线程发生OOM会如何?

     

    当任务数 > 线程数 + 队列大小。超过的任务直接拒绝并且抛出异常。程序不会继续往下执行。

    2、CallerRunsPolicy策略

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.CallerRunsPolicy()) ;

    Java线程池中的线程发生OOM会如何?

     

    超过的任务会由调用者线程(执行execute方法所在的线程)执行任务。

    3、DiscardOldestPolicy策略

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.DiscardOldestPolicy()) ;
    for (int i = 0; i < 5; i++) {
      pool.execute(() -> {
        System.out.println(Thread.currentThread().getName() + ", 进入执行") ;
        try {
          TimeUnit.SECONDS.sleep(2) ;
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    }
    System.out.println("ActiveCount: " + pool.getActiveCount()) ;
    System.out.println("PoolSize: " + pool.getPoolSize()) ;
    System.out.println("TaskCount: " + pool.getTaskCount()) ;
    pool.execute(() -> {
      System.out.println(Thread.currentThread().getName() + ", 我是新入任务") ;
    });

    执行结果:

    Java线程池中的线程发生OOM会如何?

     

    该策略会把任务队列中的列头删除,然后将自己放入队列(列尾)

    Java线程池中的线程发生OOM会如何?

     

    4、DiscardPolicy策略

    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new ThreadPoolExecutor.DiscardPolicy()) ;
    for (int i = 0; i < 7; i++) {
      pool.execute(() -> {
        System.out.println(Thread.currentThread().getName() + ", 进入执行") ;
        try {
          TimeUnit.SECONDS.sleep(2) ;
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    }
    System.out.println("ActiveCount: " + pool.getActiveCount()) ;
    System.out.println("PoolSize: " + pool.getPoolSize()) ;
    System.out.println("TaskCount: " + pool.getTaskCount()) ;
    TimeUnit.SECONDS.sleep(5) ;
    pool.execute(() -> {
      System.out.println(Thread.currentThread().getName() + ", 新任务执行") ;
    });

    执行结果:

    Java线程池中的线程发生OOM会如何?

     

    该策略什么也不做,线程池能够正常继续执行下去。

    Java线程池中的线程发生OOM会如何?

     

    自定义线程池

    自定义线程池监控线程执行耗时时间

    public class CustomThreadPool extends ThreadPoolExecutor {
      public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      }
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        if (r instanceof Task) {
          ((Task) r).setStart(System.currentTimeMillis()) ;
          System.out.println(t.getName() + ", 开始执行") ;
        }
      }
      @Override
      protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (r instanceof Task) {
          ((Task) r).times();
        }
      }
      public void execute(Task command) {
        super.execute(command);
      }
      public static class Task implements Runnable {
        private long start ;
        private Callback callback ;
        public Task(Callback callback) {
          this.callback = callback ;
        }
        @Override
        public void run() {
          if (this.callback != null) {
            this.callback.callback();
          }
        }
        public void times() {
          System.out.println(Thread.currentThread().getName() + " 执行耗时:" + (System.currentTimeMillis() - start) + "ms") ;
        }
        public void setStart(long start) {
          this.start = start;
        }
    		
      }
      public static interface Callback {
        void callback() ;
      }
    }

    重写beforeExecute,afterExecute父类的这两个方法,线程执行前和执行后。

    CustomThreadPool pool = new CustomThreadPool(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2)) ;
    pool.execute(new Task(() -> {
      try {
        TimeUnit.SECONDS.sleep(1) ;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    })) ;
    pool.execute(new Task(() -> {
      try {
        TimeUnit.SECONDS.sleep(3) ;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    })) ;
    pool.execute(new Task(() -> {
      try {
        TimeUnit.SECONDS.sleep(2) ;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    })) ;

    执行结果:

    Java线程池中的线程发生OOM会如何?

    给个关注谢谢

    Java线程池中的线程发生OOM会如何?

    展开全文
  • 当出现千万量级的push,free的内存从20g一下子打到只剩5g,最后导致OOM,服务直接重启 利用MAT检查内存泄露 heapdump文件生成 在故障定位(尤其是OOM)和性能分析的时候,经常会用到一些文件辅助我们排除代码问题。...

    问题发现

    一次偶然用户反馈自己创建的Push任务数据异常,于是后台查看日志排查发现服务异常重启,没有其他异常信息;观察发现,近期出现此类问题较为频繁,当下发的push任务在千万级别时,会出现较高频率重启:

    系统内存free图:

    当出现千万量级的push,free的内存从20g一下子打到只剩5g,最后导致OOM,服务直接重启

    利用MAT检查内存泄露

    heapdump文件生成

    在故障定位(尤其是OOM)和性能分析的时候,经常会用到一些文件辅助我们排除代码问题。这些文件记录了JVM运行期间的内存占用、线程执行等情况,这就是我们常说的dump文件。常用的有heap dump和thread dump(也叫javacore,或java dump)。我们可以这么理解:heap dump记录内存信息的,thread dump记录CPU信息。

    为了确定是不是OOM导致服务异常被kill,使用指令dmesg,grep相应的进程号,可以确定是因为OOM导致被系统kill,如下图所示:

    因为题主的系统是LINUX,所以使用如下指令生成相应的HEAP文件:

    jmap -dump:format=b,file=文件存放目录/heap.hprof  进程号

    下载安装好Mat工具(可参考链接:https://blog.csdn.net/wanghuiqi2008/article/details/50724676)

    启动打开 File - Open Heap Dump... 菜单,然后选择生成的Heap DUmp文件,选择 "Leak Suspects Report",然后点击 "Finish" 按钮

    打开heap文件之后,显示如下:

     

    界面常用到功能:

    a、Overview(视图):概要界面,显示了概要的信息,并展示了MAT常用的一些功能。

    * Details 显示了一些统计信息,包括整  个堆内存的大小、类(Class)的数量、对象(Object)的数量、类加载器(Class Loader)的数量。
    * Biggest Objects by Retained Size 使用饼图的方式直观地显示了在JVM堆内存中最大的几个对象,当光标移到饼图上的时候会在左边Inspector和Attributes窗口中显示详细的信息。
    * Actions 这里显示了几种常用到的操作,算是功能的快捷方式,包括 Histogram、Dominator Tree、Top Consumers、Duplicate Classes,具体的含义和用法见下面;
    * Reports 列出了常用的报告信息,包括 Leak Suspects和Top Components,具体的含义和内容见下;
    * Step By Step 以向导的方式引导使用功能。

    b、Histogram(直方图):可以查看每个类的实例(即对象)的数量和大小。

    c、Dominator Tree(支配树):列出Heap Dump中处于活跃状态中的最大的几个对象,默认按 retained size进行排序,因此很容易找到占用内存最多的对象。

    在leak suspects页面,列出了两个可能存在内存泄漏的方法,其中第一个是程序代码中用到的线程池方法:

                                   

    点击Details进入详情页面。在详情页面Shortest Paths To the Accumulation Point表示GC root到内存消耗聚集点的最短路径,如果某个内存消耗聚集点有路径到达GC root,则该内存消耗聚集点不会被当做垃圾被回收。

                  

    在All Accumulated Objects by Class列举了该对象所存储的所有内容。

    可以基本定位到问题可能是是在线程池使用不当导致OOM;

    于是开始找代码,注意到一段这样的代码提交:

     ThreadPoolExecutor executor = new ThreadPoolExecutor(wakeupConfiguration.wakeupTaskThreadNum, wakeupConfiguration.wakeupTaskThreadNum,
                    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    /***
    ***
    ***/
    executor.execute(() -> {
                            checkAndSendWakeUpMessage1(imeiMd5, fullPushMessage);
                        });

    代码大概的意思是定义了一个线程池,然后处理push任务的用户集的分发,代码是读取一个用户信息,然后起一个线程分发push,进行业务逻辑的处理,问题点就在于,定义的线程池没有设置有限的队列大小,并且没有限制任务的qps,当进行checkAndSendWakeupMessage的时候,会涉及到hbae的查询,hbase在任务高峰期时,会出现大量的延迟,一边是千万级别的用户push塞进队列,一边是队列的延迟,不限长度的队列直接导致队列拥堵,导致内存爆掉.

    最后使用限制大小的队列,引入调用者模式饱和策略,成功缓解服务oom的异常.下面总结下关于线程池的知识点,避免大家在以后的工作中踩坑.

    Java线程池

    虽然Java线程池理论,以及构造线程池的各种参数,以及 Executors 提供的默认实现之前研读过,不过线上还没有发生过线程池误用引发的事故,所以有必要把这些参数再仔细琢磨一遍。

    优先补充一些线程池的工作理论,有助于展开下面的内容。线程池顾名思义,就是由很多线程构成的池子,来一个任务,就从池子中取一个线程,处理这个任务。这个理解是我在第一次接触到这个概念时候的理解,虽然整体基本切入到核心,但是实际上会比这个复杂。例如线程池肯定不会无限扩大的,否则资源会耗尽;当线程数到达一个阶段,提交的任务会被暂时存储在一个队列中,如果队列内容可以不断扩大,极端下也会耗尽资源,那选择什么类型的队列,当队列满如何处理任务,都有涉及很多内容。线程池总体的工作过程如下图:

    线程池内的线程数的大小相关的概念有两个,一个是核心池大小,还有最大池大小。如果当前的线程个数比核心池个数小,当任务到来,会优先创建一个新的线程并执行任务。当已经到达核心池大小,则把任务放入队列,为了资源不被耗尽,队列的最大容量可能也是有上限的,如果达到队列上限则考虑继续创建新线程执行任务,如果此刻线程的个数已经到达最大池上限,则考虑把任务丢弃。

    在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    } 

    既然有了刚刚对线程池工作原理对概述,这些参数就很容易理解了:

    corePoolSize- 核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。

    maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。

    keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间

    unit - keepAliveTime 参数的时间单位。

    workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。 别看这个参数位置很靠后,但是真的很重要,因为楼主的坑就因这个参数而起,这些细节有必要仔细了解清楚。

    threadFactory - 执行程序创建新线程时使用的工厂。

    handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。

    可选择的阻塞队列BlockingQueue详解

    在重复一下新任务进入时线程池的执行策略:
    如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行)
    如果运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
    主要有3种类型的BlockingQueue:

    无界队列

    队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。(ps:题主的问题就出现在这个地方,定义了一个无限长度的Queue,)

    有界队列

    常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
    使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

    在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。

    同步移交队列

    如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

    可选择的饱和策略RejectedExecutionHandler详解

    JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。

    AbortPolicy中止策略

    该策略是默认饱和策略。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
     } 

    使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

    Demo:

    public class AbortPolicyDemo {  
    
        private static final int THREADS_SIZE = 1;  
        private static final int CAPACITY = 1;  
    
        public static void main(String[] args) throws Exception {  
    
            // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
            ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                    new ArrayBlockingQueue<Runnable>(CAPACITY));  
            // 设置线程池的拒绝策略为"抛出异常"  
            pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());  
    
            try {  
    
                // 新建10个任务,并将它们添加到线程池中。  
                for (int i = 0; i < 10; i++) {  
                    Runnable myrun = new MyRunnable("task-"+i);  
                    pool.execute(myrun);  
                }  
            } catch (RejectedExecutionException e) {  
                e.printStackTrace();  
                // 关闭线程池  
                pool.shutdown();  
            }  
        }  
    }  

    DiscardPolicy抛弃策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }

    如代码所示,不做任何处理直接抛弃任务

    Demo:

    public class DiscardPolicyDemo {  
    
        private static final int THREADS_SIZE = 1;  
        private static final int CAPACITY = 1;  
    
        public static void main(String[] args) throws Exception {  
    
            // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
            ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));  
            // 设置线程池的拒绝策略为"丢弃"  
            pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());  
    
            // 新建10个任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {  
                Runnable myrun = new MyRunnable("task-"+i);  
                pool.execute(myrun);  
            }  
            // 关闭线程池  
            pool.shutdown();  
        }  
    }  

    DiscardOldestPolicy抛弃旧任务策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
    } 

    如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

    Demo:

    public class DiscardOldestPolicyDemo {  
    
        private static final int THREADS_SIZE = 1;  
        private static final int CAPACITY = 1;  
    
        public static void main(String[] args) throws Exception {  
    
            // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
            ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                    new ArrayBlockingQueue<Runnable>(CAPACITY));  
            // 设置线程池的拒绝策略为"DiscardOldestPolicy"  
            pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());  
    
            // 新建10个任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {  
                Runnable myrun = new MyRunnable("task-"+i);  
                pool.execute(myrun);  
            }  
            // 关闭线程池  
            pool.shutdown();  
        }  
    } 

    CallerRunsPolicy调用者运行

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
    } 

    既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

    Demo:

    public class CallerRunsPolicyDemo {  
    
        private static final int THREADS_SIZE = 1;  
        private static final int CAPACITY = 1;  
    
        public static void main(String[] args) throws Exception {  
    
            // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
            ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                    new ArrayBlockingQueue<Runnable>(CAPACITY));  
            // 设置线程池的拒绝策略为"CallerRunsPolicy"  
            pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
    
            // 新建10个任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {  
                Runnable myrun = new MyRunnable("task-"+i);  
                pool.execute(myrun);  
            }  
    
            // 关闭线程池  
            pool.shutdown();  
        }  
    }  

    Java提供的四种常用线程池解析

    简而言之 Executors 工厂方法Executors.newCachedThreadPool() 提供了无界线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用无界队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。

    详细介绍一下上述四种线程池。

    newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    } 

    在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程?
    这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。

    newFixedThreadPool

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
     }

    看代码一目了然了,线程数量固定,使用无限大的队列。再次强调,楼主就是踩的这个无限大队列的坑。

    newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    在来看看ScheduledThreadPoolExecutor()的构造函数

     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        } 

    ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列,在上面没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。简单的说,DelayedWorkQueue是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。

    newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
     } 

    首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                super(executor);
                e = executor;
    } 

    在看看它的父类

    DelegatedExecutorService(ExecutorService executor) { 
               e = executor; 
    } 

    其实就是使用装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有一个线程顺序执行任务,也保证线程意外终止后会重新创建一个线程继续执行任务。

    参考:https://zhuanlan.zhihu.com/p/32867181

    https://www.jianshu.com/p/c34af977ade1

    https://blog.csdn.net/wanghuiqi2008/article/details/50724676

    https://www.jianshu.com/p/aa420c7df275

    展开全文
  • springboot-@Async默认线程池导致OOM问题

    万次阅读 多人点赞 2019-01-02 11:23:44
    1.最近项目上在测试人员压测过程发现了OOM问题,项目使用springboot搭建项目工程,通过查看日志包含信息:unable to create new native thread 内存溢出的三种类型: 1.第一种OutOfMemoryError: PermGen space...

    前言:
    1.最近项目上在测试人员压测过程中发现了OOM问题,项目使用springboot搭建项目工程,通过查看日志中包含信息:unable to create new native thread

    内存溢出的三种类型:
    1.第一种OutOfMemoryError: PermGen space,发生这种问题的原意是程序中使用了大量的jar或class
    2.第二种OutOfMemoryError: Java heap space,发生这种问题的原因是java虚拟机创建的对象太多
    3.第三种OutOfMemoryError:unable to create new native thread,创建线程数量太多,占用内存过大

    初步分析:

    1.初步怀疑是线程创建太多导致,使用jstack 线程号 > /tmp/oom.log将应用的线程信息打印出来。查看oom.log,发现大量线程处于Runnable状态,基本可以确认是线程创建太多了。

    代码分析:

    1.出问题的微服务是日志写库服务,对比日志,锁定在writeLog方法上,wirteLog方法使用spring-@Async注解,写库操作采用的是异步写入方式。
    2.之前没有对@Async注解深入研究过,只是知道可以自定义内部线程池,经查看,日志写库服务并未自定义异步配置,使用的是spring-@Async默认异步配置
    3.首先简单百度了下,网上提到@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,在压测情况下,会有大量写库请求进入日志写库服务,这时就会不断创建大量线程,极有可能压爆服务器内存。

    借此机会也学习了下SimpleAsyncTaskExecutor源码,总结如下:
    1.SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务,核心代码如下:

    public void execute(Runnable task, long startTimeout) {
       Assert.notNull(task, "Runnable must not be null");
       Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
       //判断是否开启限流机制
       if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
          //执行前置操作,进行限流
          this.concurrencyThrottle.beforeAccess();
          //执行完线程任务,会执行后置操作concurrencyThrottle.afterAccess(),配合进行限流
          doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
       }
       else {
          doExecute(taskToUse);
       }
    }
    

    2.SimpleAsyncTaskExecutor限流实现
    首先任务进来,会循环判断当前执行线程数是否超过concurrencyLimit,如果超了,则当前线程调用wait方法,释放monitor对象锁,进入等待

    protected void beforeAccess() {
    	if (this.concurrencyLimit == NO_CONCURRENCY) {
    		throw new IllegalStateException(
    				"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
    	}
    	if (this.concurrencyLimit > 0) {
    		boolean debug = logger.isDebugEnabled();
    		synchronized (this.monitor) {
    			boolean interrupted = false;
    			while (this.concurrencyCount >= this.concurrencyLimit) {
    				if (interrupted) {
    					throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
    							"but concurrency limit still does not allow for entering");
    				}
    				if (debug) {
    					logger.debug("Concurrency count " + this.concurrencyCount +
    							" has reached limit " + this.concurrencyLimit + " - blocking");
    				}
    				try {
    					this.monitor.wait();
    				}
    				catch (InterruptedException ex) {
    					// Re-interrupt current thread, to allow other threads to react.
    					Thread.currentThread().interrupt();
    					interrupted = true;
    				}
    			}
    			if (debug) {
    				logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
    			}
    			this.concurrencyCount++;
    		}
    	}
    }
    

    2.SimpleAsyncTaskExecutor限流实现:首先任务进来,会循环判断当前执行线程数是否超过concurrencyLimit,如果超了,则当前线程调用wait方法,释放monitor对象锁,进入等待状态。

    protected void beforeAccess() {
    	if (this.concurrencyLimit == NO_CONCURRENCY) {
    		throw new IllegalStateException(
    				"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
    	}
    	if (this.concurrencyLimit > 0) {
    		boolean debug = logger.isDebugEnabled();
    		synchronized (this.monitor) {
    			boolean interrupted = false;
    			while (this.concurrencyCount >= this.concurrencyLimit) {
    				if (interrupted) {
    					throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
    							"but concurrency limit still does not allow for entering");
    				}
    				if (debug) {
    					logger.debug("Concurrency count " + this.concurrencyCount +
    							" has reached limit " + this.concurrencyLimit + " - blocking");
    				}
    				try {
    					this.monitor.wait();
    				}
    				catch (InterruptedException ex) {
    					// Re-interrupt current thread, to allow other threads to react.
    					Thread.currentThread().interrupt();
    					interrupted = true;
    				}
    			}
    			if (debug) {
    				logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
    			}
    			this.concurrencyCount++;
    		}
    	}
    }
    

    线程任务执行完毕后,当前执行线程数会减一,会调用monitor对象的notify方法,唤醒等待状态下的线程,等待状态下的线程会竞争monitor锁,竞争到,会继续执行线程任务。

    protected void afterAccess() {
    	if (this.concurrencyLimit >= 0) {
    		synchronized (this.monitor) {
    			this.concurrencyCount--;
    			if (logger.isDebugEnabled()) {
    				logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
    			}
    			this.monitor.notify();
    		}
    	}
    }
    

    虽然看了源码了解了SimpleAsyncTaskExecutor有限流机制,实践出真知,我们还是测试下:
    一、测试未开启限流机制下,我们启动20个线程去调用异步方法,查看Java VisualVM工具如下:
    在这里插入图片描述
    二、测试开启限流机制,开启限流机制的代码如下:

    @Configuration
    @EnableAsync
    public class AsyncCommonConfig extends AsyncConfigurerSupport {
        @Override
        public Executor getAsyncExecutor() {
            SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
            //设置允许同时执行的线程数为10
     executor.setConcurrencyLimit(10);
            return executor;
        }
    }
    

    同样,我们启动20个线程去调用异步方法,查看Java VisualVM工具如下:
    在这里插入图片描述

    通过上面验证可知:
    1.开启限流情况下,能有效控制应用线程数
    2.虽然可以有效控制线程数,但执行效率会降低,会出现主线程等待,线程竞争的情况。
    3.限流机制适用于任务处理比较快的场景,对于应用处理时间比较慢的场景并不适用。==

    最终解决办法:
    1.自定义线程池,使用LinkedBlockingQueue阻塞队列来限定线程池的上限
    2.定义拒绝策略,如果队列满了,则拒绝处理该任务,打印日志,代码如下:

    public class AsyncConfig implements AsyncConfigurer{
        private Logger logger = LogManager.getLogger();
    
        @Value("${thread.pool.corePoolSize:10}")
        private int corePoolSize;
    
        @Value("${thread.pool.maxPoolSize:20}")
        private int maxPoolSize;
    
        @Value("${thread.pool.keepAliveSeconds:4}")
        private int keepAliveSeconds;
    
        @Value("${thread.pool.queueCapacity:512}")
        private int queueCapacity;
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setKeepAliveSeconds(keepAliveSeconds);
            executor.setQueueCapacity(queueCapacity);
            executor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor exe) -> {
                    logger.warn("当前任务线程池队列已满.");
            });
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new AsyncUncaughtExceptionHandler() {
                @Override
                public void handleUncaughtException(Throwable ex , Method method , Object... params) {
                    logger.error("线程池执行任务发生未知异常.", ex);
                }
            };
        }
    }
    
    展开全文
  • 利用自定义线程池解决OOM问题

    千次阅读 2018-05-19 13:07:07
    从Camera获取到了byte[]类型的图像数据之后,需要送到so库,让so库进行相应的处理,并对其处理的结果进行相应的反馈;1s大概有30帧的数据,但是除去一些装载数据的事件,时间就变大了,实际可拿到的帧率就变小了;...

    Camera获取到了byte[]类型的图像数据之后,需要送到so库中,让so库进行相应的处理,并对其处理的结果进行相应的反馈;1s大概有30帧的数据,但是除去一些装载数据的事件,时间就变大了,实际可拿到的帧率就变小了;在此每秒取5帧,每张图片大致1~3M。这大致就是解决这个问题的背景框架。

    起因

    处理图片时,耗时较长,放在主线程中,会造成卡顿,严重的时候会造成ANR。但是算法库处理时,会依赖于上一帧的数据,所以还是要按照顺序,一帧一帧来处理。
    Camera取数据后,立即开启线程处理的代码如下:

    //取数据
    mCamera.addCallbackBuffer(new byte[size.width * size.height * ImageFormat.getBitsPerPixel(ImageFormat.NV21) / 8]);
    mCamera.setPreviewCallbackWithBuffer(new Camera.PreviewCallback() {
        @Override
        public void onPreviewFrame(byte[] data, Camera camera) {
            if (mCallback != null && mCamera != null){
                // 处理数据回调
                mCallback.onImageAvaliable(data, size.width, size.height);
                mCamera.addCallbackBuffer(new byte[size.width * size.height * ImageFormat.getBitsPerPixel(ImageFormat.NV21) / 8]);
            }
        }
    });
    
    // 开启线程处理部分
    ExecutorService pool = Executors.newSingleThreadExecutor();
    ...
    if (isSwitchOn(R.id.fall_detect)) {
        pool.execute(new CheckFallAndMoveThread(buf, width, height));
    }

    这样的话,只允许一个线程执行,多来了的,就排队等待处理,这样便实现了一帧一帧处理,并且还不会阻塞主线程。但是很可惜,这样会造成OOM。

    原因

    so库处理的时间达到了2s左右(处理的图片没有经过resize),有点长。这样的话,每个需要处理的Runnable都需要一定的空间去存储这个图片,并且此种Executor是Runnable是无限长的,长度会自动变化,空间很快就会用完,也是合情合理。

    解决办法

    看了一下,默认提供的几种线程池,并不能达到自己的需求,只能自己重新设置一下其中的参数,然后利用这些参数,设置成自己想要的那种模式,即队列长度有限,按顺序执行,然后宁愿少处理几帧,也要避免OOM。

    public class CalTaskThreadExecutor {
        private static final ExecutorService instance = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new ThreadFactory() {
                    private final AtomicInteger mCount = new AtomicInteger(1);
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "SingleTaskPoolThread #" + mCount.getAndIncrement());
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        Log.e("TAG", "超了");
                        executor.remove(r);
                    }
                });
        public static ExecutorService getInstance(){
            return instance;
        }
    }

    十分感谢这篇博客,让我大致明白了其中参数所代表的意义,并按照自己的需求整个相应线程池出来。线程池,这一篇或许就够了

    展开全文
  • 地址:http://suo.im/5Y3RGF作者:ignorewho前言:最近项目上在测试人员压测过程发现了OOM问题,项目使用springboot搭建项目工程,通过查看日志包含信息:unable to create new native thread内存溢出的三种类型:...
  • Spring异步线程池的接口类,其实质是java.util.concurrent.Executor Spring 已经实现的异常线程池:1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。(我们使用了这...
  • 很多人都知道阿里的Java开发严令禁止使用Executors的方式来创建线程池,禁止的理由是“为了让开发者更加明确线程池的运行规则,更加了解线程池的底层工作原理,从而避免不规范的使用造成服务器资源耗尽的风险”,本...
  • Java线程池

    2020-05-24 15:36:13
    Java线程池详解 构造一个线程池为什么需要几个参数?如果避免线程池出现OOM?Runnable和Callable的区别是什么?本文将对这些问题一一解答,同时还将给出使用线程池的常见场景和代码片段。 基础知识 Executors创建...
  • Java线程池详解

    2019-08-04 11:19:48
    Java线程池详解 构造一个线程池为什么需要几个参数?如果避免线程池出现OOM?Runnable和Callable的区别是什么?本文将对这些问题一一解答,同时还将给出使用线程池的常见场景和代码片段。 基础知识 Executors创建...
  • java线程池详解

    2020-02-24 22:54:50
    Java线程池详解 构造一个线程池为什么需要几个参数?如果避免线程池出现OOM?Runnable和Callable的区别是什么?本文将对这些问题一一解答,同时还将给出使用线程池的常见场景和代码片段。 基础知识 Executors创建...
  • 介绍: 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建...这里借用《Java并发编程的艺术》提到的来说一下使用线程池的好处: 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成...
  • Java 线程池详解

    2019-06-03 16:58:54
    构造一个线程池为什么需要几个参数?如果避免线程池出现OOM?Runnable和Callable的区别...Java中创建线程池很简单,只需要调用Executors相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但...
  • java线程池

    2018-09-03 16:00:49
    java线程池 介绍new Thread的弊端及Java四种线程池的使用 1、new Thread的弊端 2、Java 线程池 (1). newCachedThreadPool (2). newFixedThreadPool (3) newScheduledThreadPool (4)、newSingleThreadExecutor...
  • 为什么需要线程池2.java中线程池的实现3.创建线程的工厂方法Executors3.1 newFixedThreadPool3.2 newSingleThreadExecutor3.3 newCachedThreadPool2.4 newScheduledThreadPool3.5 newWorkStealingPool4.线程池的使用...
  • Java 线程池

    2019-07-19 08:56:31
    ​创建一个线程池 ThreadPoolExecutor ,如何使用java线程池,线程池源码浅析 2014拍摄于四川羌族藏族自治区郎木寺下早课。 微信公众号 王皓的GitHub:https://github.com/TenaciousDWang 前面已经说了...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,120
精华内容 7,248
关键字:

java线程池中的oom

java 订阅