精华内容
下载资源
问答
  • java.util.concurrent.RejectedExecutionException异常报错。 具体看了下代码,里面的执行逻辑也不难,没有外部依赖都是内存多线程cpu类型计算的逻辑。 下面是报错的线程池状态变化,由上到下,按照时间增长,最后...

    场景

    业务一切正常,突然收到一堆告警,发现全是
    java.util.concurrent.RejectedExecutionException异常报错。
    具体看了下代码,里面的执行逻辑也不难,没有外部依赖都是内存多线程cpu类型计算的逻辑。
    下面是报错的线程池状态变化,由上到下,按照时间增长,最后达到饱和。

    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 99, active threads = 75, queued tasks = 5000, completed tasks = 1595259]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 101, active threads = 74, queued tasks = 4999, completed tasks = 1595298]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 90, active threads = 70, queued tasks = 4998, completed tasks = 1595354]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 93, active threads = 75, queued tasks = 5000, completed tasks = 1595378]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 85, active threads = 72, queued tasks = 5000, completed tasks = 1595443]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 83, active threads = 70, queued tasks = 5000, completed tasks = 1595447]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 90, active threads = 81, queued tasks = 5000, completed tasks = 1595476]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 85, active threads = 74, queued tasks = 4994, completed tasks = 1595574]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 91, active threads = 80, queued tasks = 4999, completed tasks = 1595678]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 95, active threads = 85, queued tasks = 5000, completed tasks = 1595696]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 95, active threads = 87, queued tasks = 5000, completed tasks = 1595721]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 93, active threads = 85, queued tasks = 4999, completed tasks = 1595727]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 94, active threads = 85, queued tasks = 4998, completed tasks = 1595737]
    ...
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 109, active threads = 96, queued tasks = 4999, completed tasks = 1596150]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 113, active threads = 102, queued tasks = 4998, completed tasks = 1596191]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 109, queued tasks = 5000, completed tasks = 1596355]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 110, queued tasks = 5000, completed tasks = 1596517]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 119, active threads = 110, queued tasks = 4999, completed tasks = 1596519]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 113, active threads = 105, queued tasks = 5000, completed tasks = 1596553]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 122, active threads = 109, queued tasks = 4999, completed tasks = 1596602]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    java.util.concurrent.ThreadPoolExecutor@57b5ea0f[Running, pool size = 120, active threads = 120, queued tasks = 5000, completed tasks = 1597061]
    

    因为cpu4核,所以coresize = 40,maxsize = 120,都正常。

        private static BlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(5000);
        private static ExecutorService executorService = new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors() * 10,
                Runtime.getRuntime().availableProcessors() * 30,
                60, TimeUnit.SECONDS, executorQueue);
    

    通过对ThreadPoolExecutor类分析,引发java.util.concurrent.RejectedExecutionException主要有两种原因:

    1. 线程池显示的调用了shutdown()之后,再向线程池提交任务的时候,如果你配置的拒绝策略是ThreadPoolExecutor.AbortPolicy的话,这个异常就被会抛出来。
    2. 当你的排队策略为有界队列,并且配置的拒绝策略是ThreadPoolExecutor.AbortPolicy,当线程池的线程数量已经达到了maximumPoolSize的时候,你再向它提交任务,就会抛出ThreadPoolExecutor.AbortPolicy异常。

    排查

    因为全机房、只有一个节点报错,而其他机器都正常,在这个一定保证正确的前提下。
    1、看了线上的请求量、并没有突增,所以排除外部因素
    2、看了代码逻辑,逻辑内部并没有打印报错日志,说明不是线程执行耗时导致后面的其他线程排队
    3、排除线程池是否提前关闭。并没有,因为手动没有显示关闭,另外看日志也知道里面的线程数还在变化,所以不存在关闭的说法。

    考虑是否是机器本身的原因,后面经过排查,看到那个点线上的cpu使用率突然升高、系统负载突然飙升,网卡流出、流入报文数目、tcp连接数也突然为0。具体干了什么,已经让运营去看了。至少说明我们代码没有问题。

    如下图,可以感受下:
    在这里插入图片描述

    解决

    线上第一时间出现这个问题,没想2秒,看了下报错代码逻辑,然后重启了这个节点实例(当时重启完后,观察又没问题了,带着疑惑然后才去排查)

    展开全文
  • 要学习JAVA中是如何实现线程间...package java.util.concurrent.locks; import java.util.concurrent.*; import sun.misc.Unsafe; public class LockSupport { private LockSupport() {} // Cannot be instantiated.

    要学习JAVA中是如何实现线程间的锁,就得从LockSupport这个类先说起,因为这个类实现了底层的一些方法,各种的锁实现都是这个基础上发展而来的。

    package java.util.concurrent.locks;
    import java.util.concurrent.*;
    import sun.misc.Unsafe;
     
    public class LockSupport {
        private LockSupport() {} // Cannot be instantiated.
     
        // Hotspot implementation via intrinsics API
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long parkBlockerOffset;
     
        static {
            try {
                parkBlockerOffset = unsafe.objectFieldOffset
                    (java.lang.Thread.class.getDeclaredField("parkBlocker"));
            } catch (Exception ex) { throw new Error(ex); }
        }
     
        private static void setBlocker(Thread t, Object arg) {
            // Even though volatile, hotspot doesn't need a write barrier here.
            unsafe.putObject(t, parkBlockerOffset, arg);
        }
     
        public static void unpark(Thread thread) {
            if (thread != null)
                unsafe.unpark(thread);
        }
     
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(false, 0L);
            setBlocker(t, null);
        }
     
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                unsafe.park(false, nanos);
                setBlocker(t, null);
            }
        }
     
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(true, deadline);
            setBlocker(t, null);
        }
     
        public static Object getBlocker(Thread t) {
            return unsafe.getObjectVolatile(t, parkBlockerOffset);
        }
     
        public static void park() {
            unsafe.park(false, 0L);
        }
     
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                unsafe.park(false, nanos);
        }
     
        public static void parkUntil(long deadline) {
            unsafe.park(true, deadline);
        }
    }
    

    这个类提供的都是静态方法,且无法被实例化。

    在LockSupport中有两个私有的成员变量:

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long parkBlockerOffset;
    

    大家都知道JAVA语言是平台无关的,一次编译,可以在任何平台上运行,但是如果真的不可以调用一些平台相关的方法吗?其实unsafe类是可以做到的。

    unsafe:是JDK内部用的工具类。它通过暴露一些Java意义上说“不安全”的功能给Java层代码,来让JDK能够更多的使用Java代码来实现一些原本是平台相关的、需要使用native语言(例如C或C++)才可以实现的功能。该类不应该在JDK核心类库之外使用。

    parkBlokcerOffset:parkBlocker的偏移量,从字面上理解是这么个东东。但是parkBlocker又是干嘛的?偏移量又是做什么的呢?让我们来看看Thread类的实现:

    //java.lang.Thread的源码
    /**
     * The argument supplied to the current call to 
     * java.util.concurrent.locks.LockSupport.park.
     * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
     * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
     */
    volatile Object parkBlocker;
    

    问题1:parkBlocker又是干嘛的?

    原来java.lang.Thread的实现当中有这么一个对象。从注释上看,这个对象被LockSupport的setBlocker和getBlocker调用。查看JAVADOC会发现这么一段解释:
    在这里插入图片描述大致意思是,这个对象是用来记录线程被阻塞时被谁阻塞的。用于线程监控和分析工具来定位原因的。主要调用了LockSupport的getBlocker方法。

    原来,parkBlocker是用于记录线程是被谁阻塞的。可以通过LockSupport的getBlocker获取到阻塞的对象。用于监控和分析线程用的。

    问题2:偏移量又是做什么的?

    private static void setBlocker(Thread t, Object arg)
    
    public static Object getBlocker(Thread t) {
            return unsafe.getObjectVolatile(t, parkBlockerOffset);
        }
    

    参数:

    Thread t 需要被赋值Blocker的线程
    
    Object arg 具体的Blocker对象
    

    解读:有了之前的理解,这个方法就很好理解了。对给定线程t的parkBlocker赋值。为了防止,这个parkBlocker被误用,该方法是不对外公开的。

    public static Object getBlocker(Thread t)
    
    public static Object getBlocker(Thread t) {
    	return unsafe.getObjectVolatile(t, parkBlockerOffset);
    }
    

    参数:Thread t, 被操作的线程对象

    返回:parkBlocker对象

    解读:从线程t中获取他的parkerBlocker对象。这个方法是对外公开的。

    是不是可以利用这个方法来写一个监控程序,炫耀一把.

    再讲其他几个方法之前,先谈谈park和unpark是做什么的.

    /**  
    * Unblock the given thread blocked on <tt>park</tt>, or, if it is
    * not blocked, cause the subsequent call to <tt>park</tt> not to
    * block.  Note: this operation is "unsafe" solely because the
    * caller must somehow ensure that the thread has not been
    * destroyed. Nothing special is usually required to ensure this
    * when called from Java (in which there will ordinarily be a live
    * reference to the thread) but this is not nearly-automatically
    * so when calling from native code.
    * @param thread the thread to unpark.
    * 
    */
    public native void unpark(Object thread);
    
    /**
    * Block current thread, returning when a balancing
    * <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has
    * already occurred, or the thread is interrupted, or, if not
    * absolute and time is not zero, the given time nanoseconds have
    * elapsed, or if absolute, the given deadline in milliseconds
    * since Epoch has passed, or spuriously (i.e., returning for no  
     * "reason"). Note: This operation is in the Unsafe class only
    * because <tt>unpark</tt> is, so it would be strange to place it
    * elsewhere.
    */
    public native void park(boolean isAbsolute, long time);
    

    字面理解park,就算占住,停车的时候不就把这个车位给占住了么?起这个名字还是很形象的。unpark,占住的反义词,就是释放。把车从车位上开走。

    翻译一下:

    • park:阻塞当前线程,(1)当配对的unpark发生或者(2)配对的unpark已经发生或者线程被中断时恢复(unpark先行,再执行park)。 (3)当absolute是false时,如果给定的时间是非0(负数)或者给定的时间(正数, 时间单位时毫秒)已经过去了(0的时候会一直阻塞着)。(4)当Absolute是true时,如果给定的时间(时间单位是纳秒)过去了或者伪造的(在我理解是参数不合法时)线程会恢复中断。这个操作是不安全的,所以在其他调用会很奇怪(奇怪?反正就是用的时候要小心)

    • unpark:当指定线程被park命令阻塞时unpark命令可以恢复阻塞。在park命令没有被先调用过的时候,调用unpark,线程仍然不被阻塞。(翻译的有点那个…).

    理解一下,park与unpark命令是成对出现的。unpark必须要在park命令后执行。但是线程的恢复并不一定要用unpark, 因为park的时间参数,有些情况下线程会自己恢复。

     public static void unpark(Thread thread)
    
    public static void unpark(Thread thread) {
        if (thread != null)
            unsafe.unpark(thread);
    }
    

    参数:Thread thread, 需要被中止挂起的线程

    带blocker参数的park方法

    public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(false, 0L);
            setBlocker(t, null);
        }
     
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                unsafe.park(false, nanos);
                setBlocker(t, null);
            }
        }
     
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(true, deadline);
            setBlocker(t, null);
        }
    

    参数:

    1、Object blocker:用于记录到线程中的parkBlocker对象。
    
    2、nanos:在nanos时间后线程自动恢复挂起
    
    3、deadline:在deadline时刻线程自动(这个毫秒其实就是自1970年1月1日0时起的毫秒数)
    

    解读:这三个方法其实是一个意思,把blocker放到线程当中,注意,这个park方法是一个阻塞的方法,除非4个条件

    1、当配对的unpark发生或者
    
    2、配对的unpark已经发生或者线程被中断时恢复(unpark先行,再执行park)
    
    3、当absolute是false时,如果给定的时间是非0(负数)或者给定的时间(正数, 时间单位时毫秒)已经过去了(0的时候会一直阻塞着)。
    
    4、当Absolute是true时,如果给定的时间(时间单位是纳秒)过去了或者伪造的(在我理解是参数不合法时)线程会恢复中断。
    

    不带blocker参数的park方法

    public static void park() {
            unsafe.park(false, 0L);
        }
     
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                unsafe.park(false, nanos);
        }
     
        public static void parkUntil(long deadline) {
            unsafe.park(true, deadline);
        }
    

    这三个方法跟上面一样,唯一区别是没有做parkBlocker的赋值操作。
    我们继续看一下JVM是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的

    os::PlatformEvent::park方法,代码如下:
    void os::PlatformEvent::park() {      
    
                 int v ;
    
             for (;;) {
    
            v = _Event ;
    
             if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
    
             }
    
             guarantee (v >= 0, "invariant") ;
    
             if (v == 0) {
    
             // Do this the hard way by blocking ...
    
             int status = pthread_mutex_lock(_mutex);
    
             assert_status(status == 0, status, "mutex_lock");
    
             guarantee (_nParked == 0, "invariant") ;
    
             ++ _nParked ;
    
             while (_Event < 0) {
    
             status = pthread_cond_wait(_cond, _mutex);
    
             // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
    
             // Treat this the same as if the wait was interrupted
    
             if (status == ETIME) { status = EINTR; }
    
             assert_status(status == 0 || status == EINTR, status, "cond_wait");
    
             }
    
             -- _nParked ;
    
              
    
             // In theory we could move the ST of 0 into _Event past the unlock(),
    
             // but then we'd need a MEMBAR after the ST.
    
             _Event = 0 ;
    
             status = pthread_mutex_unlock(_mutex);
    
             assert_status(status == 0, status, "mutex_unlock");
    
             }
    
             guarantee (_Event >= 0, "invariant") ;
    
             }
    
         }
    
    展开全文
  • 报错:java.util.concurrent.RejectedExecutionException,排查发现是等待队列设小了,导致 拒绝策略,当队列满时,处理策略报错异常。 上代码: package aqs; import java.util.concurrent.*; /** * @...

    今天学习了java的并发,线程池,同一时间执行一个操作。

    报错:java.util.concurrent.RejectedExecutionException,排查发现是等待队列设小了,导致

    拒绝策略,当队列满时,处理策略报错异常。

    上代码:

    package aqs;
    
    import java.util.concurrent.*;
    
    /**
     * @author WHM
     * 实现指定时间内做一定事情
     * @date 2021年08月06日 16:27
     */
    public class CountDownLatchTest {
        public static int clientTotal = 2000;
        // 核心线程数,当线程池空闲时保留的线程数
        static int corePoolSize = 2;
        // 线程池最大线程数,线程池繁忙时能够扩容到的最大线程数
        static int maximumPoolSize = 5;
        // 线程活跃时间,当线程数大于核心数时,并且线程开始空闲,此时多余的线程经过活跃时间后自动关闭
        static int keepAliveTime = 1;
        // 线程活跃时间单位
        static TimeUnit unit = TimeUnit.SECONDS;
        static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(20); // 等待队列,ArrayBlockingQueue为有界阻塞队列,当队列满时进行阻塞
        static RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略,当队列满时,处理策略
    
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue, handler);
    
        public static void main(String[] args) throws  Exception{
    
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for(int i = 0;i < clientTotal; i++) {
                final int now = i;
                executor.execute(()->{
                    try{
                        test(now);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            System.out.println("thread finish");
            executor.shutdown();
        }
    
        private static void test(final int threadNum) throws Exception {
            Thread.sleep(3000);
            System.out.println("thread: {}" + threadNum);
        }
    }
    

    本来以为写的还不错的代码执行报错了:

    分析
    通过对ThreadPoolExecutor类分析,引发java.util.concurrent.RejectedExecutionException主要有两种原因:
    1. 线程池显示的调用了shutdown()之后,再向线程池提交任务的时候,如果你配置的拒绝策略是ThreadPoolExecutor.AbortPolicy的话,这个异常就被会抛出来。
    2. 当你的排队策略为有界队列,并且配置的拒绝策略是ThreadPoolExecutor.AbortPolicy,当线程池的线程数量已经达到了maximumPoolSize的时候,你再向它提交任务,就会抛出ThreadPoolExecutor.AbortPolicy异常。 (我们设定了)

    static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(20); 
    
    但是
    
    线程有2000个

    所以,被拒绝啦。

    对于分析的第一个我们可以做个例子:

    这一点很好理解。比如说,你向一个仓库去存放货物,一开始,仓库管理员把门给你打开了,你放了第一件商品到仓库里,但是当你放好出去后,有人把仓库门关了,那你下次再来存放物品时,你就会被拒绝。示例代码如下:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
     
    public class TextExecutor {
    	public ExecutorService fixedExecutorService = Executors.newFixedThreadPool(5);
    	public ExecutorService cachedExecutorService = Executors.newCachedThreadPool();
    	public ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
    	
    	public void testExecutorException() {
    		for (int i = 0; i < 10; i ++) {
    			fixedExecutorService.execute(new SayHelloRunnable());
    			fixedExecutorService.shutdown();
    		}
    	}
    	
    	private class SayHelloRunnable implements Runnable {
     
    		@Override
    		public void run() {
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} finally {
    				System.out.println("hello world!");
    			}
    			
    		}
    	}
    	
    	public static void main(String[] args) {
    		TextExecutor testExecutor = new TextExecutor();
    		testExecutor.testExecutorException();
    	}
    }

    解决方案
    1. 不要显示的调用shutdown方法,例如Android里,只有你在Destory方法里cancel掉AsyncTask,则线程池里没有活跃线程会自己回收自己。
    2. 调用线程池时,判断是否已经shutdown,通过API方法isShutDown方法判断,示例代码:
     

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
     
    public class TextExecutor {
    	public ExecutorService fixedExecutorService = Executors.newFixedThreadPool(5);
    	public ExecutorService cachedExecutorService = Executors.newCachedThreadPool();
    	public ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
    	
    	public void testExecutorException() {
    		for (int i = 0; i < 10; i ++) {
    			// 增加isShutdown()判断
    			if (!fixedExecutorService.isShutdown()) {
    				fixedExecutorService.execute(new SayHelloRunnable());
    			}
    			fixedExecutorService.shutdown();
    		}
    	}
    	
    	private class SayHelloRunnable implements Runnable {
     
    		@Override
    		public void run() {
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} finally {
    				System.out.println("hello world!");
    			}
    			
    		}
    	}
    	
    	public static void main(String[] args) {
    		TextExecutor testExecutor = new TextExecutor();
    		testExecutor.testExecutorException();
    	}
    }

    第二种报错代码已经给出:

    我们看如何解决:

    1.提大排队队列

    static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2000); 

     2.使用LinkedBlockingQueue

     static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 

    问题延伸

    1.不建议使用Executors创建线程

    较为方便的Executors工厂方法Executors.newCachedThreadPool() (无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),但是通过源码我们可以发现最后他们均调用了ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 方法,因此我们在分析java.util.concurrent.RejectedExecutionException之前,需要深入学习一下ThreadPoolExecutor的使用。

     2.了解一下:TreadPoolExecutor

    核心池和最大池的大小
    TreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。当新任务在方法execute(java.lang.Runnable)中提交时,如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于corePoolSize而少于maximumPoolSize,则仅当队列满时才创建新的线程。如果设置的corePoolSize和maximumPoolSize相同,则创建了固定大小的线程池。如果将maximumPoolSize设置为基本的无界值(如Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

    3. BlockingQueue/LinkedBlockingQueue我们通过异同快速了解一下:

    相同:

    1、LinkedBlockingQueue和ArrayBlockingQueue都实现了BlockingQueue接口;

    2、LinkedBlockingQueue和ArrayBlockingQueue都是可阻塞的队列

      内部都是使用ReentrantLock和Condition来保证生产和消费的同步;

      当队列为空,消费者线程被阻塞;当队列装满,生产者线程被阻塞;

    使用Condition的方法来同步和通信:await()和signal()

    不同:

    1、由上图可以看出,他们的锁机制不同

      LinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLock

      而ArrayBlockingQueue生产者和消费者使用的是同一把锁;

    2、他们的底层实现机制也不同

      LinkedBlockingQueue内部维护的是一个链表结构

    在生产和消费的时候,需要创建Node对象进行插入或移除,大批量数据的系统中,其对于GC的压力会比较大

      而ArrayBlockingQueue内部维护了一个数组

     

     在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例

     3、构造时候的区别

      LinkedBlockingQueue有默认的容量大小为:Integer.MAX_VALUE,当然也可以传入指定的容量大小

    ArrayBlockingQueue在初始化的时候,必须传入一个容量大小的值

      看其提供的构造方法就能知道 (ideaALT+7 可以查看类方法)

     

    4、执行clear()方法

      LinkedBlockingQueue执行clear方法时,会加上两把锁

     要问什么锁,想想,肯定是生产/消费锁

     ArrayBlockingQueue是添加一把锁

     5、统计元素的个数

      LinkedBlockingQueue中使用了一个AtomicInteger对象来统计元素的个数

           毕竟有2个锁,所以保障count的原子性,需要使用AtomicInteger来控制,底层使用CAS来控制同步。  好烦是不是又要看CAS (之前有写过一篇哦)

          ArrayBlockingQueue则使用int类型来统计元素

    作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

    展开全文
  • import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; public class AtomicReferenceMain { private static AtomicReference ar = new AtomicReference(0...

    1、AtomicReference用以支持对象的原子操作:AtomicReference 可以封装引用一个V实例。

    2、public final boolean compareAndSet(V expect, V update) ,可以支持并发访问,set的时候进行对比判断,如果当前值和操作之前一样则返回false,否则表示数据没有变化。

    实例代码:package com.what21;

    import java.util.concurrent.CountDownLatch;

    import java.util.concurrent.atomic.AtomicReference;

    public class AtomicReferenceMain {

    private static AtomicReference ar = new AtomicReference(0);

    /**

    * @throws InterruptedException

    */

    public static void test() throws InterruptedException {

    int t = 100;

    final int c = 100;

    final CountDownLatch latch = new CountDownLatch(t);

    for (int i = 0; i 

    new Thread(new Runnable() {

    @Override

    public void run() {

    for (int i = 0; i 

    while (true) {

    Integer temp = ar.get();

    System.out.println("temp=" + temp);

    if (ar.compareAndSet(temp, temp + 1)) {

    break;

    }

    }

    }

    latch.countDown();

    }

    }).start();

    }

    latch.await();

    System.out.println(ar.get());

    }

    /**

    * @param args

    * @throws InterruptedException

    */

    public static void main(String[] args) throws InterruptedException {

    test();

    }

    }

    实例代码,原子量实现的计数器:package com.what21;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.atomic.AtomicInteger;

    public class AtomicCounter {

    private AtomicInteger value = new AtomicInteger();

    public int getValue() {

    return value.get();

    }

    public int increase() {

    return value.incrementAndGet();// 内部使用死循环for(;;)调用compareAndSet(current, next)

    // return value.getAndIncrement();

    }

    public int increase(int i) {

    return value.addAndGet(i);// 内部使用死循环for(;;)调用compareAndSet(current, next)

    // return value.getAndAdd(i);

    }

    public int decrease() {

    return value.decrementAndGet();// 内部使用死循环for(;;)调用compareAndSet(current, next)

    // return value.getAndDecrement();

    }

    public int decrease(int i) {

    return value.addAndGet(-i);// 内部使用死循环for(;;)调用compareAndSet(current, next)

    // return value.addAndGet(-i);

    }

    public static void main(String[] args) {

    final AtomicCounter counter = new AtomicCounter();

    ExecutorService service = Executors.newCachedThreadPool();

    for (int i = 0; i 

    service.execute(new Runnable() {

    @Override

    public void run() {

    System.out.println(counter.increase());

    }

    });

    }

    service.shutdown();

    }

    }

    实例代码,原子量实现的银行取款:package com.what21;

    import java.util.Random;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.atomic.AtomicLong;

    public class AtomicAccount {

    private AtomicLong balance;

    public AtomicAccount(long money) {

    balance = new AtomicLong(money);

    System.out.println("Total Money:" + balance);

    }

    public void deposit(long money) {

    balance.addAndGet(money);

    }

    public void withdraw(long money) {

    for (; ; ) {//保证即时同一时间有人也在取款也可以再次尝试取款,如果不需要并发尝试取款,可以去掉这句

    long oldValue = balance.get();

    if (oldValue 

    System.out.println(Thread.currentThread().getName() + " 余额不足! 余额:" + balance);

    break;

    }

    try {Thread.sleep(new Random().nextInt(1000));} catch (Exception e) { }// 模拟取款时间

    if (balance.compareAndSet(oldValue, oldValue - money)) {

    System.out.println(Thread.currentThread().getName() + " 取款 " + money + " 成功! 余额:" + balance);

    break;

    }

    System.out.println(Thread.currentThread().getName() + " 遇到并发,再次尝试取款!");

    }

    }

    /**

    * @param args

    */

    public static void main(String[] args) {

    final AtomicAccount account = new AtomicAccount(1000);

    ExecutorService pool = Executors.newCachedThreadPool();

    int i = 0;

    while (i++ 

    pool.execute(new Runnable() {

    @Override

    public void run() {

    account.withdraw(100);

    }

    });

    }

    pool.shutdown();

    }

    }

    展开全文
  • 线程池报java.util.concurrent.RejectedExecutionException异常java.util.concurrent.RejectedExecutionExceptionat java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor....
  • java.util.concurrent.TimeoutException: null

    千次阅读 2020-12-30 16:14:53
    今天出现了一个很诡异的事情,我们的一个单据页面打开...java.util.concurrent.TimeoutException: null at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) at com.netflix
  • java.util.concurrent.ExecutionException错误信息,这里给出解决方案,大家根据具体要求更改。SEVERE: A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina....
  • java.util.concurrent简介

    2021-02-12 16:42:58
    java.util.concurrent包提供了很多有用的类,方便我们进行并发程序的开发。本文将会做一个总体的简单介绍。主要的组件java.util.concurrent包含了很多内容, 本文将会挑选其中常用的一些类来进行大概的说明:...
  • java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: A child container failed during start SEVERE: A child container failed during start java.util.concurrent....
  • 参考:在项目中,我们通常会遇见各种情况导致的java.util.concurrent.TimeoutException#34705 java.util.concurrent.TimeoutExceptionandroid.content.res.AssetManager.finalize() timed out after 120 ...
  • 首先是concurrent里面最简单也是最基础的一个接口 Executor,先在这里卖个关子,为什么这个接口只有一个方法。...上源代码:packagejava.util.concurrent;publicinterfaceExecutor{voidexecute(Runnab...
  • java.util.concurrent.atomic包下,有AtomicBoolean , AtomicInteger, AtomicLong, AtomicReference等类,它们的基本特性就是在多线程环境下,执行这些类实例包含的方法时,具有排他性,即当某个线程进入方法,...
  • java.util.concurrent.CompletionException: java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id 这个报错的原因,找了一下,发现是线程A的锁,但是线程B...
  • 看到这一串错误有点不想看,不过...Error:java.util.concurrent.ExecutionException: com.android.ide.common.process.ProcessException: Error while executing process E:\and\android-sdk-windows\build-tools\26...
  • Internal error....java.util.concurrent.CompletionException: java.net.BindException: Address already in use: bind at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFutu
  • } 虽然concurrent包提供了能实现所有需求的类,但是本文出于学习其原理和其思想的目的,有必要说一下AQS最重要的编程范式,也就是说,你要使用AQS,那么就必须得按照这个代码模板去写,因为Doug Lee就是这样要求的...
  • java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: PermGen space 在vm里添加 -Xms512m -Xmx1024m -XX:MaxNewSize=512m -XX:MaxPermSize=512m
  • Caused by: java.util.concurrent.ExecutionException: org.apache.dubbo.remoting.RemotingException: Fail to decode request due to: java.lang.UnsupportedOperationException: ...java.lang.UnsupportedOperat.
  • AtomicInteger (保证多线程情况下的原子操作)public class AtomicIntegerDemo {public static AtomicInteger count = new AtomicInteger(0);public static void inc(){try{Thread.sleep(1); //延迟1毫秒}catch ...
  • 2.synchronized 是Java 语言层面的,是内置的关键字;Lock 则是JDK 5中出现的一个包,在使用时,synchronized 同步的代码块可以由JVM自动释放;Lock 需要程序员在finally块中手工释放,如果不释放,可能会引起难以...
  • [main] java.lang.ArithmeticException: / by zero thenApply 当上个阶段正常结束的时候,接收上个阶段的结果, 接着执行指定Action的新的阶段, 该阶段有返回值 接受上个阶段的结果作为本阶段的入参, 如果上个阶段...
  • 1556) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker...
  • 解决IDEA启动java.util.concurrent.CompletionException 今天早上启动IDEA突然报这个错误!可以看到异常后面lua.luacheck.LuaCheckSettingsduplicated,大概可以知道之前安装的lua插件可能出问题了。 第一时间去...
  • Exception in thread "main" java.util.concurrent.TimeoutException at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77) at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell....
  • Caused by : java.util.concurrent.TimeoutException: Futures timed out after 1000s 参考此网站 This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so ...
  • 一、ReentrantReadWriteLock读写锁 private... 说明:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程 二、java.util.concurrent.ConcurrentHashMap 三、BlockingQueue 阻塞队列 转载:...
  • 今天在项目代码中,注意到有使用AtomicInteger类,这个类主要是在java.util.concurrent.atomic并发包下的。 Java并发机制的三个特性,如下所示: (1)原子性 (2)可见性 (3)有序性 volatile关键字能禁止指令重...
  • java.util.concurrent.locks.Lock类的lock和lockInterruptibly方法的区别什么是可中断的锁获取呢?就是:线程在等待获取锁的过程中,是否能够响应中断,以便在被中断的时候能够解除阻塞状态,而不是傻傻地一直在等待...
  • 今天在使用springBoot整合rabbitMQ的时候,突然出现一个报错:java.util.concurrent.TimeoutException 并且发现在使用rabbit的后台管理系统登录的时候也特别慢。 然后就开始在网上查找对应的解决方案,搜到的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 279,672
精华内容 111,868
关键字:

java.util.concurrent

java 订阅