精华内容
下载资源
问答
  • :因为我要对接京东订单服务 拉取订单的时候需要100个商户同时拉取订单服务,必须是异步的。 首先要在springboot 启动处加入 @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") ...

    背景

    :因为我要对接京东订单服务 拉取订单的时候需要100个商户同时拉取订单服务,必须是异步的。

    首先要在springboot 启动处加入

     

    @EnableAsync
    @Configuration
    class TaskPoolConfig {
        @Bean("taskExecutor")
        public Executor taskExecutor() {
            //注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
            //这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程数10:线程池创建时候初始化的线程数
            executor.setCorePoolSize(10);
             //最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
             //maxPoolSize 当系统负载大道最大值时,核心线程数已无法按时处理完所有任务,这是就需要增加线程.每秒200个任务需要20个线程,那么当每秒1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60;
             executor.setMaxPoolSize(30);
            //缓冲队列200:用来缓冲执行任务的队列
            executor.setQueueCapacity(400);
            //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
            executor.setKeepAliveSeconds(60);
            //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
            executor.setThreadNamePrefix("taskExecutor");
            //理线程池对拒绝任务的处策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
            /*CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
            这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
            AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
            这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
            DiscardPolicy:不能执行的任务将被删除
            这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
            DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
            该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心*/
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(60);
            return executor;
        }
    }
    

    每个配置文件代表什么意思可以看一下

    这个时候启动的时候我们异步线程池是已经创建好

    我们创建一个task 类

    public class Task {
        public static Random random = new Random();
    
    
    
        @Async("taskExecutor")
        public void doTask(Integer i) throws Exception {
            System.out.println("开始做任务");
            long start = System.currentTimeMillis();
            //这里写业务代码
            long end = System.currentTimeMillis();
            System.out.println("完成任务耗时:" + (end - start)/1000 + "秒");
    
        }

    }

    这个时候我们就可以使用了我们把task 注入到 controller 层

    @RestController
    @RequestMapping("test/")
    public class TestController {
    
        @Autowired
        private TaskService taskService;
    
        @Autowired
        private Executor taskExecutor;
    
    
        private Logger logger = LogManager.getLogger(JDcontroller.class);
    
        @PostMapping("order")
        public String addOrder(@RequestBody RequestParameterDTO requestParameters){
                //这里会执行你开启的任务,都是异步的,调用这个接口会立马返回 OK  然后业务是在后台运行的
                taskService.doTask(requestParameters);
                return "OK"; 
        }
        //这里我们可以通过接口实时观看效果 具体效果如下图
        @GetMapping("order/asyncExceutor")
        public Map getThreadInfo() {
            Map map =new HashMap();
            Object[] myThread = {taskExecutor};
            for (Object thread : myThread) {
                ThreadPoolTaskExecutor threadTask = (ThreadPoolTaskExecutor) thread;
                ThreadPoolExecutor threadPoolExecutor =threadTask.getThreadPoolExecutor();
                System.out.println("提交任务数"+threadPoolExecutor.getTaskCount());
                System.out.println("完成任务数"+threadPoolExecutor.getCompletedTaskCount() );
                System.out.println("当前有"+threadPoolExecutor.getActiveCount()+"个线程正在处理任务");
                System.out.println("还剩"+threadPoolExecutor.getQueue().size()+"个任务");
                map.put("提交任务数-->",threadPoolExecutor.getTaskCount());
                map.put("完成任务数-->",threadPoolExecutor.getCompletedTaskCount());
                map.put("当前有多少线程正在处理任务-->",threadPoolExecutor.getActiveCount());
                map.put("还剩多少个任务未执行-->",threadPoolExecutor.getQueue().size());
                map.put("当前可用队列长度-->",threadPoolExecutor.getQueue().remainingCapacity());
                map.put("当前时间-->",DateFormatUtil.stringDate());
            }
            return map;
        }
    }
    

     

    上图那个名字要和你在springboot启动处定义的名字要相同 这样spring 才能找到 才能监控你的线程池 ,当然这做的好处是你可以监控多个线程池 的线程,只需要在启动处 在加入  类似 的代码,名字不一样就行了

     

    展开全文
  • 由于工作的需要,写了一份异步调用的小框架,分享出来。。。   启动类:   /** * 线程启动 * @author yfguopeng */ public class ThreadExecutorListener implements ServletContextListener{ ...

    由于工作的需要,写了一份异步调用的小框架,分享出来。。。

     

    启动类:

     

    /**
     * 线程启动
     * @author yfguopeng
     */
    public class ThreadExecutorListener  implements ServletContextListener{
    	private final static Log log = LogFactory.getLog(ThreadExecutorListener.class);
    
    	@SuppressWarnings("unchecked")
    	public void contextInitialized(ServletContextEvent sce) {
    		ServletContext servletContext = sce.getServletContext();
    		WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext);
    		List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); 
    		log.info("=====================初始化线程池========================");
    		//创建线程组
    		SecurityManager s = System.getSecurityManager();
    		ThreadGroup  father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    		ThreadGroup group = new ThreadGroup(father, "root-threadgroup");
    		
    		for (ThreadConfigBean configBean : worders) {
    			//设置排队队列大小
    			ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity());
    			//设置线程工厂
    			ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId());
    			
    			ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(),
                        TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler());
    			
    			ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker);
    		}
    		
    		log.info("=====================线程池初始化完毕========================");
    		log.info("=====================初始化监控线程========================");
    		ThreadGroupUtil.monitorThreadStart(group,2000l);
    		log.info("=====================监控线程初始化完毕========================");
    	}
    
    	public void contextDestroyed(ServletContextEvent sce) {
    	}
    
    	
    }

     

     

    业务监控配置:

     

    <bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean">
        	<property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 -->
        	<property name="max" value="40"></property><!-- 最好为请求线程的倍数 -->
        	<property name="min" value="10"></property><!-- 最好为请求线程的倍数 -->
        	<property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 -->
        	<property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 -->
        	<property name="rejectHandler" ><!--任务拒绝处理策略 -->
    	    	<bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" >
    	    		<property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 -->
    	    	</bean>
        	</property>
        </bean>
    
    	<bean id="workers" class="java.util.ArrayList">
    		<constructor-arg >
    			<list> 
    				<ref bean="xxxIndex"/>
    			</list>
    		</constructor-arg>
    	</bean>

     线程工厂:

     

     

    /**
     *  线程工厂, 加入了线程名的业务描述
     * 
     * @User: guopeng
     * @Date: 2013-02-28
     */
    public class DecorateThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;
    
        public DecorateThreadFactory(final ThreadGroup group,final String bizName) {
            this.group = group;
            namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

     监控线程:

    /**
     * 监控业务线程池运行情况
     * @author yfguopeng
     * @Date 2013-02-28
     */
    public class MonitorThread implements Runnable {
    	private final static Log log = LogFactory.getLog(MonitorThread.class);
    	private final ThreadGroup group;
    	
    	public MonitorThread(ThreadGroup group) {
    		this.group = group;
    	}
    	
    	public void run() {
    		Map<String, ThreadPoolExecutor>workers =  ThreadGroupUtil.getThreadWorkers();
    		Iterator<String> iterator = workers.keySet().iterator();
    		
    		log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]");
    		while(iterator.hasNext()) {
    			ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next());
    			
    			RejectedExecutionHandler handler = worker.getRejectedExecutionHandler();
    			String rejectedSize = "";
    			if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) {
    				rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize();
    			}
    			
    			log.info("business name:[ "+worker.getBizName()+" ]" +
    					", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" +
    							", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " +
    									", reject threads:[ "+rejectedSize+" ],  largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]");
    		}
    	}
    
    }

     线程拒绝处理器:

    /**
     * 线程拒绝执行控制球
     * @author yfguopeng
     * @Date 2013-02-28
     */
    public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{
    	private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class);
    	private static AtomicLong totals = new AtomicLong(0l);
    	
    	private String bizName;
    	
    	public RejectedPolicyHandler(){}
    	
    	@Override
    	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    		String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size();
    		log.info(tip);
    		//业务报警 TODO
    		totals.addAndGet(1);
    		super.rejectedExecution(r, executor);
    	}
    
    	public String getBizName() {
    		return bizName;
    	}
    
    	public void setBizName(String bizName) {
    		this.bizName = bizName;
    	}
    	
    	public long getRejectedSize() {
    		return totals.get();
    	}
    }

     

    import java.util.concurrent.RejectedExecutionHandler;
    
    public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{
    	public long getRejectedSize() ;
    }

     线程配置bean:

    @SuppressWarnings("serial")
    public class ThreadConfigBean implements Serializable{
    	/**
    	 * 业务ID
    	 */
    	private String businessId;
    	/**
    	 * 任务队列最大线程数
    	 * 默认:80
    	 */
    	private Integer max = 160;
    	/**
    	 * 任务队列最小线程数
    	 * 默认:40
    	 */
    	private Integer min = 80;
    	/**
    	 * 等待队列请求数
    	 * 默认:300
    	 */
    	private Integer queueCapacity = 300;
    	/**
    	 * 空闲线程存活时间
    	 * 默认:3分钟
    	 */
    	private Long keepAliveTime = 3 * 60l;
    	/**
    	 * 线程拒绝策略
    	 */
    	private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
    	
    	public ThreadConfigBean() {
    		super();
    	}
    	
    	public Integer getMax() {
    		return max;
    	}
    	public void setMax(Integer max) {
    		this.max = max;
    	}
    	public Integer getMin() {
    		return min;
    	}
    	public void setMin(Integer min) {
    		this.min = min;
    	}
    	public Integer getQueueCapacity() {
    		return queueCapacity;
    	}
    	public void setQueueCapacity(Integer queueCapacity) {
    		this.queueCapacity = queueCapacity;
    	}
    	public Long getKeepAliveTime() {
    		return keepAliveTime;
    	}
    	public void setKeepAliveTime(Long keepAliveTime) {
    		this.keepAliveTime = keepAliveTime;
    	}
    	public RejectedExecutionHandler getRejectHandler() {
    		return rejectHandler;
    	}
    	public void setRejectHandler(RejectedExecutionHandler rejectHandler) {
    		this.rejectHandler = rejectHandler;
    	}
    	public String getBusinessId() {
    		return businessId;
    	}
    	public void setBusinessId(String businessId) {
    		this.businessId = businessId;
    	}
    }

     线程组:

    /**
     * 各个业务获取响应线程池
     * @author yfguopeng
     */
    public class ThreadGroupUtil {
    	private static Map<String, ThreadPoolExecutor> threadworkers;
    	private static ScheduledExecutorService monitorThread;//监视线程
    	private static final long delay = 200l;
    	private static long cycle_default = 5000l;
    	
    	static {
    		threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>();
    		monitorThread = Executors.newScheduledThreadPool(1);
    	}
    	
    	public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){
    		threadworkers.put(bizName, executor);
    	}
    	
    	public static ThreadPoolExecutor getThreadWorker(String bizName) {
    		return threadworkers.get(bizName);
    	}
    	
    	public static Map<String, ThreadPoolExecutor> getThreadWorkers(){
    		return threadworkers;
    	}
    
    	public static ScheduledExecutorService getMonitorThread() {
    		return monitorThread;
    	}
    
    	public static void setMonitorThread(ScheduledExecutorService monitorThread) {
    		ThreadGroupUtil.monitorThread = monitorThread;
    	}
    	
    	public static void monitorThreadClosed(){
    		if (monitorThread != null)
    			if (!monitorThread.isTerminated()) 
    				monitorThread.shutdown();
    	}
    	
    	public static void monitorThreadStart(ThreadGroup group,Long cycle){
    		MonitorThread monitor = new MonitorThread(group);
    		if (cycle > 0l) {
    			try {
    				cycle_default = cycle;
    			} catch (Exception e) {
    			}
    		}
    		monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS);
    	}
    }

     线程池实现类:

    /**
     * 线程池
     * @author yfguopeng
     *
     */
    public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    
    	private String bizName;
    	
    	public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize,
    			long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
    			RejectedExecutionHandler handler) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    				threadFactory, handler);
    		this.bizName = bizName;
    	}
    
    	public String getBizName() {
    		return bizName;
    	}
    
    	public void setBizName(String bizName) {
    		this.bizName = bizName;
    	}
    	
    }

     

    web.xml配置:

      <listener>
    		<listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class>
    	</listener> 

    调用:

    		ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex");
    		
    		String payOrgInfo = null;
    		String cards = null;
    		Future<String> xxxFuture = null;
    		Future<String> yyyFuture = null;
    		long start = System.currentTimeMillis();
    		xxxTask xxxTask = new xxxTask(//参数);
    		yyyTask yyyTask = new yyyTask(//参数);
    		System.out.println("开始......");
    		xxxFuture = exc.submit(xxxTask );
    		yyyFuture = exc.submit(yyyTask );
    		
    		try {
    			xxx= xxxFuture .get();
    			yyy= yyyFuture .get();
    			System.out.println(xxx);
    			System.out.println(yyy);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (ExecutionException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		long end = System.currentTimeMillis();
    		System.out.println("结束......   "+(end-start));
    		
    		return "";
    	

     

    展开全文
  • 最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。...线程池的运行状况,也需要监控关于线程...

    最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。

    从这个问题中,我们学到了两点:

    线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;

    线程池的运行状况,也需要监控

    关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:

    public class AsyncThreadExecutor implements AutoCloseable {

    private static final int DEFAULT_QUEUE_SIZE = 1000;

    private static final int DEFAULT_POOL_SIZE = 10;

    @Setter

    private int queueSize = DEFAULT_QUEUE_SIZE;

    @Setter

    private int poolSize = DEFAULT_POOL_SIZE;

    /**

    * 用于周期性监控线程池的运行状态

    */

    private final ScheduledExecutorService scheduledExecutorService =

    Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());

    /**

    * 自定义异步线程池

    * (1)任务队列使用有界队列

    * (2)自定义拒绝策略

    */

    private final ThreadPoolExecutor threadPoolExecutor =

    new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),

    new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),

    (r, executor) -> log.error("the async executor pool is full!!"));

    private final ExecutorService executorService = threadPoolExecutor;

    @PostConstruct

    public void init() {

    scheduledExecutorService.scheduleAtFixedRate(() -> {

    /**

    * 线程池需要执行的任务数

    */

    long taskCount = threadPoolExecutor.getTaskCount();

    /**

    * 线程池在运行过程中已完成的任务数

    */

    long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();

    /**

    * 曾经创建过的最大线程数

    */

    long largestPoolSize = threadPoolExecutor.getLargestPoolSize();

    /**

    * 线程池里的线程数量

    */

    long poolSize = threadPoolExecutor.getPoolSize();

    /**

    * 线程池里活跃的线程数量

    */

    long activeCount = threadPoolExecutor.getActiveCount();

    log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",

    taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);

    }, 0, 10, TimeUnit.MINUTES);

    }

    public void execute(Runnable task) {

    executorService.execute(task);

    }

    @Override

    public void close() throws Exception {

    executorService.shutdown();

    }

    }

    这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。

    在查看监控日志的时候,看到下图所示的监控日志:

    760d118107417581e2fd631e7c203732.png

    屏幕快照 2018-03-28 21.55.19.png

    这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:

    /**

    * Returns the largest number of threads that have ever

    * simultaneously been in the pool.

    *

    * @return the number of threads

    */

    public int getLargestPoolSize() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

    return largestPoolSize;

    } finally {

    mainLock.unlock();

    }

    }

    注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

    int c = ctl.get();

    int rs = runStateOf(c);

    // Check if queue empty only if necessary.

    if (rs >= SHUTDOWN &&

    ! (rs == SHUTDOWN &&

    firstTask == null &&

    ! workQueue.isEmpty()))

    return false;

    for (;;) {

    int wc = workerCountOf(c);

    if (wc >= CAPACITY ||

    wc >= (core ? corePoolSize : maximumPoolSize))

    return false;

    if (compareAndIncrementWorkerCount(c))

    break retry;

    c = ctl.get(); // Re-read ctl

    if (runStateOf(c) != rs)

    continue retry;

    // else CAS failed due to workerCount change; retry inner loop

    }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

    w = new Worker(firstTask);

    final Thread t = w.thread;

    if (t != null) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

    // Recheck while holding lock.

    // Back out on ThreadFactory failure or if

    // shut down before lock acquired.

    int rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN ||

    (rs == SHUTDOWN && firstTask == null)) {

    if (t.isAlive()) // precheck that t is startable

    throw new IllegalThreadStateException();

    workers.add(w);

    int s = workers.size();

    if (s > largestPoolSize)

    largestPoolSize = s;//这里这里!

    workerAdded = true;

    }

    } finally {

    mainLock.unlock();

    }

    if (workerAdded) {

    t.start();

    workerStarted = true;

    }

    }

    } finally {

    if (! workerStarted)

    addWorkerFailed(w);

    }

    return workerStarted;

    }

    发现两点:

    largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;

    largestPoolSize<=maximumPoolSize。

    PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。

    展开全文
  • #####spring异步线程池 如报表,因为业务量大,比较耗时 实现AsyncConfigurer接口,使用@EnableAsync开启异步可用 在使用时,添加注解@Async即可 异步消息JMS 点对点模式 发布订阅模式,为了更好的扩展,更多系统...

    #####spring异步线程池
    如报表,因为业务量大,比较耗时

    • 实现AsyncConfigurer接口,使用@EnableAsync开启异步可用
    • 在使用时,添加注解@Async即可
    异步消息JMS
    • 点对点模式
    • 发布订阅模式,为了更好的扩展,更多系统使用监控得到消息,此项用的多。
    ActiveMQ
    • springboot配置
      spring.JMS.
      spring.activemq.
    • 发送消息
      使用jmsTemplate.convertwAndSend发送消息,默认转换规则SimpleMessageConverter
    • 接收消息
      注解@JmsListener接收消息
    • 发送或接收对象,对象需要序列化,并且在配置文件中加入到信任列表中,spring.Activemq.packages.trusted=对象所在包名,java.lang,因为对象中的id属性为Long,信任所有包为trust-all
    • RabbitMQ
      Amqp的rabbitmq
      springboot配置spring.rabbitmq.
    • ConfirmCallback接口的Confirm方法为回调方法
      rabbitTemplate进行操作,
      rabbitTemplate.setConfirmCallback设置回调
      rabbitTemplate.ConvertAndSend发送消息
      @RabbitListener接收消息
    定时任务

    原因:统计,耗时操作,月末,年末等统计报表
    @EnableScheduling启用
    @Scheduled配置如何定时

    • cron表达式
      秒 分 时 天 月 星期 年,最后一项年可以不配置,通配符如下:
      任意值 *
      ? 不指定值,用于处理天和星期配置的冲突
      _ 指定时间区间
      / 指定时间间隔执行
      ,列举多个项
      第几个#
      L 最后的
    websocket

    websocket协议,浏览器与服务器全双工通信
    @ServerEndpoint定义端点服务器类
    @onOpen 连接建立成功
    @onClose 连接关闭
    @onMessage 接收消息
    @OnError 错误时调用

    • 通过子协议STOMP兼容websocket不支持的浏览器
      @EnableWebSockitMessageBroker注解启动websockit的子协议stomp,通过实现WebSockitMessageBrokerConfigurer的两个方法,springboot使用SimpMessagingTemplate进行消息转发,
      @MessageMapping定义websocket请求路径,并使用模板进行转发。如果使用了springsecurity可以使用Principal获取当前用户信息
      @SendTo定义结果发送到特定路径
      客户端可使用Sockjs进行连接
    展开全文
  • java线程池监控

    2018-12-02 16:19:00
    使用到了一线程去做一些异步的事情,在开发环境和测试环境验证没有任何问题,但是在程序在生产运行一段时间后,发现没有得到自己想要的结果,为此开始了漫长的排查bug的之路,因为用到了一些线程,但是实际又没有对...
  • Java线程池监控小结

    2019-03-08 19:26:46
    最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发...
  • 线程池注入: import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import...
  • Spring 线程池定时监控

    千次阅读 2018-07-09 21:59:53
    在上一篇Spring异步线程池:https://blog.csdn.net/timchen525/article/details/80672186,该文介绍了如何使用Spring的注解来配置异步线程操作。本篇博文中,补充介绍如何通过Spring定时任务来配置定时检测线程池的...
  • Java线程池该如何监控

    千次阅读 2018-09-25 22:36:03
    Java线程池该如何监控? 日常开发中,当我们开发一些复合型任务时,尝尝会使用线程池通过异步的方式解决一些对时效性要求不高的任务。下面小编列举几种线程池的使用方式,以便参考! Java JDK中默认封装好的...
  • 我们在做项目开发中,有时需要对某个服务进行监控,如果只监控一个服务的话,就不需要使用多线程了,但是如果监控的对象有很多,就需要使用线程池来减少资源的消耗了。 要想使用线程池,就必须先创建一个线程池。...
  • 线程池

    2021-03-27 10:31:00
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。 1.线程池的好处 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 提高相应速度。任务...
  • 在spring boot应用监控线程池的状态

    万次阅读 2018-05-05 11:27:26
    背景废话不多说,做这个监控的背景很简单,我们的项目都是以spring boot框架为基础开发的,代码里所有的异步线程都是通过@Async标签标注的,并且标注的时候都是指定对应线程池的,如果不知@Asyn...
  • 1.线程池的好处几乎所有需要异步操作或者并发执行任务的程序都可以使用线程池,三个好处: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗; 提高响应速度:当任务到达的时候,任务可以不...
  • 线程池对执行同步或异步的任务很重要。本文展示如何利用Spring开发并监控线程池服务。创建线程池的其他两种方法已讲解过。 使用技术 JDK 1.6.0_21Spring 3.0.5Maven 3.0.2 第1步:创建Maven工程 下面是一个...
  • 线程池在IO密集、并发任务、异步任务、后台监控任务场景中都发挥作用。而线程话题无论是操作系统和高级编程语言上都是一个复杂的话题。本文的讨论是基于Posix标准讨论的。题外话,作为一名pytho...
  • 点击下方公众号「关注」和「星标」回复“1024”获取独家整理的学习资料!1. Java的线程池① 合理使用线程池的好处Java的线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行...
  • 线程池总结

    2020-08-03 10:45:24
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。开发中使用线程池的三个优点如下: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁带来的消耗。 ...
  • 线程池简述

    2021-04-08 14:22:00
    java的线程是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池 合理使用线程池带来的好处 降低资源的消耗 通过重复利用已经创建的线程,降低频繁的创建与销毁线程所带来的资源消耗...
  • 最近在做推荐排单时,有些走规则进行实时查询的需求,需要并行的远程调用处理数据,所以使用了线程池异步并行处理。但是原生线程池没有监控,没有监控就没法统计数据,没法调参数,需要一个能统计任务数据的线程池。...
  • Java中的线程池

    2020-01-04 16:21:50
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建...
  • Java的线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。 合理使用线程池能带来的好处: 降低资源消耗。 通过重复利用已经创建的线程降低线程创建的和销毁造成的消耗。...
  • 线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED。因为服务 B 依赖外部接口,刚开始误以为外部接口...
  • 线程池原理分析

    2020-05-25 23:51:12
    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和...
  • Java多并发之线程池

    2020-12-16 14:16:53
    目录一、为什么需要线程池二、线程池实现原理三、线程池的使用3.1 线程池的创建3.2 向线程池提交任务3.3 关闭线程池3.4 线程池监控 本文参考自《并发编程艺术》 一、为什么需要线程池 Java中的线程池是运用场景...

空空如也

空空如也

1 2 3 4 5 ... 9
收藏数 166
精华内容 66
关键字:

异步线程池监控