精华内容
下载资源
问答
  • 多线程模板特点: 0)....如遇执行失败需要重试的任务直接压入队列 排队重试! (如果限制比较大的网站,账号登陆被限制IP了,在同一个IP子线程里面循环重试又浪费时间,不如试试揪出来扔后面排队重试吧)
  • 资源介绍:鱼刺线程池 拨号,失败重试框架易语言源码,源码调用了精易模块和鱼刺多线程模块。源码实现了多线程操作,并且具有宽带拨号,失败重试等功能。资源作者:Joker4tb资源界面:资源下载:
  • springboot提供了org.springframework.scheduling.annotation.AsyncConfigurer接口让开发人员可以自定义线程池执行器;框架默认提供了一个空的实现类AsyncConfigurerSupport,不过两个方法体内部提供的都是空实现; ...

    springboot提供了org.springframework.scheduling.annotation.AsyncConfigurer接口让开发人员可以自定义线程池执行器;框架默认提供了一个空的实现类AsyncConfigurerSupport,不过两个方法体内部提供的都是空实现;

    1.首先看下AsyncConfigurer接口
    public interface AsyncConfigurer {
    
    	/**
    	 * 方法返回一个实际执行线程的线程池
    	 */
    	@Nullable
    	default Executor getAsyncExecutor() {
    		return null;
    	}
    
    	/**
    	 * 当线程池执行异步任务时会抛出AsyncUncaughtExceptionHandler异常,
    	 * 此方法会捕获该异常
    	 */
    	@Nullable
    	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    		return null;
    	}
    
    }
    
    2.通过上面的接口实现异步线程池

    先看下我们的目标:

    • 自定义一个异步线程池替换掉springboot框架默认的线程池;
    • 自动捕获异步线程实例抛出的异常;
    • 线程池的核心指标可以通过配置文件自定义配置;
    • 封装成为一个组件,可以通过配置控制容器是否加载;

    自动化配置类如下:

    package com.sgrain.boot.autoconfigure.threadpool;
    
    import com.sgrain.boot.common.exception.PrintExceptionInfo;
    import com.sgrain.boot.common.utils.LoggerUtils;
    import com.sgrain.boot.common.utils.constant.CharacterUtils;
    import org.apache.commons.lang3.ArrayUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.Objects;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @program: spring-parent
     * @description: 异步线程池配置 AsyncConfigurer在applicationContext早期初始化,如果需要依赖于其它的bean,尽可能的将它们声明为lazy
     * @create: 2020/08/21
     */
    @EnableAsync
    @Configuration
    @EnableConfigurationProperties(AsyncThreadPoolProperties.class)
    @ConditionalOnProperty(prefix = "spring.sgrain.async-thread-pool", name = "enable", havingValue = "true", matchIfMissing = false)
    public class AsyncThreadPoolAutoConfiguration implements AsyncConfigurer {
    
        @Autowired
        private AsyncThreadPoolProperties asyncThreadPoolProperties;
    
        /**
         * 定义线程池
         * 使用{@link java.util.concurrent.LinkedBlockingQueue}(FIFO)队列,是一个用于并发环境下的阻塞队列集合类
         * ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉,容器会自动调用
         *
         * @return
         */
        @Bean("asyncTaskExecutor")
        @Override
        public Executor getAsyncExecutor() {
            //Java虚拟机可用的处理器数
            int processors = Runtime.getRuntime().availableProcessors();
            //定义线程池
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //核心线程数
            taskExecutor.setCorePoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors);
            //线程池最大线程数,默认:40000
            taskExecutor.setMaxPoolSize(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 40000);
            //线程队列最大线程数,默认:80000
            taskExecutor.setQueueCapacity(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 80000);
            //线程名称前缀
            taskExecutor.setThreadNamePrefix(StringUtils.isNotEmpty(asyncThreadPoolProperties.getThreadNamePrefix()) ? asyncThreadPoolProperties.getThreadNamePrefix() : "Async-ThreadPool-");
            //线程池中线程最大空闲时间,默认:60,单位:秒
            taskExecutor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveSeconds());
            //核心线程是否允许超时,默认:false
            taskExecutor.setAllowCoreThreadTimeOut(asyncThreadPoolProperties.isAllowCoreThreadTimeOut());
            //IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
            taskExecutor.setWaitForTasksToCompleteOnShutdown(asyncThreadPoolProperties.isWaitForTasksToCompleteOnShutdown());
            //阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
            taskExecutor.setAwaitTerminationSeconds(asyncThreadPoolProperties.getAwaitTerminationSeconds());
            /**
             * 拒绝策略,默认是AbortPolicy
             * AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
             * DiscardPolicy:丢弃任务但不抛出异常
             * DiscardOldestPolicy:丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务
             * CallerRunsPolicy:执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务
             */
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
            //初始化
            //taskExecutor.initialize();
    
            return taskExecutor;
        }
    
        /**
         * 异步方法执行的过程中抛出的异常捕获
         *
         * @return
         */
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (throwable, method, objects) -> {
                String msg = StringUtils.EMPTY;
                if (ArrayUtils.isNotEmpty(objects) && objects.length > 0) {
                    msg = StringUtils.join(msg, "参数是:");
                    for (int i = 0; i < objects.length; i++) {
                        msg = StringUtils.join(msg, objects[i], CharacterUtils.ENTER);
                    }
                }
                if (Objects.nonNull(throwable)) {
                    msg = StringUtils.join(msg, PrintExceptionInfo.printErrorInfo(throwable));
                }
                LoggerUtils.error(method.getDeclaringClass(), msg);
            };
        }
    }
    
    

    线程池的自定义属性及异常处理代码中的注解已经标的很清晰了,不在重复说明;@Configuration注解描述当前类是一个配置类,@EnableAsync注解描述启动线程池执行器开启异步执行功能;@EnableConfigurationProperties注解描述启动对@ConfigurationProperties注解标注的bean的支持,该bean可以被注入到IOC容器之中;@ConditionalOnProperty是一个条件注解,用来控制容器是否将当前的配置类注入到IOC容器之中;

    线程池使用到的外部属性配置类如下:

    package com.sgrain.boot.autoconfigure.threadpool;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * @program: spring-parent
     * @description: 异步线程池配置文件
     * @create: 2020/08/21
     */
    @ConfigurationProperties(prefix = "spring.sgrain.async-thread-pool")
    public class AsyncThreadPoolProperties {
        /**
         * 是否启动异步线程池,默认 false
         */
        private boolean enable;
        /**
         * 核心线程数,默认:Java虚拟机可用线程数
         */
        private Integer corePoolSize;
        /**
         * 线程池最大线程数,默认:40000
         */
        private Integer maxPoolSize;
        /**
         * 线程队列最大线程数,默认:80000
         */
        private Integer queueCapacity;
        /**
         * 自定义线程名前缀,默认:Async-ThreadPool-
         */
        private String threadNamePrefix;
        /**
         * 线程池中线程最大空闲时间,默认:60,单位:秒
         */
        private Integer keepAliveSeconds = 60;
        /**
         * 核心线程是否允许超时,默认false
         */
        private boolean allowCoreThreadTimeOut;
        /**
         * IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
         */
        private boolean waitForTasksToCompleteOnShutdown;
        /**
         * 阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
         */
        private int awaitTerminationSeconds = 10;
    
        public boolean isEnable() {
            return enable;
        }
    
        public void setEnable(boolean enable) {
            this.enable = enable;
        }
    
        public Integer getCorePoolSize() {
            return corePoolSize;
        }
    
        public void setCorePoolSize(Integer corePoolSize) {
            this.corePoolSize = corePoolSize;
        }
    
        public Integer getMaxPoolSize() {
            return maxPoolSize;
        }
    
        public void setMaxPoolSize(Integer maxPoolSize) {
            this.maxPoolSize = maxPoolSize;
        }
    
        public Integer getQueueCapacity() {
            return queueCapacity;
        }
    
        public void setQueueCapacity(Integer queueCapacity) {
            this.queueCapacity = queueCapacity;
        }
    
        public String getThreadNamePrefix() {
            return threadNamePrefix;
        }
    
        public void setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
        }
    
        public Integer getKeepAliveSeconds() {
            return keepAliveSeconds;
        }
    
        public void setKeepAliveSeconds(Integer keepAliveSeconds) {
            this.keepAliveSeconds = keepAliveSeconds;
        }
    
        public boolean isAllowCoreThreadTimeOut() {
            return allowCoreThreadTimeOut;
        }
    
        public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
            this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
        }
    
        public boolean isWaitForTasksToCompleteOnShutdown() {
            return waitForTasksToCompleteOnShutdown;
        }
    
        public void setWaitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
            this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
        }
    
        public int getAwaitTerminationSeconds() {
            return awaitTerminationSeconds;
        }
    
        public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
            this.awaitTerminationSeconds = awaitTerminationSeconds;
        }
    }
    
    

    properties属性配置示例如下:

    #异步线程池
    #异步线程池组件开关,默认false
    spring.sgrain.async-thread-pool.enable=true
    #核心线程数,默认:Java虚拟机可用线程数
    spring.sgrain.async-thread-pool.core-pool-size=4
    #线程池最大线程数,默认:40000
    spring.sgrain.async-thread-pool.max-pool-size=40000
    #线程队列最大线程数,默认:80000
    spring.sgrain.async-thread-pool.queue-capacity=80000
    #自定义线程名前缀,默认:Async-ThreadPool-
    spring.sgrain.async-thread-pool.thread-name-prefix=Async-ThreadPool-
    #线程池中线程最大空闲时间,默认:60,单位:秒
    spring.sgrain.async-thread-pool.keep-alive-seconds=60
    #核心线程是否允许超时,默认false
    spring.sgrain.async-thread-pool.allow-core-thread-time-out=false
    #IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
    spring.sgrain.async-thread-pool.wait-for-tasks-to-complete-on-shutdown=false
    #阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
    spring.sgrain.async-thread-pool.await-termination-seconds=10
    

    上面的开发任务都做好之后还需要最后一步,将com.sgrain.boot.autoconfigure.threadpool.AsyncThreadPoolAutoConfiguration配置到resources/META-INF目录的spring.factories文件中:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      com.sgrain.boot.autoconfigure.threadpool.AsyncThreadPoolAutoConfiguration
    

    看到配置的属性key是EnableAutoConfiguration大概你就可以猜测到是为了启用自动化配置功能;到现在整个异步线程池自动化配置组件已经开发完成了,那如何使用呢?我就不再举例说明了,网上有很多示例,只说几个重点;

    • 使用@Async注解标注方法为异步任务
    • 异步任务返回值为void
    • 异步任务方法必须使用@Override标注,即是一个接口的实现方法
    3.异步线程池原理分析

    springboot框架自带ThreadPoolTaskExecutor线程池,org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration自动化配置类自动的创建默认的线程池,如果没有自定义ThreadPoolTaskExecutor线程池,那么@EnableAsync异步线程自动关联默认的线程池;

    • 看下@EnableAsync的源码是如何启动异步执行器的
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(AsyncConfigurationSelector.class)
    public @interface EnableAsync {
    
    	Class<? extends Annotation> annotation() default Annotation.class;
    	boolean proxyTargetClass() default false;
    	AdviceMode mode() default AdviceMode.PROXY;
    	int order() default Ordered.LOWEST_PRECEDENCE;
    
    }
    

    上述代码的核心是@Import注解导入的AsyncConfigurationSelector选择器类;

    • AsyncConfigurationSelector选择器类用来确定具体实现的配置类,一共有两种模式一种是使用JDK自带的动态代理模式实现动态代理,另外一种是使用ASPECTJ实现动态代理;默认使用JDK模式;
    public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    
    	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
    			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    
    
    	/**
    	 * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
    	 * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
    	 * respectively.
    	 */
    	@Override
    	@Nullable
    	public String[] selectImports(AdviceMode adviceMode) {
    		switch (adviceMode) {
    			case PROXY:
    				return new String[] {ProxyAsyncConfiguration.class.getName()};
    			case ASPECTJ:
    				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
    			default:
    				return null;
    		}
    	}
    
    }
    
    • ProxyAsyncConfiguration配置类源码如下
    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    
    	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    		bpp.configure(this.executor, this.exceptionHandler);
    		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
    			bpp.setAsyncAnnotationType(customAsyncAnnotation);
    		}
    		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    		return bpp;
    	}
    
    }
    

    上面的类是AbstractAsyncConfiguration类的子类,该类通过setConfigurers方法注入实现了AsyncConfigurer配置的自定义线程池,源码如下:

    	@Autowired(required = false)
    	void setConfigurers(Collection<AsyncConfigurer> configurers) {
    		if (CollectionUtils.isEmpty(configurers)) {
    			return;
    		}
    		if (configurers.size() > 1) {
    			throw new IllegalStateException("Only one AsyncConfigurer may exist");
    		}
    		AsyncConfigurer configurer = configurers.iterator().next();
    		this.executor = configurer::getAsyncExecutor;
    		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
    	}
    

    ProxyAsyncConfiguration源码中的AsyncAnnotationBeanPostProcessor是一个Bean的后置处理器,所以会在setBeanFactory方法中对AOP切面类AsyncAnnotationAdvisor进行了初始化,源码如下:

    	@Override
    	public void setBeanFactory(BeanFactory beanFactory) {
    		super.setBeanFactory(beanFactory);
    
    		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    		if (this.asyncAnnotationType != null) {
    			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    		}
    		advisor.setBeanFactory(beanFactory);
    		this.advisor = advisor;
    	}
    

    切面类的构造函数中有两个核心点,对通知和切点进行初始化,源码如下:

    	public AsyncAnnotationAdvisor(
    			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    
    		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    		asyncAnnotationTypes.add(Async.class);
    		try {
    			asyncAnnotationTypes.add((Class<? extends Annotation>)
    					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    		}
    		catch (ClassNotFoundException ex) {
    			// If EJB 3.1 API not present, simply ignore.
    		}
    		this.advice = buildAdvice(executor, exceptionHandler);
    		this.pointcut = buildPointcut(asyncAnnotationTypes);
    	}
    

    其中buildAdvice方法对拦截器AnnotationAsyncExecutionInterceptor进行了初始化,该类实现了MethodInterceptor接口,源码如下:

    	protected Advice buildAdvice(
    			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    
    		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    		interceptor.configure(executor, exceptionHandler);
    		return interceptor;
    	}
    

    上述拦截器类调用了configure方法,该方法对默认线程池和异常执行器进行了初始化,源码如下:

    	public void configure(@Nullable Supplier<Executor> defaultExecutor,
    			@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    
    		this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
    		this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
    	}
    

    这个地方是一个关键点,如果没有自定义实现AsyncConfigurer接口,则此处获取到的是默认的线程池,如果自定义实现了AsyncConfigurer接口,则此处获取到的就是自定义线程池;

    • 接下来我们看下拦截器的invoke方法,此方法是在AnnotationAsyncExecutionInterceptor的父类AsyncExecutionInterceptor中,源码如下:
    	@Override
    	@Nullable
    	public Object invoke(final MethodInvocation invocation) throws Throwable {
    		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    		//此处是用来获取具体执行线程任务线程池
    		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    		if (executor == null) {
    			throw new IllegalStateException(
    					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
    		}
    
    		Callable<Object> task = () -> {
    			try {
    				Object result = invocation.proceed();
    				if (result instanceof Future) {
    					return ((Future<?>) result).get();
    				}
    			}
    			catch (ExecutionException ex) {
    				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    			}
    			catch (Throwable ex) {
    				handleError(ex, userDeclaredMethod, invocation.getArguments());
    			}
    			return null;
    		};
    
    		return doSubmit(task, executor, invocation.getMethod().getReturnType());
    	}
    

    上述代码的核心determineAsyncExecutor方法用来确定执行线程的线程池,源码如下:

    @Nullable
    	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    		AsyncTaskExecutor executor = this.executors.get(method);
    		if (executor == null) {
    			Executor targetExecutor;
    			String qualifier = getExecutorQualifier(method);
    			if (StringUtils.hasLength(qualifier)) {
    				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
    			}
    			else {
    				targetExecutor = this.defaultExecutor.get();
    			}
    			if (targetExecutor == null) {
    				return null;
    			}
    			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
    					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
    			this.executors.put(method, executor);
    		}
    		return executor;
    	}
    

    至此,通过@EnableAsync注解启动异步线程池,如何加载默认线程池配置,如何定义AOP切面及拦截器,通过@Async标注异步任务如何确定执行的线程池的原理及源码分析已经完成。

    GitHub源码:https://github.com/mingyang66/spring-parent/tree/master/emily-spring-boot-autoconfigure

    展开全文
  • 线程池

    千次阅读 多人点赞 2017-07-17 11:22:43
    使用线程池的优缺点什么是线程池线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务线程池类图线程池重要的类 Executor 接口只有一个 方法,execute,并且需要 传入一个 ...

    线程池

    什么是线程池
    线程池的组成部分
    线程池的实现原理
    线程池的应用场景
    使用线程池的优缺点

    什么是线程池

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务

    线程池类图

    这里写图片描述

    线程池重要的类

    这里写图片描述
    Executor 接口只有一个 方法,execute,并且需要 传入一个 Runnable 类型的参数
    ExecutorService 接口继承了 Executor,并且提供了一些其他的方法
    shutdownNow : 关闭线程池,返回放入了线程池,但是还没开始执行的线程。
    submit : 执行的任务 允许拥有返回值。
    invokeAll : 运行把任务放进集合中,进行批量的执行,并且能有返回
    AbstractExecutorService 是一个抽象类,主要完成了 对 submit 方法,invokeAll 方法 的实现
    ThreadPoolExecutor 继承了 AbstractExecutorService,并且实现了最重要的 execute 方法

    线程池的组成部分

    1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
    2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
    3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

    线程池实现原理

    1.线程池状态

    2.任务的执行

    3.线程池中的线程初始化

    4.任务缓存队列及排队策略

    5.任务拒绝策略

    6.线程池的关闭

    7.线程池容量的动态调整

    线程池状态

    RUNNING :接受提交任务
    SHUTDOWN:关闭状态 不在接受但能处理
    STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程
    TIDYING:有效线程数)为0
    TERMINATED:在terminated() 方法执行完后进入该状态

    // 得到线程数,也就是后29位的数字。 直接跟CAPACITY做一个与操作即可,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 与操作的话前面3位肯定为0,相当于直接取后29位的值
    private static int workerCountOf(int c) { return c & CAPACITY; }

    // 得到状态,CAPACITY的非操作得到的二进制位11100000000000000000000000000000,然后做在一个与操作,相当于直接取前3位的的值
    private static int runStateOf(int c) { return c & ~CAPACITY; }

    // 或操作。相当于更新数量和状态两个操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    详情 https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/

    线程池状态转换

    这里写图片描述

    execute方法执行流程

    这里写图片描述

    线程池中的线程初始化

    prestartCoreThread():初始化一个核心线程;
    prestartAllCoreThreads():初始化所有核心线程

    任务缓存队列及排队策略

    workQueue,它用来存放等待执行的任务。
    workQueue的类型为BlockingQueue,通常可以取下面三种类型:
    1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

    2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

    3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
    (有限队列、无线队列、队列 、栈)

    任务拒绝策略

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    线程池的关闭

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    线程池容量的动态调整

    ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    setCorePoolSize:设置核心池大小
    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

    线程池的创建

    new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
    创建一个线程池需要输入几个参数:

    corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
    runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
    maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
    ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
    RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    AbortPolicy:直接抛出异常。
    CallerRunsPolicy:只用调用者所在线程来运行任务。
    DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    DiscardPolicy:不处理,丢弃掉。
    当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
    keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
    TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
    四种实现
    1.newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    3.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    4.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    线程池的应用场景

    单个任务处理时间比较短
    需要处理的任务数量很大

    线程池的优缺点

    线程复用
    控制最大并发数
    管理线程
    缺点、死锁、资源不足、线程泄露、并发错误、请求过载
    使用线程池的风险
    虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
    1。死锁
    任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程死锁 了。死锁的最简单情形是:线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),否则死锁的线程将永远等下去。
    虽然任何多线程程序中都有死锁的风险,但线程池却引入了另一种死锁可能,在那种情况下,所有池线程都在执行已阻塞的等待队列中另一任务的执行结果的任务, 但这一任务却因为没有未被占用的线程而不能运行。当线程池被用来实现涉及许多交互对象的模拟,被模拟的对象可以相互发送查询,这些查询接下来作为排队的任 务执行,查询对象又同步等待着响应时,会发生这种情况。
    2.资源不足
    线程池的一个优点在于:相对于其它替代调度机制(有些我们已经讨论过)而言,它们通常执行得很好。但只有恰当地调整了线程池大小时才是这样的。线程消耗包括内存和其它系统资源在内的大量资源。除了Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java 线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性能。
    如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏 问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例 如 JDBC 连接、套接字或文件。这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。
    3、并发错误
    线程池和其它排队机制依靠使用wait() 和 notify() 方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。使用这些方法时,必须格外小心;即便是专家也可能在它们上面出错。而最好使用现有的、已经知道能工作的实现,例如在下面的 无需编写自己的线程池中讨论的util.concurrent 包。
    4、线程泄漏
    各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。
    有些任务可能会永远等待某些资源或来自用户的输入,而这些资源又不能保证变得可用,用户可能也已经回家了,诸如此类的任务会永久停止,而这些停止的任务也 会引起和线程泄漏同样的问题。如果某个线程被这样一个任务永久地消耗着,那么它实际上就被从池除去了。对于这样的任务,应该要么只给予它们自己的线程,要 么只让它们等待有限的时间。
    请求过载
    仅仅是请求就压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会 消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可 以用一个指出服务器暂时很忙的响应来拒绝请求。

    有效使用线程池的准则

    只要您遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:
    • 不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都很忙。
    • 在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得某些 进展。
    • 理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队 列可能会有意义,这样可以相应地调整每个池。

    问题

    工作线程数是不是设置的越大越好?
    调用sleep()函数的时候,线程是否一直占用CPU?
    单核CPU,设置多线程有意义么,是否能提高并发性能

    线程数不是越多越好
    sleep()不占用CPU
    单核设置多线程不但能使得代码清晰,还能提高吞吐量

    N核服务器,通过日志分析出任务执行过程中,本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化

    展开全文
  • 发现一个比较好玩的东西: 如果你在使用多线程的使用中异常结束了,你应该如何操作呢? 例子: 正常情况下: ...项目一启动都可以跑完,如果有一段代码出现错误呢。...有没有发生过这样的情况,你写的工作线程莫名其妙...

    发现一个比较好玩的东西:

    如果你在使用多线程的使用中异常结束了,你应该如何操作呢?

    例子:

    正常情况下:

     

    项目一启动都可以跑完,如果有一段代码出现错误呢。

    系统丢出了一个异常出来。

    有没有发生过这样的情况,你写的工作线程莫名其妙的挂了,如果不是被你刚好看到,拿只能抓瞎了,不知道啥原因了,因为异常的时候只会把stack trace打在控制台上,不会记在你想记得错误日志里,头皮都抓破了也没能找到确切的地方,最后只能在能加try catch 的地方都给加上,但你却并没有找到准确的错误地址。

    好的,接下来我们把代码更改一下:

    然后我们再运行一下看看:

     

    在完成任务之前,这个线程会持续运行。通过其他多线程的思想同样可以实现这种情况。

     

    转载于:https://www.cnblogs.com/java7115/p/9699720.html

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,935
精华内容 12,774
关键字:

线程池自动重试