精华内容
下载资源
问答
  • java8 并行编程教程——Threads 和 Executors 欢迎java8 并行编程的第一部分。本文通过简易的示例代码让你轻松理解java8 并行编程。这时关于java 并行API系列教程的第一部分。接下来的15分钟你学习通过线程、任务、...

    java8 并行编程教程——Threads 和 Executors

    欢迎java8 并行编程的第一部分。本文通过简易的示例代码让你轻松理解java8 并行编程。这时关于java 并行API系列教程的第一部分。接下来的15分钟你学习通过线程、任务、执行服务实现并行编程。
    并行编程API首先在java5中发布,后续每个新版本逐步增强。本文的主要概念与java8之前版本一致,但示例代码基于java8按充分使用lambda表达式和其他新特性。如果你不熟悉lambda表达式,可以查看之前的内容。

    Threads 和 Runnables

    所有现代操作系统都支持并行编程,主要通过线程和进程。进程
    是程序的实例,各个进程彼此独立运行。如果你启动个java应用,操作系统生成一个进程,和其他程序并行运行。在进程里,可以利用线程并行执行代码,可以最大化使用多核CPU.

    JAVA从JDK1.0开始就支持线程,在开始线程之前,首先准备线程要执行的代码,通常称之为任务,通过实现Runnable函数式接口,其定义了一个无参的方法run(),如下面示例所示:

    Runnable task = () -> {
        String threadName = Thread.currentThread().getName();
        System.out.println("Hello " + threadName);
    };
    
    task.run();
    
    Thread thread = new Thread(task);
    thread.start();
    
    System.out.println("Done!");
    

    因为Runnable是函数式接口,我们使用java8lambda表达式打印当前线程的名称至控制台。我们首先在主线程中直接执行runnable 任务,然后开启一个新线程执行。
    结果可能如下:

    Hello main
    Hello Thread-0
    Done!
    

    或也如下:

    Hello main
    Done!
    Hello Thread-0
    

    因为并行执行,我们不能预知子线程是否在打印’done’代码之前或之后。因为顺序不能确定,使得并行编程在大型应用中非常复杂。

    线程可以休眠一段时间,这可以方便地模拟耗时的任务,看下面示例:

    Runnable runnable = () -> {
        try {
            String name = Thread.currentThread().getName();
            System.out.println("Foo " + name);
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Bar " + name);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
    
    Thread thread = new Thread(runnable);
    thread.start();
    

    当运行上述代码,你将注意到,在第二个打印代码和第一个代码之间有1秒钟延迟。TimeUnit是非常有用单位工作时间枚举类型,也可以使用Thread.sleep(1000)代替。

    使用Thread类可能代码冗长且容易出错。因此在2004年java5中正式引入并行编程API,位于java.util.concurrent包中,其中包括许多有用的操作并行编程的类。从此,每个java新版都增强并行API,java8也提供了新类和方法。

    下面让我们深入了解并行API中最重要的部分之一,执行服务(executor services)。

    Executor

    并行API引入ExecutorService 作为替代直接使用Thread类更高层的封装。Executor一般通过管理线程池执行并行任务,因此我们无需手工创建新的线程。在线程池中的线程执行完任务后,可以重复使用,所以我们使用单个执行器服务运行应用全生命周期中尽可能多并行任务。
    这时第一个使用executor的示例:

    ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Hello " + threadName);
    });
    
    // => Hello pool-1-thread-1
    

    Executors类提供便利的工厂方法,用于创建不同类型的executor 服务。本例中Executors创建仅一个线程的线程池。
    当运行程序,执行结果与上面示例类似,但你注意到一个重要的差异:java进程没有结束,Executors 需要手动显示结束,否则始终运行监听新的任务。
    ExecutorService提供了两个方法实现关闭任务:shutdown()等待当前正在运行的任务完成,而shutdownNow()中断所有运行
    任务接着立刻关闭executor。

    这时我比较喜欢的方式关闭executor:

    try {
        System.out.println("attempt to shutdown executor");
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
    }
    catch (InterruptedException e) {
        System.err.println("tasks interrupted");
    }
    finally {
        if (!executor.isTerminated()) {
            System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
    }
    

    executor延时关闭,用于结束当前正在运行的任务。等待最大时间5秒之后,中断未完成的任务并关闭executor。

    Callables 和 Futures

    除了Runnable,executor也可以执行另一种名为Callable任务。Callable是函数式接口,除了有返回值,其他和runnable一样。

    下面代码中lambda表达式定义一个callable,休眠1秒后返回一个整数。

    Callable<Integer> task = () -> {
        try {
            TimeUnit.SECONDS.sleep(1);
            return 123;
        }
        catch (InterruptedException e) {
            throw new IllegalStateException("task interrupted", e);
        }
    };
    

    Callable 能像Runnable一样提交给executor服务执行,那么Callable的执行结果是什么呢?submit()方法没有等待任务执行完成,executor服务不能直接返回callable执行结果。而是executor返回一个特定的Future类型,通过Fucture在之后的某个时间点可以获取实际的结果。

    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(task);
    
    System.out.println("future done? " + future.isDone());
    
    Integer result = future.get();
    
    System.out.println("future done? " + future.isDone());
    System.out.print("result: " + result);
    

    提交一个Callable任务给executor执行器之后,我们通过检查future.isDone()方法,判断是否执行完成。我很确信其没有完成,因为上面的任务中返回一个整数之前,休眠了1秒。

    调用get()方法,阻塞当前线程等待直到在返回实际结果123之前Callable任务完成。最终future完成,我们可以在控制台看到结果。

    future done? false
    future done? true
    result: 123
    

    Future和底层executor服务是紧耦合的,记住,每个没有终止的future,你关闭executor,将抛出异常。

    executor.shutdownNow();
    future.get();
    

    你可能主要到创建executor,与之前的示例稍微有点差异。我们使用newFixedThreadPool(1)方法创建一个的executor服务,支持一个线程的线程池。其等价与newSingleThreadExecutor(),但我们之后可以简单通过传入一个比1大的值,增加线程池大小。

    超时

    任何调用future.get()方法,将阻塞当前线程直达后台callable执行完成。最坏的情况callable一直运行,这样是你的应用查询没有响应,可以通过传入超时时间消除这种场景。

    ExecutorService executor = Executors.newFixedThreadPool(1);
    
    Future<Integer> future = executor.submit(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
            return 123;
        }
        catch (InterruptedException e) {
            throw new IllegalStateException("task interrupted", e);
        }
    });
    
    future.get(1, TimeUnit.SECONDS);
    

    执行上述代码,结果抛出TimeoutExcepton异常。

    Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    

    你可能已经知道异常的原因了:我们设定最大等待时间为1秒,但callable实际执行时间为2秒。

    InvokeAll

    executor 支持通过invokeAll()方法一次提交多个callable任务批量执行。这个方法接受一个callable集合,并返回future集合。

    ExecutorService executor = Executors.newWorkStealingPool();
    
    List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");
    
    executor.invokeAll(callables)
        .stream()
        .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        })
        .forEach(System.out::println);
    

    这个示例中我们使用java8函数式流,为了处理invokeAll方法执行的所有返回的future。我们首先map每个future的返回值,然后print每一个至控制台。

    InvokeAny

    另外一个批量提交callable任务的方法是invokeAny(),与invokeAll()方法稍微有点不同,返回第一个callable任务执行完成的结果。

    为了测试这种行为,我们使用一个方法模拟不同执行时间的callable,该方法休眠一定时间后返回给定的执行结果。

    Callable<String> callable(String result, long sleepSeconds) {
        return () -> {
            TimeUnit.SECONDS.sleep(sleepSeconds);
            return result;
        };
    }
    

    我们使用该方法创建一组callable任务,他们拥有不同额执行时间,1~3秒。然后提交这些callable至executor通过invokeAny()方法,他们返回最快执行完成的callable任务,本例中是task2。

    ExecutorService executor = Executors.newWorkStealingPool();
    
    List<Callable<String>> callables = Arrays.asList(
        callable("task1", 2),
        callable("task2", 1),
        callable("task3", 3));
    
    String result = executor.invokeAny(callables);
    System.out.println(result);
    
    // => task2
    

    上面示例通过newWorkSteaingPool()方法创建另一种类型的executor,其为java8提供的工厂方法,返回ForkJoinPool类型的执行器,与正常的executor稍微不同,代替给定一个固定大小的线程池,ForkJoinPool创建线程池大小默认为主机CPU的核数。java7中ForkJoinPool已经存在。

    Scheduled Executors

    我们已经学习如何一次性提交并运行任务给executor,为了周期性运行常规任务多次,可以使用预定线程池。

    ScheduledExecutorService 能后调度任务周期运行或一定时间之后一次运行。下面代码演示调度任务3秒之后运行。

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
    Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
    ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
    
    TimeUnit.MILLISECONDS.sleep(1337);
    
    long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
    System.out.printf("Remaining Delay: %sms", remainingDelay);
    

    调度任务生成类型类型的future——ScheduleFuture类,除了
    Future类提供的方法,还提供getDelay()方法,其返回剩余时间,且并行执行。

    为了调度任务周期性执行,executor提供两个方法,scheduleAtFixedRate()scheduleWithFixedDelay(),第一个方法能够使用固定的时间频率执行任务,如下面代码示例的每秒1次。

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
    Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
    
    int initialDelay = 0;
    int period = 1;
    executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
    

    另外,该方法接受一个初始延迟参数,即第一次执行之前需等待的时间。需要注意的是scheduleAtFixedRate()方法没有考虑实际执行任务的时间,所以你指定1秒周期,但是任务需要2秒,那么线程执行则更快。这种情况下,应该考虑使用scheduleWithFixedDelay()方法代替。这个方法就如前面描述的方式吻合,不同之处是等待周期是任务结束和下一个任务开始的间隔。示例如下:

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
    Runnable task = () -> {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("Scheduling: " + System.nanoTime());
        }
        catch (InterruptedException e) {
            System.err.println("task interrupted");
        }
    };
    
    executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
    

    该示例调度任务使用固定时间间隔,即任务结束和下一个任务开始间的时间间隔。初始延时时间为0,执行任务所需时间为2秒。所以我们最终执行间隔时间为0s,3s,6s,9s……
    所以如果你不能预测调度任务执行时间,使用scheduleWithFixedDelay()是非常方便的。

    我希望这篇文章对你有帮助,有好的建议可以给我留言。

    展开全文
  • java并行

    千次阅读 2019-08-13 15:55:58
    java7之前,处理并行数据非常麻烦. 第一:你得明确的把包含的数据结构分成若干子部分. 第二:你要给每个子部分分配独立的线程. 第三:你需要在恰当的时候对他们进行同步,来避免不希望出现的竞争条件,等待所有线程完成,...

    在java7之前,处理并行数据非常麻烦. 第一:你得明确的把包含的数据结构分成若干子部分. 第二:你要给每个子部分分配独立的线程. 第三:你需要在恰当的时候对他们进行同步,来避免不希望出现的竞争条件,等待所有线程完成,最后把这些结果合并起来.

    在前面的文章中,我们介绍了 Stream接口,让你可以很方便的处理它的元素,可以调用ParallelStream 方法把集合转换成并行流.

    并行流就是把一个内容分成多个数据块,并用不同线程分别处理每个数据块的流.

    这样一来,你就可以把给定的工作负荷自动分配给多个处理器内核,让他们都忙起来.

    假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个 直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个 数字求和的 BinaryOperator 来归约这个流,如下所示:

    //顺序
        public static Long sequentialSum(Long n){
            return Stream.iterate(1L, i->i+1L)
                    .limit(n)
                    .reduce(0L,Long::sum);
        }
    

    用更为传统的Java术语来说,这段代码与下面的迭代等价

    //传统迭代
        public static Long iterativeSum(Long n){
            Long result = 0L;
            for (Long i=1L;i<n;i++){
                result = result+i;
            }
            return result;
        }
    

    这似乎是利用并行流的好机会,特别是n很大的时候,那该怎样做呢?

    你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?

    其实根本不必担心,并行流已经帮我们做完了这些令人头疼的工作

    将顺序流转换为并行流

    //并行流
        public static Long parallelSum(Long n){
            return Stream.iterate(1L,i->i+1L)
                    .limit(n)
                    .parallel()
                    .reduce(0L,Long::sum);
        }
    

    在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它 在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并 行执行.类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流.

    测量流性能

    我们说并行求和的方法应该比顺序迭代的方法更好.但在软件工程上, 靠猜绝对不是什么好办法,有时候经验也靠不住. 你应该始终遵循三个黄金规则;测量,测量,再测量.

    为了简化测量,我们写个方法,专门用来测试 ParallelStreams类里的三个求和方法: sequentialSum iterativeSum, parallelSum.

     public Long measureSumPref(Function<Long, Long> addr, Long n) {
            long fastest = Long.MAX_VALUE;
            for (int i = 0; i < 10; i++) {
                Long start = System.nanoTime();
                Long sum = addr.apply(n);
                Long druation = (System.nanoTime() - start)/1000000;
                if (druation < fastest) {
                    fastest = druation;
                }
            }
            return fastest;
    
        }
    

    这个方法会接收一个函数和一个Long类型参数.它会对传给方法的参数应用函数10次,记录每次执行的时间.

    下面是测试结果

     //顺序
        @Test
        public void test4() {
            Long fast = measureSumPref(ParallelStreams::sequentialSum, 1000 * 10000L);
            System.out.println("sequentialSum= " + fast);//398毫秒
    
    
        }
    
        //迭代
        @Test
        public void test5() {
            Long fast = measureSumPref(ParallelStreams::iterativeSum, 1000 * 10000L);
            System.out.println("iterativeSum= "+ fast);//153毫秒
    
    
        }
        //并行
        @Test
        public void test6(){
            Long fast = measureSumPref(ParallelStreams::parallelSum, 1000 * 10000L);
            System.out.println("parallelSum= "+fast);//1309毫秒
        }
    

    看到结果,我们发现并行流操作相当令我们失望.

    求和方法的并行版本比顺序版本要慢很多!!! 其实对这个意外的结果,有两方面的原因:

    • 一:iterate 生成的是装箱对象,必须拆箱成数字才能求和.

    • 二:我们很难把iterate分成多个独立的块来执行.

    对于第二个问题,很有意思,我们直觉上可能是这样运行的,如图:

    在这里插入图片描述

    但是,iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果

    在这里插入图片描述

    也就是说,整张数字表在归纳过程开始时还没准备好,因为Stream在遇到终端操作才会开始执行,因而无法有效的把流划分为小块进行处理. 把流标记为并行,其实是给顺序处理增加了开销,它还要把每次求和操作的结果分到一个不同的线程上.

    这就说明了并行编程肯能很复杂,如果用得不对(比如采用了一个不易并行化的操作,如 iterate ),它甚至可能让程序的整体性能更差. 所以在调用那个看似神奇的 parallel 操作时,了解背后到底发生了什么是很有必要的。

    并行流使用注意事项:使用更有针对性的方法

    对于上面那种出人意料的结果,我们万不可把锅退给并行流,其实仔细分析,不难发现,这是我们使用了不恰当的的数据结构导致的.

    对于上面的并行处理操作,我们可做如下改进.在之前的文章中,我们介绍过一个叫LongStream的流.这个流有个专门针对Long型的方法

    • LongStream.rangeClosed 直接产生原始类型的long数字,没有装箱拆箱的开销.
    • LongStream.rangeClosed 会生成数字范围,很容易查分为独立的小块.
      LongStream和Stream一样都继承了BaseStream
    public interface LongStream extends BaseStream<Long, LongStream> {...}
    
    public interface Stream<T> extends BaseStream<T, Stream<T>> {...}
    

    这两个流的用法基本完全相同,唯一的不同相比从名字就能看出来,LongStream 指明了流类型为Long,类似的还有,IntStream,DoubleStream等

    我们改进代码如下:

     //顺序流改进版 LongStream.rangeClosed
        public static Long sequentialSum2(Long n) {
            return LongStream.rangeClosed(1, n)
                    .reduce(0L,Long::sum);
    
        }
        
     //并行流改进版
        public static Long paraparallelSum2(Long n) {
            return LongStream.rangeClosed(1, n)
                    .parallel()
                    .reduce(0L,Long::sum);
    
        }
    

    然后再次进行测量

      //顺序流(改进版)
        @Test
        public void test7(){
            Long fast = measureSumPref(ParallelStreams::sequentialSum2, 1000 * 10000L);
            System.out.println("顺序流(改进版)="+fast);//56毫秒------改进之前:398毫秒
    
        }
    
    
    
        //并行流(改进版)
        @Test
        public void test8(){
            Long fast = measureSumPref(ParallelStreams::paraparallelSum2, 1000 * 10000L);
            System.out.println("并行流(改进版)="+fast);//14毫秒--------改进之前:1309毫秒
    
        }
    

    由此结果可得出结论:

    • 使用LongStream比iterate效率提高 710%

    • 在上面基础上使用 并行流 比 顺序流 效率提高 400%

    可见:选择适当的数据结构往往比并行算法正重要,使用正确的数据结构后再选择并行算法能保证最佳的性能.

    尽管如此,我们也必须知道,并行化不是没有代价的.并行化本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值.
    但在多个内核之间移动数据的代价也可能比你想的要大,所以在使用并行操作很重要的一点就是要保证并行执行的工作时间要比数据在内核之前移动的时间要长.

    在使用并行Stream加速代码之前,你必须确保用的对,如果用错了,算得快就毫无意义了.让我们看一个常见的陷阱.

    高效使用并行流

    如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事

    • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流( IntStream 、 LongStream 、 DoubleStream )来避免这种操作,但凡有可能都应该用这些流

    • 有些操作本身在并行流上的性能就比顺序流差。特别是 limit 和 findFirst 等依赖于元 素顺序的操作,它们在并行流上执行的代价非常大。例如, findAny 会比 findFirst 性 能好,因为它不一定要按顺序来执行。

    • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过 流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味 着使用并行流时性能好的可能性比较大。

    • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素 的好处还抵不上并行化造成的额外开销

    • 要考虑流背后的数据结构是否易于分解。例如, ArrayList 的拆分效率比 LinkedList 高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历

    • 还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)

    在这里插入图片描述

    需要强调的是:并行流背后使用的基础架构是java7引入的分支/合并框架.我们想要正确高效的使用并行流,了解它的内部原理至关重要.

    分支/合并框架详解

    分支框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果.

    它是 ExecutorService 接口的一个实现,他把子任务分配给线程池(ForkJoinPool)中的线程.

    使用 RecursiveTask

    要把任务提交到池,必须创建 RecursiveTask 的一个子类,其中V是并行化任务产生的结果类型,

    RecursiveTask类源码:

    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    
        /**
         * The result of the computation.
         */
        V result;
    
        /**
         * The main computation performed by this task.
         * @return the result of the computation
         */
        protected abstract V compute();
    
        public final V getRawResult() {
            return result;
        }
    
        protected final void setRawResult(V value) {
            result = value;
        }
    
        /**
         * Implements execution conventions for RecursiveTask.
         */
        protected final boolean exec() {
            result = compute();
            return true;
        }
    
    }
    

    要定义 RecursiveTask, 只需实现它唯一的抽象方法compute :

    @Override
        protected Long compute() {
    
            return null;
        }
    

    这个方法定义了将任务拆分成子任务的逻辑,以及无法再拆分或不便再拆分,生成单个子任务结果的逻辑.

    即(伪代码如下):

    if (任务足够小或不可分) {
        顺序计算该任务
    } else {
        将任务分成两个子任务
        递归调用本方法,拆分每个子任务,等待所有子任务完成
        合并每个子任务的结果
    }
    

    递归的任务拆分过程如图:
    在这里插入图片描述

    如果你了解著名的分治算法,会发现这不过是分支算法的并行版本而已.

    接下来我们举一个用分支/合并框架的实际例子,还以前面的例子为基础,让我们试着用这个框架为一个数字范围(这里用一个long[] 数组表示)求和

    /**
     * 分支合并框架测试
     *
     * @author itguang
     * @create 2017-11-18 14:22
     **/
    public class ForkJoinTest extends RecursiveTask<Long> {
    
        //要处理的任务数组
        private final long[] numbers;
    
        //子任务处理数组的起始和结束位置
        private final int start;
        private final int end;
    
        //阀值,当数组小于10000就并行执行
        public static final long THRESHOLD = 10000;
    
        //公共构造函数,用于创建子任务
    
    
        //私有构造函数,用于 以递归方式为主任务创建子任务
        public ForkJoinTest(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
    
        public ForkJoinTest(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
    
    
        @Override
        protected Long compute() {
            int length = end - start;
    
            //如果大小小于等于阀值,则顺序计算结果
            if (length <= THRESHOLD) {
                return computeSequentially();
            }
    
            //否则,创建一个子任务为数组的前一半求和
            ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
            //利用另一个 ForkJoinPool 里的线程异步执行新创建的子任务.
            leftTask.fork();//对子任务调用 fork 方法可以把它排进 ForkJoinPool 。
            //创建一个任务为数组的后一半求和
            ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
            //**递归操作**
            long rightResult = rightTask.compute();
    
    
            //遇到递归终止条件,读取本次递归第一个子任务的结果,如果尚未完成就等待
            long leftResult = leftTask.join();
    
            //递归累加
            return leftResult+rightResult;
        }
    
        //计算和
        private long computeSequentially() {
    
            long sum = 0;
            for (int i = start; i < end; i++) {
    
                    sum += numbers[i];
                }
                return sum;
        }
    
    
    
    }
    

    测试:创建一个 ForkJoinPool,并把任务传递给它的invoke()方法.在ForkPool中执行时,返回结果就是ForkJoinTest的并行递归求和结果

    @Test
        public void test9(){
            long[] numbers = LongStream.rangeClosed(1, 1000*10000).toArray();
            ForkJoinTest forkJoinTest = new ForkJoinTest(numbers);
            Long sum = new ForkJoinPool().invoke(forkJoinTest);
            System.out.println(sum);//50000005000000
    
        }
    

    请注意在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一 般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任 何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM 能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的 返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器, 但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。

    当把一个ForkJoinTask 任务交给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法. 该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinTest ,而它们也由ForkJoinPool 安排执行.

    因此这一过程可以递归重复,把原任务拆分成更小的任务执行,知道满足不可拆分的条件,在上例中是拆分数组的大小小于阀值. 这时候会从递归终止开始顺序计算每个任务的结果.然后由分支创建的二叉树遍历回它的根.接下来会合并每个子任务的部分结果,从而得到总任务的结果.

    如图:

    在这里插入图片描述

    使用分支/合并框架的最佳做法

    • 对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子 任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂, 因为每个子任务都必须等待另一个子任务完成才能启动。

    • 不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直 接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。

    • 对子任务调用 fork 方法可以把它排进 ForkJoinPool 。同时对左边和右边的子任务调用 它似乎很自然,但这样做的效率要比直接对其中一个调用 compute 低。这样做你可以为 其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销

    • 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计 算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时 有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方 法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出 同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就 像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编 译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么 做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死 码分析——删去从未被使用的计算)。

    ForkJoinTask工作窃取算法

    在 ForkJoinSumCalculator 的例子中,我们决定在要求和的数组中最多包含10 000个项目 时就不再创建子任务了。这个选择是很随意的,但大多数情况下也很难找到一个好的启发式方法 来确定它,只能试几个不同的值来尝试优化它。在我们的测试案例中,我们先用了一个有1000 万项目的数组,意味着 ForkJoinSumCalculator 至少会分出1000个子任务来。这似乎有点浪费 资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所 有的任务都受CPU约束,预计所花的时间也差不多。

    但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时, 应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每 个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如 磁盘访问慢,或是需要和外部服务协调执行。

    分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应 用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分 配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执 行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经 空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队 列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队 列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程 之间平衡负载。

    一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。如图展示 了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线 程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。

    在这里插入图片描述

    小结:

    • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。

    • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的 行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。

    • 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎 总是比尝试并行化某些操作更为重要

    • 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程 上执行,然后将各个子任务的结果合并起来生成整体结果

    • Spliterator 定义了并行流如何拆分它要遍历的数据。

    引用 《java8实战》 github

    展开全文
  • Java - 并行数据处理和性能

    千次阅读 2018-12-11 15:09:24
    Java - 并行数据处理和性能并行流配置并行流使用的线程池测量流的性能使用更专业的方法正确使用并行流fork/join框架RecursiveTask使用fork/join的最佳实践偷工作Spliterator分割过程Spliterator特征实现自己的...

    并行流

    并行流是一个把元素分成多个块的流,每个块用不同的线程处理。可以自动分区,让所有的处理器都忙起来。
    假设要写一个方法,接受一个数量n做参数,计算1-n的和。可以这样实现:

        public long sequentialSum(long n) {
            return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .reduce(0L, Long::sum);
        }
    

    也许可以使用parallel方法,简单地使用并行计算,提高程序性能:

        public long sequentialSum(long n) {
            return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .parallel()
                    .reduce(0L, Long::sum);
        }
    

    这样,流可能在内部被分成多个块,导致reduction操作可以在不同的块上互不依赖地并行地各自工作。最后,reduction操作组合每个子流的并行reductions的返回值,返回的结果就是整个流的结果。见下面的示意图
    A parallel reduction operation

    实际上,调用parallel方法,流自身不会有任何变化。在内部,设置一个布尔类型的标记,标明你想在并行模式执行操作,接下来的操作都是并行的。
    类似地,你也可以使用sequential方法,把并行流转成串行的。你也许认为可以组合这两个方法:

            stream.parallel()
                .filter(...)
                .sequential()
                .map(...)
                .parallel()
                .reduce();
    

    但是,最后一次调用parallel或者sequential才会全局地影响管道。上面的例子,管道将被并行地执行。

    配置并行流使用的线程池

    并行流内部使用ForkJoinPool。默认地,线程数量等于处理器数量(Runtime.getRuntime().availableProcessors())。但是,可以修改系统属性java.util.concurrent.ForkJoinPool.common.parallelism,配置线程数量。
    这是全局配置,所以,除非你认为对性能有帮助,否则不要修改。

    测量流的性能

    我们声称并行加法应该比串行的或者自己的迭代方法快。我们可以使用JMH测量一下。这是一个工具,使用基于注解的方法,可以为JVM程序增加
    可靠的microbenchmarks。如果使用maven,可以这样引入:

            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-core</artifactId>
                <version>1.21</version>
            </dependency>
            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-generator-annprocess</artifactId>
                <version>1.21</version>
            </dependency>
    

    第一个库是核心实现,第二个包含一个注解处理器,帮助生成JAR文件,通过它可以方便地运行你的benchmark。maven配置里还应该有下面的plugin:

                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <finalName>benchmarks</finalName>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>org.openjdk.jmh.Main</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    

    程序代码如下

    import org.openjdk.jmh.annotations.Benchmark;
    import org.openjdk.jmh.annotations.BenchmarkMode;
    import org.openjdk.jmh.annotations.Fork;
    import org.openjdk.jmh.annotations.Level;
    import org.openjdk.jmh.annotations.Mode;
    import org.openjdk.jmh.annotations.OutputTimeUnit;
    import org.openjdk.jmh.annotations.Scope;
    import org.openjdk.jmh.annotations.State;
    import org.openjdk.jmh.annotations.TearDown;
    
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Stream;
    
    //测量平均时间
    @BenchmarkMode(Mode.AverageTime)
    //以毫秒为单位,打印benchmark结果
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    //执行两次,增加可靠性。堆空间是4Gb
    @Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
    @State(Scope.Benchmark)
    public class ParallelStreamBenchmark {
        private static final long N = 10_000_000L;
    
        @Benchmark
        public long sequentialSum() {
            return Stream.iterate(1L, i -> i + 1).limit(N)
                    .reduce(0L, Long::sum);
        }
        
        //每次执行benchmark后,执行GC
        @TearDown(Level.Invocation)
        public void tearDown() {
            System.gc();
        }
    }
    

    使用大内存,和每次迭代以后试着GC都是为了尽量减少GC的影响。尽管如此,结果应该再加一些盐。很多因素会影响执行时间,比如你的机器有多少核。
    默认地,JMH一般先执行5次热身迭代,这样可以让HotSpot优化代码,然后再执行5次迭代用来计算最终的结果。你可以使用-w和-i命令行参数修改这些配置。
    在我的机器上,使用JDK 1.8.0_121, Java HotSpot™ 64-Bit Server VM,执行结果是

    Benchmark                              Mode  Cnt   Score   Error  Units
    ParallelStreamBenchmark.sequentialSum  avgt   10  83.565 ± 1.841  ms/op
    

    你应该期望,使用经典的for循环的迭代版本运行得更快,因为它在更低层(level)工作,而且,更重要的是,它不需要执行原始类型的装箱和拆箱操作。我们测试一下这个方法:

        @Benchmark
        public long iterativeSum() {
            long result = 0;
            for (long i = 1L; i <= N; i++) {
                result += i;
            }
            return result;
        }
    

    执行结果是

    Benchmark                             Mode  Cnt  Score   Error  Units
    ParallelStreamBenchmark.iterativeSum  avgt   10  6.877 ± 0.068  ms/op
    

    证实了我们的期望:迭代版本比串行流快了10倍。让我们使用并行流试一试:

    Benchmark                            Mode  Cnt    Score   Error  Units
    ParallelStreamBenchmark.parallelSum  avgt   10  110.157 ± 1.882  ms/op
    

    非常令人失望:并行版本的求和一点都没有发挥多核的优势,比串行版还要慢。为什么会这样?有两个问题混在一起:

    • 迭代生成了装箱对象,它们在做加法前,必须拆箱成数字
    • 迭代很难划分独立的块来并行地执行

    第二点是特别有趣的,不是所有的流都是适合并行处理的。特别是,迭代的流就很难,这是因为,函数的输入依赖上一个函数的结果。见下图:
    iterate is inherently sequential

    这意味着,reduction过程并没有像第一张图里所表示的那样执行。reduction开始的时候,还没有整个数字列表,所以没法分块。把流标记为并行的,反而增加了在不同线程上执行的求和要被串行处理的负担。

    使用更专业的方法

    LongStream.rangeClosed方法使用的是原始long类型,所以不用装箱和拆箱。而且,它生产的数的范围,可以很容易地分成不依赖的块。比如,范围1-20可以被分成1-5、6-10、11-15和16-20。

        @Benchmark
        public long rangedSum() {
            return LongStream.rangeClosed(1, N)
                    .reduce(0L, Long::sum);
        }
    

    输出是

    Benchmark                          Mode  Cnt  Score   Error  Units
    ParallelStreamBenchmark.rangedSum  avgt   10  7.660 ± 1.643  ms/op
    

    可以看出来,比并行流快了很多,仅比经典的for循环慢了一点。LongStream支持并行:

        @Benchmark
        public long parallelRangedSum() {
            return LongStream.rangeClosed(1, N)
                    .parallel()
                    .reduce(0L, Long::sum);
        }
    

    输出是

    Benchmark                                  Mode  Cnt  Score   Error  Units
    ParallelStreamBenchmark.parallelRangedSum  avgt   10  4.790 ± 5.142  ms/op
    

    可以发现,并行生效了。甚至比for循环还快了1/3。

    正确使用并行流

    滥用并行流产生错误的主要原因是使用了改变共享状态的算法。下面是一个通过改变共享的累加器来实现前n个自然数求和的例子:

        public long sideEffectSum(long n) {
            Accumulator accumulator = new Accumulator();
            LongStream.rangeClosed(1, n).forEach(accumulator::add);
            return accumulator.total;
        }
        
        public class Accumulator {
            public long total = 0;
            public void add(long value) { 
                total += value; 
            }
        }
    

    这种代码很常见,特别对熟悉命令式编程范式的开发者而言。当你迭代数字列表时,经常这样做:初始化一个累加器,遍历元素,使用累加器相加。
    这代码有什么错?它是串行的,失去了并行性。让我们试着使用并行流:

        public long sideEffectParallelSum(long n) {
            Accumulator accumulator = new Accumulator();
            LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
            return accumulator.total;
        }
    

    多执行几次,你会发现,每次返回的结果都不一样,而且都不是正确的50000005000000。这是因为多线程累加的时候,total += value并不是原子操作。那么怎样才能写出并行情况下,正确的代码呢?

    • 如果有怀疑,就做测试
    • 注意装箱问题。Java提供的原始类型流(IntStream、LongStream和DoubleStream)可以避免类似的问题,尽量使用他们
    • 有些操作使用并行流性能更差。尤其是像limit和findFirst这种依赖元素顺序的操作,使用并行是非常昂贵的。比如,findAny就比findFirst性能好,因为它跟顺序无关。调用unordered方法,可以把一个有顺序的流变成无顺序的流。比如,如果你需要流的N个元素,而你对前M个感兴趣,在一个无顺序的流上调用limit比有顺序的高效
    • 如果数据量不大,不要选择并行流
    • 要考虑流的底层数据结构的可分解程度。比如,ArrayList比LinkedList分解起来更高效,因为不遍历就可以分割。使用range工厂增加的原始类型流也很容易分割。可以通过实现自己的Spliterator分割流
    • 流的特征,以及中间操作如何修改流的元素,会改变分解过程的性能。比如,一个SIZED流可以被分解成两个相等的部分,并且每个部分可以高效得并行处理,但是,filter会过滤掉任何不满足条件的元素,导致流的size成了未知的
    • 考虑结束操作是廉价的还是昂贵的merge步骤(比如,Collector的combiner方法)。如果是昂贵的,组合并行结果的代价会比并行流带来的好处还要高

    下面的表格,总结一些流在可分解性方面的并行友好性

    可分解性
    ArrayList优秀
    LinkedList
    IntStream.range优秀
    Stream.iterate
    HashSet
    TreeSet

    fork/join框架

    fork/join框架用来递归地把可并行的任务分解成小任务,然后组合每个子任务的结果,以生成总的结果。它实现了ExecutorService接口,这样所有的子任务都在一个线程池(ForkJoinPool)内工作。

    RecursiveTask

    要向ForkJoinPool提交任务,你不得不增加RecursiveTask的子类-R是并行任务(以及每个子任务)的返回类型,或者
    增加RecursiveAction的子类-当没有返回值的时候。要定义RecursiveTask,需要实现它唯一的抽象方法:

            protected abstract R compute();
    

    该方法定义分割任务和不能继续被分割时处理一个子任务的算法的逻辑。该方法的实现,经常像下面的伪代码:

    if (任务足够小,不再被分) {
        顺序执行任务
    } else {
        把任务分成两个子任务
        递归地调用本方法,尽量分割每个子任务
        等待所有子任务的完成
        组合每个子任务的结果
    }
    

    可以发现,这是分治算法的并行实现。我们继续求和的例子,演示怎么使用fork/join框架。首先需要扩展RecursiveTask类:

    import java.util.concurrent.RecursiveTask;
    
    /**
     * Created by leishu on 18-12-11.
     */
    public class ForkJoinSumCalculator extends RecursiveTask<Long> {
        //分割任务的阈值
        public static final long THRESHOLD = 10_000;
        //要被求和的数组
        private final long[] numbers;
        private final int start;
        private final int end;
    
        public ForkJoinSumCalculator(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
        //生成子任务的私有构造器
        private ForkJoinSumCalculator(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            //子任务的大小
            int length = end - start;
            if (length <= THRESHOLD) {
                return computeSequentially();//小于阈值,不分割
            }
            //增加第一个子任务
            ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
            //异步执行,新的子任务使用ForkJoinPool的另一个线程
            leftTask.fork();
            ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
            //同步执行第二个子任务,允许递归
            Long rightResult = rightTask.compute();
            //读取第一个子任务的结果,如果没完成就等待
            Long leftResult = leftTask.join();
            //组合
            return leftResult + rightResult;
        }
    
        //顺序执行
        private long computeSequentially() {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }
    

    然后写一个方法,执行并行求和:

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
    

    执行一下,输出如下

    Benchmark                             Mode  Cnt   Score   Error  Units
    ParallelStreamBenchmark.forkJoinSumB  avgt    4  28.458 ± 0.602  ms/op
    

    性能不够好,这是因为在ForkJoinSumCalculator使用的是一个long[]。
    The fork/join algorithm

    使用fork/join的最佳实践

    • 调用任务的join方法,会阻塞调用者,直到返回结果。所以,要在两个子任务都启动以后在调用它
    • 不要在RecursiveTask内使用ForkJoinPool的invoke方法
    • 子任务的fork方法是用来做调度的。在两个子任务上直接调用它似乎是很自然的,但是,在其中一个上调用compute效率更高,因为这样能重用相同的线程

    偷工作

    任务被分给ForkJoinPool里的线程。每个线程有一个保存任务的双端链表,顺序地执行链表中的任务。如果由于某种原因(比如I/O),一个线程完成了分配给他的全部任务,它会随机地从其他线程选择一个队列,从队列的尾部偷一个任务。这个过程会持续,直到所有的队列都空了为止。所以,要有大量的小任务,而不是几个大任务,这样可以更好地平衡线程的负荷。
    The work-stealing algorithm

    Spliterator

    Spliterator是Java 8 提供的新接口,意思是“splitable iterator”,用来并行地迭代源中的元素。也许你不用开发自己的Spliterator,但是,理解了它,也就明白了并行流是如何工作的。Java 8已经在Collections框架内提供了Spliterator的默认实现。Collection接口有一个default方法spliterator(),它就返回一个Spliterator对象。我们先看看Spliterator接口的定义:

    public interface Spliterator<T> {
        //用来按顺序消费Spliterator的元素,如果还有元素就返回true
        boolean tryAdvance(Consumer<? super T> action);
        //把一些元素分到一个新的Spliterator,以允许他们并行处理
        Spliterator<T> trySplit();
        //剩余的可被遍历的元素数量估值
        long estimateSize();
        int characteristics();
    }
    

    tryAdvance方法的行为类似于迭代器,用来按顺序消费Spliterator的元素,如果还有元素就返回true。trySplit方法
    用来把一些元素分到一个新的Spliterator,以允许他们并行处理。

    分割过程

    把一个流分割成多个部分是一个递归过程,如下图所示。首先,在第一个Spliterator上调用trySplit生成一个新的。然后,在这两个Spliterator上调用trySplit,这样产生四个。一直进行下去,直到该方法返回null,标志着不能再被分割。最后,当所有的trySplit都返回null时,递归过程结束。
    The recursive splitting process

    分割过程也会受到Spliterator的特征(由characteristics方法声明)的影响。

    Spliterator特征

    characteristics方法返回一个整数,用来更好地控制和优化Spliterator的用法。

    Characteristic描述
    ORDERED元素是有顺序的(比如List),所以Spliterator使用该顺序做遍历和分区
    DISTINCT对于每对遍历的元素x和y,x.equals(y)返回false
    SORTED遍历的元素遵循预定义的排序顺序
    SIZED源的size是已知的(比如set),所以estimatedSize()返回的值是精确的
    NON-NULL元素不会为空
    IMMUTABLE源是不可变的,说明遍历的时候,元素不会被增加、修改和删除
    CONCURRENT源是并发安全的,并发修改的时候,不用任何同步
    SUBSIZEDSpliterator和接下来产生的Spliterator都是SIZED

    实现自己的Spliterator

    我们开发一个简单的方法,用来计算字符串中的单词数。

        public int countWordsIteratively(String s) {
            int counter = 0;
            boolean lastSpace = true;
            for (char c : s.toCharArray()) {
                if (Character.isWhitespace(c)) {
                    lastSpace = true;
                } else {
                    if (lastSpace) counter++;
                    lastSpace = false;
                }
            }
            return counter;
        }
    

    要计算的字符串是但丁的“地域”的第一句

            public static final String SENTENCE =
                    " Nel   mezzo del cammin  di nostra  vita "
                            + "mi  ritrovai in una  selva oscura"
                            + " che la  dritta via era   smarrita ";
    
            System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
    

    注意,两个单词间的空格数是随机的。执行结果

    Found 19 words
    

    使用函数式实现

    首先需要把字符串转换成一个流。原始类型int、long和double才有原始的的流,所以,我们使用Stream:

    Stream<Character> stream = IntStream.range(0, SENTENCE.length())
                                        .mapToObj(SENTENCE::charAt);
    

    可以使用reduction计算单词数量。当reduce的时候,你不得不携带由两个变量组成的状态:整数型的总数和布尔型的字符是否是空格。因为Java没有tuples,你得增加一个新类-WordCounter-封装状态:

        class WordCounter {
            private final int counter;
            private final boolean lastSpace;
    
            public WordCounter(int counter, boolean lastSpace) {
                this.counter = counter;
                this.lastSpace = lastSpace;
            }
            
            //遍历,累加
            public WordCounter accumulate(Character c) {
                if (Character.isWhitespace(c)) {
                    return lastSpace ? this : new WordCounter(counter, true);
                } else {
                    //如果上一个字符是空格,而当前的不是,就加1
                    return lastSpace ? new WordCounter(counter + 1, false) : this;
                }
            }
    
            //组合,求和
            public WordCounter combine(WordCounter wordCounter) {
                return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
            }
    
            public int getCounter() {
                return counter;
            }
        }
    

    下面是遍历一个新字符时,WordCounter的状态图
    The state transitions

    然后,我们就可以使用流的reduce方法了

        private int countWords(Stream<Character> stream) {
            WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
                    WordCounter::accumulate,
                    WordCounter::combine);
            return wordCounter.getCounter();
        }
    

    我们做一下测试

            Stream<Character> stream = IntStream.range(0, SENTENCE.length())
                    .mapToObj(SENTENCE::charAt);
            System.out.println("Found " + countWords(stream) + " words");
    

    执行结果是正确的。

    并行的实现

    我们修改一下代码

            System.out.println("Found " + countWords(stream.parallel()) + " words");
    

    执行结果不是找到19个单词了。因为源字符串在随意的位置被分割,一个字符被多次分割。要解决这个问题,就需要实现自己的Spliterator。

        class WordCounterSpliterator implements Spliterator<Character> {
    
            private final String string;
            private int currentChar = 0;
    
            private WordCounterSpliterator(String string) {
                this.string = string;
            }
    
            @Override
            public boolean tryAdvance(Consumer<? super Character> action) {
                //消费当前字符
                action.accept(string.charAt(currentChar++));
                //如果还有字符可被消费,返回true
                return currentChar < string.length();
            }
    
            @Override
            public Spliterator<Character> trySplit() {
                int currentSize = string.length() - currentChar;
                //小于阈值,不再分割
                if (currentSize < 10) {
                    return null;
                }
                //候选的分割位置是字符串的一半长度
                for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
                    //如果是空格,才分割
                    if (Character.isWhitespace(string.charAt(splitPos))) {
                        Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
                        //当前位置修改为分割位置
                        currentChar = splitPos;
                        return spliterator;
                    }
                }
                return null;
            }
    
            @Override
            public long estimateSize() {
                return string.length() - currentChar;
            }
    
            @Override
            public int characteristics() {
                return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
            }
        }
    

    然后,我们做测试

            Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
            Stream<Character> stream = StreamSupport.stream(spliterator, true);
    
            System.out.println("Found " + countWords(stream) + " words");
    

    这回没问题了。

    展开全文
  • 接口并行开发中有着重要作用 下面演示接口的例子 模拟显卡 工作 显卡应该有显示功能 和获得厂家名称的功能 interface VideoCard {void Display();void GetName();}; 下面实现Deming显卡 ,我们通

    接口在并行开发中有着重要作用        下面演示接口的例子      模拟显卡  工作

     

    显卡应该有显示功能 和获得厂家名称的功能

    interface  VideoCard 

    {

    void Display();

    void GetName();

    };

     

    下面实现Deming显卡 ,我们通过一个类来实现 上述接口 并且在类中添加自己的特性  因为显卡的厂家不同所以 实例化的类就不同    对于生产厂商只需要满足标准接口就行了

    对于谁生产 都无所谓    这就是接口的用处  我们是利用 Deming 类来实现 VideoCard这个接口 

     

    class  Deming implements VideoCard
    {
     public void Display()
     {
      System.out.println("Display");
     }
     public void GetName()
     {
      System.out.println("Deming");
     }
     String name;
     Deming()
     {
      name="Deming";
     }
     void SetName(String name)
     {
      this.name=name;
     }
     void ShowName()
     {
      System.out.println(name);
     }
     

    这是显卡的函数 我们用户需要另外一个主函数来执行这个厂商生产的显卡类     User  到此就完成了接口的应用

    class  user

    {

     public static void main(String []args)
     {
      Deming p=new Deming();
      p.ShowName();
      p.Display();
      p.SetName("New Name");
      p.ShowName();
      
      
     }

     

     



     

     

     

     

     

     

     

    展开全文
  • 1 实验题目 体操比赛计算选手成绩的办法是去掉一个最高分和一个最低分再计算平均分 而学校考 察一个班级的某科目的考试情况时是计算全班学生的平均成绩 Gymnastics 类和 School 类都实现了 ComputerAverage 接口但...
  • 在前两篇的 Java 8 函数式编程的 blog 中,我们聊了 Lambda 表达式,聊了一些常用的 Stream API 和一些收集器方法。在今天,我们要考虑一下效率了。
  • 【项目说明】:使用Runnable接口实现求和的Java多线程并行程序求1+2+3+...+1000000000,输出结果、并行、串行时间并计算加速比。【项目代码】:public class And { public static void main(String[] args) throws ...
  • 【九】Java接口

    千次阅读 2020-02-25 19:26:56
    思维导图参考:【九】Java接口思维导图 【习题与详解】 (封装、继承、多态、接口)创建三个类,组成一个继承树,表示游戏中的角色练习题 【经典接口回调案例案例】 探究Comparable接口中CompareTo的方法,即其中的...
  • java8 并行流使用

    千次阅读 2017-07-26 15:19:03
    一.并行流 1.并行流运行时:内部使用了fork-join框架  其默认线程数为处理器数量,Runtime.getRuntime().availableProcessors() ... System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
  • 项目简介 Async 是一款 Java 异步处理框架 设计目的 并行执行可以大幅度提升程序的运行速度有效利用 CPU 资源 但是单独为每次方法都使用线程池手写显然不够优雅复用性也很差 特性 支持接口类的动态代理异步 支持非...
  • 项目简介 Async 是一款 Java 异步处理框架 设计目的 并行执行可以大幅度提升程序的运行速度有效利用 CPU 资源 但是单独为每次方法都使用线程池手写显然不够优雅复用性也很差 特性 支持接口类的动态代理异步 支持非...
  • 【项目说明】:使用Runnable接口实现求10000以内素数个数的Java多线程并行程序求10000以内素数个数,输出结果、并行、串行时间,并计算加速比。【项目代码】:public class And { public static void main(String[]...
  • 今天和大家分享在业务开发中如何降低接口响应时间的一个小技巧,也是大家日常开发中比较普遍存在的一个问题,即如何提高程序的并行计算能力? 本文主要包含以下内容: 顺序执行很慢 线程池+...
  • JAVA并行计算中的外部排序

    万次阅读 2018-08-01 23:50:52
    并行计算中的外部排序 一、并行计算的方法 (1)将数据拆分到每个节点上【如何拆分】 (2)每个节点并行的计算结果【什么结果】 (3)将结果汇总 【如何汇总】 二、外部排序 如何排序10G个元素? .....
  • 接口并行并发稳定安全策略 一个jvm时: 1、使用java中的Lock加锁 2、synchronized重量锁 注:Lock只能在一个jvm中起效,如果集群部署,在多个JVM中就会造成无锁模式 多个jvm时: 1、数据库-行级排他锁(写锁) ...
  • Java并行编程–从并行任务集获取反馈

    千次阅读 多人点赞 2012-04-01 12:53:24
    Java并行编程–从并行任务集获取反馈在并行任务启动后,强制性地从并行任务得到反馈。假想有一个程序,可以发送批邮件,还使用了多线程机制。你想知道有多少邮件成功发送吗?你想知道在实际发送过程期间,这个批处理...
  • java接口自动化接口测试

    千次阅读 2019-08-27 10:24:09
    这里测试的接口为spring boot开发接口文章中开发好的接口。 测试用例
  • JavaCV还具有硬件加速的全屏图像显示( CanvasFrame和GLCanvasFrame ),易于使用的方法以在多核上并行执行代码( Parallel ),相机和投影仪的用户友好型几何和颜色校准( GeometricCalibrator , ...
  • Java8 并行流多线程操作

    千次阅读 2018-11-12 16:59:52
    并行流是一个把内容分成多个数据块的,并用不同的线程分别处理每个数据块的流。下面通过简单的示例介绍一下顺序流和并行流的特点。后面后时间在详细记录。 并行流: public static void main(String[] args) { ...
  • java 异步并行框架 async-01-入门教程

    万次阅读 2019-03-12 19:49:47
    Async 是一款 Java 异步处理框架。 设计目的 并行执行可以大幅度提升程序的运行速度,有效利用 CPU 资源。 但是单独为每次方法都使用线程池手写,显然不够优雅,复用性也很差。 特性 支持接口类的动态代理异步 ...
  • 背景 1.通常我们在获取到一个list列表后需要一个挨着一个的进行遍历处理数据,如果每次处理都需要长时间操作,...2.Java8的stream接口极大地减少了for循环写法的复杂性,stream提供了map/reduce/collect等一系列聚...
  • JAVA并行程序基础

    千次阅读 2017-03-29 23:55:35
    JAVA并行程序基础 在面向线程设计的计算机结构中,进程是线程的容器。我们都知道,程序是对于指令、数据及其组织形式的描述,而进程是程序的实体。 线程是轻量级的进程,是程序执行的最小单位。(PS:使用多线程去进行...
  • 归并排序可以将问题分成独立的子问题,所以很适合使用ForkJoinPool进行并行排序,以充分利用多核cpu,ForkJoinPool的默认构造器创建的线程数量为CPU核心数。归并排序改进后好像是不需要额外的O(n)存储空间需求的,...
  • 需求: 获取3个有一定关联的业务接口的数据并返回 接口返回数据格式: A: {“msg”:“操作成功”,“success”:true,“AStatus”:7} B: {“msg”:“操作成功”,“success”:true,“BStatus”:1} C: {“msg”:“操作成功...
  • 使用Runnable接口实现并行的算法设计... 3 2.3继承Thread类实现并行的算法设计... 3 2.4 理论加速比分析... 3 3.使用Runnable接口并行算法实现... 4 3.1 代码及注释... 4 3.2 执行结果截图... 6
  • Java接口开发流程总结

    千次阅读 2020-03-23 17:01:43
    之前课上讲过好多次接口开发的流程以及什么是接口开发,但是仍有好多同学不理解,在这做一个总结,希望能帮助到“不明白什么是接口开发”以及“不知道如何进行接口开发”的同学。 一、什么是接口开发? 接口开发,...
  • 走进Java接口测试之测试框架TestNG

    千次阅读 2018-12-14 10:02:07
    Demo基本注释常用断言方法testng.xml测试方法,测试类和测试组测试方法测试组群组排除组部分组参数化testng.xml 中的参数使用 DataProviders 的参数依赖性带注释的依赖关系XML 中的依赖关系工厂忽略测试并行和超时...
  • Java多线程并行计算

    2020-04-19 17:26:47
    945+0800 INFO [main] com.qx.test.ParallelTest - costs: 1001ms 总结:以上就是如何让接口并行计算的三种实现方式,属于日常开发中比较常用的代码优化技巧。这里没有做过多的说明和比较,需要大家查阅更多的相关...
  • Java 7之前,并行处理数据集合非常麻烦。 第一,你得明确地把包含数据的数据结构分成若干子部分。 第二,你要给每个子部分分配一个独立的线程。 第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 125,478
精华内容 50,191
关键字:

java接口如何并行

java 订阅