精华内容
下载资源
问答
  • 上一篇文章《java8 stream运行原理之顺序流原理详解》介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理。 一、如何创建并行流 调用parallel()方法可以创建并行流,如下: public static void main...

    上一篇文章《java8 stream运行原理之顺序流原理详解》介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理。

    一、如何创建并行流

    调用parallel()方法可以创建并行流,如下:

        public static void main(String argv[]){
            Stream<String> stream=Stream.of("1","2","","123");
            stream.filter(x->x.length()>=1).parallel().forEach(System.out::println);
        }
    

    二、并行流原理

    下面以第一小节的代码为例,介绍一下并行流原理。
    这里只介绍最后的终端操作(forEach),对于如何创建Stream流,以及中间操作原理请参见上一篇文章。

    1、parallel()

    首先来看一下parallel()方法:

        public final S parallel() {
        	//sourceStage是Head对象引用
        	//将Head对象的parallel属性设置为true
            sourceStage.parallel = true;
            return (S) this;
        }
    

    parallel()方法仅仅将Head对象的parallel属性设置为true。

    2、forEach()

    下面是forEach()方法源码:

        public void forEach(Consumer<? super P_OUT> action) {
        	//makeRef()创建TerminalOp对象
            evaluate(ForEachOps.makeRef(action, false));
        }
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    		//检查sourceStage.parallel的值,如果为true,表示是并行流
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    

    在forEach()里面调用了terminalOp.evaluateParallel()进行并行处理。下面是terminalOp.evaluateParallel()的方法:

        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<S> spliterator) {
            //ordered表示是否有序遍历,true表示有序
            //forEach操作默认ordered都是false
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }
    

    在evaluateParallel()方法里面创建ForEachTask对象。
    ForEachTask实现了ForkJoinTask类,而这个ForkJoinTask是Fork/Join框架中的,这是java7新增的功能。Fork/Join框架对并行处理做了很多优化。
    上面代码的最后调用了invoker()方法,这个方法会调用到ForEachTask.compute()方法:

    public void compute() {
    	  //将数据源拆分为两部分,分别交给两个不同的线程处理,
    	  //这两部分使用rightSplit和leftSplit记录
          Spliterator<S> rightSplit = spliterator, leftSplit;
          long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
          if ((sizeThreshold = targetSize) == 0L)
              targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
          boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
          boolean forkRight = false;
          Sink<S> taskSink = sink;
          ForEachTask<S, T> task = this;
          while (!isShortCircuit || !taskSink.cancellationRequested()) {
              //rightSplit.trySplit()可以对数据源的数据进行拆分,将数据一分为二
              //如果剩余的数据量不足以进行再次拆分,则直接使用当前线程处理
              if (sizeEstimate <= sizeThreshold ||
                  (leftSplit = rightSplit.trySplit()) == null) {
                  task.helper.copyInto(taskSink, rightSplit);
                  break;
              }
              //将数据拆分为两部分后,左半部分的数据再创建ForEachTask对象
              ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
              task.addToPendingCount(1);
              ForEachTask<S, T> taskToFork;
              //forkRight相当于一个开关,如果上次启动任务处理左半部分数据,那么这次启动任务处理右半部分数据
              if (forkRight) {
                  forkRight = false;
                  rightSplit = leftSplit;
                  taskToFork = task;
                  task = leftTask;
              }
              else {
                  forkRight = true;
                  taskToFork = leftTask;
              }
              //调用fork()可以将任务加入到待处理队列中,后续线程池中的线程会将任务取走处理
              taskToFork.fork();
              sizeEstimate = rightSplit.estimateSize();
          }
          task.spliterator = null;
          task.propagateCompletion();
      }
    }		
    

    并行流处理的核心逻辑就在compute()方法里面,下面总结一下并行流的执行流程:

    1. 调用Spliterator.trySplit()拆分流,将流一拆为二,如果不能再拆分,那么调用Sink对象链表处理数据;
    2. 拆分为两部分数据后,右半部分的数据使用当前ForEachTask对象处理,左半部分数据新创建一个ForEachTask对象处理,之后分别启动线程处理这两个任务。

    如果处理流的操作都是无状态的,那么执行会像上面介绍的一样在终端操作里面创建并行任务,如果中间操作有有状态的,那么在创建Spliterator对象时,会先将该有状态操作以及之前的所有操作并行执行一次,得到的结果作为Spliterator对象,然后将该Spliterator对象传递给终端操作,这一段处理逻辑可以参见方法sourceSpliterator():

        private Spliterator<?> sourceSpliterator(int terminalFlags) {
            // Get the source spliterator of the pipeline
            Spliterator<?> spliterator = null;
            if (sourceStage.sourceSpliterator != null) {
                spliterator = sourceStage.sourceSpliterator;
                sourceStage.sourceSpliterator = null;
            }
            else if (sourceStage.sourceSupplier != null) {
                spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
                sourceStage.sourceSupplier = null;
            }
            else {
                throw new IllegalStateException(MSG_CONSUMED);
            }
    		//如果是并行流且有有状态的操作,那么下面的if分支判断为true
            if (isParallel() && sourceStage.sourceAnyStateful) {
                // Adapt the source spliterator, evaluating each stateful op
                // in the pipeline up to and including this pipeline stage.
                // The depth and flags of each pipeline stage are adjusted accordingly.
                int depth = 1;
                //从Head开始遍历各个操作对象
                for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                     u != e;
                     u = p, p = p.nextStage) {
    
                    int thisOpFlags = p.sourceOrOpFlags;
                    //如果当前操作是有状态的
                    if (p.opIsStateful()) {
                        depth = 0;//操作对象链表深度记为0
                        if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                            // Clear the short circuit flag for next pipeline stage
                            // This stage encapsulates short-circuiting, the next
                            // stage may not have any short-circuit operations, and
                            // if so spliterator.forEachRemaining should be used
                            // for traversal
                            thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                        }
    					//将数据进行并行处理,只执行当前有状态的操作以及它前面的操作
    					//将操作的结果创建一个Spliterator对象,
    					//该Spliterator对象接下来就作为后面流处理的数据源
                        spliterator = p.opEvaluateParallelLazy(u, spliterator);
    
                        // Inject or clear SIZED on the source pipeline stage
                        // based on the stage's spliterator
                        thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                                ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                                : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                    }
                    //操作对象链表深度加1 ,如果中间操作有有状态的,那么该有状态的操作深度为0,
                    //相当于接下来以该有状态操作对象作为链表起点
                    p.depth = depth++;
                    p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
                }
            }
            if (terminalFlags != 0)  {
                // Apply flags from the terminal operation to last pipeline stage
                combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
            }
            return spliterator;
        }
    
    展开全文
  • 一、顺序流原理总述 下图是Stream的继承结构: 蓝框表示接口,灰框表示抽象类,绿框表示非抽象类。 因为Integer、Double、Long比较常用且特殊,java8提供了专门的Stream类。不过这三个类的原理与ReferencePipeline...

    接下来将通过两篇文章介绍stream的原理,本文介绍顺序流,下篇文章介绍并行流。

    一、顺序流原理总述

    下图是Stream的继承结构:
    在这里插入图片描述
    蓝框表示接口,灰框表示抽象类,绿框表示非抽象类。
    因为Integer、Double、Long比较常用且特殊,java8提供了专门的Stream类。不过这三个类的原理与ReferencePipeline是一样的,本文接下来就以ReferencePipeline为例做介绍。
    ReferencePipeline是最常用的流对象,一般使用的流对象都是该类。该类是抽象类,它有三个子类,分别是StatelessOp、StatefulOp、Head,前面两个类表示操作类型,分别表示无状态操作和有状态操作。java将流经过的操作组装成一个链表,这个链表的头结点就是Head对象,Head对象不表示任何操作,仅仅标示一个流的开始,之后的中间操作分为有状态的和无状态的,最后一个终端操作使用TerminalOp表示,在这个链条上每创建一个操作对象,便使用该对象的属性previousStage记录前一个操作对象的引用,同时也会更新前一个操作的nextStage属性值为当前操作对应的引用,最后形成下图的链表,注意终端操作并没有接到链表上:
    在这里插入图片描述

    当创建了TerminalOp对象之后,便要从TerminalOp对象开始向前遍历每个操作对象,将每个操作对象转换为Sink对象,这样每个元素需要的操作处理就从操作对象转换到了Sink对象上。Sink对象也会形成一个链表,每个Sink对象里面有一个属性downstream记录了当前操作的后一个Sink对象。Head没有对应的Sink对象,就上图来说,Sink对象形成的链表头结点是StatefulOp对应的节点,上图形成的Sink对象链表如下图:
    在这里插入图片描述
    Sink对象链表创建好后,接下来就开始遍历每个元素,对元素执行Sink对象链表上的每个操作。当执行完后,就可以得到本次流操作的结果。
    下图是Sink和TerminalOp的继承关系图:
    在这里插入图片描述
    蓝框表示接口,绿框表示非抽象类。collect()/max()/min()/count()这几个终端操作从上图没有找到对应的Op类,其实它们都归属到了ReduceOp类中。
    Sink接口对所有操作都提供了对应的实现类,只不过有些实现类有类名字,有些是内部匿名类,没有类名字,我没有在上图一一画出这些实现类。
    除了Sink、Stream之外,还有一个大的集成体系,那便是Spliterator继承结构:
    在这里插入图片描述
    Spliterator是一个分离器,数据流的源便保存在分离器中,分离器提供了一些方法,包括遍历流中的元素,检查是否还有未处理元素,流中元素的总个数等。
    如果数据源是一个数组,该数组对象会保存到ArraySpliterator对象中;如果是集合或者Iterator,那么便使用IteratorSpliterator保存;如果使用Stream.concat()融合两个流,分离器使用ConcatSpliterator;如果使用Stream.generate()创建流,分离器使用InfiniteSupplyingSpliterator。
    除了上面这些类之外,java还提供了一些工具类,这些类里面主要是静态方法:
    在这里插入图片描述

    二、源码分析

        public static void main(String argv[]){
            Stream<String> stream=Stream.of("1","2","","123");
            stream.filter(x->x.length()>=1).forEach(System.out::println);
        }
    

    下面我以上述代码为例介绍Stream源码的执行过程。

    1、Stream.of()

        public static<T> Stream<T> of(T... values) {
            return Arrays.stream(values);
        }
    

    Stream.of()的入参为数组,里面将请求传给Arrays,最终调用到Arrays.stream()方法上,

    	//startInclusive是数组的开始下标,在本例中是0,
    	//endExclusive是数组的结束下标,在本地中是数组的长度
        public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
        	//最后的入参表示是否是并行流
            return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
        }
    

    在Arrays.stream()里面首先要调用spliterator()方法创建一个Spliterator对象:

        public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
        	//创建Spliterator对象
        	//spliterator()的最后一个入参表示本分离器的特征:结构上不可变,元素与元素之间关联有序
            return Spliterators.spliterator(array, startInclusive, endExclusive,
                                            Spliterator.ORDERED | Spliterator.IMMUTABLE);
        }
        //下面这个方法是Spliterators.spliterator()
        public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
                                                     int additionalCharacteristics) {
            //对入参做合法性校验
            checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex);
            //创建ArraySpliterator对象
            return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
        }
        	//ArraySpliterator的构造方法
            public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
                this.array = array;
                this.index = origin;
                this.fence = fence;
                //特征值增加两个:元素个数已知,且对数据源做任意切分,元素个数都是已知的
                this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
            }
    

    创建出Spliterator对象后,下面要执行StreamSupport.stream()方法:

        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            //创建出Head对象,也就是操作对象链表的头结点,最后一个入参表示是否是并行流
            //Head对象继承自AbstractPipeline,创建Head对象构造方法调用AbstractPipeline
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
        //AbstractPipeline类的构造方法
        AbstractPipeline(Spliterator<?> source,
                         int sourceFlags, boolean parallel) {
            this.previousStage = null;//前一个操作对象引用,这里是null
            this.sourceSpliterator = source;//分离器,持有分离器也就持有了数据源
            this.sourceStage = this;//当前操作对象
            this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;//特征值
            this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
            this.depth = 0;//表示操作链表的深度
            this.parallel = parallel;//是否是并行流
        }
    

    Stream.of()方法最后创建出Head对象,并将Head对象返回给调用方。

    2、stream.filter()

    filter()是一个无状态的操作,因此执行该方法时,内部创建一个StatelessOp对象:

        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            //StatelessOp继承自AbstractPipeline,创建StatelessOp对象需要调用AbstractPipeline的构造方法
            //注意:这里的this是Head对象
            return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
                //当执行到终端方法时,使用下面的方法将当前操作对象转换为Sink对象 
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
        //下面是AbstractPipeline的构造方法
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
    
            this.previousStage = previousStage;//记录前一个操作,形成一个双向链表
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);//融合特征值
            this.sourceStage = previousStage.sourceStage;//sourceStage记录的是Head对象的引用
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;//记录当前链表的深度
        }
    

    3、stream.forEach()

        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));
        }
    

    先来看一下ForEachOps.makeRef():

        public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                      boolean ordered) {
            Objects.requireNonNull(action);
            //创建一个ForEachOp对象,OfRef继承自ForEachOp
            //OfRef内部属性会记录action引用
            return new ForEachOp.OfRef<>(action, ordered);
        }
    

    创建完ForEachOp对象之后,接下来执行stream.evaluate()方法:

        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    		//因为是顺序流,接下来执行terminalOp.evaluateSequential()
    		//注意:这里的this对象是前面filter()方法创建出的StatelessOp对象
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
        //下面是terminalOp.evaluateSequential()方法
        //helper是filter()方法创建出的StatelessOp对象
        //this是ForEachOp对象
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                               Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }
        //下面是helper.wrapAndCopyInto()
        //入参sink是ForEachOp对象,ForEachOp实现了Sink接口
        final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
            copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
            return sink;
        }
    

    在wrapAndCopyInto()方法里面调用了两个很关键的方法wrapSink()和copyInto(),前者负责创建Sink对象链表,后者负责对每个元素执行Sink链表上的操作,注意调用的这两个方法属于StatelessOp对象,也就是filter()创建的对象,下面首先来看wrapSink()方法:

        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
            //从操作对象链表的最后开始向前遍历,因为Head对象的depth=0,所以遍历不会访问到Head
            //opWrapSink()将每个操作转换为Sink对象,然后将Sink作为入参接着向下传递
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            	//下面会以filter操作对象的opWrapSink()方法为例做介绍
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
        //下面是filter()方法里面创建StatelessOp对象时实现的方法,这个在上面已经展示过了
        //为了方便看,再展示一遍
        //下面这个方法将filter的StatelessOp操作对象转换为Sink对象
        //ChainedReference是Sink接口的抽象类
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
                        @Override
                        public void accept(P_OUT u) {
                        	//predicate是过滤条件,也就是filter()方法的入参
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            //下面是ChainedReference构造方法,使用属性downstream记录Sink对象链表上下一个Sink对象
            public ChainedReference(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    

    wrapSink()遍历操作对象链表,将每个操作对象转换为Sink对象,使用属性downstream记录下一个Sink对象,这样形成一个Sink对象链表,将链表传递给copyInto()方法。

        final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
            //下面这个if判断是检查终端操作是否是短路操作
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            	//调用Sink链表上每个begin()方法,表示下面要开始遍历流中的数据了
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                //forEachRemaining()用于遍历每个元素,入参为Sink链表
                spliterator.forEachRemaining(wrappedSink);
                //调用Sink链表上每个end()方法
                wrappedSink.end();
            }
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
    

    下面是spliterator.forEachRemaining()方法:

           public void forEachRemaining(Consumer<? super T> action) {
                Object[] a; int i, hi; // hoist accesses and checks from loop
                if (action == null)
                    throw new NullPointerException();
                //下面的while循环遍历每个元素
                if ((a = array).length >= (hi = fence) &&
                    (i = index) >= 0 && i < (index = hi)) {
                    //action对象是Sink链表上的头结点,也就是filter操作对应的Sink对象
                    do { action.accept((T)a[i]); } while (++i < hi);
                }
            }
    		//下面是filter操作对应的Sink对象的accept()方法
    		//这个方法在介绍wrapSink()时已经展示过,它是在创建Sink对象时使用匿名内部类实现的
            @Override
            public void accept(P_OUT u) {
                if (predicate.test(u))
                	//downstream()是Sink对象链表上的下一个Sink对象,
                	//如果符合filter条件便将元素交给下一个Sink处理
                    downstream.accept(u);
            }
    

    到这里为止,按照本小节的例子介绍了一遍执行流程,其他的Stream方法执行流程都是类似的,大家可以参考本小节的内容了解其他方法。

    从上面的源码可以看到,对流进行处理是在最后执行终端方法时才进行的。

    展开全文
  • Stream 串行 原理

    2019-01-10 18:49:17
    Stream 为什么会出现? Stream 出现之前,遍历一个集合最传统的做法大概是用 Iterator,或者 for 循环。这种两种方式都属于外部迭代,然而外部迭代存在着一些问题。 开发者需要自己手写迭代的逻辑,虽然大部分...

    Stream 为什么会出现?

    Stream 出现之前,遍历一个集合最传统的做法大概是用 Iterator,或者 for 循环。这种两种方式都属于外部迭代,然而外部迭代存在着一些问题。

    • 开发者需要自己手写迭代的逻辑,虽然大部分场景迭代逻辑都是每个元素遍历一次。

    • 如果存在像排序这样的有状态的中间操作,不得不进行多次迭代。

    • 多次迭代会增加临时变量,从而导致内存的浪费。

    虽然 Java 5 引入的 foreach 解决了部分问题,但也引入了新的问题。

    • foreach 遍历不能对元素进行赋值操作

    • 遍历的时候,只有当前被遍历的元素可见,其他不可见

    随着大数据的兴起,传统的遍历方式已经无法满足开发者的需求。

    就像小作坊发展到一定程度要变成大工厂才能满足市场需求一样。大工厂和小作坊除了规模变大、工人不多之外,最大的区别就是多了流水线。流水线可以将工人们更高效的组织起来,使得生产力有质的飞跃。

    所以不安于现状的开发者们想要开发一种更便捷,更实用的特性。

    • 它可以像流水线一样来处理数据

    • 它应该兼容常用的集合

    • 它的编码应该更简洁

    • 它应该具有更高的可读性

    • 它可以提供对数据集合的常规操作

    • 它可以拼装不同的操作

    经过不懈的能力,Stream 就诞生了。加上 lambda 表达式的加成,简直是如虎添翼。

    你可以用 Stream 干什么?

    下面以简单的需求为例,看一下 Stream 的优势:

    从一列单词中选出以字母a开头的单词,按字母排序后返回前3个。

    传统实现方式

    List<String> list = Lists.newArrayList("are", "where", "advance", "anvato", "java", "abc");
    
    List<String> tempList = Lists.newArrayList();
    
    List<String> result = Lists.newArrayList();
    
    for( int i = 0; i < list.size(); i++) {
    
        if (list.get(i).startsWith("a"))
    
            tempList.add(list.get(i));
    
    }
    
    tempList.sort(Comparator.naturalOrder());
    
    result = tempList.subList(0,3);

    Stream实现方式

    List<String> list = Arrays.asList("are", "where", "anvato", "java", "abc");
    
    List<String> result =
    
        list.stream()                //定义输入源,得到Stage0 Head节点(流)
    
          .filter(s -> s.startsWith("a"))   //定义中间操作,得到Stage1 StatelessOp节点(流) 
    
          .sorted()                //定义中间操作,得到Stage2 StatefulOp节点(流),SortedOps.OfRef实例
    
          .limit(3)                //定义中间操作,得到Stage3 StatefulOp节点(流),SliceOps的StatefulOp实例
    
          .collect(Collectors.toList());    //定义终端操作, TerminalOp节点(流),ReduceOp实例

    Stream 是怎么实现的?

    需要解决的问题:

    • 如何定义流水线?

    • 原料如何流入?

    • 如何让流水线上的工人将处理过的原料交给下一个工人?

    • 流水线何时开始运行?

    • 流水线何时结束运行?

    总观全局

    Stream 处理数据的过程可以类别成工厂的流水线。数据可以看做流水线上的原料,对数据的操作可以看做流水线上的工人对原料的操作。

    事实上 Stream 只是一个接口,并没有操作的缺省实现。最主要的实现是 ReferencePipeline,而 ReferencePipeline 继承自 AbstractPipeline ,AbstractPipeline 实现了 BaseStream 接口并实现了它的方法。但 ReferencePipeline 仍然是一个抽象类,因为它并没有实现所有的抽象方法,比如 AbstractPipeline 中的 opWrapSink()抽象方法,该方法是由具体的中间操作和终端操作来实现的 。ReferencePipeline内部定义了三个静态内部类,分别是:Head, StatelessOp, StatefulOp,但只有 Head 不再是抽象类。

    流水线的结构有点像双向链表,节点之间通过引用连接。节点可以分为三类,控制数据输入的节点、操作数据的中间节点和控制数据输出的节点。

    ReferencePipeline 包含了控制数据流入的 Head ,中间操作 StatelessOp, StatefulOp,终止操作 TerminalOp。

    Stream 常用的流操作包括:

    • 中间操作(Intermediate Operations)

      • 无状态(Stateless)操作:每个元素是彼此独立的。如 filter()、flatMap()、map()、peek()、unordered() 等

      • 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如 distinct()、sorted()、sorted(comparator)、limit()、skip() 等

    • 终止操作(Terminal Operations)

      • 非短路操作:处理完所有数据才能得到结果。如 collect()、reduce()、count()、forEach()、forEachOrdered()、max()、min()、toArray() 等。

      • 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不在处理所剩元素。如 anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() 等。

    源码分析

    了解了流水线的结构和定义,接下来我们基于上面的例子逐步看一下源代码。

    • 定义输入源 Head节点(Stage0阶段)

    stream() 是 Collection 中的 default 方法,实际上调用的是 StreamSupport.stream() 方法,返回的是 ReferencePipeline.Head的实例。

    ReferencePipeline.Head 的构造函数传递是 ArrayList 中实现的 spliterator 。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

    • 定义流水线节点 中间操作节点(Stage1..N阶段 StatelessOp流对象/StatefulOp流对象)

    filter() 是 Stream 中定义的方法,在 ReferencePipeline 中实现,返回 StatelessOp 的实例。

    可以看到 filter() 接收的参数是谓词,可以用 lambda 表达式。StatelessOp的构造函数接收了 this,也就是 ReferencePipeline.Head 实例的引用。并且实现了 AbstractPipeline 中定义的 opWrapSink 方法。

    public final Stream<E> filter(Predicate<? super E> predicate) {  //调用filter方法时,是当前流Stream,也即currentStage

     

        return new StatelessOp<E, E>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { 

     

        //所以这里的this,是代表当前流Stream的currentStage

                //new StatelessOp()是创建一个代表filter方法的中间操作,也即创建当前流Stream的下一个阶段流nextStage

                //new StatelessOp()构造函数内部的this不是currentStage,而是即将创建的nextStage

     

            Sink<E> opWrapSink(int flags, Sink<E> sink) {

                return new Sink.ChainedReference<E, E>(sink) {

     

                    public void begin(long size) {

                        downstream.begin(-1);

                    }

     

                    public void accept(E u) {

                        if (predicate.test(u))

                            downstream.accept(u);

                    }

                };

            }

        };

    }

    sorted() 和 limit() 的返回值和也都是 Stream 的实现类,并且都接收了 this 。

    sorted() 返回的是 ReferencePipeline.StatefulOp 的子类 SortedOps.OfRef 的实例。

    limit() 返回的是 ReferencePipeline.StatefulOp 的实例。

    现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的 sorted() 和 limit() 都只是仅仅返回一个包含上一节点引用的中间节点而已,其它啥都没做,这也就是 Stream 延时执行的特性原因之所在。

    有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作。

    参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。

    previousStage.nextStage = this;

    所以各个节点组成了一个双向链表的结构。

    • 组装流水线

    最后来看一下终止操作 .collect() 接收的是返回类型对应的 Collector。

    此例中的 Collectors.toList() 是 Collectors 针对 ArrayList 的创建的 CollectorImpl 的实例。

    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {

        A container;

        if (isParallel()

          && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))

          && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {

            container = collector.supplier().get();

            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();

            forEach(u -> accumulator.accept(container, u));

        }

        else {

            container = evaluate(ReduceOps.makeRef(collector));//1

        }

        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)

                ? (R) container

                : collector.finisher().apply(container);

    }

    先忽略并行的情况,来看一下加注释了1的代码:

    A:ReduceOps.makeRef 接收此 Collector 返回了一个 ReduceOp(实现了 TerminalOp 接口)的实例。

    B:返回的 ReduceOp 实例又被传递给 AbstractPipeline 中的 evaluate(){TerminalOp.evaluateSequential();} 方法。

    C:在evaluate()方法内,调用了 ReduceOp实例的evaluateSequential()方法,并将上流水线上最后一个节点ReducingSink的实例和 sourceSpliterator 传递进去。

    public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {

        return helper.wrapAndCopyInto(makeSink(), spliterator).get();

    }

    1. 然后调用 ReduceOp 实例的 makeSink() 方法返回其 makeRef() 方法内部类 ReducingSink 的实例。

    2. 接着 ReducingSink 的实例作为参数和 spliterator 一起传入最后一个节点的 wrapAndCopyInto() 方法,返回值是 Sink 。

    • 启动流水线

    流水线组装好了,现在就该启动流水线了。这里的核心方法是 wrapAndCopyInto,根据方法名也能看出来这里应该做了两件事,wrapSink() 和 copyInto()。

    wrapSink()

    将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。参见附录I可以发现

    每个节点都记录了上一节点的引用( previousStage )和每一个节点的深度( depth )。

    所以这个 for 循环是从当前Sink(终止操作的Sink)开始,倒退至第一个Sink结束(Stage0是Head节点,无Sink)。每一次循环都是将上一阶段的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

    (Head.combinedFlags,           //Stage1的中间filter的Sink,使用Stage0的combinedFlags

       (StatelessOp.combinedFlags,    //Stage2的中间sorted的Sink,使用Stage1的combinedFlags   

          (StatefulOp.combinedFlags,  //Stage3的中间limit的Sink,使用Stage2的combinedFlags   

            TerminalOp.sink      //终端操作的Sink

          )

       )

    )

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {

        for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {

            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);

        }

        return (Sink<P_IN>) sink;

    }

    copyInto()

    终于到了要真正开始迭代的时候,这个方法接收两个参数 Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator 。wrappedSink对应的是 Head节点后面的第一个操作节点(它相当于这串 Sink 的头),spliterator 对应着数据源。

    这个时候我们回过头看一下 Sink 这个接口,它继承自 Consumer 接口,又定义了 begin()、end()、cancellationRequested() 方法。Sink 直译过来是水槽,如果把数据流比作水,那水槽就是水会流过的地方。begin() 用于通知水槽的水要过来了,里面会做一些准备工作,同样 end() 是做一些收尾工作。cancellationRequested() 是判断是不是可以停下来了。Consumer里的accept() 是消费数据的地方。

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { //无短路操作的执行流

            wrappedSink.begin(spliterator.getExactSizeIfKnown()); //1

            spliterator.forEachRemaining(wrappedSink); //2

            wrappedSink.end(); //3

        }

        else { //有短路操作的执行流

            copyIntoWithCancel(wrappedSink, spliterator);

        }

    }

    有了完整的水槽链,就可以让水流进去了。copyInto() 里做了三个动作:

    1. 通知第一个水槽(Sink)水要来了,准备一下。

    2. 让水流进水槽(Sink)里。

    3. 通知第一个水槽(Sink)水流完了,该收尾了。

    注:图中蓝色线表示数据实际的处理流程。

    每一个 Sink 都有自己的职责,但具体表现各有不同。

    无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的 下游。

    有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

    例如 sorted() 就是一个有状态的操作,一般会有一个属于自己的容器,用来记录处自己理过的数据的状态。sorted() 是在执行 begin 的时候初始化这个容器,在执行 accept 的时候把数据放到容器中,最后在执行 end 方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

    最后数据流到终止节点,终止节点将数据收集起来就结束了。

    然后就没有然后了,copyInto() 返回类型是 void ,没有返回值。

    wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

    回顾

    1:准备Head:首先是将 Collection 转化为 Stream,也就是流水线的头。

    2:中间操作:然后将各个中间操作节点像拼积木一样拼接起来。每个中间操作节点都定义了自己对应的 Sink,并重写了 makeSink() 方法用来返回自己的 Sink 实例。

    3:终止操作:终止操作节点出现时才开始将 Sink 实例化并串起来。然后就是上面提到的那三步:通知、数据流入、结束。

    本文介绍和分析了最常规的 stream 用法和实现,实际上 stream 还有很多高阶用法,比如利用协程实现的并行流,本文并没介绍。

    参考

    附录I

    以下是初始化 Head 节点和中间操作的实现。

    /**

    * Constructor for the head of a stream pipeline.

    *

    * @param source {@code Spliterator} describing the stream source

    * @param sourceFlags the source flags for the stream source, described in {@link StreamOpFlag}

    * @param parallel {@code true} if the pipeline is parallel

    */

    //构造函数:初始化Head节点,也即初始化Stage0

    AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {

        this.previousStage = null;     //Head节点无前序stage, Head节点就是Stage0

        this.sourceSpliterator = source;

        this.sourceStage = this;

        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;

        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;

        this.depth = 0;

        this.parallel = parallel;

    }

     

    /**

    * Constructor for appending an intermediate operation stage onto an existing pipeline.

    *

    * @param previousStage the upstream pipeline stage

    * @param opFlags the operation flags for the new stage, described in {@link StreamOpFlag}

    */

    //构造函数:初始化中间操作StatelessOp和StatefulOp

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {

        if (previousStage.linkedOrConsumed)

            throw new IllegalStateException(MSG_STREAM_LINKED);

        previousStage.linkedOrConsumed = true; //这三句的作用能反应出:流对象只能被使用一次的概念

        previousStage.nextStage = this;   //双向链表,this代表当前中间操作,入参previousStage是调用该方法时的前序阶段

        this.previousStage = previousStage; //双向链表,

        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;

        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);

        this.sourceStage = previousStage.sourceStage;

        this.depth = previousStage.depth + 1;

        f (opIsStateful())

            sourceStage.sourceAnyStateful = true;

    }

     

     

    展开全文
  • 是不是又开始懵比了,哈哈,本文就运用一问三连的形式来进行争取不那么麻烦的解释Stream流式编程的实现原理。 Java8新特性系列总结: 《Lambda表达式你会吗》 《Stream流式编程知识总结》 Stream怎么用 其实上篇...

    Java8新特性系列:

    上一篇《Stream流式编程知识总结》我们主要针对Stream流式编程的具体使用方法进行了深入的探讨,但是如果再来一个一问三连what?-why?-then?是不是又开始懵比了,哈哈,本文就运用一问三连的形式来进行争取不那么麻烦的解释Stream流式编程的实现原理。

    Stream怎么用

    其实上篇已经讲过,Stream没用之前我们针对集合的便利帅选等操作更多的是for-loop/while-loop,用了Stream后发现原来代码可以如此简洁,并且越发形似SQL语句。甚至可以做很多复杂的动作:

    ap<Integer, List<String>> lowCaloricDishesNameGroup = 
        dishes.parallelStream() // 开启并行处理
              .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
              .sorted(comparing(Dish::getCalories)) // 按照热量进行排序
              .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
                  Dish::getCalories, 
                  Collectors.mapping(Dish::getName, Collectors.toList())
              ));
    

    Stream流式编程过滤操作哈哈图

    Stream的操作分类

    Stream使用一种类似SQL语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等。
    数据流在管道中经过中间操作(intermediate operation)处理,由终止操作(terminal operation)得到前面处理的结果。这些也在《Stream流式编程知识总结》有相应的说明。

    Stream操作分为两类:

    • 中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted等。
      中间操作又分为有状态(stateful)无状态(stateless)

      • 有状态:必须等上一步操作完拿到全部元素后才可操作,如sorted
      • 无状态:该操作的数据不收上一步操作的影响,如filter map
    • 终止操作:触发数据的流动,并收集结果,如collect findFirst forEach等。终止操作又分为短路操作(short-circuiting)非短路操作(non-short-circuiting)

      • 短路操作:会在适当的时刻终止遍历,类似于break,如anyMatch findFirst等
      • 非短路操作:会遍历所有元素,如collect max等

    Stream相关操作符分类

    Stream的实现过程

    Stream的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。

    Stream采用某种方式记录用户每一步的操作,中间操作会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,面对如此简洁高效的API不由得使我们有所疑问:

    • 用户的操作如何记录?
    • 操作如何叠加?
    • 叠加后的操作如何执行?
    • 执行后的结果(如果有)在哪里?

    操作如何记录

    Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将各Pipeline按照先后顺序连接到一起,就构成了整个流水线。

    与Stream相关类和接口的继承关系如下图,其中蓝色表示继承关系,绿色表示接口实现:

    Stream相关类和接口的继承关系图

    Head用于表示第一个Stage,该Stage不包含任何操作。StatelessOp和StatefulOp分别表示无状态和有状态的Stage。

    Stream流水线组织结构示意图

    其中:

    • Head记录Stream起始操作,将包装为Spliterator的原始数据存放在Stage中
    • StatelessOp记录无状态的中间操作
    • StatefulOp记录有状态的中间操作
    • TerminalOp用于触发数据数据在各Stage间的流动及处理,并收集最终数据(如果有)

    使用Collection.streamArrays.streamStream.of等接口会生成Head,其内部均采用StreamSupport.stream方法,将原始数据包装为Spliterator存放在Stage中。

    Head、StatelessOp、StatefulOp三个操作实例化会指向其父类AbstractPipeline。

    对于Head:

    /**
     * Constructor for the head of a stream pipeline.
     *
     * @param source {@code Spliterator} describing the stream source
     * @param sourceFlags the source flags for the stream source, described in
     * {@link StreamOpFlag}
     * @param parallel {@code true} if the pipeline is parallel
     */
    AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
    

    Head操作会将包装为Spliterator的原始数据存放在该Stage中,将自身存放sourceStage中,并把串并行操作也记录在内。Head的前期功能就是记录这些源数据。

    对于StatelessOp及StatefulOp:

    /**
     * Constructor for appending an intermediate operation stage onto an
     * existing pipeline.
     *
     * @param previousStage the upstream pipeline stage
     * @param opFlags the operation flags for the new stage, described in
     * {@link StreamOpFlag}
     */
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;
    
        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    中间操作通过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操作。

    每一个中间操作Stage中的sourceStage都指向前一个Stage的soureStage,如此递归,最终指向Head。卧槽,似乎是不是明白些啥了,接着往下看吧,现在仅仅是第一阶段。

    Stage记录操作双链表结构示意图

    操作如何叠加

    上一个问题解决了每一步操作数据源以及内部实现是怎么记录的,此时并没有执行,Stage只是保存了当前的操作,并不能确定下一个Stage需要何种操作,所以想要让pipeline运行起来,需要一种将所有操作叠加到一起的方案。

    Stream类库采用了Sink接口来协调各Stage之间的关系:

    interface Sink<T> extends Consumer<T> {
        /**
         * Resets the sink state to receive a fresh data set.  This must be called
         * before sending any data to the sink.  After calling {@link #end()},
         * you may call this method to reset the sink for another calculation.
         * @param size The exact size of the data to be pushed downstream, if
         * known or {@code -1} if unknown or infinite.
         *
         * <p>Prior to this call, the sink must be in the initial state, and after
         * this call it is in the active state.
         *
         * 开始遍历前调用,通知Sink做好准备
         */
        default void begin(long size) {}
    
        /**
         * Indicates that all elements have been pushed.  If the {@code Sink} is
         * stateful, it should send any stored state downstream at this time, and
         * should clear any accumulated state (and associated resources).
         *
         * <p>Prior to this call, the sink must be in the active state, and after
         * this call it is returned to the initial state.
         *
         * 所有元素遍历完成后调用,通知Sink没有更多元素了
         */
        default void end() {}
        
        /**
         * Indicates that this {@code Sink} does not wish to receive any more data.
         *
         * @implSpec The default implementation always returns false.
         *
         * @return true if cancellation is requested
         *
         * 是否可以结束操作,可以让短路操作尽早结束
         */
        default boolean cancellationRequested() {}
    
        /**
         * Accepts a value.
         *
         * @implSpec The default implementation throws IllegalStateException.
         *
         * @throws IllegalStateException if this sink does not accept values
         *
         * 遍历时调用,接收的一个待处理元素,并对元素进行处理。
         * Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前
         * Stage.accept方法即可
         */
        default void accept(T value) {}
    }
    

    其实Stream的各种操作实现的本质,就是如何重载Sink的这四个接口方法,各个操作通过Sink接口accept方法依次向下传递执行。

    下面结合具体源码来理解Stage是如何将自身的操作包装成Sink,以及Sink是如何将处理结果转发给下一个Sink的。

    无状态Stage(Stream.map):

    // Stream.map 将生成一个新Stream
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            // 该方法将回调函数(处理逻辑)包装成Sink
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        // 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    

    有状态Stage(Stream.sorted):

    private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        // 存放用于排序的元素
        private ArrayList<T> list;
    
        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }
    
        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            // 创建用于存放排序元素的列表
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        }
    
        @Override
        public void end() {
            // 只有在接收到所有元素后才开始排序
            list.sort(comparator);
            downstream.begin(list.size());
            // 排序完成后,将数据传递给下游Sink
            if (!cancellationWasRequested) {
                // 下游Sink不包含短路操作,将数据依次传递给下游Sink
                list.forEach(downstream::accept);
            }
            else {
                // 下游Sink包含短路操作
                for (T t : list) {
                    // 对于每一个元素,都要询问是否可以结束处理
                    if (downstream.cancellationRequested()) break;
                    // 将元素传递给下游Sink
                    downstream.accept(t);
                }
            }
            // 告知下游Sink数据传递完毕
            downstream.end();
            list = null;
        }
    
        @Override
        public void accept(T t) {
            // 依次将需要排序的元素加入到临时列表中
            list.add(t);
        }
    }
    

    Stream.sorted会在接收到所有元素之后再进行排序,之后才开始将数据依次传递给下游Sink。

    两个操作之间通过Sink接口的accept方法进行挂钩,此时如果从第一个Sink开始执行accept方法便可以把整个管道流动起来,但是这个“如果”怎么实现呢?另外记着每一个操作中的opWrapSink是用于包装Sink的,也就是说只有包装后的Sink才具有条件使得整个管道流动起来。

    叠加后的操作如何执行

    终止操作(TerminalOp)之后不能再有别的操作,终止操作会创建一个包装了自己操作的Sink,这个Sink只处理数据而不会将数据传递到下游Sink(没有下游了)。

    在调用Stream的终止操作时,会执行AbstractPipeline.evaluate:

    /**
     * Evaluate the pipeline with a terminal operation to produce a result.
     *
     * @param <R> the type of result
     * @param terminalOp the terminal operation to be applied to the pipeline.
     * @return the result
     */
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
    }
    

    最终会根据并行还是串行执行TerminalOp中不同的的evaluate方法。如果是串行执行,接下来在TerminalOp的evaluate方法中会调用wrapAndCopyInto来包装、串联各层Sink,触发pipeline,并获取最终结果。

    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    

    其中wrapSink(包装)实现:

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
    
        // AbstractPipeline.this,最后一层Stage
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            // 从下游向上游遍历,不断包装Sink
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */);
        }
        return (Sink<P_IN>) sink;
    }
    

    wrapSink方法通过下游Stage的“opWrapSink”方法不断将下游Stage的Sink从下游向上游遍历包装,最终得到上文我说的第一个Sink

    Sink包装执行链

    有了第一个Sink,如何执行呢,还记的wrapAndCopyInto中的copyInto吧:

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
    
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            // 不包含短路操作
            
            // 1. begin
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            // 2. 遍历调用 sink.accept
            spliterator.forEachRemaining(wrappedSink);
            // 3. end
            wrappedSink.end();
        }
        else {
            // 包含短路操作
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    
    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        @SuppressWarnings({"rawtypes","unchecked"})
        AbstractPipeline p = AbstractPipeline.this;
        while (p.depth > 0) {
            p = p.previousStage;
        }
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍历调用 sink.accept
        //    每一次遍历都询问cancellationRequested结果
        //    如果cancellationRequested为true,则中断遍历
        p.forEachWithCancel(spliterator, wrappedSink);
        // 3. end
        wrappedSink.end();
    }
    

    copyInto会根据不同的情况依次调用:

    • sink.bigin
    • sink.accept(遍历调用,如果包含短路操作,则每次遍历都需要询问cancellationRequested,适时中断遍历)
    • sink.end

    执行结果在哪儿

    每一种TerminalSink中均会提供一个获取最终结果的方法:

    各种TerminalSink获取最终结果图示

    TerminalOp通过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中:

    @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }
    

    Stream并行执行原理

    使用Collection.parallelStreamStream.parallel等方法可以将当前的Stream流标记为并行执行。

    上文提到在调用Stream的终止操作时,会执行AbstractPipeline.evaluate方法,根据paraller标识是执行并行操作还是串行操作:

    ...
    return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
    

    如果被标记为sequential,则会调用TerminalOp.evaluateSequential,evaluateSequential的调用过程上文已经讲述的很清楚。

    如果被标记为parallel,则会调用TerminalOp.evaluateParallel,对于该方法不同的TerminalOp会有不同的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果。

    ForkJoin在《Stream流式编程知识总结》有所说明。默认情况下,ForkJoin的线程数即为机器的CPU核数,如果想自定义Stream并行执行的线程数,可以参考Custom Thread Pools In Java 8 Parallel Streams

    最后

    本文详细讲述了Stream流的实现原理,刚开始研究的时候也是云里雾里,弄懂后才知道原来是“一波三折”,用这个词再合适不过了:

    1. Head包装最初的数据源,它不属于Stream流中的任何操作,并且Stream流中每一个操作都会指向Head,用于将来数据源便捷取出。
    2. 每一个操作都是一个Stage,每一个Stage都有上下游指针,使得每一个Stage进行挂钩,形成双链表。
    3. 每一个Stage都会通过Sink接口协议使得两个Stage之间的操作进行挂钩,上游执行下游。
    4. 终止操作根据“从下游向上游”原则依次包装Sink,最终得到第一个Sink。
    5. 从第一个Sink执行使得整个管道的流动,得到最终结果。

    我是i猩人,总结不易,转载注明出处,喜欢本篇文章的童鞋欢迎点赞、关注哦。

    参考

    • https://segmentfault.com/a/1190000018919146
    • https://www.cnblogs.com/Dorae/p/7779246.html
    • https://segmentfault.com/a/1190000019143092#
    展开全文
  • Java 8 - Stream流骚操作解读

    千次阅读 2021-03-08 11:49:36
    java.util.stream.Stream 中的 Stream 接口定义了许多操作。 我们来看个例子 可以分为两大类操作 filter 、 map 和 limit 可以连成一条流水线 collect 触发流水线执行并关闭它 可以连接起来的操作称为中间操作...
  • Java8的Stream流详解

    万次阅读 多人点赞 2018-07-24 13:10:22
    首先,Stream流有一些特性: Stream流不是一种数据结构,不保存数据,它只是在原数据集上定义了一组操作。 这些操作是惰性的,即每当访问到流中的一个元素,才会在此元素上执行这一系列操作。 Stream不保存数据,...
  • 主要介绍了Java Stream 实现合并操作,结合实例形式详细分析了Java Stream 实现合并操作原理与相关注意事项,需要的朋友可以参考下
  • 深入理解Java8中Stream的实现原理

    万次阅读 多人点赞 2019-02-25 15:40:26
    二.Stream的实现原理 1.一种直白的实现方式 2.Stream流水线解决方案 1).操作如何记录 2).操作如何叠加 3).叠加之后的操作如何执行 4).执行后的结果在哪里 一.容器执行Lambda表达式的方式 1.回顾 首先回顾...
  • Java8 Stream原理深度解析

    千次阅读 2019-06-03 11:30:16
    在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1所示。 表1-1 Stream的常用操作分类(表格引自这里) 如表1-1中所示,Stream中的操作可以分为两大类: 中间操作与结束...
  • Stream流,是对集合对象操作的增强 基本使用 比如有一个Person类的集合:List<Person> personList,可以通过stream()对集合中的元素进行操作, 下面的操作流程可以归纳为 过滤-映射-收集。 List<...
  • Java中的Stream的所有操作都是针对的,所以,使用Stream必须要得到Stream对象: 1、初始化一个Stream stream = Stream.of("a", "b", "c"); 2、数组转换为一个: String [] strArray = new String[] {...
  • 我们都知道在java 使用strem做多线程处理是非常方便的。 list.parallelStream().forEach(s -> { // 业务处理 }); 但是parallelStream是如何实现多线程处理的呢?其实看源码我们会发现parallelStream是使用...
  • JDK1.8新增的Stream的使用及原理

    万次阅读 多人点赞 2019-06-10 17:31:34
    在学习Apache Flink 的时候,经常遇到算子运算,其实这就是一种最常用的Stream流编程运算,今天就来研究一下Java 的JDK1.8提供的Stream流式编程,以便对后续学习Flink和Spark还有Strom这些流式和批式处理框架有所...
  •  java8新增了stream流的特性,能够让用户以函数式的方式、更为简单的操纵集合等数据结构,并实现了用户无感知的并行计算。 1.2从零开始实现一个stream流  相信很多人在使用过java8的streamAPI接口之后,都会对其...
  • 下面中的map和forEach函数式接口方法 通俗来讲,map其实就是类型转换的作用, 我只是测试所以还是返回的数组,forEach我用作输出控制台 int[] ints ={1,2,3}; Arrays.stream(ints).map(arr-&amp;amp;gt;{...
  • Stream并行详解

    千次阅读 2020-12-24 16:19:21
    1、并行与并发的区别 在说到并行的时候,相信很多人都会想到并发的概念。那么并行和并发两者一字之差,有什么区别呢?...2、并行流原理介绍 对于并行流,其在底层实现中,是沿用了Java7提供的fork/joi
  • Stream流】Sort排序详解

    千次阅读 2021-01-04 14:54:48
    很多时候由于需求的复杂性,很多直接从数据库查出的数据并不能直接返回前端,需要进行处理,处理之后又需要排序,这时候一般都会使用Stream流的Sort排序 场景一:普通排序 正序(升序) list=list.stream().sorted()...
  • Java 8-stream实现原理分析(一)

    千次阅读 2018-03-07 14:43:46
    背景介绍 ...来源 实现方式 代码分析 Sink执行分析 filter的begin() map的begin() sorted的begin() filter的accpet() map的accpet() sorted的accpet() filter的end() map的end() sorted的end() Re...
  • 主要介绍了JDK8并行及串行区别原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了node.js中stream流中可读流和可写流的实现与使用方法,结合实例形式分析了node.js stream流可读流和可写流基本分类、原理、定义、使用方法及相关注意事项,需要的朋友可以参考下
  • java8 Stream流 flatmap方法使用

    千次阅读 2020-09-09 17:55:06
    lists.stream().flatMap(Collection::stream) .forEach(map -> { map.forEach((k,v)->{ System.out.println("k:"+k+"\tvalue:"+v); }); }); 输出结果如下: k:a value:a k:c value:c k:b value:b 由此知道 flatmap ...
  • Java8 stream流分页(手动分页)

    千次阅读 2021-02-25 15:27:06
    项目中对于返回数据的分页,有时候不方便用PageHelper的时候们可以试试java8stream流中的分页,谨以此篇博客和大家分享下。 遇到数据库查询出来的结果还需要进行处理或过滤后分页这种情况可以采用stream里面的分页。...
  • Java Stream流之sorted

    千次阅读 2019-05-31 15:30:44
    List<Integer> list = new ArrayList(); list.add(1); list.add(4); list.add(9); list.add(6); list.add(2);...list.stream().sorted(Comparator.comparing(Integer::intValue).reversed())...
  • php中stream()的用法

    2020-12-18 10:28:19
    (stream)的概念源于UNIX中管道(pipe)的概念。在UNIX中,管道是一条不间断的字节,用来实现程序或进程间的通信,或读写外围设备、外部文件等。根据的方向又可以分为输入和输出,同时可以在其外围再套上其它...
  • Java8 Stream流遍历的方式

    千次阅读 2020-08-07 23:32:54
    Stream流是一个集合元素的函数模型,它并不是集合,也不是数据结构,其本身并不存储任何元素(或其地址值),它只是在原数据集上定义了一组操作。 Stream流不保存数据,Stream操作是尽可能惰性的,即
  • jdk8,stream流水线原理

    千次阅读 2017-07-21 00:25:31
    深入理解Java Stream流水线 前面我们已经学会如何使用Stream API,用起来真的很爽,但简洁的方法下面...本节我们学习Stream流水线的原理,这是Stream实现的关键所在。 首先回顾一下容器执行Lambda表达式的方式,以
  • 该博客没有Stream原理的讲述,这里只是教你如何使用Stream流map方法,看完这个博客。你会对Stream流编程会有简单的了解。若想了解其原理,我其他博客上有写。这里仅仅做一个入门级代码练习 前提准备数据 Person.java...
  • 我们在之前文章:集合之 Stream 流式操作 和Stream流 collect() 方法的详细使用介绍中使用到的示例,使用的都是串行的流,就是说在一个线程上执行的流。 JDK8 还为我们提供了并行的 Stream 流,即多线程执行的流。...
  • scala的stream流

    千次阅读 2018-10-13 19:17:48
    Scala的的实现在Stream中。 主要用到的实现类是Cons类。 @SerialVersionUID(-602202424901551803L) final class Cons[+A](hd: A, tl: =&gt; Stream[A]) extends Stream[A] { override def isEmpty = false...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 58,706
精华内容 23,482
关键字:

stream流原理