精华内容
下载资源
问答
  • 是不是又开始懵比了,哈哈,本文就运用一问三连的形式来进行争取不那么麻烦的解释Stream流式编程的实现原理。 Java8新特性系列总结: 《Lambda表达式你会吗》 《Stream流式编程知识总结》 Stream怎么用 其实上篇...

    Java8新特性系列:

    上一篇《Stream流式编程知识总结》我们主要针对Stream流式编程的具体使用方法进行了深入的探讨,但是如果再来一个一问三连what?-why?-then?是不是又开始懵比了,哈哈,本文就运用一问三连的形式来进行争取不那么麻烦的解释Stream流式编程的实现原理。

    Stream怎么用

    其实上篇已经讲过,Stream没用之前我们针对集合的便利帅选等操作更多的是for-loop/while-loop,用了Stream后发现原来代码可以如此简洁,并且越发形似SQL语句。甚至可以做很多复杂的动作:

    ap<Integer, List<String>> lowCaloricDishesNameGroup = 
        dishes.parallelStream() // 开启并行处理
              .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
              .sorted(comparing(Dish::getCalories)) // 按照热量进行排序
              .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
                  Dish::getCalories, 
                  Collectors.mapping(Dish::getName, Collectors.toList())
              ));
    

    Stream流式编程过滤操作哈哈图

    Stream的操作分类

    Stream使用一种类似SQL语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等。
    数据流在管道中经过中间操作(intermediate operation)处理,由终止操作(terminal operation)得到前面处理的结果。这些也在《Stream流式编程知识总结》有相应的说明。

    Stream操作分为两类:

    • 中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted等。
      中间操作又分为有状态(stateful)无状态(stateless)

      • 有状态:必须等上一步操作完拿到全部元素后才可操作,如sorted
      • 无状态:该操作的数据不收上一步操作的影响,如filter map
    • 终止操作:触发数据的流动,并收集结果,如collect findFirst forEach等。终止操作又分为短路操作(short-circuiting)非短路操作(non-short-circuiting)

      • 短路操作:会在适当的时刻终止遍历,类似于break,如anyMatch findFirst等
      • 非短路操作:会遍历所有元素,如collect max等

    Stream相关操作符分类

    Stream的实现过程

    Stream的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。

    Stream采用某种方式记录用户每一步的操作,中间操作会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,面对如此简洁高效的API不由得使我们有所疑问:

    • 用户的操作如何记录?
    • 操作如何叠加?
    • 叠加后的操作如何执行?
    • 执行后的结果(如果有)在哪里?

    操作如何记录

    Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将各Pipeline按照先后顺序连接到一起,就构成了整个流水线。

    与Stream相关类和接口的继承关系如下图,其中蓝色表示继承关系,绿色表示接口实现:

    Stream相关类和接口的继承关系图

    Head用于表示第一个Stage,该Stage不包含任何操作。StatelessOp和StatefulOp分别表示无状态和有状态的Stage。

    Stream流水线组织结构示意图

    其中:

    • Head记录Stream起始操作,将包装为Spliterator的原始数据存放在Stage中
    • StatelessOp记录无状态的中间操作
    • StatefulOp记录有状态的中间操作
    • TerminalOp用于触发数据数据在各Stage间的流动及处理,并收集最终数据(如果有)

    使用Collection.streamArrays.streamStream.of等接口会生成Head,其内部均采用StreamSupport.stream方法,将原始数据包装为Spliterator存放在Stage中。

    Head、StatelessOp、StatefulOp三个操作实例化会指向其父类AbstractPipeline。

    对于Head:

    /**
     * Constructor for the head of a stream pipeline.
     *
     * @param source {@code Spliterator} describing the stream source
     * @param sourceFlags the source flags for the stream source, described in
     * {@link StreamOpFlag}
     * @param parallel {@code true} if the pipeline is parallel
     */
    AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
    

    Head操作会将包装为Spliterator的原始数据存放在该Stage中,将自身存放sourceStage中,并把串并行操作也记录在内。Head的前期功能就是记录这些源数据。

    对于StatelessOp及StatefulOp:

    /**
     * Constructor for appending an intermediate operation stage onto an
     * existing pipeline.
     *
     * @param previousStage the upstream pipeline stage
     * @param opFlags the operation flags for the new stage, described in
     * {@link StreamOpFlag}
     */
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;
    
        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    中间操作通过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操作。

    每一个中间操作Stage中的sourceStage都指向前一个Stage的soureStage,如此递归,最终指向Head。卧槽,似乎是不是明白些啥了,接着往下看吧,现在仅仅是第一阶段。

    Stage记录操作双链表结构示意图

    操作如何叠加

    上一个问题解决了每一步操作数据源以及内部实现是怎么记录的,此时并没有执行,Stage只是保存了当前的操作,并不能确定下一个Stage需要何种操作,所以想要让pipeline运行起来,需要一种将所有操作叠加到一起的方案。

    Stream类库采用了Sink接口来协调各Stage之间的关系:

    interface Sink<T> extends Consumer<T> {
        /**
         * Resets the sink state to receive a fresh data set.  This must be called
         * before sending any data to the sink.  After calling {@link #end()},
         * you may call this method to reset the sink for another calculation.
         * @param size The exact size of the data to be pushed downstream, if
         * known or {@code -1} if unknown or infinite.
         *
         * <p>Prior to this call, the sink must be in the initial state, and after
         * this call it is in the active state.
         *
         * 开始遍历前调用,通知Sink做好准备
         */
        default void begin(long size) {}
    
        /**
         * Indicates that all elements have been pushed.  If the {@code Sink} is
         * stateful, it should send any stored state downstream at this time, and
         * should clear any accumulated state (and associated resources).
         *
         * <p>Prior to this call, the sink must be in the active state, and after
         * this call it is returned to the initial state.
         *
         * 所有元素遍历完成后调用,通知Sink没有更多元素了
         */
        default void end() {}
        
        /**
         * Indicates that this {@code Sink} does not wish to receive any more data.
         *
         * @implSpec The default implementation always returns false.
         *
         * @return true if cancellation is requested
         *
         * 是否可以结束操作,可以让短路操作尽早结束
         */
        default boolean cancellationRequested() {}
    
        /**
         * Accepts a value.
         *
         * @implSpec The default implementation throws IllegalStateException.
         *
         * @throws IllegalStateException if this sink does not accept values
         *
         * 遍历时调用,接收的一个待处理元素,并对元素进行处理。
         * Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前
         * Stage.accept方法即可
         */
        default void accept(T value) {}
    }
    

    其实Stream的各种操作实现的本质,就是如何重载Sink的这四个接口方法,各个操作通过Sink接口accept方法依次向下传递执行。

    下面结合具体源码来理解Stage是如何将自身的操作包装成Sink,以及Sink是如何将处理结果转发给下一个Sink的。

    无状态Stage(Stream.map):

    // Stream.map 将生成一个新Stream
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            // 该方法将回调函数(处理逻辑)包装成Sink
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        // 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    

    有状态Stage(Stream.sorted):

    private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        // 存放用于排序的元素
        private ArrayList<T> list;
    
        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }
    
        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            // 创建用于存放排序元素的列表
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        }
    
        @Override
        public void end() {
            // 只有在接收到所有元素后才开始排序
            list.sort(comparator);
            downstream.begin(list.size());
            // 排序完成后,将数据传递给下游Sink
            if (!cancellationWasRequested) {
                // 下游Sink不包含短路操作,将数据依次传递给下游Sink
                list.forEach(downstream::accept);
            }
            else {
                // 下游Sink包含短路操作
                for (T t : list) {
                    // 对于每一个元素,都要询问是否可以结束处理
                    if (downstream.cancellationRequested()) break;
                    // 将元素传递给下游Sink
                    downstream.accept(t);
                }
            }
            // 告知下游Sink数据传递完毕
            downstream.end();
            list = null;
        }
    
        @Override
        public void accept(T t) {
            // 依次将需要排序的元素加入到临时列表中
            list.add(t);
        }
    }
    

    Stream.sorted会在接收到所有元素之后再进行排序,之后才开始将数据依次传递给下游Sink。

    两个操作之间通过Sink接口的accept方法进行挂钩,此时如果从第一个Sink开始执行accept方法便可以把整个管道流动起来,但是这个“如果”怎么实现呢?另外记着每一个操作中的opWrapSink是用于包装Sink的,也就是说只有包装后的Sink才具有条件使得整个管道流动起来。

    叠加后的操作如何执行

    终止操作(TerminalOp)之后不能再有别的操作,终止操作会创建一个包装了自己操作的Sink,这个Sink只处理数据而不会将数据传递到下游Sink(没有下游了)。

    在调用Stream的终止操作时,会执行AbstractPipeline.evaluate:

    /**
     * Evaluate the pipeline with a terminal operation to produce a result.
     *
     * @param <R> the type of result
     * @param terminalOp the terminal operation to be applied to the pipeline.
     * @return the result
     */
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
    }
    

    最终会根据并行还是串行执行TerminalOp中不同的的evaluate方法。如果是串行执行,接下来在TerminalOp的evaluate方法中会调用wrapAndCopyInto来包装、串联各层Sink,触发pipeline,并获取最终结果。

    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    

    其中wrapSink(包装)实现:

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
    
        // AbstractPipeline.this,最后一层Stage
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            // 从下游向上游遍历,不断包装Sink
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */);
        }
        return (Sink<P_IN>) sink;
    }
    

    wrapSink方法通过下游Stage的“opWrapSink”方法不断将下游Stage的Sink从下游向上游遍历包装,最终得到上文我说的第一个Sink

    Sink包装执行链

    有了第一个Sink,如何执行呢,还记的wrapAndCopyInto中的copyInto吧:

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
    
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            // 不包含短路操作
            
            // 1. begin
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            // 2. 遍历调用 sink.accept
            spliterator.forEachRemaining(wrappedSink);
            // 3. end
            wrappedSink.end();
        }
        else {
            // 包含短路操作
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    
    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        @SuppressWarnings({"rawtypes","unchecked"})
        AbstractPipeline p = AbstractPipeline.this;
        while (p.depth > 0) {
            p = p.previousStage;
        }
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍历调用 sink.accept
        //    每一次遍历都询问cancellationRequested结果
        //    如果cancellationRequested为true,则中断遍历
        p.forEachWithCancel(spliterator, wrappedSink);
        // 3. end
        wrappedSink.end();
    }
    

    copyInto会根据不同的情况依次调用:

    • sink.bigin
    • sink.accept(遍历调用,如果包含短路操作,则每次遍历都需要询问cancellationRequested,适时中断遍历)
    • sink.end

    执行结果在哪儿

    每一种TerminalSink中均会提供一个获取最终结果的方法:

    各种TerminalSink获取最终结果图示

    TerminalOp通过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中:

    @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }
    

    Stream并行执行原理

    使用Collection.parallelStreamStream.parallel等方法可以将当前的Stream流标记为并行执行。

    上文提到在调用Stream的终止操作时,会执行AbstractPipeline.evaluate方法,根据paraller标识是执行并行操作还是串行操作:

    ...
    return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
    

    如果被标记为sequential,则会调用TerminalOp.evaluateSequential,evaluateSequential的调用过程上文已经讲述的很清楚。

    如果被标记为parallel,则会调用TerminalOp.evaluateParallel,对于该方法不同的TerminalOp会有不同的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果。

    ForkJoin在《Stream流式编程知识总结》有所说明。默认情况下,ForkJoin的线程数即为机器的CPU核数,如果想自定义Stream并行执行的线程数,可以参考Custom Thread Pools In Java 8 Parallel Streams

    最后

    本文详细讲述了Stream流的实现原理,刚开始研究的时候也是云里雾里,弄懂后才知道原来是“一波三折”,用这个词再合适不过了:

    1. Head包装最初的数据源,它不属于Stream流中的任何操作,并且Stream流中每一个操作都会指向Head,用于将来数据源便捷取出。
    2. 每一个操作都是一个Stage,每一个Stage都有上下游指针,使得每一个Stage进行挂钩,形成双链表。
    3. 每一个Stage都会通过Sink接口协议使得两个Stage之间的操作进行挂钩,上游执行下游。
    4. 终止操作根据“从下游向上游”原则依次包装Sink,最终得到第一个Sink。
    5. 从第一个Sink执行使得整个管道的流动,得到最终结果。

    我是i猩人,总结不易,转载注明出处,喜欢本篇文章的童鞋欢迎点赞、关注哦。

    参考

    • https://segmentfault.com/a/1190000018919146
    • https://www.cnblogs.com/Dorae/p/7779246.html
    • https://segmentfault.com/a/1190000019143092#
    展开全文
  • 本篇继上一篇《Lambda表达式你会吗》又一篇Java8新特性——流式编程,上篇文章中并没有采用Stream例子来装饰Lambda表达式,害怕有同学看不懂,所以在文章末尾留个彩蛋,本篇文章重点讲一下对Java8中流式编程的运用...

    Java8新特性系列:

    本篇继上一篇《Lambda表达式你会吗》又一篇Java8新特性——流式编程,上篇文章中并没有采用Stream例子来装饰Lambda表达式,害怕有同学看不懂,所以在文章末尾留个彩蛋,本篇文章重点讲一下对Java8中流式编程的运用学习。

    什么是Stream

    Stream它并不是一个容器,它只是对容器的功能进行了增强,添加了很多便利的操作,例如查找、过滤、分组、排序等一系列的操作。并且有串行并行两种执行模式,并行模式充分的利用了多核处理器的优势,使用fork/join框架进行了任务拆分,同时提高了执行速度。简而言之,Stream就是提供了一种高效且易于使用的处理数据的方式。

    特点:

    • Stream自己不会存储元素。
    • Stream操作不会改变源对象。相反,他们会返回一个持有结果的新Stream。
    • Stream操作是延迟执行的。它会等到需要结果的时候才执行。也就是执行终端操作的时候。

    图解:
    Stream操作流

    一个Stream的操作就如上图,在一个管道内,分为三个步骤:

    • 第一步是创建Stream,从集合、数组中获取一个流;
    • 第二步是中间操作链,对数据进行处理;
    • 第三步是终端操作,用来执行中间操作链,返回结果;

    为什么需要流式操作

    集合API是Java API中最重要的部分。基本上每一个java程序都离不开集合。尽管很重要,但是现有的集合处理在很多方面都无法满足需要。

    一个原因是,许多其他的语言或者类库以声明的方式来处理特定的数据模型,比如SQL语言,你可以从表中查询,按条件过滤数据,并且以某种形式将数据分组,而不必需要了解查询是如何实现的——数据库帮你做所有的脏活。这样做的好处是你的代码很简洁。很遗憾,Java没有这种好东西,你需要用控制流程自己实现所有数据查询的底层的细节。

    其次是你如何有效地处理包含大量数据的集合。理想情况下,为了加快处理过程,你会利用多核架构。但是并发程序不太好写,而且很容易出错。

    Stream API很好的解决了这两个问题。它抽象出一种叫做流的东西让你以声明的方式处理数据,更重要的是,它还实现了多线程:帮你处理底层诸如线程、锁、条件变量、易变变量等等。

    怎么创建Stream

    常用的Stream有三种创建方式:

    • 集合 Collection.stream()
    • 数组 Arrays.stream
    • 静态方法 Stream.of

    由集合创建

    Java8 中的 Collection 接口被扩展,提供了两个获取流的方法,这两个方法是default方法,也就是说所有实现Collection接口的接口都不需要实现就可以直接使用:

    1. default Stream stream() : 返回一个串行流。
    2. default Stream parallelStream() : 返回一个并行流。
    List<Integer> integerList = new ArrayList<>();
    integerList.add(1);
    integerList.add(2);
    Stream<Integer> stream = integerList.stream();
    Stream<Integer> stream1 = integerList.parallelStream();
    

    由数组创建

    Java8 中的 Arrays 的静态方法 stream() 可以获取数组流:

    int[] array = {1,2,3};
    Stream<Integer> stream = Arrays.stream(array);
    

    由静态方法Stream.of创建

    可以使用静态方法 Stream.of(), 通过显示值 创建一个流。它可以接收任意数量的参数。

    Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
    

    准备数据

    //计算机俱乐部
    private static List<Student> computerClub = Arrays.asList(
            new Student("2015134001", "小明", 15, "1501"),
            new Student("2015134003", "小王", 14, "1503"),
            new Student("2015134006", "小张", 15, "1501"),
            new Student("2015134008", "小梁", 17, "1505")
    );
    //篮球俱乐部
    private static List<Student> basketballClub = Arrays.asList(
            new Student("2015134012", "小c", 13, "1503"),
            new Student("2015134013", "小s", 14, "1503"),
            new Student("2015134015", "小d", 15, "1504"),
            new Student("2015134018", "小y", 16, "1505")
    );
    //乒乓球俱乐部
    private static List<Student> pingpongClub = Arrays.asList(
            new Student("2015134022", "小u", 16, "1502"),
            new Student("2015134021", "小i", 14, "1502"),
            new Student("2015134026", "小m", 17, "1504"),
            new Student("2015134027", "小n", 16, "1504")
    );
    
    private static List<List<Student>> allClubStu = new ArrayList<>();
    allClubStu.add(computerClub);
    allClubStu.add(basketballClub);
    allClubStu.add(pingpongClub);
    

    以上数据用于下边的Stream的中间操作和终止操作实例说明。

    Stream中间操作

    如果Stream只有中间操作是不会执行的,当执行终端操作的时候才会执行中间操作,这种方式称为延迟加载或惰性求值。多个中间操作组成一个中间操作链,只有当执行终端操作的时候才会执行一遍中间操作链,下面看下Stream有哪些中间操作。

    distinct

    distinct: 对于Stream中包含的元素进行去重操作(去重逻辑依赖元素的equals方法),新生成的Stream中没有重复的元素;

    distinct方法示意图

    List<String> list = Arrays.asList("b","b","c","a");
    list.forEach(System.out::print); //bbca
    list.stream().distinct().forEach(System.out::print);//bca
    

    filter

    filter: 对于Stream中包含的元素使用给定的过滤函数进行过滤操作,新生成的Stream只包含符合条件的元素;

    filter方法示意图

    //筛选1501班的学生
    computerClub.stream().filter(e -> e.getClassNum().equals("1501")).forEach(System.out::println);
    //筛选年龄大于15的学生
    List<Student> collect = computerClub.stream().filter(e -> e.getAge() > 15).collect(Collectors.toList());
    

    map

    map: 对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。

    这个方法有三个对于原始类型的变种方法,分别是:mapToInt,mapToLong和mapToDouble。这三个方法也比较好理解,比如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。之所以会有这样三个变种方法,可以免除自动装箱/拆箱的额外消耗;

    map方法示意图

    //篮球俱乐部所有成员名 + 暂时住上商标^_^,并且获取所有队员名
    List<String> collect1 = basketballClub.stream()
            .map(e -> e.getName() + "^_^")
            .collect(Collectors.toList());
    collect1.forEach(System.out::println);
    //小c^_^^_^
    //小s^_^^_^
    //小d^_^^_^
    //小y^_^^_^
    

    flatMap

    flatMap:和map类似,不同的是其每个元素转换得到的是Stream对象,会把子Stream中的元素压缩到父集合中;

    flatMap方法示意图

    //获取年龄大于15的所有俱乐部成员
    List<Student> collect2 = Stream.of(basketballClub, computerClub, pingpongClub)
            .flatMap(e -> e.stream().filter(s -> s.getAge() > 15))
            .collect(Collectors.toList());
    collect2.forEach(System.out::println);
    
    //用双层list获取所有年龄大于15的俱乐部成员
    List<Student> collect3 = allClubStu.stream()
            .flatMap(e -> e.stream().filter(s -> s.getAge() > 15))
            .collect(Collectors.toList());
    collect3.forEach(System.out::println);
    

    peek

    peek: 生成一个包含原Stream的所有元素的新Stream,同时会提供一个消费函数(Consumer实例),新Stream每个元素被消费的时候都会执行给定的消费函数;

    peek方法示意图

    //篮球俱乐部所有成员名 + 赞助商商标^_^,并且获取所有队员详细内容
    List<Student> collect = basketballClub.stream()
            .peek(e -> e.setName(e.getName() + "^_^"))
            .collect(Collectors.toList());
    collect.forEach(System.out::println);
    //Student{idNum='2015134012', name='小c^_^', age=13, classNum='1503'}
    //Student{idNum='2015134013', name='小s^_^', age=14, classNum='1503'}
    //Student{idNum='2015134015', name='小d^_^', age=15, classNum='1504'}
    //Student{idNum='2015134018', name='小y^_^', age=16, classNum='1505'}
    

    limit

    limit: 对一个Stream进行截断操作,获取其前N个元素,如果原Stream中包含的元素个数小于N,那就获取其所有的元素;

    limit方法示意图

    List<String> list = Arrays.asList("a","b","c");
    //获取list中top2即截断取前两个
    List<String> collect1 = list.stream().limit(2).collect(Collectors.toList());
    collect1.forEach(System.out::print);//ab
    

    skip

    skip: 返回一个丢弃原Stream的前N个元素后剩下元素组成的新Stream,如果原Stream中包含的元素个数小于N,那么返回空Stream;

    skip方法示意图

    List<String> list = Arrays.asList("a","b","c");
    //获取list中top2即截断取前两个
    List<String> collect1 = list.stream().skip(2).collect(Collectors.toList());
    collect1.forEach(System.out::print);//c
    

    sorted

    sorted有两种形式存在:

    1. sorted(Comparator): 指定比较规则进行排序。
    2. sorted(): 产生一个新流,按照自然顺序排序。
    List<String> list = Arrays.asList("b","c","a");
    //获取list中top2即截断取前两个
    List<String> collect1 = list.stream().sorted().collect(Collectors.toList());
    collect1.forEach(System.out::print);//abc
    

    Stream的终端操作

    如果说Stream中间操作返回的是Stream,那么终端操作返回的就是最终转换需要返回的结果。

    汇聚操作:

    • foreach(Consumer c) 遍历操作
    • collect(Collector) 将流转化为其他形式

    其中Collectors具体方法有:

    • toList List 把流中元素收集到List
    • toSet Set 把流中元素收集到Set
    • toCollection Coolection 把流中元素收集到Collection中
    • groupingBy Map<K,List> 根据K属性对流进行分组
    • partitioningBy Map<boolean, List> 根据boolean值进行分组

    栗子:

    //此处只是演示 此类需求直接用List构造器即可
    List<Student> collect = computerClub.stream().collect(Collectors.toList());
    Set<Student> collect1 = pingpongClub.stream().collect(Collectors.toSet());
    
    //注意key必须是唯一的 如果不是唯一的会报错而不是像普通map那样覆盖
    Map<String, String> collect2 = pingpongClub.stream()
            .collect(Collectors.toMap(Student::getIdNum, Student::getName));
    
    //分组 类似于数据库中的group by
    Map<String, List<Student>> collect3 = pingpongClub.stream()
            .collect(Collectors.groupingBy(Student::getClassNum));
    
    //字符串拼接 第一个参数是分隔符 第二个参数是前缀 第三个参数是后缀
    String collect4 = pingpongClub.stream().map(Student::getName).collect(Collectors.joining(",", "【", "】")); //【小u,小i,小m,小n】
    
    //三个俱乐部符合年龄要求的按照班级分组
    Map<String, List<Student>> collect5 = Stream.of(basketballClub, pingpongClub, computerClub)
            .flatMap(e -> e.stream().filter(s -> s.getAge() < 17))
            .collect(Collectors.groupingBy(Student::getClassNum));
    
    //按照是否年龄>16进行分组 key为true和false
    ConcurrentMap<Boolean, List<Student>> collect6 = Stream.of(basketballClub, pingpongClub, computerClub)
            .flatMap(Collection::stream)
            .collect(Collectors.groupingByConcurrent(s -> s.getAge() > 16));
    

    匹配操作

    • booelan allMatch(Predicate) 都符合
    • boolean anyMatch(Predicate) 任一元素符合
    • boolean noneMatch(Predicate) 都不符合
    boolean b = basketballClub.stream().allMatch(e -> e.getAge() < 20);
    boolean b1 = basketballClub.stream().anyMatch(e -> e.getAge() < 20);
    boolean b2 = basketballClub.stream().noneMatch(e -> e.getAge() < 20);
    

    寻找操作

    • findFirst——返回第一个元素
    • findAny——返回当前流中的任意元素
    Optional<Student> first = basketballClub.stream().findFirst();
    if (first.isPresent()) {
        Student student = first.get();
        System.out.println(student);
    }
    
    Optional<Student> any = basketballClub.stream().findAny();
    if (any.isPresent()) {
        Student student2 = any.get();
        System.out.println(student2);
    }
    Optional<Student> any1 = basketballClub.stream().parallel().findAny();
    System.out.println(any1);
    

    计数和极值

    • count 返回流中元素的总个数
    • max(Comparator) 返回流中最大值
    • min(Comparator) 返回流中最小值
    long count = basketballClub.stream().count();
    Optional<Student> max = basketballClub.stream().max(Comparator.comparing(Student::getAge));
    if (max.isPresent()) {
        Student student = max.get();
    }
    Optional<Student> min = basketballClub.stream().min(Comparator.comparingInt(Student::getAge));
    if (min.isPresent()) {
        Student student = min.get();
    }
    

    Fork/Join框架

    上面我们提到过,说Stream的并行模式使用了Fork/Join框架,这里简单说下Fork/Join框架是什么?Fork/Join框架是java7中加入的一个并行任务框架,可以将任务拆分为多个小任务,每个小任务执行完的结果再合并成为一个结果。在任务的执行过程中使用工作窃取(work-stealing)算法,减少线程之间的竞争。

    Fork/Join图解:

    Fork/Join图解

    工作窃取图解:

    工作窃取图解

    什么是工作窃取算法?说白了就是多线程同步执行,当一个线程把自己队列任务完成后去“窃取”其他线程队列任务继续干。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

    最后

    本篇文章大量的图片示例采用了RxJava图示说明,其实RxJava编程思想虽然是响应式编程,但是其操作符转换和流式编程如出一辙,本篇文章主要讲述Stream流式编程的认识和运用,有兴趣的小伙伴可以继续深入了解一下Stream的工作原理。

    我是i猩人,总结不易,转载注明出处,喜欢本篇文章的童鞋欢迎点赞、关注哦。

    展开全文
  • Stream流式编程在JDK 1.8版本中与Lambda表达式一起推出,这一特性标志着JDK 1.8成为Java历史上的又一里程...今天来看一看Stream流式编程在数据分析方面的应用及原理,并且这里会再次带大家回顾一下Lambda表达式的使用。

    Stream流式编程在JDK 1.8版本中与Lambda表达式一起推出,这一特性标志着JDK 1.8成为Java历史上的又一里程碑。之前在公司接手的关于数据分析及展示需求中就会频繁地使用到Stream流式编程。今天来看一看Stream流式编程在数据分析方面的应用及原理,并且这里会再次带大家回顾一下Lambda表达式的使用。

    一、Lambda表达式

    之前写过关于JDK函数式编程的文章,可以先进行了解:
    https://blog.csdn.net/pbrlovejava/article/details/85226974

    1.1、基本使用

    在介绍Stream流式编程之前,需要先了解Lambda表达式的使用及基本原理。

    一般使用Lambda表达式的场景是优化匿名内部类的繁琐声明,以达到简化代码的效果,譬如在JDK 1.7版本,我们需要声明一个线程(Thread)并且包含一个匿名任务(Runnable)时,我们会这样做:

    • JDK 1.7版本
    public static void main(String[] args) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    // 具体的任务执行内容
                }
            };
            Thread thread = new Thread(task);
            thread.start();
        }
    

    这里的task的本质是一个实现了Runnable接口的匿名内部类,最后在new Thread(task)时将其传入构造方法中,以创建一个线程来执行这段任务。

    当然,我们还可以进一步进行简化:

    • JDK 1.7版本(简化)
    public static void main(String[] args) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    // 具体的任务执行内容
                }
            });
            thread.start();
        }
    

    我们可以直接在创建Thread时,直接在构造函数中传入new Runnable并重写run方法,实现具体的任务执行内容。

    此时,若没有Idea的帮助,写出这段代码你至少需要记得:

    1. Thread类构造方法可传入类型
    2. 传入类型需要重写的方法

    但是这种写法很麻烦,我们明明知道这样写不过是为了符合当初的构造定义,但是确实有些脱裤子放屁的意思,此时,Lambda表达式的出现解决了这一问题:

    我们只需要记住匿名内部类需要被重写的方法参数及返回值,剩下的,交给Lambda就好了!

    • JDK 1.8(Lambda)
    public static void main(String[] args) {
            Thread thread = new Thread(()->{
                // 具体的任务执行内容
            });
            thread.start();
        }
    

    多么地简单明了?现在代码看起来变得更易于阅读了。下面再举几个例子来看看:

    • 实现大顶堆(JDK 1.7)
    public static void main(String[] args) {
        PriorityQueue<Integer> maxHeap = new PriorityQueue<>(new Comparator<Integer>() {
            @Override
            public int compare(Integer x, Integer y) {
                return y - x;
            }
        });
    }
    
    • 实现大顶堆(JDK 1.8 Lambda)
    public static void main(String[] args) {
        PriorityQueue<Integer> maxHeap = new PriorityQueue<>((x, y) -> y - x);
    }
    

    是不是可以很明显地看出差距了呢,使用Lambda表达式只需要一行就可以声明一个大顶堆,而不需要手动创建比较器、重写比较器方法这两个步骤。

    最后需要注意的是:在使用Lambda表达式去代替匿名接口重写方法时,这个接口有且只有唯一的一个抽象方法,不然Lambda表达式无法判断需要重写哪个方法。

    1.2、实现原理

    看完了基本使用后,现在来了解一下Lambda表达式的实现原理。

    还是上面那段代码,Debug可以发现:
    在这里插入图片描述
    当执行到Lambda表达式的时候,其实是通过字节码技术与反射技术,生成了相应的匿名内部类实现。(因为是在内存中生成,所以只能通过Idea查看运行时状态)

    所以使用Lambda表达式时,相较于普通的匿名内部类方式会产生额外的字节码生成及反射带来的效率损失,但是只要不是特别频繁地、多地的使用,一般察觉不到。

    更深入地了解请阅读:

    https://cloud.tencent.com/developer/article/1328370

    二、Stream流式编程

    Stream流式编程的使用依赖于Lambda表达式,是JDK 1.8的一大特点所在。使用Stream流式编程可以很方便和灵活地处理数据,而不需要通过数据库去做额外的处理,所以被广泛地应用在数据处理相关程序中。

    2.1、Stream API 和 Lambda Expression实现遍历的Demo

    现在假设我们需要获取List< String >中首字母为a的数据,可以有以下两种写法,我们可以发现使用流式编程和Lambda表达式后会更加地凝练:
    获取流-过滤流-遍历流。

    		 String[] arrays = {"a1","a2","b","c"};
            //将arrays转化为List<String>
            List<String> stringList = Arrays.asList(arrays);
    
            //原始的处理方法
            for (String s : stringList) {
                if(s.charAt(0) == 'a'){
                    System.out.println(s);
                }
            }
            
            //Stream API 结合 Lambda Expression
            stringList.stream()
                         .filter(s->s.charAt(0) == 'a')
                         .forEach(s-> System.out.println(s));
    
    
    

    2.2、Stream常用方法

    Stream是在Java 8之后更新的一种流,它不同于io流中的InputStream、OutputStream等,准确地说,这个位于java.util下的Stream和io中的流毫无关系。这里的Stream是数据流和对象流。

    2.2.1、 of(T… values)

    要把List、Set转化为数据流可以使用xxxList.stream()或者使用Stream.of(T…values),T…values代表着数组或者是不定数量的数据,它们会按顺序转换成数据流。

    • 获得Stream的三种方式
    		//1
            Stream<String> stringStream1 = Stream.of(new String[]{"a","b","c"});
            //2
            Stream<String> stringStream2 = Stream.of("a","b","c");
            //3
            Stream<String> stringStream3 = stringList.stream();
    

    2.2.2、filter(Predicate<? super T> predicate)

    filter用以将流按需过滤成新的流,需要传入的参数为一个位于java.util.function下的Predicate接口并重写test方法去进行校验:

    Stream newStream = stringList.stream().filter(new Predicate() {
                @Override
                public boolean test(Object s) {
                    if (s.toString().charAt(0) == 'a') {
                        return true;
                    }
                    return false;
                }
            });
    

    利用Lambda表达式,我们可以将上述代码简化为:

    Stream newStream = stringList.stream().filter(s->s.charAt(0) == 'a');
    

    2.2.3、 forEach(Consumer<? super T> action)

    对此流的每一个元素进行操作,需要传入的参数为Consumer接口并且实现其accept方法:

    stringStream.forEach(new Consumer(){
                @Override
                public void accept(Object s) {
                    System.out.println(s);
                }
            });
    

    结合Lambda表达式:

    stringStream.forEach(s->System.out.println(s));
    

    2.2.4、map(Function<? super T,? extends R> mapper)

    map方法的作用是对Stream进行处理并且返回一个其他对象充当原Stream。

    • 将数据转换为大写
     String[] arrays = {"a1","a2","b","c"};
            //将arrays转化为List<String>
            List<String> stringList = Arrays.asList(arrays);
            List<String> collect = stringList.stream()
                    .map(String::toUpperCase)
                    .collect(Collectors.toList());
            System.out.println(collect);
    

    需要说明的是,map方法需要传入的参数是一个函数式方法,可以使用lambda表达式也可直接使用双冒号表达式(现在可以将双冒号表达式::理解为对象通过::调用方法并且传入当前的数据作为参数);而collect方法则是将经过map处理的流“收集”起来形成新的流,传递参数Collectors.toList()表示以List的形式转化流。

    • 删除末尾的数字
    List<String> collect1 = Stream.of(arrays)
                    .filter(v -> v != null)
                    .map(v -> {
                        if (v.length() == 2) {
                            //删除尾部
                            v = v.substring(0, v.length()-1);
                        }
                        //返回最终结果
                        return v;
                    })
                    .collect(Collectors.toList());
            System.out.println(collect1);
    
    • 将Person对象转化为Student对象
        class Person implements Serializable {
                //编号
                private int id;
                //姓名
                private String name;
                //年龄
                private int age;
                //getter setter...
            }
    
            class Student implements Serializable {
                //学号
                private int schoolId;
                //姓名
                private String name;
                //年龄
                private int age;
                //getter setter...
            }
    
    
            public static void main(String[] args) {
                List<Person> personList = Arrays.asList(new Person(1, "lily", 18)
                    , new Person(2, "arong", 19)
                    , new Person(3, "joke", 20));
            //将PersonList转化为Studentlist
            List<Student> studentList = personList.stream().map(p -> {
                Student student = new Student();
                student.setSchoolId(3111000 + p.getId());
                student.setName(p.getName());
                student.setAge(p.getAge());
                //将转化好的student作为结果返回
                return student;
            }).collect(Collectors.toList());
            
            System.out.println(studentList.toString());
    
            }
        }
    

    三、使用Stream和Lambda表达式进行数据处理

    3.1、基本应用

    现在来看看Stream流式编程在数据处理方面的一些优势。

    假设有这么一个需求,前端需要获取到后端这边提供的基础监控数据用以展示,基础监控数据和MySQL交互的VO已经封装成如下:

    • BasicDataVO
    /**
     * @Auther: ARong
     * @Date: 2020/7/9 5:59 下午
     * @Description: 基础数据VO
     */
    public class BasicDataVO {
        private String name; // 数据名称
        private String type; // 数据类别
        private int count;   // 数据数量
    
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getType() {
            return type;
        }
        public void setType(String type) {
            this.type = type;
        }
        public int getCount() {
            return count;
        }
        public void setCount(int count) {
            this.count = count;
        }
    
        @Override
        public String toString() {
            return "BasicDataVO{" +
                    "name='" + name + '\'' +
                    ", type='" + type + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
    

    现在需要在一个页面中展示如下数据表格:

    1. 数量前三的基础数据(根据数量排序)
    2. 类别为"国内"的基础数据(根据数量排序)
    3. 将类别"海外"的基础数据中的数量进行精度修正并和全量数据一起返回(根据数量进行排序)

    如果按正常的编程逻辑,这3个需求需要查询3次数据库,跑3个不同的SQL以组合成所需的数据集:

    • BasicDataController
    @Controller
    public class BasicDataController {
        @RequestMapping("/getBasicData")
        public Map<String, List<BasicDataVO>> getBasicData() {
            HashMap<String, List<BasicDataVO>> res = new HashMap<String, List<BasicDataVO>>();
            // 数量前三的基础数据(根据数量排序)
            List<BasicDataVO> dataList1 = queryLimit3OrderByCount();
            // 类别为"国内"的基础数据(根据数量排序)
            List<BasicDataVO> dataList2 = queryType1OrderByCount();
            // 将类别"海外"的基础数据中的数量进行精度修正并和全量数据一起返回(根据数量进行排序)
            List<BasicDataVO> dataList3 = queryType2AndFixDataOrderByCount();
            // 数据封装
            res.put("list1", dataList1);
            res.put("list2", dataList2);
            res.put("list3", dataList3);
            return res;
        }
    }
    

    当然这是没有问题的,但是为了3份区别不大的数据进行了3次MySQL连接和查询,这其实是比较浪费资源的,所以另外的方案就是先查询出通用数据,然后在通用数据的基础上使用Steam流式编程与Lambda表达式获取到所需的定制化数据:

    • BasicDataController
    @Controller
    public class BasicDataController {
        @RequestMapping("/getBasicData")
        public Map<String, List<BasicDataVO>> getBasicData() {
            HashMap<String, List<BasicDataVO>> res = new HashMap<String, List<BasicDataVO>>();
            // 获取通用数据
            List<BasicDataVO> dataList = queryBasicData();
            // 数量前三的基础数据(根据数量排序)
            List<BasicDataVO> dataList1 = dataList.stream()
                    .sorted((x, y) -> x.getCount() - y.getCount())
                    .limit(3)
                    .collect(Collectors.toList());
            // 类别为"国内"的基础数据(根据数量排序)
            List<BasicDataVO> dataList2 = dataList.stream()
                    .filter(x -> "国内服务器".equals(x.getType()))
                    .sorted((x, y) -> x.getCount() - y.getCount())
                    .collect(Collectors.toList());
            // 将类别"海外"的基础数据中的数量进行精度修正并和全量数据一起返回(根据数量进行排序)
            // 此处存在浅拷贝问题
            List<BasicDataVO> dataList3 = dataList.stream().map(x -> {
                x.setCount(fixData(x.getName()));
                return x;
            })
            .sorted((x, y) -> x.getCount() - y.getCount())
            .collect(Collectors.toList());
            // 数据封装
            res.put("list1", dataList1);
            res.put("list2", dataList2);
            res.put("list3", dataList3);
            return res;
        }
    }
    

    3.2、基本原理

    Stream流的原理和实现是很复杂的,这里只是作为简单学习和了解,需要深入了解请点击这篇文章,讲的很好:

    https://www.cnblogs.com/CarpenterLee/p/6637118.html

    当使用list.stream()的时候,会返回一个Stream对象,这是因为在JDK 1.8中,开发者们修改了Collection,即集合框架的顶层接口,并在里头新增了一个钩子方法:

    • Collection
    public interface Collection<E> extends Iterable<E> {
    
        default Stream<E> stream() {
                return StreamSupport.stream(spliterator(), false);
        }
    
    }
    

    这个钩子方法就是stream(),然后通过StreamSupport这个类获取到相应的流对象。

    • StreamSupport
     public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
    }
    

    可以看到,StreamSupport会新创建一个ReferencePipeline.Head这个类,看到Head会不会有些眼熟?是的,整个Stream是通过一个双向链表组织起来的,每一个阶段会对应着一个Stream:

    在这里插入图片描述

    展开全文
  • 文章目录前言一、示例二、原理总结 前言 有时候需要将多个流合并到一个流中,这是就需要通过flatMap对流进行扁平化操作。 一、示例 对字符串进行拆分,返回每个字符 List<String> data = Arrays.asList(...

    flatMap可以将流扁平化


    前言

    有时候需要将多个流合并到一个流中,这是就需要通过flatMap对流进行扁平化操作。


    一、示例

    对字符串进行拆分,返回每个字符

    	List<String> data = Arrays.asList("hello", "world"
    展开全文
  • final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { //没有标记停止,并且尝试操作流中下一个元素 do { } while (!sink.cancellationRequested() &...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,248
精华内容 6,499
关键字:

流式编程原理