精华内容
下载资源
问答
  • Java线程池几种实现方式

    千次阅读 2017-09-13 15:42:29
    Java线程池几种实现原理

    线程池的实现方式是通过Executors类创建几种不同类型的线程池,常用的有newFixedThreadPool(int nThreads),构造方法如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                          0L, TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>());
    }

    第一个参数代表corePoolSize,第二个代表maximumPoolSize,第三个代表keepAliveTime存活时间,最后一个是用队列保存超出的任务。
    具体代码:

    ExecutorService ec = Executors.newFixedThreadPool(2);
    ec.submit(new Runnable(){
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }
    });

    ec.submit方法调用的是抽象类AbstractExecutorService中的submit方法

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        FutureTask<Object> ftask = new FutureTask<Object>(task, null);
        execute(ftask);
        return ftask;
    }

    该方法创建一个未来任务FutureTask

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

    Executors.callable方法如下:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    
    static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable  task, T result) {
        this.task = task; 
        this.result = result;
    }
    public T call() { 
        task.run(); 
        return result; 
    }

    未来任务FutureTask创建完毕之后, 执行execute(ftask)方法,调用ThreadPoolExecutor类。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        for (;;) {
            if (runState != RUNNING) {
            reject(command);
            return;
            }
            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
            return;
            if (workQueue.offer(command))
            return;
            Runnable r = addIfUnderMaximumPoolSize(command);
            if (r == command)
            return;
            if (r == null) {
            reject(command);
            return;
            }
            // else retry
        }
    }

    会先执行if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)),因为目前线程数量还小于设定的核心数量。addIfUnderCorePoolSize(command)代码如下:

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize)
            t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
            largestPoolSize = nt;
        }
        return t;
    }

    Worker是ThreadPoolExecutor一个内部类,将要执行的任务包装进Worker中,之后执行t.start(),调用Worker中的run方法。

    public void run() {
        try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null; // unnecessary but can help GC
        }
        } catch(InterruptedException ie) {
        // fall through
        } finally {
        workerDone(this);
        }
    }

    执行一次后,进入while循环,getTask方法如下:

    Runnable getTask() throws InterruptedException {
        for (;;) {
            switch(runState) {
            case RUNNING: {
            if (poolSize <= corePoolSize)   // untimed wait if core
                return workQueue.take();
    
            long timeout = keepAliveTime;
            if (timeout <= 0) // die immediately for 0 timeout
                return null;
            Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
            if (r != null)
                return r;
            if (poolSize > corePoolSize) // timed out
                return null;
            // else, after timeout, pool shrank so shouldn't die, so retry
            break;
            }
    
            case SHUTDOWN: {
            // Help drain queue 
            Runnable r = workQueue.poll();
            if (r != null)
                return r;
    
            // Check if can terminate
            if (workQueue.isEmpty()) {
                interruptIdleWorkers();
                return null;
            }
    
            // There could still be delayed tasks in queue.
            // Wait for one, re-checking state upon interruption
            try {
                return workQueue.take();
            } catch(InterruptedException ignore) {}
            break;
            }
    
            case STOP:
            return null;
            default:
            assert false; 
            }
        }
    }

    newFixedThreadPool创建的数量一直是poolSize 小于等于corePoolSize,所以从队列中获取下一条任务,如果没有则等待,workQueue.take();

    下面开始讲newCachedThreadPool,该类的创建方式:Executors.newCachedThreadPool();

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                          60L, TimeUnit.SECONDS,
                          new SynchronousQueue<Runnable>());
    }

    内部生成一个corePoolSize为0,poolSize 为整形最大数,存活时间为60s的线程池。他的执行任务方式和newFixedThreadPool一样,也是ec.submit。我们关心的是如何做到60s后自动回收的。主要在Worker中的getTask方法里。

    while (task != null || (task = getTask()) != null) {
       runTask(task);
        task = null; // unnecessary but can help GC
    }
     switch(runState) {
       case RUNNING: {
        if (poolSize <= corePoolSize)   // untimed wait if core
            return workQueue.take();
    
        long timeout = keepAliveTime;
        if (timeout <= 0) // die immediately for 0 timeout
            return null;
        Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
        if (r != null)
            return r;
        if (poolSize > corePoolSize) // timed out
            return null;
        // else, after timeout, pool shrank so shouldn't die, so retry
        break;
        }

    getTask方法中,由于corePoolSize为0,所以直接走下面,获取超时时间keepAliveTime,然后从队列中Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);根据超时时间获取任务,超过时限则break。之后外面run方法的循环结束,线程关闭。

    public void run() {
        try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null; // unnecessary but can help GC
        }
        } catch(InterruptedException ie) {
        // fall through
        } finally {
            workerDone(this);
        }
    }

    最后再讲一下定时及延时执行的线程newScheduledThreadPool,创建方式:ScheduledExecutorService ec = Executors.newScheduledThreadPool(int corePoolSize);

      public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue());
    }

    执行任务为:

    ec1.scheduleAtFixedRate(new Runnable(){
        public void run() {
            System.out.println("i:"+j+" ********");
        }
    },10, 2, TimeUnit.SECONDS);

    10代表首次执行延时10秒执行,之后每2秒执行一次。接着看他们的实现原理。方法如下:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
                          long initialDelay,  
                          long period, 
                          TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        if (initialDelay < 0) initialDelay = 0;
        long triggerTime = now() + unit.toNanos(initialDelay);
        ScheduledFutureTask<?> t = 
            new ScheduledFutureTask<Object>(command, 
                            null,
                            triggerTime,
                            unit.toNanos(period));
        delayedExecute(t);
        return t;
    }
    
    ScheduledFutureTask(Runnable r, V result, long ns,  long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    创建任务大体相同,将延时时间设置为time,将每次执行时间设置为period。之后执行delayedExecute(t);方法。

    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();
    
        super.getQueue().add(command);
    }

    该方法比较目前的线程数量是否小于设置的核心数量,如果小于,则创建线程,否则加入队列。当我们第一次进入时,肯定是小于的。执行prestartCoreThread()方法。

    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }

    该方法执行addIfUnderCorePoolSize()方法,和newFixedThreadPool相同,只不过传入的Runnable任务为null,表明只创建个线程被Worker包装,之后将该Runnable任务放入队列中。接着执行Worker的run方法,从getTask中获取任务。

      Runnable getTask() throws InterruptedException {
            for (;;) {
                switch(runState) {
                case RUNNING: {
                    if (poolSize <= corePoolSize)   // untimed wait if core
                        return workQueue.take();
    
                    long timeout = keepAliveTime;
                    if (timeout <= 0) // die immediately for 0 timeout
                        return null;
                    Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
                    if (r != null)
                        return r;
                    if (poolSize > corePoolSize) // timed out
                        return null;
                    // else, after timeout, pool shrank so shouldn't die, so retry
                    break;
                }

    这是workQueue.take()的实现类是DelayQueue队列,

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
            E first = q.peek();
            if (first == null) {
                available.await();
            } else {
                long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {
                long tl = available.awaitNanos(delay);
                } else {
                E x = q.poll();
                assert x != null;
                if (q.size() != 0)
                    available.signalAll(); // wake up other takers
                return x;
    
                }
            }
            }
        } finally {
            lock.unlock();
        }
    }

    先获取time,第一次设置的10秒,等待10s后返回开始执行任务。执行的run方法是ScheduledThreadPoolExecutor中的run方法。

    public void run() {
        if (isPeriodic())
        runPeriodic();
        else 
        ScheduledFutureTask.super.run();
    }

    runPeriodic()方法内容:

    private void runPeriodic() {
        boolean ok = ScheduledFutureTask.super.runAndReset();
        boolean down = isShutdown();
        // Reschedule if not cancelled and not shutdown or policy allows
        if (ok && (!down ||
               (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 
            !isTerminating()))) {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = now() - p;
        ScheduledThreadPoolExecutor.super.getQueue().add(this);
        }
        // This might have been the final executed delayed
        // task.  Wake up threads to check.
        else if (down) 
        interruptIdleWorkers();
    }

    方法在runAndReset中执行完毕,之后获取每次间隔多少时间执行的参数period,将time设为time+p。这样当再次循环从队列中找任务的时候, long delay = first.getDelay(TimeUnit.NANOSECONDS);

    public long getDelay(TimeUnit unit) {
        long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
        return d;
    }

    就是等待2S后执行了。
    以上就是newFixedThreadPool、newScheduledThreadPool、newCachedThreadPool的实现原理。

    展开全文
  • 上篇《Java线程的6种状态详解及创建线程的4种方式》 前言:我们都知道,线程是稀有资源,系统频繁创建会很大程度上影响服务器的使用效率,如果不加以限制,很容易就会把服务器资源耗尽。所以,我们可以通过创建...

    在这里插入图片描述
    上篇《Java线程的6种状态详解及创建线程的4种方式
    前言:我们都知道,线程是稀有资源,系统频繁创建会很大程度上影响服务器的使用效率,如果不加以限制,很容易就会把服务器资源耗尽。所以,我们可以通过创建线程池来管理这些线程,提升对线程的使用率。

    1、什么是线程池?

    简而言之,线程池就是管理线程的一个容器,有任务需要处理时,会相继判断核心线程数是否还有空闲、线程池中的任务队列是否已满、是否超过线程池大小,然后调用或创建线程或者排队,线程执行完任务后并不会立即被销毁,而是仍然在线程池中等待下一个任务,如果超过存活时间还没有新的任务就会被销毁,通过这样复用线程从而降低开销。

    2、使用线程池有什么优点?

    可能有人就会问了,使用线程池有什么好处吗?那不用说,好处自然是有滴。大概有以下:
    1、提升线程池中线程的使用率,减少对象的创建、销毁。
    2、线程池的伸缩性对性能有较大的影响,使用线程池可以控制线程数,有效的提升服务器的使用资源,避免由于资源不足而发生宕机等问题。(创建太多线程,将会浪费一定的资源,有些线程未被充分使用;销毁太多线程,将导致之后浪费时间再次创建它们;创建线程太慢,将会导致长时间的等待,性能变差;销毁线程太慢,导致其它线程资源饥饿。)

    3、线程池的核心工作流程(重要)

    我们要使用线程池得先了解它是怎么工作的,流程如下图,废话不多说看图就行。核心就是复用线程,降低开销。
    线程池的工作流程

    4、线程池的五种状态生命周期

    • RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务。
    • SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)。
    • STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
    • TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态。
    • TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做。
      线程池的生命周期流程图

    5、创建线程池的几种方式

    • 通过 Executors 工厂方法创建
    • 通过 new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) 自定义创建
      相对而言,更建议用第二个创建线程池,Executors 创建的线程池内部很多地方用到了无界任务队列,在高并发场景下,无界任务队列会接收过多的任务对象,严重情况下会导致 JVM 崩溃,一些大厂也是禁止使用 Executors 工厂方法去创建线程池。newFixedThreadPool 和 newSingleThreadExecutor 的主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM;newCachedThreadPool 和 newScheduledThreadPool 的主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

    5.1、Executors 五个工厂方法创建不同线程池的区别

    在这里插入图片描述
    1、newCachedThreadPool()(工作队列使用的是 SynchronousQueue)
    创建一个线程池,如果线程池中的线程数量过大,它可以有效的回收多余的线程,如果线程数不足,那么它可以创建新的线程。
    不足:这种方式虽然可以根据业务场景自动的扩展线程数来处理我们的业务,但是最多需要多少个线程同时处理却是我们无法控制的。
    优点:如果当第二个任务开始,第一个任务已经执行结束,那么第二个任务会复用第一个任务创建的线程,并不会重新创建新的线程,提高了线程的复用率。
    作用:该方法返回一个可以根据实际情况调整线程池中线程的数量的线程池。即该线程池中的线程数量不确定,是根据实际情况动态调整的。
    2、newFixedThreadPool()(工作队列使用的是 LinkedBlockingQueue)
    这种方式可以指定线程池中的线程数。如果满了后又来了新任务,此时只能排队等待。
    优点:newFixedThreadPool 的线程数是可以进行控制的,因此我们可以通过控制最大线程来使我们的服务器达到最大的使用率,同时又可以保证即使流量突然增大也不会占用服务器过多的资源。
    作用:该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变,即不会再创建新的线程,也不会销毁已经创建好的线程,自始自终都是那几个固定的线程在工作,所以该线程池可以控制线程的最大并发数。
    3、newScheduledThreadPool()
    该线程池支持定时,以及周期性的任务执行,我们可以延迟任务的执行时间,也可以设置一个周期性的时间让任务重复执行。该线程池中有以下两种延迟的方法。
    scheduleAtFixedRate 不同的地方是任务的执行时间,如果间隔时间大于任务的执行时间,任务不受执行时间的影响。如果间隔时间小于任务的执行时间,那么任务执行结束之后,会立马执行,至此间隔时间就会被打乱。
    scheduleWithFixedDelay 的间隔时间不会受任务执行时间长短的影响。
    作用:该方法返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。
    4、newSingleThreadExecutor()
    这是一个单线程池,至始至终都由一个线程来执行。
    作用:该方法返回一个只有一个线程的线程池,即每次只能执行一个线程任务,多余的任务会保存到一个任务队列中,等待这一个线程空闲,当这个线程空闲了再按 FIFO 方式顺序执行任务队列中的任务。
    5、newSingleThreadScheduledExecutor()
    只有一个线程,用来调度任务在指定时间执行。
    作用:该方法返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。只不过和上面的区别是该线程池大小为 1,而上面的可以指定线程池的大小。
    使用示例:

    //创建一个会根据需要创建新线程的线程池
    ExecutorService executor= Executors.newCachedThreadPool();
    for (int i = 0; i < 20; i++) {
    	executor.submit(new Runnable() {
    		@Override
            public void run() {
            	System.out.println(i);
            }
        });
    }
    

    这五种线程池都是直接或者间接获取的 ThreadPoolExecutor 实例 ,只是实例化时传递的参数不一样。所以如果 Java 提供的线程池满足不了我们的需求,我们可以通过 ThreadPoolExecutor 构造方法创建自定义线程池。

    5.2、ThreadPoolExecutor 构造方法参数详解

    public ThreadPoolExecutor(
    int corePoolSize,//线程池核心线程大小
    int maximumPoolSize,//线程池最大线程数量
    long keepAliveTime,//空闲线程存活时间
    TimeUnit unit,//空闲线程存活时间单位,一共有七种静态属性(TimeUnit.DAYS天,TimeUnit.HOURS小时,TimeUnit.MINUTES分钟,TimeUnit.SECONDS秒,TimeUnit.MILLISECONDS毫秒,TimeUnit.MICROSECONDS微妙,TimeUnit.NANOSECONDS纳秒)
    BlockingQueue<Runnable> workQueue,//工作队列
    ThreadFactory threadFactory,//线程工厂,主要用来创建线程(默认的工厂方法是:Executors.defaultThreadFactory()对线程进行安全检查并命名)
    RejectedExecutionHandler handler//拒绝策略(默认是:ThreadPoolExecutor.AbortPolicy不执行并抛出异常)
    ) 
    

    使用示例:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
    

    5.2.1、工作队列

    jdk 中提供了四种工作队列:
    ①ArrayBlockingQueue
    基于数组的有界阻塞队列,按 FIFO 排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到 corePoolSize 后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到 maxPoolSize,则会执行拒绝策略。
    ②LinkedBlockingQuene
    基于链表的无界阻塞队列(其实最大容量为 Interger.MAX_VALUE),按照 FIFO 排序。由于该队列的近似无界性,当线程池中线程数量达到 corePoolSize 后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到 maxPoolSize,因此使用该工作队列时,参数 maxPoolSize 其实是不起作用的。
    ③SynchronousQuene
    一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到 maxPoolSize,则执行拒绝策略。
    ④PriorityBlockingQueue
    具有优先级的无界阻塞队列,优先级通过参数 Comparator 实现。

    5.2.2、拒绝策略

    当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,就会执行拒绝策略。jdk中提供了4中拒绝策略:
    ①ThreadPoolExecutor.CallerRunsPolicy
    该策略下,在调用者线程中直接执行被拒绝任务的 run 方法,除非线程池已经 shutdown,则直接抛弃任务。
    ②ThreadPoolExecutor.AbortPolicy
    该策略下,直接丢弃任务,并抛出 RejectedExecutionException 异常。
    ③ThreadPoolExecutor.DiscardPolicy
    该策略下,直接丢弃任务,什么都不做。
    ④ThreadPoolExecutor.DiscardOldestPolicy
    该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列。
    除此之外,还可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。

    6、线程池的关闭

    • shutdown():
      1、调用之后不允许继续往线程池内添加线程;
      2、线程池的状态变为 SHUTDOWN 状态;
      3、所有在调用 shutdown() 方法之前提交到 ExecutorSrvice 的任务都会执行;
      4、一旦所有线程结束执行当前任务,ExecutorService 才会真正关闭。
    • shutdownNow():
      1、该方法返回尚未执行的 task 的 List;
      2、线程池的状态变为 STOP 状态;
      3、尝试停止所有的正在执行或暂停任务的线程。
      简单点来说,就是:
      shutdown() 调用后,不可以再 submit 新的 task,已经 submit 的将继续执行
      shutdownNow() 调用后,试图停止当前正在执行的 task,并返回尚未执行的 task 的 list

    7、总结

    本文简单介绍了线程池的一些相关知识,相信大家对线程池的优点,线程池的生命周期,线程池的工作流程及线程池的使用有了一个大概的了解,也希望能对有需要的人提供一点帮助!文中有错误的地方,还请留言给予指正,谢谢~
    也欢迎大家关注我的公众号:Java的成神之路,免费领取最新面试资料,技术电子书,架构进阶相关资料等。
    在这里插入图片描述

    展开全文
  • 线程池几种方式与使用场景 在 Executors 类里面提供了一些静态工厂,生成一些常用的线程池。 newFixedThreadPool:创建固定大小的线程池线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而...

    线程池的几种方式与使用场景

    在 Executors 类里面提供了一些静态工厂,生成一些常用的线程池。

    1. newFixedThreadPool:创建固定大小的线程池。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
    2. newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
    3. newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
    4. newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
    5. newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
    展开全文
  • java中线程池几种实现方式

    千次阅读 2018-09-16 03:16:23
    1、线程池简介:  多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。   假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程...

    1、线程池简介:
        多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。    
        假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

        如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                    一个线程池包括以下四个基本组成部分:
                    1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                    2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                    3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                    4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                    
        线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
        线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
        假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

        代码实现中并没有实现任务接口,而是把Runnable对象加入到线程池管理器(ThreadPool),然后剩下的事情就由线程池管理器(ThreadPool)来完成了

     

    package mine.util.thread;  
      
    import java.util.LinkedList;  
    import java.util.List;  
      
    /** 
     * 线程池类,线程管理器:创建线程,执行任务,销毁线程,获取线程基本信息 
     */  
    public final class ThreadPool {  
        // 线程池中默认线程的个数为5  
        private static int worker_num = 5;  
        // 工作线程  
        private WorkThread[] workThrads;  
        // 未处理的任务  
        private static volatile int finished_task = 0;  
        // 任务队列,作为一个缓冲,List线程不安全  
        private List<Runnable> taskQueue = new LinkedList<Runnable>();  
        private static ThreadPool threadPool;  
      
        // 创建具有默认线程个数的线程池  
        private ThreadPool() {  
            this(5);  
        }  
      
        // 创建线程池,worker_num为线程池中工作线程的个数  
        private ThreadPool(int worker_num) {  
            ThreadPool.worker_num = worker_num;  
            workThrads = new WorkThread[worker_num];  
            for (int i = 0; i < worker_num; i++) {  
                workThrads[i] = new WorkThread();  
                workThrads[i].start();// 开启线程池中的线程  
            }  
        }  
      
        // 单态模式,获得一个默认线程个数的线程池  
        public static ThreadPool getThreadPool() {  
            return getThreadPool(ThreadPool.worker_num);  
        }  
      
        // 单态模式,获得一个指定线程个数的线程池,worker_num(>0)为线程池中工作线程的个数  
        // worker_num<=0创建默认的工作线程个数  
        public static ThreadPool getThreadPool(int worker_num1) {  
            if (worker_num1 <= 0)  
                worker_num1 = ThreadPool.worker_num;  
            if (threadPool == null)  
                threadPool = new ThreadPool(worker_num1);  
            return threadPool;  
        }  
      
        // 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(Runnable task) {  
            synchronized (taskQueue) {  
                taskQueue.add(task);  
                taskQueue.notify();  
            }  
        }  
      
        // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(Runnable[] task) {  
            synchronized (taskQueue) {  
                for (Runnable t : task)  
                    taskQueue.add(t);  
                taskQueue.notify();  
            }  
        }  
      
        // 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定  
        public void execute(List<Runnable> task) {  
            synchronized (taskQueue) {  
                for (Runnable t : task)  
                    taskQueue.add(t);  
                taskQueue.notify();  
            }  
        }  
      
        // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁  
        public void destroy() {  
            while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧  
                try {  
                    Thread.sleep(10);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            // 工作线程停止工作,且置为null  
            for (int i = 0; i < worker_num; i++) {  
                workThrads[i].stopWorker();  
                workThrads[i] = null;  
            }  
            threadPool=null;  
            taskQueue.clear();// 清空任务队列  
        }  
      
        // 返回工作线程的个数  
        public int getWorkThreadNumber() {  
            return worker_num;  
        }  
      
        // 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成  
        public int getFinishedTasknumber() {  
            return finished_task;  
        }  
      
        // 返回任务队列的长度,即还没处理的任务个数  
        public int getWaitTasknumber() {  
            return taskQueue.size();  
        }  
      
        // 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数  
        @Override  
        public String toString() {  
            return "WorkThread number:" + worker_num + "  finished task number:"  
                    + finished_task + "  wait task number:" + getWaitTasknumber();  
        }  
      
        /** 
         * 内部类,工作线程 
         */  
        private class WorkThread extends Thread {  
            // 该工作线程是否有效,用于结束该工作线程  
            private boolean isRunning = true;  
      
            /* 
             * 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待 
             */  
            @Override  
            public void run() {  
                Runnable r = null;  
                while (isRunning) {// 注意,若线程无效则自然结束run方法,该线程就没用了  
                    synchronized (taskQueue) {  
                        while (isRunning && taskQueue.isEmpty()) {// 队列为空  
                            try {  
                                taskQueue.wait(20);  
                            } catch (InterruptedException e) {  
                                e.printStackTrace();  
                            }  
                        }  
                        if (!taskQueue.isEmpty())  
                            r = taskQueue.remove(0);// 取出任务  
                    }  
                    if (r != null) {  
                        r.run();// 执行任务  
                    }  
                    finished_task++;  
                    r = null;  
                }  
            }  
      
            // 停止工作,让该线程自然执行完run方法,自然结束  
            public void stopWorker() {  
                isRunning = false;  
            }  
        }  
    }  

    注意!多线程安全,或者说synchronize 一般是用在“增”,“删”,“改”代码段上;

    测试代码:

    package mine.util.thread;  
      
    //测试线程池  
    public class TestThreadPool {  
        public static void main(String[] args) {  
            // 创建3个线程的线程池  
            ThreadPool t = ThreadPool.getThreadPool(3);  
            t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
            t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
            System.out.println(t);  
            t.destroy();// 所有线程都执行完成才destory  
            System.out.println(t);  
        }  
      
        // 任务类  
        static class Task implements Runnable {  
            private static volatile int i = 1;  
      
            @Override  
            public void run() {// 执行任务  
                System.out.println("任务 " + (i++) + " 完成");  
            }  
        }  
    }  

    运行结果:

    WorkThread number:3  finished task number:0  wait task number:6
    任务 1 完成
    任务 2 完成
    任务 3 完成
    任务 4 完成
    任务 5 完成
    任务 6 完成
    WorkThread number:3  finished task number:6  wait task number:0

    分析:由于并没有任务接口,传入的可以是自定义的任何任务,所以线程池并不能准确的判断该任务是否真正的已经完成(真正完成该任务是这个任务的run方法执行完毕),只能知道该任务已经出了任务队列,正在执行或者已经完成。

     

    2、Java类库中提供的线程池简介:

         java提供的线程池更加强大,相信理解线程池的工作原理,看类库中的线程池就不会感到陌生了。

    在Java5之后,并 发线程这块发生了根本的变化,最重要的莫过于新的启动、调度、管理线程的一大堆API了。在Java5以后,通过Executor来启动线程比用 Thread的start()更好。在新特征中,可以很容易控制线程的启动、执行和关闭过程,还可以很容易使用线程池的特性。

    参考:https://www.cnblogs.com/mokafamily/p/3558886.html

    参考:https://www.cnblogs.com/zhujiabin/p/5404771.html

    下面我们来详细了解下这些线程池。

    2、Java 线程池

    Java通过Executors提供四种线程池,分别为:
    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

      当你挑选完的线程池后就需要创建以及使用线程池:

        大概步骤为以下3步:

        (1)调用执行器类(Executors)的静态方法来创建线程池

        (2)调用线程池的submit方法提交Runnable或Callable对象

        (3)当不需要添加更多的任务时,调用shutdown关闭入口

     

      下面通过代码来逐步操作:

    //创建线程池对象
            ExecutorService service = Executors.newCachedThreadPool();
            //创建一个用于递增输出i值的runnable对象
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 400; i++) {
                        System.out.println(i);
                    }
                }
            };
            //调用线程池的submit方法传入runnable(传入的runnable将会自动执行)
            service.submit(runnable);
            service.submit(runnable);
            //当不需要传入更多的任务时调用shutdown方法来关闭入口
            service.shutdown();

      需要注意的是如果希望直接停止线程池的一切任务是无法通过shutdown来操作的,因为shutdown仅仅是关闭了入口,但是已经加入的任务还是会继续执行的,这时我们可以调用线程池的shutdownNow方法来操作,shutdownNow的作用是用来关闭线程池的入口并且会尝试终止所有当前线程池内的任务。

    //用来关闭线程池入口以及终止所有正在执行的任务
      service.shutdownNow();

     service的submit方法会返回一个Future<?>类型的对象,然而这是一个怎样的类型呢?让我们来看一下api中的方法摘要:

     

       

     

       从方法摘要中可以看出该对象用于在加入线程池以后能够对此任务进行取消,查看状态等操作,如果说在加入线程池以后有可能会取消此任务的话就需要,在submit的时候就需要保存好Future对象。

     

    //保存Future<?>
            Future<?> run2 = service.submit(runnable);
            
            //用于查看是否已经执行完毕,返回类型为boolean
            System.out.println(run2.isDone());
            
            //取消任务,如果需要中断的话参数为true
            run2.cancel(true);

    3.线程池实例:

    (1). newCachedThreadPool
    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
    final int index = i;
    try {
    Thread.sleep(index * 1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
     
    cachedThreadPool.execute(new Runnable() {
     
    @Override
    public void run() {
    System.out.println(index);
    }
    });
    }

    线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

    注意! executeService.submit() 有返回值 返回类型为future ,exeuteService.execute() 没有返回值

     (2). newFixedThreadPool
    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

     

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 10; i++) {
    final int index = i;
    fixedThreadPool.execute(new Runnable() {
     
    @Override
    public void run() {
    try {
    System.out.println(index);
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    });
    }

    因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。

    定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache

     (3) newScheduledThreadPool
    创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.schedule(new Runnable() {
     
    @Override
    public void run() {
    System.out.println("delay 3 seconds");
    }
    }, 3, TimeUnit.SECONDS);

    表示延迟3秒执行。

    定期执行示例代码如下:

    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
     
    @Override
    public void run() {
    System.out.println("delay 1 seconds, and excute every 3 seconds");
    }
    }, 1, 3, TimeUnit.SECONDS);

    表示延迟1秒后每3秒执行一次。

    ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。

    (4)、newSingleThreadExecutor
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {
     
    @Override
    public void run() {
    try {
    System.out.println(index);
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    });
    }

     

    线程池的作用:

    线程池作用就是限制系统中执行线程的数量。
         根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排 队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。

    为什么要用线程池:

    1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

    2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

    比较重要的几个类:

    ExecutorService

    真正的线程池接口。

    ScheduledExecutorService

    能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

    ThreadPoolExecutor

    ExecutorService的默认实现。

    ScheduledThreadPoolExecutor

    继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

    要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

    1. newSingleThreadExecutor

    创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    2.newFixedThreadPool

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    3. newCachedThreadPool

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

    那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    4.newScheduledThreadPool

    创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    ------------------------------------------------------------------------------------------------------

    一、创建任务

      任务就是一个实现了Runnable接口的类。

      创建的时候实run方法即可。

      二、执行任务

      通过java.util.concurrent.ExecutorService接口对象来执行任务,该接口对象通过工具类java.util.concurrent.Executors的静态方法来创建。

      Executors此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。

       ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其停止接受新任务。关闭后,执行程序将最后终止,这时没有任务在执行,也没有任务在等待执行,并且无法提交新任 务。

      executorService.execute(new TestRunnable());

      1、创建ExecutorService

      通过工具类java.util.concurrent.Executors的静态方法来创建。

      Executors此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。

      比如,创建一个ExecutorService的实例,ExecutorService实际上是一个线程池的管理工具:

      ExecutorService executorService = Executors.newCachedThreadPool();

      ExecutorService executorService = Executors.newFixedThreadPool(3);

      ExecutorService executorService = Executors.newSingleThreadExecutor();

      2、将任务添加到线程去执行

      当将一个任务添加到线程池中的时候,线程池会为每个任务创建一个线程,该线程会在之后的某个时刻自动执行。

      三、关闭执行服务对象

      executorService.shutdown();

     

      四、综合实例

    package concurrent;
    
    import java.util.concurrent.ExecutorService; 
    import java.util.concurrent.Executors;
    
     
    public class TestCachedThreadPool { 
            public static void main(String[] args) { 
    //                ExecutorService executorService = Executors.newCachedThreadPool(); 
                    ExecutorService executorService = Executors.newFixedThreadPool(5); //         ExecutorService executorService = Executors.newSingleThreadExecutor();
    
                    for (int i = 0; i < 5; i++) { 
                            executorService.execute(new TestRunnable()); 
                            System.out.println("************* a" + i + " *************"); 
                    } 
                    executorService.shutdown(); 
            } 
    }
    
    class TestRunnable implements Runnable { 
            public void run() { 
                    System.out.println(Thread.currentThread().getName() + "线程被调用了。"); 
                    while (true) { 
                            try { 
                                    Thread.sleep(5000); 
                                   System.out.println(Thread.currentThread().getName()); 
                            } catch (InterruptedException e) { 
                                    e.printStackTrace(); 
                            } 
                    } 
            } 
    }
    
      运行结果:
    
    ************* a0 ************* 
    ************* a1 ************* 
    pool-1-thread-2线程被调用了。 
    ************* a2 ************* 
    pool-1-thread-3线程被调用了。 
    pool-1-thread-1线程被调用了。 
    ************* a3 ************* 
    ************* a4 ************* 
    pool-1-thread-4线程被调用了。 
    pool-1-thread-5线程被调用了。 
    pool-1-thread-2 
    pool-1-thread-1 
    pool-1-thread-3 
    pool-1-thread-5 
    pool-1-thread-4 
    pool-1-thread-2 
    pool-1-thread-1 
    pool-1-thread-3 
    pool-1-thread-5 
    pool-1-thread-4 

     

     五、获取任务的执行的返回值

      在Java5之 后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是 Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的 submit(Callable<T> task) 方法来执行,并且返回一个 <T> Future<T>,是表示任务等待完成的 Future.

      public interface Callable<V>返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call 的方法。

      Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常。

      Executors 类包含一些从其他普通形式转换成 Callable 类的实用方法。

      Callable中的call()方法类似Runnable的run()方法,就是前者有返回值,后者没有。

      当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。

      同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null.

      遗憾的是,在Java API文档中,这块介绍的很糊涂,估计是翻译人员还没搞清楚的缘故吧。或者说是注释不到位。下面看个例子:

    import java.util.ArrayList; 
    import java.util.List; 
    import java.util.concurrent.*;
    
     
    public class CallableDemo { 
            public static void main(String[] args) { 
                    ExecutorService executorService = Executors.newCachedThreadPool(); 
                    List<Future<String>> resultList = new ArrayList<Future<String>>();
    
                    //创建10个任务并执行 
                    for (int i = 0; i < 10; i++) { 
                            //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 
                            Future<String> future = executorService.submit(new TaskWithResult(i)); 
                            //将任务执行结果存储到List中 
                            resultList.add(future); 
                    }
    
                    //遍历任务的结果 
                    for (Future<String> fs : resultList) { 
                            try { 
                                    System.out.println(fs.get());     //打印各个线程(任务)执行的结果 
                            } catch (InterruptedException e) { 
                                    e.printStackTrace(); 
                            } catch (ExecutionException e) { 
                                    e.printStackTrace(); 
                            } finally { 
                                    //启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。 
                                    executorService.shutdown(); 
                            } 
                    } 
            } 
    }
    
    
    class TaskWithResult implements Callable<String> { 
            private int id;
    
            public TaskWithResult(int id) { 
                    this.id = id; 
            }
    
             
            public String call() throws Exception { 
                    System.out.println("call()方法被自动调用,干活!!!             " + Thread.currentThread().getName()); 
                    //一个模拟耗时的操作 
                    for (int i = 999999; i > 0; i--) ; 
                    return "call()方法被自动调用,任务的结果是:" + id + "    " + Thread.currentThread().getName(); 
            } 
    }
    
      运行结果:
    
    call()方法被自动调用,干活!!!             pool-1-thread-1 
    call()方法被自动调用,干活!!!             pool-1-thread-3 
    call()方法被自动调用,干活!!!             pool-1-thread-4 
    call()方法被自动调用,干活!!!             pool-1-thread-6 
    call()方法被自动调用,干活!!!             pool-1-thread-2 
    call()方法被自动调用,干活!!!             pool-1-thread-5 
    call()方法被自动调用,任务的结果是:0    pool-1-thread-1 
    call()方法被自动调用,任务的结果是:1    pool-1-thread-2 
    call()方法被自动调用,干活!!!             pool-1-thread-2 
    call()方法被自动调用,干活!!!             pool-1-thread-6 
    call()方法被自动调用,干活!!!             pool-1-thread-4 
    call()方法被自动调用,任务的结果是:2    pool-1-thread-3 
    call()方法被自动调用,干活!!!             pool-1-thread-3 
    call()方法被自动调用,任务的结果是:3    pool-1-thread-4 
    call()方法被自动调用,任务的结果是:4    pool-1-thread-5 
    call()方法被自动调用,任务的结果是:5    pool-1-thread-6 
    call()方法被自动调用,任务的结果是:6    pool-1-thread-2 
    call()方法被自动调用,任务的结果是:7    pool-1-thread-6 
    call()方法被自动调用,任务的结果是:8    pool-1-thread-4 
    call()方法被自动调用,任务的结果是:9    pool-1-thread-3
    
    Process finished with exit code 0
    
    
    
    

    一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

     

    线程池可以解决两个不 同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行集合任务时使用的线 程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。

     

    为了便于跨大量上下文 使用,此类提供了很多可调整的参数和扩展挂钩。但是,强烈建议程序员使用较为方便的 Executors 工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、 Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。

    用法示例
    下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
         class NetworkService implements Runnable {
                    private final ServerSocket serverSocket;
                    private final ExecutorService pool;
     
                    public NetworkService(int port, int poolSize)
                        throws IOException {
                      serverSocket = new ServerSocket(port);
                      pool = Executors.newFixedThreadPool(poolSize);
                    }
                 
                    public void run() { // run the service
                      try {
                        for (;;) {
                          pool.execute(new Handler(serverSocket.accept()));
                        }
                      } catch (IOException ex) {
                        pool.shutdown();
                      }
                    }
                  }
     
                  class Handler implements Runnable {
                    private final Socket socket;
                    Handler(Socket socket) { this.socket = socket; }
                    public void run() {
                      // read and service request on socket
                    }
                 }
    下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:
        void shutdownAndAwaitTermination(ExecutorService pool) {
               pool.shutdown(); // Disable new tasks from being submitted
               try {
                 // Wait a while for existing tasks to terminate
                 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                   pool.shutdownNow(); // Cancel currently executing tasks
                   // Wait a while for tasks to respond to being cancelled
                   if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                       System.err.println("Pool did not terminate");
                 }
               } catch (InterruptedException ie) {
                 // (Re-)Cancel if current thread also interrupted
                 pool.shutdownNow();
                 // Preserve interrupt status
                 Thread.currentThread().interrupt();
               }
             }
    内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。
    

    转发:https://blog.csdn.net/w2393040183/article/details/52177572

    展开全文
  • 下面小编就为大家带来一篇Java线程池几种实现方法和区别。小编觉得挺不错的,现在分享给大家,也给大家做个参考,一起跟随小编过来看看吧,祝大家游戏愉快哦
  • 下面小编就为大家带来一篇Java线程池几种实现方法及常见问题解答。小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 129,644
精华内容 51,857
关键字:

线程池实现的几种方式