stream_streamlit - CSDN
stream 订阅
Stream泛指流媒体技术。流媒体实际指的是一种新的媒体传送方式,而非一种新的媒体,是指采用流式传输的方式在Internet播放的媒体格式。可指大河,也可指小河或小溪,指小溪时与brook,creek同义。creek侧重其狭长蜿蜒,缓缓流动,且多流入大河或湖泊。brook侧重发源于山泉。creek和stream都比brook大。stream还可引申表示事物连绵不断。stream还可作动词,意为“流动,飘动”。常用作不及物动词,也可用作及物动词。 展开全文
Stream泛指流媒体技术。流媒体实际指的是一种新的媒体传送方式,而非一种新的媒体,是指采用流式传输的方式在Internet播放的媒体格式。可指大河,也可指小河或小溪,指小溪时与brook,creek同义。creek侧重其狭长蜿蜒,缓缓流动,且多流入大河或湖泊。brook侧重发源于山泉。creek和stream都比brook大。stream还可引申表示事物连绵不断。stream还可作动词,意为“流动,飘动”。常用作不及物动词,也可用作及物动词。
信息
外文名
Stream
基本解释
泛指流媒体技术
词    性
动词,名词
中文名
流媒体技术
stream英语单词
基本释义v. [striːm] ( streams; streamed; streaming )vt. & vi. 流; 移动 flow freely; move continuously and smoothly in one directionvi. 飘扬; 招展 float or wave (in the wind)词语要点1.stream的基本意思是“流动”,指受限制的流动,如通过一定的路线或出口。也可指大量不断地流动。引申可指“飘动”。2.stream既可用作及物动词,也可用作不及物动词。用作及物动词时,可接名词或代词作宾语。3.stream接介词with表示“被…覆盖”。词语搭配~+副词stream torrentially 激流涌进stream back 向后飘动stream in 络绎进入~+介词stream behind 在…后面飘动stream down one's cheeks (眼泪)从脸颊流下stream into the auditorium (人群)络绎不绝进入礼堂stream out of the station (人群)涌出车站stream with 被…覆盖辨析pour, flow, run, stream这组词的共同意思是“流”“流动”。其区别是:1.flow, run, stream和pour都可指液体流动; flow还可指气体流动; pour还可指光线、微粒等倾泻。2.时间上:flow一般指源源不断地、长时间地流动; run既可以是源源不断地、长时间地流动,也可以是流动一段时间; stream和pour多指短时间地流动。3.方向上:指液体流动时, flow是水平流动; run和stream既可水平流动,也可垂直流动; pour是垂直流动。4.流速上:从快到慢依次为pour, stream, run, flow。具体说就是flow是平平稳稳地流动; run比较湍急; stream比run更有力; pour则是“倾泻”。5.flow, stream和pour常用于比喻, run很少用于比喻。例如:The river was flowing quietly.河水静静流着。She let her hair down so that it flew darkly over her shoulders.她让乌黑的头发披散下来,飘垂到肩上。  The river runs through hills and fields.河水流经山冈和田野。The water runs out of the pipe into the bucket.水自管内注入水桶中。Tears were streaming down her face.她脸上热泪滚滚而下。The students streamed into the auditorium.学生们络绎不绝地进入礼堂。下面三句话的意思相同:She poured me a cup of tea.She poured a cup of tea for me.She poured me out a cup of tea.她给我倒了一杯茶。词源<古英语stream(流动)基本释义C 小河,溪流 a small river C 流,一股,一串 flow (of liquid, people, things, etc.)S 水流方向,潮流 current or direction of sth flowing or movingC (按能力分的)班级 class or division of a class into which children of the same age and level of ability are placedC 川流不息 a continuous series of people, things, or events, usually moving in a line or in a certain direction常见搭配动词+~cross a stream 涉过一条小溪tap a stream 引流形容词+~clear〔dancing, quiet, running〕 stream 清澈〔奔腾欢跳,平静,流动〕的溪流rapid〔strong〕 stream 湍急〔强劲〕的水流rushing stream 激流名词+~mountain stream 山涧sun streams 太阳光线介词+~in streams 连续不断,川流不息on stream 进行生产,投入生产up the stream 向〔在〕上游~+介词a stream of light 一缕光线a stream of words 滔滔不绝的话the stream of history 历史潮流词语辨析branch,brook,canal,creek,river,stream,torrent这组词的共同意思是“流水的通道”。其区别是:1.除canal指人工开挖的河流或渠道外,其余各词均指自然形成的水道。2.river和torrent均指流量较大的河流,river可泛指(自然形成的)江河;torrent则特指急流、湍流。这两个词还常用于比喻。3.stream可指大河,也可指小河或小溪,指小溪时与brook,creek同义。creek侧重其狭长蜿蜒,缓缓流动,且多流入大河或湖泊。brook侧重发源于山泉。creek和stream都比brook大。stream还可引申表示事物连绵不断。4.branch指江河的支流。同义词n. brook, course, flow, rush, streamlet [1] 
收起全文
精华内容
参与话题
  • java8学习总结——Stream的理解

    千次阅读 多人点赞 2018-01-09 14:23:10
    Stream初理解  JAVA8中一个重要概念就是流——stream。先来看看流的使用: public class StreamTest01 { public static void main(String[] args) { Stream stream = Stream.of(1,2,3,4); stream.map(i ->...
        Stream初理解

        JAVA8中一个重要概念就是流——stream。先来看看流的使用:

    public class StreamTest01 {
    
        public static void main(String[] args) {
           Stream<Integer> stream = Stream.of(1,2,3,4);
    
           stream.map(i -> i+1).forEach(System.out::println);
        }
    }

         上面的代码是创建了一个包含(1,2,3,4)四个元素的流,并将流中的元素分别加一,然后打印输出。也许这样不太直观。可以再看看下面的写法:

    public class StreamTest01 {
    
        public static void main(String[] args) {
           List<Integer> list = Arrays.asList(1,2,3,4);
           Stream<Integer> stream = list.stream();
           stream.map(i -> i+1).forEach(System.out::println);
        }
    }
    
    上面的这段代码也是将四个元素加一然后输出到控制台。不同的是这里是通过一个集合创建流对象,然后进行操作的。同样的需求,流对象带来的便利性是显而易见,直观、易读且易编写。开发人员不用关心元素是如何取出的,也不用关心操作之间应该如何连接(以前的写法是,我们必须保证加一操作和输出操作之间的执行顺序);开发人员只需要关心每个操作的具体执行逻辑。

           文档将对其的解释是:流是一些聚合操作,可以并发和顺序执行。从这个定义中可以看出流的两个特征:

           1、流是一系列操作的集合。

           2、流可以并发执行,也可以顺序执行。

    既然流是一些操作的集合,那么流就可以将一个或多个操作聚集起来。这里就有两个问题:1、流是如何将这些操作聚集起来的;2、流是如何创建的的,只有知道流的创建过程,才知道流是如何连接操作的。

          上面用到了Stream.of()方法返回了一个流对象。意味着这个方法包含了流的创建过程,先看看他的源代码:

    //Stream接口
    public static<T> Stream<T> of(T... values) {
            return Arrays.stream(values);
    }
    
    //Arrays类
    public static <T> Stream<T> stream(T[] array) {
            return stream(array, 0, array.length);
    }
    
    //Arrays类
    public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
            return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
    }
    
    //StreamSupport类
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
    }
    上面是Stream.of()方法的源代码调用过程,可以看到最后调用了StreamSupport类的stream()方法,在stream方法中创建了一个ReferencePipeline.Head类(这是ReferencePipeline的一个内部类)的对象。接下来再看看Head这个内部类的源代码: 

    //Head类
    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {      
            Head(Supplier<? extends Spliterator<?>> source,
                 int sourceFlags, boolean parallel) {
                super(source, sourceFlags, parallel);
            } 
    ……
    }
    
    // ReferencePipeline类
    abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
            implements Stream<P_OUT>  {
        ReferencePipeline(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
    }
    ……
    }
    
    // AbstractPipeline类
    /**
    * Constructor for the head of a stream pipeline.
    *
    * @param source {@code Spliterator} describing the stream source
    * @param sourceFlags the source flags for the stream source, described in
    * {@link StreamOpFlag}
    * @param parallel {@code true} if the pipeline is parallel
    */
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
            AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                         int sourceFlags, boolean parallel) {
                  this.previousStage = null;
                  this.sourceSupplier = source;
                  this.sourceStage = this;
                  this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
                  // The following is an optimization of:
                  // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
                  this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
                  this.depth = 0;
                  this.parallel = parallel;
            }
            ……
    }
    这里只截取了创建Head实例的部分代码,可以看到在Head的构造方法中什么也没做,直接调用了父类的构造方法。最后Head的构造方法调用的是AbstractPipeline类的构造方法。在AbstractPipeline的构造方法执行完成后,流对象就创建完成了。

          但是另外一个问题又出现了。这么云山雾绕的转了一大圈的创建了一个对象,不知道是做什么用的。为什么要这么做,与流的创建又有什么关系?其实,在AbstractPipeline的构造方法我将注释也截取下来了,注释有这么一句话:

    Constructor for the head of a stream pipeline(构造一个流管道的头部).这句话透露了两个信息:1、我们最后构造的是一个流管道,也就是说流对象是以管道的形式表现的;2、这个构造方法仅仅是构造管道的头部,也就是管道的开端,那么流管道的其他部分呢?

          因此,一定有构造流管道其他部分的方法的。这个方法就在AbstractPipeline类的另外一个构造方法里面,看一下AbstractPipeline类的另外一个构造方法:

     /**
     * Constructor for appending an intermediate operation stage onto an
     * existing pipeline.
     *
     * @param previousStage the upstream pipeline stage
     * @param opFlags the operation flags for the new stage, described in
     * {@link StreamOpFlag}
     */
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    	if (previousStage.linkedOrConsumed)
    		throw new IllegalStateException(MSG_STREAM_LINKED);
    	previousStage.linkedOrConsumed = true;
    	previousStage.nextStage = this;
    
    	this.previousStage = previousStage;
    	this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    	this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    	this.sourceStage = previousStage.sourceStage;
    	if (opIsStateful())
    		sourceStage.sourceAnyStateful = true;
    	this.depth = previousStage.depth + 1;
    }
    留意注释部分(Constructor for appending an intermediate operation stage onto an existing pipeline.)。根据注释的解释,这个构造方法构造的对象有两个特征:

    1、是一个中间操作阶段对象(an  intermediate operation stage);

    2、这个中间操作阶段对象会被追加到一个已经存在的管道上。

    AbstractPipeline中有几个属性需要留意一下:

    /**
     * Backlink to the head of the pipeline chain (self if this is the source
     * stage).
     */
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;
    
    /**
     * The "upstream" pipeline, or null if this is the source stage.
     */
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;
    
    /**
     * The next stage in the pipeline, or null if this is the last stage.
     * Effectively final at the point of linking to the next pipeline.
     */
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;
    这三个属性都是AbstractPipeline对象,分别表述pipeline的开端(sourceStage)、上一个操作(previousStage)、下一个操作(nextStage)。再结合AbstractPipeline的两个构造方法看一下:

    //构造方法一:构造管道的开端
    this.previousStage = null;   //因为是开端,没有前一个状态
    this.sourceStage = this;     //因为是开端,所以pipeline的源阶段就是当前对象
    
    //构造方法二:构造流的中间操作
    previousStage.nextStage = this;      //前一个操作的nextStage就是当前对象
    this.previousStage = previousStage;  //当前操作的前一个操作就是previousStage
    this.sourceStage = previousStage.sourceStage;  //无论管道有多长,其源都只会有一个(就像水管无论有多长,其开端只有一个),所以等于 previousStage.sourceStage
    综上所述,我们可以得到流的结构是一种双向链表的管道结构,如下图:


    流将每个操作封装成一个AbstractPipeline对象,然后通过previousStage、nextStage属性将当前操作的前一个操作和后一个操作连接起来,以此来保证流的将操作联合的特性。

            Stream的源
            Stream是一些列操作的集合,其需要一个操作的数据源,这个数据源就是Stream的源。在AbstractPipeline提供了两个构造流开端的方法,如下:
    AbstractPipeline(Supplier<? extends Spliterator<?>> source,int sourceFlags, boolean parallel) {
    	this.previousStage = null;
    	this.sourceSupplier = source;
    	this.sourceStage = this;
    	……
    }
    
     AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {
    	this.previousStage = null;
    	this.sourceSpliterator = source;
    	this.sourceStage = this;
    	……
    }
    上面两个构造方法中有一个变量(source),这个就是外部传入的数据源。在两个构造方法中,这个源分别被赋给了sourceSupplier 、sourceSpliterator两个属性。其实两个属性是一个意思,sourceSupplier是supplier(不接受参数,返回一个值)的一个实例,其返回的也是一个Spliterator对象。只是提供两种不同的源提供方式,但是其提供的源都是一样的。所以官方文档对这两个属性的介绍是:两个属性被使用了其中一个,另外一个必须为null(也就是说这个两个属性是互斥的)。在流使用完成后,这两个属性都必须被设置为null(流使用完了,就不再需要源了)。
      从上面的描述中可以看到source对象是一个Spliterator的实例。来看看Spliterator的源代码:
    public interface Spliterator<T> {
        
        boolean tryAdvance(Consumer<? super T> action);  //消耗式获取元素,元素一旦被取出,分割器将不会拥有这个元素
    
        default void forEachRemaining(Consumer<? super T> action) {   //遍历元素
            do { } while (tryAdvance(action));
        }
     
        Spliterator<T> trySplit();  //分割数据源
       
        default long getExactSizeIfKnown() {
            return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
        }
     
        default boolean hasCharacteristics(int characteristics) {
            return (characteristics() & characteristics) == characteristics;
        }
    
        default Comparator<? super T> getComparator() {
            throw new IllegalStateException();
        } 
    }
    官方文档将Spliterator解释为:是一个能够对源数据进行分区和遍历操作的对象。从其实现类中,可以更好的理解这个接口。Spliterator的实现类分为三类:
          1、ArraySpliterator       (IntArraySpliterator,LongArraySpliterator,DoubleArraySpliterator);
          2、AbstractSpliterator  IntAbstractSpliterator,LongAbstractSpliterator,DoubleAbstractSpliterator
          3、IteratorSpliterator    IntIteratorSpliterator,LongIteratorSpliterator,DoubleIteratorSpliterator
          关于这个三个实现类:
          ArraySpliterator主要面向数组类型的数据源;
          IteratorSpliterator面向的是集合类数据源;
          AbstractSpliterator是给使用者自定义的,使用者可以根据自己需要扩展相应的Spliterator,这个类没有给出数据源类型,需要使用者自定义。
          这个三个类的实现方式很类似,都有一些共同的特点:1、持有一个数据源;2、设置了数据源的大小、切分单位、切分的初始值;3、定义了分割器的特征(这里不做讨论)。从源代码中看看这些特征:
    public static abstract class AbstractSpliterator<T> implements Spliterator<T> {
            static final int BATCH_UNIT = 1 << 10;  // 切分数据源的单位,每次切分的增量。即batch+BATCH_UNIT就是本次切分的长度;这里默认是1024
            static final int MAX_BATCH = 1 << 25;   //允许切分的最大长度,即batch+BATCH_UNIT是当前切分的长度,这个长度不能大于MAX_BATCH。如果大于,则等于MAX_BATCH;
            private final int characteristics;      //分割器的特征
            private long est;            // 数据源的长度(集合的长度)
            private int batch;           // 切分的初始值,创建时一般是0
    		……
    }
    
    static class IteratorSpliterator<T> implements Spliterator<T> {
            static final int BATCH_UNIT = 1 << 10;  // 切分数据源的单位,每次切分的增量。即batch+BATCH_UNIT就是本次切分的长度;这里默认是1024
            static final int MAX_BATCH = 1 << 25;  //允许切分的最大长度,即batch+BATCH_UNIT是当前切分的长度,这个长度不能大于MAX_BATCH。如果大于,则等于MAX_BATCH;
            private final Collection<? extends T> collection; //原始数据
            private Iterator<? extends T> it;
            private final int characteristics;  // 分割器的特征
            private long est;             // 数据源的长度(集合的长度)
            private int batch;            // 切分的初始值,创建时一般是0
    		……
    }
    
    static final class ArraySpliterator<T> implements Spliterator<T> {
            
            private final Object[] array;  //原始数据
            private int index;        // 切分的初始值,创建时一般是0
            private final int fence;  // 栅栏,即数据源的长度(数组的长度)
            private final int characteristics;// 分割器的特征
    		……
    	//注:这里没有给出切分的增量单位(BATCH_UNIT)和允许切分的最大长度(MAX_BATCH),是因为这种类型的分割器在其             trySplite()方法中,通过index和fence计算出来,自然也就不用给值了。
    }
    再通过一个示例感受一下Spliterator:
    public class StreamTest01 {
    
        public static void main(String[] args) {
            String[] strs = new String[]{"a","b","c","d","e","f","g","h","i","j","k"};  //原始数据
            Spliterator<String> spliterator1 = Spliterators.spliterator(strs, 0); //创建分割器
            Spliterator<String> spliterator2 = spliterator1.trySplit(); //调用分割方法,切分数据源
    
            spliterator1.forEachRemaining(i -> System.out.print(i+","));  //打印出"分割器1"拥有的所有元素
            System.out.println();//用于换行
            System.out.println("------------------");
            spliterator2.forEachRemaining(i -> System.out.print(i+","));//打印出"分割器2"拥有的所有元素
        }
    }
    //输出:
    //f,g,h,i,j,k,
    //------------------
    //a,b,c,d,e
    
    
    public class StreamTest01 {
    
        public static void main(String[] args) {
            String[] strs = new String[]{"a","b","c","d","e","f","g","h","i","j","k"};  //原始数据
            Spliterator<String> spliterator1 = Spliterators.spliterator(strs, 0); //创建分割器
            spliterator1.tryAdvance(i -> System.out.print(i+",")); //取出a
            spliterator1.tryAdvance(i -> System.out.print(i+",")); //取出b
            spliterator1.tryAdvance(i -> System.out.print(i+",")); //取出c
            spliterator1.tryAdvance(i -> System.out.print(i+",")); //取出d
        }
    }
    
    //输出:a,b,c,d,
    接下来看看IteratorSpliterator的源代码实现(其他两个实现的思想是一样的,只是代码的写法上有些微的差异):
    static class IteratorSpliterator<T> implements Spliterator<T> {
    
            /**
             *
             * 这个方法的主要作用是,在每次调用时,根据事先设置好的切分单位和相关属性
             * 将原始数据源进行切分为多个数据源,并将切分出来的数据源包装为新的Spliterator
             */
             @Override
             public Spliterator<T> trySplit() {
    
                 Iterator<? extends T> i;
                 long s;
                 if ((i = it) == null) {
                     i = it = collection.iterator();
                     s = est = (long) collection.size();
                 }
                 else
                    s = est;
                 if (s > 1 && i.hasNext()) {   //判断是否还有元素可以进行拆分
                    int n = batch + BATCH_UNIT;  //获取切分长度
                 if (n > s)  //如果切分长度大于元素的个数,则切分长度等于元素个数
                     n = (int) s;
                 if (n > MAX_BATCH)  //如果切分长度大于最大允许切分长度,则切分长度等于最大可切分长度
                     n = MAX_BATCH;
                 Object[] a = new Object[n]; //从这一步开始主要做两件事:1、将元素取出,2、将元素包装为Spliterator
                 int j = 0;
                     do { a[j] = i.next(); } while (++j < n && i.hasNext());
                     batch = j;
                     if (est != Long.MAX_VALUE)
                         est -= j;
                     return new ArraySpliterator<>(a, 0, j, characteristics);
                 }
                 return null;
             }
    
            /**
             * 这个方法是遍历出所有的元素,因为是集合类型数据,所以使用了集合的迭代器
             * 在Spliterator接口中,这个方法会调用tryAdvance()方法,依次取出元素。如:
             *  default void forEachRemaining(Consumer<? super T> action) {do { } while (tryAdvance(action));}
             *  但仅仅只是一种取出元素的标准,如果实现类可以有自己的取出元素的方式,则也可以直接覆盖
             * @param action
             */
            @Override
             public void forEachRemaining(Consumer<? super T> action) {
                 if (action == null) throw new NullPointerException();
                 Iterator<? extends T> i;
                 if ((i = it) == null) {
                     i = it = collection.iterator();
                     est = (long)collection.size();
                 }
                 i.forEachRemaining(action);
             }
    
            /**
             * 这个一个取出元素的方法,每次取出一个元素后,Spliterator将不再拥有这个元素
             * (可能删除了元素,也可能并没有删除,只是Spliterator再也取不到这个元素了)
             * @param action
             * @return
             */
             @Override
             public boolean tryAdvance(Consumer<? super T> action) {
                 if (action == null) throw new NullPointerException();
                     if (it == null) {
                     it = collection.iterator();
                     est = (long) collection.size();
                 }
                 if (it.hasNext()) {
                     action.accept(it.next());
                     return true;
                 }
                 return false;
             }
    
            /**
             * 获取源数据的长度。这个是一个估计值,因为当计算获取数据源的成本很高时,同时又进行拆分;
             * 此时获取数据源的长度,就有可能导致计算出的数据源长度不精确。或者数据源的长度没有限制,或者是
             * 未知的数据源,则无法获取精确地值。
             * 在没有进行分组遍历,且Spliterator具有SIZE特性;此时这个方法获取的值一定等于Spliterator遍历
             * 所有元素的次数
             * @return
             */
             @Override
             public long estimateSize() {
                 if (it == null) {
                     it = collection.iterator();
                     return est = (long)collection.size();
                 }
                 return est;
             }
    
             @Override
             public int characteristics() { return characteristics; }
    
            /**
             * 获取元素的比较器,由子类自定义,也可以直接抛异常
             * @return
             */
             @Override
             public Comparator<? super T> getComparator() {
                 if (hasCharacteristics(Spliterator.SORTED))
                     return null;
                     throw new IllegalStateException();
            }
    }
    通过上面的描述,Stream的数据源是通过一个Spliterator的实例持有一个原始数据源来提供的。

          Stream的中间操作和终止操作

          在上面的内容中,说到流有三个部分:开端、终端、中端。根据上面的描述,流的开端保存了数据的源,提供了一定的数据源的操作方法。而对于中端和终端,则为进行任何的描述。在JDK的文档中,将流定义为一系列操作的集合,同时JDK文档又将这些操作分为两类,即中间操作和终止操作。看一段代码,直观感受一下中间操作和终止操作:

    import java.util.Arrays;
    import java.util.List;
    import java.util.stream.Stream;
    
    public class StreamTest01 {
    	
    	public static void main(String[] args) {
    		  List<Integer> lis = Arrays.asList(1,2,3,4,5,6,7,8,9);
    		  Stream<Integer> stream1 = lis.stream().map(i -> i+1);  //中间操作
    		  Stream<Integer> stream2 = stream1.filter(i -> i > 0);  //中间操作
    		  Stream<Integer> stream3 = stream2.limit(10);  //中间操作
    		  Stream<Integer> stream4 = stream3.distinct(); //中间操作
    		  
    		  //Optional<Integer> result1 = stream4.findAny(); //终止操作
    		  //Optional<Integer> result2 = stream4.findFirst(); //终止操作,两次调用终止操作,会抛异常,流只能被使用一次,可以中间操作,也可以是终止操作
    		  stream4.forEach(i -> System.out.print(i));  //终止操作,没有返回值
    		  
    		  //上面的所有操作有另外的一种写法
    		  lis.stream().map(i -> i+1).filter(i -> i > 0)
    		  	 .limit(10).distinct().forEach(i -> System.out.print(i));
    	} 
    }
    从上面的代码可以看到,中间操作返回的都是Stream对象;终止操作可能有返回值,可能没有;而且返回的值类型不定。

          接下来从源代码角度看一下中间操作和终止操作都是如何运行的。这里选取map方法和foreach方法作为分析对象,其他方法的实现方式类似。 

    //类名:ReferencePipeline;方法:map
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    	Objects.requireNonNull(mapper);  留意StatelessOp
    	return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,  
    								 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
    		@Override
    		Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    			return new Sink.ChainedReference<P_OUT, R>(sink) {
    				@Override
    				public void accept(P_OUT u) {
    					downstream.accept(mapper.apply(u));
    				}
    			};
    		}
    	};
    }
    
    //类名:ReferencePipeline;方法:forEach
    public void forEach(Consumer<? super P_OUT> action) {
    	evaluate(ForEachOps.makeRef(action, false));  //调用了AbstractPipeline的evaluate方法
    }
    
    //类名:AbstractPipeline;方法:evaluate
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {  //留意TerminalOp
    	assert getOutputShape() == terminalOp.inputShape();
    	if (linkedOrConsumed)
    		throw new IllegalStateException(MSG_STREAM_LINKED);
    	linkedOrConsumed = true;
    
    	return isParallel() ? terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))  : terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
    }

    在map方法中创建了StatelessOp对象。在foreach方法中通过ForEachOps.makeRef(action, false)方法创建了一个TerminalOp对象。

         StatelessOp比较简单,StatelessOp继承了AbstractPipeline,并覆写了opWrapSink方法。在opWrapSink方法中创建了一个Sink.ChainedReference对象。StatelessOp继承了AbstractPipeline,这里调用StateleddOp的构造方法,实际上是调用了AbstractPipeline的创建中间操作对象的构造方法(前面提到过AbstractPipeline的构造方法有两个,一个构造流的开端,一个构造中间操作),构造了一个AbstractPipeline对象,这个对象表示流的中间操作。

            TerminalOp对象的创建则相对复杂一些。在这之前需要了解两个接口:TerminalOp和TerminalSink,源代码如下:

    //接口:TerminalOp
    interface TerminalOp<E_IN, R> {
        
        default StreamShape inputShape() { return StreamShape.REFERENCE; }
    
         
        default int getOpFlags() { return 0; }
    
         
        default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                          Spliterator<P_IN> spliterator) {
            if (Tripwire.ENABLED)
                Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
            return evaluateSequential(helper, spliterator);
        }
    
        
        <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
                                    Spliterator<P_IN> spliterator);
    }
    
    //接口:TerminalSink
    interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }

    在TerminalOp接口中分别定义了顺序执行的方法(evaluateSequential)和并行执行的方法(evaluateParallel);留心一下会发现,并行计算的方法调用了顺序执行的方法。也就是说,默认情况下是没有并行的(因为并行调用了顺序执行的方法);具体的并行的执行方式需要具体的子类去实现。而对于TerminalSink接口,则没有任何的实现,直接继承了Sink接口。

    接下来再看看ForEachOps.makeRef(action, false)的执行内容:

    //类名:ForEachOps ; 方法名:makeRef
    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,boolean ordered) {
    	Objects.requireNonNull(action);
    	return new ForEachOp.OfRef<>(action, ordered);
    }
    
    //类名:ForEachOps.OfRef 
    static final class OfRef<T> extends ForEachOp<T> {
    	final Consumer<? super T> consumer;
    
    	OfRef(Consumer<? super T> consumer, boolean ordered) {
    		super(ordered);
    		this.consumer = consumer;
    	}
    
    	@Override
    	public void accept(T t) {
    		consumer.accept(t);
    	}
    }
    
    //类名:ForEachOp
    static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
    	private final boolean ordered;
    
    	protected ForEachOp(boolean ordered) {
    		this.ordered = ordered;
    	}
    	……
    }

    从上面的代码可以看到ForEachOps.makeRef(action, false)创建了一个 ForEachOps.OfRef 对象。ForEachOps实现了TerminalOp和TerminalSink接口,同时也实现了Sink接口。

    无论是StatelessOp还是TerminalOp,最终都关联了Sink接口。其实流的所有操作都是由Sink接口的各种实现类来串联的。看一下Sink接口的代码: 

    interface Sink<T> extends Consumer<T> {
        
        default void begin(long size) {}
      
        default void end() {}
      
        default boolean cancellationRequested() {
            return false;
        }
       
        default void accept(int value) {
            throw new IllegalStateException("called wrong accept method");
        }
       
        default void accept(long value) {
            throw new IllegalStateException("called wrong accept method");
        }
      
        default void accept(double value) {
            throw new IllegalStateException("called wrong accept method");
        }
    	
    	static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedReference(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }
     
        static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedInt(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }
     
        static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedLong(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }
     
        static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
            protected final Sink<? super E_OUT> downstream;
    
            public ChainedDouble(Sink<? super E_OUT> downstream) {
                this.downstream = Objects.requireNonNull(downstream);
            }
    
            @Override
            public void begin(long size) {
                downstream.begin(size);
            }
    
            @Override
            public void end() {
                downstream.end();
            }
    
            @Override
            public boolean cancellationRequested() {
                return downstream.cancellationRequested();
            }
        }	
    }

            这里采用了模板方法模式,规定了Sink方法的调用必须是begin,accept,end的顺序,可以重复调用,但必须保证调用方法的顺序(begin,accept,end)。

            在这个接口中有三个重载的accept方法,却没有accept(Object  value),这是因为Sink接口继承了Consumer接口,在Consumer接口中已经有一个接收Object参数的accept方法了。

         此外,Sink接口中定义了四个实现类(ChainedReference,ChainedInt,ChainedLong,ChainedDouble),这四个实现类中都分别持有一个downstream的Sink对象,这个对象是实现Stream中操作链接的关键。

           前面都是准备阶段,接下来看看Stream是如何将所有的操作链接起来进行的。在调用map方法的时候,留心的话,你就会发现,map方法仅仅只是创建了一个StatelessOp对象,却没有调用任何的执行代码。这其实就是流的惰性计算。任何的中间操作都没有执行任何的计算代码,只有在遇到终止操作时才会触发流的执行。接下来再看看终止操作是如何触发流的执行的,源代码:

    //类名:ReferencePipeline;方法名:forEach
    public void forEach(Consumer<? super P_OUT> action) {
    	evaluate(ForEachOps.makeRef(action, false));
    }
    
    //类名:AbstractPipeline;方法名:evaluate
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    	assert getOutputShape() == terminalOp.inputShape();
    	if (linkedOrConsumed)
    		throw new IllegalStateException(MSG_STREAM_LINKED);
    	linkedOrConsumed = true;
    
    	return isParallel()
    		   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
    		   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    
    //类名:ForEachOp;方法名:evaluateSequential
    public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                               Spliterator<S> spliterator) {
    	return helper.wrapAndCopyInto(this, spliterator).get();
    }
    
    //类名:AbstractPipeline;方法名:wrapAndCopyInto。(AbstractPipeline继承了PipelineHelper)
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    	copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); //这里的copyInto方法触发了执行
    	return sink;
    }
    
    //类名:AbstractPipeline;方法名:wrapSink
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { //这个方法是关键,操作就是在这里进行连接的
    	Objects.requireNonNull(sink);
    
    	for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
    		sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    	}
    	return (Sink<P_IN>) sink;
    }
    在wrapSink中,有一个参数需要解释一下——p.depth:这个参数表示Stream的深度,即有多少个操作需要链接。我们在创建 AbstractPipeline对象时,每进行一次中间操作对象的创建,这个属性值就会加一。如下:

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
    
            this.previousStage = previousStage; //前一个操作
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1; //每构造一次,就会加一
        }
    在wrapSink方法中再次调用了opWrapSink,这个方法就是前面在调用Stream的map方法时,重写的方法。如下:
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { //重写方法
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {  //重写了accept方法,在方法中调用downstream的accept方法
                            downstream.accept(mapper.apply(u)); //经过这里,就可以将所有的操作连接起来
                        }
                    };
                }
            };
        }
    接下里再看看是如何触发这些操作的,触发这些操作的代码在copyInto方法中。代码如下:

     final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
    
    	if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { //判断是否有短路特性
    		wrappedSink.begin(spliterator.getExactSizeIfKnown());
    		spliterator.forEachRemaining(wrappedSink);
    		wrappedSink.end();
    	}
    	else {
    		copyIntoWithCancel(wrappedSink, spliterator); //流有短路特性,则执行
    	}
    }
    当流没有短路特性时,调用了spliterator的forEachRemaining方法,传入了刚才包装的那个sink对象。spliteator的实现类很多,执行形式也有区别,但是在spliterator中都会调用wrapsink的accept方法。这个wrapsink的accept方法中调用downstream.accept方法,这个时候就会触发这个流的所有操作的执行。












    展开全文
  • JDK1.8-Stream()使用详解

    万次阅读 多人点赞 2017-08-06 21:27:18
    为什么需要 Stream Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对大数据实时处理的 ...

    为什么需要 Stream

    Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对大数据实时处理的 Stream。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核时代综合影响的产物。

    什么是聚合操作

    在传统的 J2EE 应用中,Java 代码经常不得不依赖于关系型数据库的聚合操作来完成诸如:

    • 客户每月平均消费金额
    • 最昂贵的在售商品
    • 本周完成的有效订单(排除了无效的)
    • 取十个数据样本作为首页推荐

    这类的操作。

    但在当今这个数据大爆炸的时代,在数据来源多样化、数据海量化的今天,很多时候不得不脱离 RDBMS,或者以底层返回的数据为基础进行更上层的数据统计。而 Java 的集合 API 中,仅仅有极少量的辅助型方法,更多的时候是程序员需要用 Iterator 来遍历集合,完成相关的聚合应用逻辑。这是一种远不够高效、笨拙的方法。在 Java 7 中,如果要发现 type 为 grocery 的所有交易,然后返回以交易值降序排序好的交易 ID 集合,我们需要这样写:

    清单 1. Java 7 的排序、取值实现

    List<Transaction> groceryTransactions = new Arraylist<>();
    for(Transaction t: transactions){
     if(t.getType() == Transaction.GROCERY){
     groceryTransactions.add(t);
     }
    }
    Collections.sort(groceryTransactions, new Comparator(){
     public int compare(Transaction t1, Transaction t2){
     return t2.getValue().compareTo(t1.getValue());
     }
    });
    List<Integer> transactionIds = new ArrayList<>();
    for(Transaction t: groceryTransactions){
     transactionsIds.add(t.getId());
    }

    而在 Java 8 使用 Stream,代码更加简洁易读;而且使用并发模式,程序执行速度更快。

    清单 2. Java 8 的排序、取值实现

    List<Integer> transactionsIds = transactions.parallelStream().
     filter(t -> t.getType() == Transaction.GROCERY).
     sorted(comparing(Transaction::getValue).reversed()).
     map(Transaction::getId).
     collect(toList());

    Stream 总览

    什么是流

    Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

    Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

    而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

    >
    1. 1.0-1.4 中的 java.lang.Thread
    2. 5.0 中的 java.util.concurrent
    02. 6.0 中的 Phasers 等
    10. 7.0 中的 Fork/Join 框架
    1. 8.0 中的 Lambda

    Stream 的另外一大特点是,数据源本身可以是无限的。

    流的构成

    当我们使用一个流的时候,通常包括三个基本步骤:

    获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,如下图所示。

    图 1. 流管道 (Stream Pipeline) 的构成

    有多种方式生成 Stream Source:

    • 从 Collection 和数组

      • Collection.stream()
      • Collection.parallelStream()
      • Arrays.stream(T array) or Stream.of()

      从 BufferedReader

      • java.io.BufferedReader.lines()
    • 静态工厂
      • java.util.stream.IntStream.range()
      • java.nio.file.Files.walk()
    • 自己构建

      • java.util.Spliterator
      • 其它
        • Random.ints()
        • BitSet.stream()
        • Pattern.splitAsStream(java.lang.CharSequence)
        • JarFile.stream()

    流的操作类型分为两种:

    • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
    • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

    在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

    还有一种操作被称为 short-circuiting。用以指:

    • 对于一个intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的Stream,但返回一个有限的新Stream
    • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。

    当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个 short-circuiting 操作是必要非充分条件。

    清单 3. 一个流操作的示例

    int sum = widgets.stream()
    .filter(w -> w.getColor() == RED)
     .mapToInt(w -> w.getWeight())
     .sum();

    stream() 获取当前小物件的 source,filter 和 mapToInt 为 intermediate 操作,进行数据筛选和转换,最后一个 sum() 为 terminal 操作,对符合条件的全部小物件作重量求和。

    流的使用详解

    简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)。

    流的构造与转换

    下面提供最常见的几种构造 Stream 的样例。

    清单 4. 构造流的几种常见方法

    // 1. Individual values
    Stream stream = Stream.of("a", "b", "c");
    // 2. Arrays
    String [] strArray = new String[] {"a", "b", "c"};
    stream = Stream.of(strArray);
    stream = Arrays.stream(strArray);
    // 3. Collections
    List<String> list = Arrays.asList(strArray);
    stream = list.stream();

    需要注意的是,对于基本数值型,目前有三种对应的包装类型 Stream:

    IntStream、LongStream、DoubleStream。当然我们也可以用 Stream<Integer>、Stream<Long> >、Stream<Double>,但是 boxingunboxing 会很耗时,所以特别为这三种基本数值型提供了对应的Stream
    Java 8 中还没有提供其它数值型 Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种 Stream 进行。

    清单 5. 数值流的构造

    IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
    IntStream.range(1, 3).forEach(System.out::println);
    IntStream.rangeClosed(1, 3).forEach(System.out::println);

    清单 6. 流转换为其它数据结构

    // 1. Array
    String[] strArray1 = stream.toArray(String[]::new);
    // 2. Collection
    List<String> list1 = stream.collect(Collectors.toList());
    List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
    Set set1 = stream.collect(Collectors.toSet());
    Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
    // 3. String
    String str = stream.collect(Collectors.joining()).toString();

    一个 Stream 只可以使用一次,上面的代码为了简洁而重复使用了数次。

    流的操作

    接下来,当把一个数据结构包装成 Stream 后,就要开始对里面的元素进行各类操作了。常见的操作可以归类如下。

    • Intermediate
      • map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
    • Terminal
      • forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
    • Short-circuiting
      • anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

    我们下面看一下 Stream 的比较典型用法。

    map/flatMap


    我们先来看 map。如果你熟悉 scala 这类函数式语言,对这个方法应该很了解,它的作用就是把 inputStream 的每一个元素,映射成 outputStream 的另外一个元素。

    清单 7. 转换大写

    List<String> output = wordList.stream().
    map(String::toUpperCase).
    collect(Collectors.toList());

    这段代码把所有的单词转换为大写。

    清单 8. 平方数

    List<Integer> nums = Arrays.asList(1, 2, 3, 4);
    List<Integer> squareNums = nums.stream().
    map(n -> n * n).
    collect(Collectors.toList());

    这段代码生成一个整数 list 的平方数 {1, 4, 9, 16}。

    从上面例子可以看出,map 生成的是个 1:1 映射,每个输入元素,都按照规则转换成为另外一个元素。还有一些场景,是一对多映射关系的,这时需要 flatMap。

    清单 9. 一对多

    Stream<List<Integer>> inputStream = Stream.of(
     Arrays.asList(1),
     Arrays.asList(2, 3),
     Arrays.asList(4, 5, 6)
     );
    Stream<Integer> outputStream = inputStream.
    flatMap((childList) -> childList.stream());

    flatMap 把 input Stream 中的层级结构扁平化,就是将最底层元素抽出来放到一起,最终 output 的新 Stream 里面已经没有 List 了,都是直接的数字。

    filter


    filter 对原始 Stream 进行某项测试,通过测试的元素被留下来生成一个新 Stream。

    清单 10. 留下偶数

    Integer[] sixNums = {1, 2, 3, 4, 5, 6};
    Integer[] evens =
    Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);

    经过条件“被 2 整除”的 filter,剩下的数字为 {2, 4, 6}。

    清单 11. 把单词挑出来

    List<String> output = reader.lines().
     flatMap(line -> Stream.of(line.split(REGEXP))).
     filter(word -> word.length() > 0).
     collect(Collectors.toList());

    这段代码首先把每行的单词用 flatMap 整理到新的 Stream,然后保留长度不为 0 的,就是整篇文章中的全部单词了。

    forEach


    forEach 方法接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式。

    清单 12. 打印姓名(forEach 和 pre-java8 的对比)

    // Java 8
    roster.stream()
     .filter(p -> p.getGender() == Person.Sex.MALE)
     .forEach(p -> System.out.println(p.getName()));
    // Pre-Java 8
    for (Person p : roster) {
     if (p.getGender() == Person.Sex.MALE) {
     System.out.println(p.getName());
     }
    }

    对一个人员集合遍历,找出男性并打印姓名。可以看出来,forEach 是为 Lambda 而设计的,保持了最紧凑的风格。而且 Lambda 表达式本身是可以重用的,非常方便。当需要为多核系统优化时,可以 parallelStream().forEach(),只是此时原有元素的次序没法保证,并行的情况下将改变串行时操作的行为,此时 forEach 本身的实现不需要调整,而 Java8 以前的 for 循环 code 可能需要加入额外的多线程逻辑。

    但一般认为,forEach 和常规 for 循环的差异不涉及到性能,它们仅仅是函数式风格与传统 Java 风格的差别。

    另外一点需要注意,forEach 是 terminal 操作,因此它执行后,Stream 的元素就被“消费”掉了,你无法对一个 Stream 进行两次 terminal 运算。下面的代码是错误的:

    stream.forEach(element -> doOneThing(element));
    stream.forEach(element -> doAnotherThing(element));

    相反,具有相似功能的 intermediate 操作 peek 可以达到上述目的。如下是出现在该 api javadoc 上的一个示例。

    清单 13. peek 对每个元素执行操作并返回一个新的 Stream

    Stream.of("one", "two", "three", "four")
     .filter(e -> e.length() > 3)
     .peek(e -> System.out.println("Filtered value: " + e))
     .map(String::toUpperCase)
     .peek(e -> System.out.println("Mapped value: " + e))
     .collect(Collectors.toList());

    forEach 不能修改自己包含的本地变量值,也不能用 break/return 之类的关键字提前结束循环。

    findFirst


    这是一个 termimal 兼 short-circuiting 操作,它总是返回 Stream 的第一个元素,或者空。

    这里比较重点的是它的返回值类型:Optional。这也是一个模仿 Scala 语言中的概念,作为一个容器,它可能含有某值,或者不包含。使用它的目的是尽可能避免 NullPointerException。

    清单 14. Optional 的两个用例

    String strA = " abcd ", strB = null;
    print(strA);
    print("");
    print(strB);
    getLength(strA);
    getLength("");
    getLength(strB);
    public static void print(String text) {
     // Java 8
     Optional.ofNullable(text).ifPresent(System.out::println);
     // Pre-Java 8
     if (text != null) {
     System.out.println(text);
     }
     }
    public static int getLength(String text) {
     // Java 8
    return Optional.ofNullable(text).map(String::length).orElse(-1);
     // Pre-Java 8
    // return if (text != null) ? text.length() : -1;
     };

    在更复杂的 if (xx != null) 的情况中,使用 Optional 代码的可读性更好,而且它提供的是编译时检查,能极大的降低 NPE 这种 Runtime Exception 对程序的影响,或者迫使程序员更早的在编码阶段处理空值问题,而不是留到运行时再发现和调试。

    Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。还有例如 IntStream.average() 返回 OptionalDouble 等等。

    reduce


    这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于Integer sum = integers.reduce(0, (a, b) -> a+b);Integer sum = integers.reduce(0, Integer::sum);

    也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。

    清单 15. reduce 的用例

    // 字符串连接,concat = "ABCD"
    String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
    // 求最小值,minValue = -3.0
    double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
    // 求和,sumValue = 10, 有起始值
    int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
    // 求和,sumValue = 10, 无起始值
    sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
    // 过滤,字符串连接,concat = "ace"
    concat = Stream.of("a", "B", "c", "D", "e", "F").
     filter(x -> x.compareTo("Z") > 0).
     reduce("", String::concat);

    上面代码例如第一个示例的 reduce(),第一个参数(空白字符)即为起始值,第二个参数(String::concat)为 BinaryOperator。这类有起始值的 reduce() 都返回具体的对象。而对于第四个示例没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional,请留意这个区别。

    limit/skip


    limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素(它是由一个叫 subStream 的方法改名而来)。

    清单 16. limit 和 skip 对运行次数的影响

    public void testLimitAndSkip() {
     List<Person> persons = new ArrayList();
     for (int i = 1; i <= 10000; i++) {
     Person person = new Person(i, "name" + i);
     persons.add(person);
     }
    List<String> personList2 = persons.stream().
    map(Person::getName).limit(10).skip(3).collect(Collectors.toList());
     System.out.println(personList2);
    }
    private class Person {
     public int no;
     private String name;
     public Person (int no, String name) {
     this.no = no;
     this.name = name;
     }
     public String getName() {
     System.out.println(name);
     return name;
     }
    }

    输出结果为:

    name1
    name2
    name3
    name4
    name5
    name6
    name7
    name8
    name9
    name10
    [name4, name5, name6, name7, name8, name9, name10]

    这是一个有 10,000 个元素的 Stream,但在 short-circuiting 操作 limit 和 skip 的作用下,管道中 map 操作指定的 getName() 方法的执行次数为 limit 所限定的 10 次,而最终返回结果在跳过前 3 个元素后只有后面 7 个返回。
    有一种情况是 limit/skip 无法达到 short-circuiting 目的的,就是把它们放在 Stream 的排序操作后,原因跟 sorted 这个 intermediate 操作有关:此时系统并不知道 Stream 排序后的次序如何,所以 sorted 中的操作看上去就像完全没有被 limit 或者 skip 一样。

    清单 17. limit 和 skip 对 sorted 后的运行次数无影响

    List<Person> persons = new ArrayList();
     for (int i = 1; i <= 5; i++) {
     Person person = new Person(i, "name" + i);
     persons.add(person);
     }
    List<Person> personList2 = persons.stream().sorted((p1, p2) -> 
    p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList());
    System.out.println(personList2);

    上面的示例对清单 13 做了微调,首先对 5 个元素的 Stream 排序,然后进行 limit 操作。输出结果为:

    name2
    name1
    name3
    name2
    name4
    name3
    name5
    name4
    [stream.StreamDW$Person@816f27d, stream.StreamDW$Person@87aac27]

    即虽然最后的返回元素数量是 2,但整个管道中的 sorted 表达式执行次数没有像前面例子相应减少。

    最后有一点需要注意的是,对一个 parallel 的 Steam 管道来说,如果其元素是有序的,那么 limit 操作的成本会比较大,因为它的返回对象必须是前 n 个也有一样次序的元素。取而代之的策略是取消元素间的次序,或者不要用 parallel Stream。

    sorted


    对 Stream 的排序通过 sorted 进行,它比数组的排序更强之处在于你可以首先对 Stream 进行各类 map、filter、limit、skip 甚至 distinct 来减少元素数量后,再排序,这能帮助程序明显缩短执行时间。我们对清单 14 进行优化:

    清单 18. 优化:排序前进行 limit 和 skip

    List<Person> persons = new ArrayList();
     for (int i = 1; i <= 5; i++) {
     Person person = new Person(i, "name" + i);
     persons.add(person);
     }
    List<Person> personList2 = persons.stream().limit(2).sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).collect(Collectors.toList());
    System.out.println(personList2);

    结果会简单很多:

    name2
    name1
    [stream.StreamDW$Person@6ce253f1, stream.StreamDW$Person@53d8d10a]

    当然,这种优化是有 business logic 上的局限性的:即不要求排序后再取值。
    min/max/distinct


    min 和 max 的功能也可以通过对 Stream 元素先排序,再 findFirst 来实现,但前者的性能会更好,为 O(n),而 sorted 的成本是 O(n log n)。同时它们作为特殊的 reduce 方法被独立出来也是因为求最大最小值是很常见的操作。

    清单 19. 找出最长一行的长度

    BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log"));
    int longest = br.lines().
     mapToInt(String::length).
     max().
     getAsInt();
    br.close();
    System.out.println(longest);

    下面的例子则使用 distinct 来找出不重复的单词。

    清单 20. 找出全文的单词,转小写,并排序

    List<String> words = br.lines().
     flatMap(line -> Stream.of(line.split(" "))).
     filter(word -> word.length() > 0).
     map(String::toLowerCase).
     distinct().
     sorted().
     collect(Collectors.toList());
    br.close();
    System.out.println(words);

    Match


    Stream 有三个 match 方法,从语义上说:

    • allMatch:Stream 中全部元素符合传入的 predicate,返回 true
    • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true
    • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true
      它们都不是要遍历全部元素才能返回结果。例如 allMatch 只要一个元素不满足条件,就 skip 剩下的所有元素,返回 false。对清单 13 中的 Person 类稍做修改,加入一个 age 属性和 getAge 方法。

    清单 21. 使用 Match

    List<Person> persons = new ArrayList();
    persons.add(new Person(1, "name" + 1, 10));
    persons.add(new Person(2, "name" + 2, 21));
    persons.add(new Person(3, "name" + 3, 34));
    persons.add(new Person(4, "name" + 4, 6));
    persons.add(new Person(5, "name" + 5, 55));
    boolean isAllAdult = persons.stream().
     allMatch(p -> p.getAge() > 18);
    System.out.println("All are adult? " + isAllAdult);
    boolean isThereAnyChild = persons.stream().
     anyMatch(p -> p.getAge() < 12);
    System.out.println("Any child? " + isThereAnyChild);

    输出结果:

    All are adult? false
    Any child? true

    进阶:自己生成流

    Stream.generate


    通过实现 Supplier 接口,你可以自己来控制流的生成。这种情形通常用于随机数、常量的 Stream,或者需要前后元素间维持着某种状态信息的 Stream。把 Supplier 实例传递给 Stream.generate() 生成的 Stream,默认是串行(相对 parallel 而言)但无序的(相对 ordered 而言)。由于它是无限的,在管道中,必须利用 limit 之类的操作限制 Stream 大小。

    清单 22. 生成 10 个随机整数

    Random seed = new Random();
    Supplier<Integer> random = seed::nextInt;
    Stream.generate(random).limit(10).forEach(System.out::println);
    //Another way
    IntStream.generate(() -> (int) (System.nanoTime() % 100)).
    limit(10).forEach(System.out::println);

    Stream.generate() 还接受自己实现的 Supplier。例如在构造海量测试数据的时候,用某种自动的规则给每一个变量赋值;或者依据公式计算 Stream 的每个元素值。这些都是维持状态信息的情形。

    清单 23. 自实现 Supplier

    Stream.generate(new PersonSupplier()).
    limit(10).
    forEach(p -> System.out.println(p.getName() + ", " + p.getAge()));
    private class PersonSupplier implements Supplier<Person> {
     private int index = 0;
     private Random random = new Random();
     @Override
     public Person get() {
     return new Person(index++, "StormTestUser" + index, random.nextInt(100));
     }
    }

    输出结果:

    StormTestUser1, 9
    StormTestUser2, 12
    StormTestUser3, 88
    StormTestUser4, 51
    StormTestUser5, 22
    StormTestUser6, 28
    StormTestUser7, 81
    StormTestUser8, 51
    StormTestUser9, 4
    StormTestUser10, 76

    iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。

    清单 24. 生成一个等差数列

    Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));

    输出结果:

    0 3 6 9 12 15 18 21 24 27

    与 Stream.generate 相仿,在 iterate 时候管道必须有 limit 这样的操作来限制 Stream 大小。
    进阶:用 Collectors 来进行 reduction 操作
    java.util.stream.Collectors 类的主要作用就是辅助进行各类有用的 reduction 操作,例如转变输出为 Collection,把 Stream 元素进行归组。
    groupingBy/partitioningBy

    清单 25. 按照年龄归组

    Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier()).
     limit(100).
     collect(Collectors.groupingBy(Person::getAge));
    Iterator it = personGroups.entrySet().iterator();
    while (it.hasNext()) {
     Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next();
     System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size());
    }

    上面的 code,首先生成 100 人的信息,然后按照年龄归组,相同年龄的人放到同一个 list 中,可以看到如下的输出:

    Age 0 = 2
    Age 1 = 2
    Age 5 = 2
    Age 8 = 1
    Age 9 = 1
    Age 11 = 2
    ……

    清单 26. 按照未成年人和成年人归组

    Map<Boolean, List<Person>> children = Stream.generate(new PersonSupplier()).
     limit(100).
     collect(Collectors.partitioningBy(p -> p.getAge() < 18));
    System.out.println("Children number: " + children.get(true).size());
    System.out.println("Adult number: " + children.get(false).size());

    输出结果:

    Children number: 23 
    Adult number: 77

    在使用条件“年龄小于 18”进行分组后可以看到,不到 18 岁的未成年人是一组,成年人是另外一组。partitioningBy 其实是一种特殊的 groupingBy,它依照条件测试的是否两种结果来构造返回的数据结构,get(true) 和 get(false) 能即为全部的元素对象。

    结束语

    总之,Stream 的特性可以归纳为:

    • 不是数据结构
    • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
    • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
    • 所有 Stream 的操作必须以 lambda 表达式为参数
    • 不支持索引访问
    • 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
    • 很容易生成数组或者 List
    • 惰性化
    • 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
    • Intermediate 操作永远是惰性化的。
    • 并行能力
    • 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
    • 可以是无限的
    • 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。

    参考

    Java 8 中的 Streams API 详解

    展开全文
  • Java 8 stream的详细用法

    万次阅读 多人点赞 2019-01-08 23:12:43
    Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。使用Stream API 对集合数据进行操作,就类似于使用 SQL 执行的数据库查询。也可以...

    一、概述

    Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。使用Stream API 对集合数据进行操作,就类似于使用 SQL 执行的数据库查询。也可以使用 Stream API 来并行执行操作。简而言之,Stream API 提供了一种高效且易于使用的处理数据的方式。

    特点:

            1 . 不是数据结构,不会保存数据。

            2. 不会修改原来的数据源,它会将操作后的数据保存到另外一个对象中。(保留意见:毕竟peek方法可以修改流中元素

            3. 惰性求值,流在中间处理过程中,只是对操作进行了记录,并不会立即执行,需要等到执行终止操作的时候才会进行实际的计算。

    二、分类

        无状态:指元素的处理不受之前元素的影响;

        有状态:指该操作只有拿到所有元素之后才能继续下去。

        非短路操作:指必须处理所有元素才能得到最终结果;

        短路操作:指遇到某些符合条件的元素就可以得到最终结果,如 A || B,只要A为true,则无需判断B的结果。

    三、具体用法

    1. 流的常用创建方法

    1.1 使用Collection下的 stream() 和 parallelStream() 方法

    List<String> list = new ArrayList<>();
    Stream<String> stream = list.stream(); //获取一个顺序流
    Stream<String> parallelStream = list.parallelStream(); //获取一个并行流

    1.2 使用Arrays 中的 stream() 方法,将数组转成流

    Integer[] nums = new Integer[10];
    Stream<Integer> stream = Arrays.stream(nums);

    1.3 使用Stream中的静态方法:of()、iterate()、generate()

    Stream<Integer> stream = Stream.of(1,2,3,4,5,6);
    
    Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(6);
    stream2.forEach(System.out::println); // 0 2 4 6 8 10
    
    Stream<Double> stream3 = Stream.generate(Math::random).limit(2);
    stream3.forEach(System.out::println);
    

    1.4 使用 BufferedReader.lines() 方法,将每行内容转成流

    BufferedReader reader = new BufferedReader(new FileReader("F:\\test_stream.txt"));
    Stream<String> lineStream = reader.lines();
    lineStream.forEach(System.out::println);

    1.5 使用 Pattern.splitAsStream() 方法,将字符串分隔成流

    Pattern pattern = Pattern.compile(",");
    Stream<String> stringStream = pattern.splitAsStream("a,b,c,d");
    stringStream.forEach(System.out::println);

    2. 流的中间操作

    2.1 筛选与切片
            filter:过滤流中的某些元素
            limit(n):获取n个元素
            skip(n):跳过n元素,配合limit(n)可实现分页
            distinct:通过流中元素的 hashCode() 和 equals() 去除重复元素

    Stream<Integer> stream = Stream.of(6, 4, 6, 7, 3, 9, 8, 10, 12, 14, 14);
    
    Stream<Integer> newStream = stream.filter(s -> s > 5) //6 6 7 9 8 10 12 14 14
            .distinct() //6 7 9 8 10 12 14
            .skip(2) //9 8 10 12 14
            .limit(2); //9 8
    newStream.forEach(System.out::println);

    2.2 映射        
            map:接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
            flatMap:接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。

    List<String> list = Arrays.asList("a,b,c", "1,2,3");
    
    //将每个元素转成一个新的且不带逗号的元素
    Stream<String> s1 = list.stream().map(s -> s.replaceAll(",", ""));
    s1.forEach(System.out::println); // abc  123
    
    Stream<String> s3 = list.stream().flatMap(s -> {
        //将每个元素转换成一个stream
        String[] split = s.split(",");
        Stream<String> s2 = Arrays.stream(split);
        return s2;
    });
    s3.forEach(System.out::println); // a b c 1 2 3

    2.3 排序
            sorted():自然排序,流中元素需实现Comparable接口
            sorted(Comparator com):定制排序,自定义Comparator排序器  

    List<String> list = Arrays.asList("aa", "ff", "dd");
    //String 类自身已实现Compareable接口
    list.stream().sorted().forEach(System.out::println);// aa dd ff
    
    Student s1 = new Student("aa", 10);
    Student s2 = new Student("bb", 20);
    Student s3 = new Student("aa", 30);
    Student s4 = new Student("dd", 40);
    List<Student> studentList = Arrays.asList(s1, s2, s3, s4);
    
    //自定义排序:先按姓名升序,姓名相同则按年龄升序
    studentList.stream().sorted(
            (o1, o2) -> {
                if (o1.getName().equals(o2.getName())) {
                    return o1.getAge() - o2.getAge();
                } else {
                    return o1.getName().compareTo(o2.getName());
                }
            }
    ).forEach(System.out::println);

    2.4 消费
            peek:如同于map,能得到流中的每一个元素。但map接收的是一个Function表达式,有返回值;而peek接收的是Consumer表达式,没有返回值。

    Student s1 = new Student("aa", 10);
    Student s2 = new Student("bb", 20);
    List<Student> studentList = Arrays.asList(s1, s2);
    
    studentList.stream()
            .peek(o -> o.setAge(100))
            .forEach(System.out::println);   
    
    //结果:
    Student{name='aa', age=100}
    Student{name='bb', age=100}            

    3. 流的终止操作

    3.1 匹配、聚合操作
            allMatch:接收一个 Predicate 函数,当流中每个元素都符合该断言时才返回true,否则返回false
            noneMatch:接收一个 Predicate 函数,当流中每个元素都不符合该断言时才返回true,否则返回false
            anyMatch:接收一个 Predicate 函数,只要流中有一个元素满足该断言则返回true,否则返回false
            findFirst:返回流中第一个元素
            findAny:返回流中的任意元素
            count:返回流中元素的总个数
            max:返回流中元素最大值
            min:返回流中元素最小值

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
    
    boolean allMatch = list.stream().allMatch(e -> e > 10); //false
    boolean noneMatch = list.stream().noneMatch(e -> e > 10); //true
    boolean anyMatch = list.stream().anyMatch(e -> e > 4);  //true
    
    Integer findFirst = list.stream().findFirst().get(); //1
    Integer findAny = list.stream().findAny().get(); //1
    
    long count = list.stream().count(); //5
    Integer max = list.stream().max(Integer::compareTo).get(); //5
    Integer min = list.stream().min(Integer::compareTo).get(); //1

    3.2 规约操作
            Optional<T> reduce(BinaryOperator<T> accumulator):第一次执行时,accumulator函数的第一个参数为流中的第一个元素,第二个参数为流中元素的第二个元素;第二次执行时,第一个参数为第一次函数执行的结果,第二个参数为流中的第三个元素;依次类推。
            T reduce(T identity, BinaryOperator<T> accumulator):流程跟上面一样,只是第一次执行时,accumulator函数的第一个参数为identity,而第二个参数为流中的第一个元素。
            <U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner):在串行流(stream)中,该方法跟第二个方法一样,即第三个参数combiner不会起作用。在并行流(parallelStream)中,我们知道流被fork join出多个线程进行执行,此时每个线程的执行流程就跟第二个方法reduce(identity,accumulator)一样,而第三个参数combiner函数,则是将每个线程的执行结果当成一个新的流,然后使用第一个方法reduce(accumulator)流程进行规约。

    //经过测试,当元素个数小于24时,并行时线程数等于元素个数,当大于等于24时,并行时线程数为16
    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24);
    
    Integer v = list.stream().reduce((x1, x2) -> x1 + x2).get();
    System.out.println(v);   // 300
    
    Integer v1 = list.stream().reduce(10, (x1, x2) -> x1 + x2);
    System.out.println(v1);  //310
    
    Integer v2 = list.stream().reduce(0,
            (x1, x2) -> {
                System.out.println("stream accumulator: x1:" + x1 + "  x2:" + x2);
                return x1 - x2;
            },
            (x1, x2) -> {
                System.out.println("stream combiner: x1:" + x1 + "  x2:" + x2);
                return x1 * x2;
            });
    System.out.println(v2); // -300
    
    Integer v3 = list.parallelStream().reduce(0,
            (x1, x2) -> {
                System.out.println("parallelStream accumulator: x1:" + x1 + "  x2:" + x2);
                return x1 - x2;
            },
            (x1, x2) -> {
                System.out.println("parallelStream combiner: x1:" + x1 + "  x2:" + x2);
                return x1 * x2;
            });
    System.out.println(v3); //197474048

    3.3 收集操作
            collect:接收一个Collector实例,将流中元素收集成另外一个数据结构。
            Collector<T, A, R> 是一个接口,有以下5个抽象方法:
                Supplier<A> supplier():创建一个结果容器A
                BiConsumer<A, T> accumulator():消费型接口,第一个参数为容器A,第二个参数为流中元素T。
                BinaryOperator<A> combiner():函数接口,该参数的作用跟上一个方法(reduce)中的combiner参数一样,将并行流中各                                                                 个子进程的运行结果(accumulator函数操作后的容器A)进行合并。
                Function<A, R> finisher():函数式接口,参数为:容器A,返回类型为:collect方法最终想要的结果R。
                Set<Characteristics> characteristics():返回一个不可变的Set集合,用来表明该Collector的特征。有以下三个特征:
                    CONCURRENT:表示此收集器支持并发。(官方文档还有其他描述,暂时没去探索,故不作过多翻译)
                    UNORDERED:表示该收集操作不会保留流中元素原有的顺序。
                    IDENTITY_FINISH:表示finisher参数只是标识而已,可忽略。
            注:如果对以上函数接口不太理解的话,可参考我另外一篇文章:Java 8 函数式接口

    3.3.1 Collector 工具库:Collectors

    Student s1 = new Student("aa", 10,1);
    Student s2 = new Student("bb", 20,2);
    Student s3 = new Student("cc", 10,3);
    List<Student> list = Arrays.asList(s1, s2, s3);
    
    //装成list
    List<Integer> ageList = list.stream().map(Student::getAge).collect(Collectors.toList()); // [10, 20, 10]
    
    //转成set
    Set<Integer> ageSet = list.stream().map(Student::getAge).collect(Collectors.toSet()); // [20, 10]
    
    //转成map,注:key不能相同,否则报错
    Map<String, Integer> studentMap = list.stream().collect(Collectors.toMap(Student::getName, Student::getAge)); // {cc=10, bb=20, aa=10}
    
    //字符串分隔符连接
    String joinName = list.stream().map(Student::getName).collect(Collectors.joining(",", "(", ")")); // (aa,bb,cc)
    
    //聚合操作
    //1.学生总数
    Long count = list.stream().collect(Collectors.counting()); // 3
    //2.最大年龄 (最小的minBy同理)
    Integer maxAge = list.stream().map(Student::getAge).collect(Collectors.maxBy(Integer::compare)).get(); // 20
    //3.所有人的年龄
    Integer sumAge = list.stream().collect(Collectors.summingInt(Student::getAge)); // 40
    //4.平均年龄
    Double averageAge = list.stream().collect(Collectors.averagingDouble(Student::getAge)); // 13.333333333333334
    // 带上以上所有方法
    DoubleSummaryStatistics statistics = list.stream().collect(Collectors.summarizingDouble(Student::getAge));
    System.out.println("count:" + statistics.getCount() + ",max:" + statistics.getMax() + ",sum:" + statistics.getSum() + ",average:" + statistics.getAverage());
    
    //分组
    Map<Integer, List<Student>> ageMap = list.stream().collect(Collectors.groupingBy(Student::getAge));
    //多重分组,先根据类型分再根据年龄分
    Map<Integer, Map<Integer, List<Student>>> typeAgeMap = list.stream().collect(Collectors.groupingBy(Student::getType, Collectors.groupingBy(Student::getAge)));
    
    //分区
    //分成两部分,一部分大于10岁,一部分小于等于10岁
    Map<Boolean, List<Student>> partMap = list.stream().collect(Collectors.partitioningBy(v -> v.getAge() > 10));
    
    //规约
    Integer allAge = list.stream().map(Student::getAge).collect(Collectors.reducing(Integer::sum)).get(); //40

    3.3.2 Collectors.toList() 解析

    //toList 源码
    public static <T> Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                (left, right) -> {
                    left.addAll(right);
                    return left;
                }, CH_ID);
    }
    
    //为了更好地理解,我们转化一下源码中的lambda表达式
    public <T> Collector<T, ?, List<T>> toList() {
        Supplier<List<T>> supplier = () -> new ArrayList();
        BiConsumer<List<T>, T> accumulator = (list, t) -> list.add(t);
        BinaryOperator<List<T>> combiner = (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
        Function<List<T>, List<T>> finisher = (list) -> list;
        Set<Collector.Characteristics> characteristics = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
    
        return new Collector<T, List<T>, List<T>>() {
            @Override
            public Supplier supplier() {
                return supplier;
            }
    
            @Override
            public BiConsumer accumulator() {
                return accumulator;
            }
    
            @Override
            public BinaryOperator combiner() {
                return combiner;
            }
    
            @Override
            public Function finisher() {
                return finisher;
            }
    
            @Override
            public Set<Characteristics> characteristics() {
                return characteristics;
            }
        };
    
    }

     

     

    展开全文
  • stream介绍,以及lambda表达式的使用

    万次阅读 多人点赞 2018-10-31 16:34:50
    Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。 有多种方式生成 Stream Source: 从 Collection 和数组 Collection....

     

    Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

    有多种方式生成 Stream Source:

    • 从 Collection 和数组

      • Collection.stream()

      • Collection.parallelStream()

      • Arrays.stream(T array) or Stream.of()

    • 从 BufferedReader

      • java.io.BufferedReader.lines()

    • 静态工厂

      • java.util.stream.IntStream.range()

      • java.nio.file.Files.walk()

    • 自己构建

      • java.util.Spliterator

    • 其它

      • Random.ints()

      • BitSet.stream()

      • Pattern.splitAsStream(java.lang.CharSequence)

      • JarFile.stream()

    流(stream)的操作类型分为两种:

    • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

    • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

    在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

    还有一种操作被称为 short-circuiting。用以指:

    • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。

    • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。

    流的使用详解

    简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)

    流的构造与转换

    下面提供最常见的几种构造 Stream 的样例。

    // 1. Individual values
    Stream stream = Stream.of("a", "b", "c");
    // 2. Arrays
    String [] strArray = new String[] {"a", "b", "c"};
    stream = Stream.of(strArray);
    stream = Arrays.stream(strArray);
    // 3. Collections
    List<String> list = Arrays.asList(strArray);
    stream = list.stream();

    需要注意的是,对于基本数值型,目前有三种对应的包装类型 Stream:

    IntStream、LongStream、DoubleStream。当然我们也可以用 Stream<Integer>、Stream<Long> >、Stream<Double>,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。

    数值流的构造

    IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
    IntStream.range(1, 3).forEach(System.out::println);
    IntStream.rangeClosed(1, 3).forEach(System.out::println);

    流转换为其它数据结构

    // 1. Array
    String[] strArray1 = stream.toArray(String[]::new);
    // 2. Collection
    List<String> list1 = stream.collect(Collectors.toList());
    List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
    Set set1 = stream.collect(Collectors.toSet());
    Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
    // 3. String
    String str = stream.collect(Collectors.joining()).toString();

    一个 Stream 只可以使用一次,上面的代码只是示例,为了简洁而重复使用了数次(正常开发只能使用一次)。

    流的操作

    接下来,当把一个数据结构包装成 Stream 后,就要开始对里面的元素进行各类操作了。常见的操作可以归类如下。

    • Intermediate:

    map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

    • Terminal:

    forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

    • Short-circuiting:

    anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

    典型用法示例

    map/flatMap

    //转换大写,wordList为单词集合List<String>类型
    List<String> output = wordList.stream().map(String::toUpperCase).collect(Collectors.toList());

    求平方数

    //这段代码生成一个整数 list 的平方数 {1, 4, 9, 16}。
    List<Integer> nums = Arrays.asList(1, 2, 3, 4);
    List<Integer> squareNums = nums.stream().map(n -> n * n).collect(Collectors.toList());

    从上面例子可以看出,map 生成的是个 1:1 映射,每个输入元素,都按照规则转换成为另外一个元素。还有一些场景,是一对多映射关系的,这时需要 flatMap。

    //将最底层元素抽出来放到一起,最终 output 的新 Stream 里面已经没有 List 了,都是直接的数字。
    Stream<List<Integer>> inputStream = Stream.of(
     Arrays.asList(1),
     Arrays.asList(2, 3),
     Arrays.asList(4, 5, 6)
     );
    Stream<Integer> outputStream = inputStream.flatMap((childList) -> childList.stream());
    List<Integer> list =outputStream.collect(Collectors.toList());
    System.out.println(list.toString());

    filter

    filter 对原始 Stream 进行某项测试,通过测试的元素被留下来生成一个新 Stream。

    //留下偶数,经过条件“被 2 整除”的 filter,剩下的数字为 {2, 4, 6}。
    Integer[] sixNums = {1, 2, 3, 4, 5, 6};
    Integer[] evens =Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);
    //把每行的单词用 flatMap 整理到新的 Stream,然后保留长度不为 0 的,就是整篇文章中的全部单词了。
    //REGEXP为正则表达式,具体逻辑具体分析
    List<String> output = reader.lines().
        flatMap(line -> Stream.of(line.split(REGEXP))).
        filter(word -> word.length() > 0).collect(Collectors.toList());

    forEach

    forEach 方法接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式。

    //打印所有男性姓名,roster为person集合类型为List<Pserson>
    // Java 8
    roster.stream().filter(p -> p.getGender() == Person.Sex.MALE).forEach(p -> System.out.println(p.getName()));
    // Pre-Java 8
    for (Person p : roster) {
        if (p.getGender() == Person.Sex.MALE) {
            System.out.println(p.getName());
        }
    }

    当需要为多核系统优化时,可以 parallelStream().forEach(),只是此时原有元素的次序没法保证,并行的情况下将改变串行时操作的行为,此时 forEach 本身的实现不需要调整,而 Java8 以前的 for 循环 code 可能需要加入额外的多线程逻辑。

    另外一点需要注意,forEach 是 terminal 操作,因此它执行后,Stream 的元素就被“消费”掉了,你无法对一个 Stream 进行两次 terminal 运算。下面代码是错误的

    //错误代码示例,一个stream不可以使用两次
    stream.forEach(element -> doOneThing(element));
    stream.forEach(element -> doAnotherThing(element));

    相反,具有相似功能的 intermediate 操作 peek 可以达到上述目的。如下是出现在该 api javadoc 上的一个示例。

    // peek 对每个元素执行操作并返回一个新的 Stream
    Stream.of("one", "two", "three", "four")
         .filter(e -> e.length() > 3)
         .peek(e -> System.out.println("Filtered value: " + e))
         .map(String::toUpperCase)
         .peek(e -> System.out.println("Mapped value: " + e))
         .collect(Collectors.toList());

    findFirst

    这是一个 termimal 兼 short-circuiting 操作,它总是返回 Stream 的第一个元素,或者空。

    这里比较重点的是它的返回值类型:Optional。这也是一个模仿 Scala 语言中的概念,作为一个容器,它可能含有某值,或者不包含。使用它的目的是尽可能避免 NullPointerException。

    String strA = " abcd ", strB = null;
    print(strA);
    print("");
    print(strB);
    getLength(strA);
    getLength("");
    getLength(strB);
    //输出text不为null的值
    public static void print(String text) {
        // Java 8
         Optional.ofNullable(text).ifPresent(System.out::println);
        // Pre-Java 8
         if (text != null) {
            System.out.println(text);
         }
     }
    //输出text的长度,避免空指针
    public static int getLength(String text) {
        // Java 8
        return Optional.ofNullable(text).map(String::length).orElse(-1);
        // Pre-Java 8
        //return if (text != null) ? text.length() : -1;
    }

    在更复杂的 if (xx != null) 的情况中,使用 Optional 代码的可读性更好,而且它提供的是编译时检查,能极大的降低 NPE 这种 Runtime Exception 对程序的影响,或者迫使程序员更早的在编码阶段处理空值问题,而不是留到运行时再发现和调试。

    Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。还有例如 IntStream.average() 返回 OptionalDouble 等等

    reduce

    这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。也有没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional。

    // reduce用例
    // 字符串连接,concat = "ABCD"
    String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat); 
    // 求最小值,minValue = -3.0
    double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min); 
    // 求和,sumValue = 10, 有起始值
    int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
    // 求和,sumValue = 10, 无起始值,返回Optional,所以有get()方法
    sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
    // 过滤,字符串连接,concat = "ace"
    concat = Stream.of("a", "B", "c", "D", "e", "F")
        .filter(x -> x.compareTo("Z") > 0)
        .reduce("", String::concat);

    limit/skip

    limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素(它是由一个叫 subStream 的方法改名而来)。

    // limit 和 skip 对运行次数的影响
    public void testLimitAndSkip() {
         List<Person> persons = new ArrayList();
         for (int i = 1; i <= 10000; i++) {
             Person person = new Person(i, "name" + i);
             persons.add(person);
        }
        List<String> personList2 = persons.stream()
            .map(Person::getName).limit(10).skip(3)
            .collect(Collectors.toList());
        
        System.out.println(personList2);
    }
    private class Person {
        public int no;
        private String name;
        public Person (int no, String name) {
            this.no = no;
            this.name = name;
        }
         public String getName() {
            System.out.println(name);
            return name;
         }
    }
    //结果
    name1
    name2
    name3
    name4
    name5
    name6
    name7
    name8
    name9
    name10
    [name4, name5, name6, name7, name8, name9, name10]
    //这是一个有 10,000 个元素的 Stream,但在 short-circuiting 操作 limit 和 skip 的作用下,管道中 map 操作指定的 getName() 方法的执行次数为 limit 所限定的 10 次,而最终返回结果在跳过前 3 个元素后只有后面 7 个返回。

    有一种情况是 limit/skip 无法达到 short-circuiting 目的的,就是把它们放在 Stream 的排序操作后,原因跟 sorted 这个 intermediate 操作有关:此时系统并不知道 Stream 排序后的次序如何,所以 sorted 中的操作看上去就像完全没有被 limit 或者 skip 一样。

     List<Person> persons = new ArrayList();
     for (int i = 1; i <= 5; i++) {
         Person person = new Person(i, "name" + i);
         persons.add(person);
     }
    List<Person> personList2 = persons.stream().sorted((p1, p2) -> 
    p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList());
    System.out.println(personList2);
    //结果
    name2
    name1
    name3
    name2
    name4
    name3
    name5
    name4
    [stream.StreamDW$Person@816f27d, stream.StreamDW$Person@87aac27]
    //虽然最后的返回元素数量是 2,但整个管道中的 sorted 表达式执行次数没有像前面例子相应减少。

    sorted

    对 Stream 的排序通过 sorted 进行,它比数组的排序更强之处在于你可以首先对 Stream 进行各类 map、filter、limit、skip 甚至 distinct 来减少元素数量后,再排序,这能帮助程序明显缩短执行时间。

    List<Person> persons = new ArrayList();
     for (int i = 1; i <= 5; i++) {
         Person person = new Person(i, "name" + i);
         persons.add(person);
     }
    List<Person> personList2 = persons.stream().limit(2).sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).collect(Collectors.toList());
    System.out.println(personList2);
    //结果
    name2
    name1
    [stream.StreamDW$Person@6ce253f1, stream.StreamDW$Person@53d8d10a]

    min/max/distinct

    min 和 max 的功能也可以通过对 Stream 元素先排序,再 findFirst 来实现,但前者的性能会更好,为 O(n),而 sorted 的成本是 O(n log n)。同时它们作为特殊的 reduce 方法被独立出来也是因为求最大最小值是很常见的操作。

    //找出最长一行的长度
    BufferedReader br = new BufferedReader(new FileReader("c:\\Service.log"));
    int longest = br.lines().mapToInt(String::length).max().getAsInt();
    br.close();
    System.out.println(longest);
    //找出全文的单词,转小写,并排序,使用 distinct 来找出不重复的单词。单词间只有空格
    List<String> words = br.lines()
        .flatMap(line -> Stream.of(line.split(" ")))
        .filter(word -> word.length() > 0)
        .map(String::toLowerCase)
        .distinct().sorted()
        .collect(Collectors.toList());
    br.close();
    System.out.println(words);
    ​

    Match

    Stream 有三个 match 方法,从语义上说:

    • allMatch:Stream 中全部元素符合传入的 predicate,返回 true

    • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true

    • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true

    它们都不是要遍历全部元素才能返回结果。例如 allMatch 只要一个元素不满足条件,就 skip 剩下的所有元素,返回 false。

    //Match使用示例
    List<Person> persons = new ArrayList();
    persons.add(new Person(1, "name" + 1, 10));
    persons.add(new Person(2, "name" + 2, 21));
    persons.add(new Person(3, "name" + 3, 34));
    persons.add(new Person(4, "name" + 4, 6));
    persons.add(new Person(5, "name" + 5, 55));
    boolean isAllAdult = persons.stream().allMatch(p -> p.getAge() > 18);
    System.out.println("All are adult? " + isAllAdult);
    boolean isThereAnyChild = persons.stream().anyMatch(p -> p.getAge() < 12);
    System.out.println("Any child? " + isThereAnyChild);
    //结果
    All are adult? false
    Any child? true

    自己生成流

    通过实现 Supplier 接口,你可以自己来控制流的生成。这种情形通常用于随机数、常量的 Stream,或者需要前后元素间维持着某种状态信息的 Stream。把 Supplier 实例传递给 Stream.generate() 生成的 Stream,默认是串行(相对 parallel 而言)但无序的(相对 ordered 而言)。由于它是无限的,在管道中,必须利用 limit 之类的操作限制 Stream 大小。

    //生成10个随机数
    Random seed = new Random();
    Supplier<Integer> random = seed::nextInt;
    Stream.generate(random).limit(10).forEach(System.out::println);
    //Another way
    IntStream.generate(() -> (int) (System.nanoTime() % 100)).
    limit(10).forEach(System.out::println);

    Stream.generate() 还接受自己实现的 Supplier。例如在构造海量测试数据的时候,用某种自动的规则给每一个变量赋值;或者依据公式计算 Stream 的每个元素值。这些都是维持状态信息的情形。

    Stream.generate(new PersonSupplier()).
        limit(10).forEach(p -> System.out.println(p.getName() + ", " + p.getAge()));
    private class PersonSupplier implements Supplier<Person> {
         private int index = 0;
         private Random random = new Random();
         @Override
         public Person get() {
            return new Person(index++, "StormTestUser" + index, random.nextInt(100));
         }
    }
    //结果
    StormTestUser1, 9
    StormTestUser2, 12
    StormTestUser3, 88
    StormTestUser4, 51
    StormTestUser5, 22
    StormTestUser6, 28
    StormTestUser7, 81
    StormTestUser8, 51
    StormTestUser9, 4
    StormTestUser10, 76

    Stream.iterate

    iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推

    //生成等差数列
    Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));
    //结果
    0 3 6 9 12 15 18 21 24 27

    Stream.generate 相仿,在 iterate 时候管道必须有 limit 这样的操作来限制 Stream 大小。

    用 Collectors 来进行 reduction 操作

    java.util.stream.Collectors 类的主要作用就是辅助进行各类有用的 reduction 操作,例如转变输出为 Collection,把 Stream 元素进行归组。

    groupingBy/partitioningBy

    //按照年龄归组
    Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier())
        .limit(100).collect(Collectors.groupingBy(Person::getAge));
    Iterator it = personGroups.entrySet().iterator();
    while (it.hasNext()) {
         Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next();
         System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size());
    }
    //上面的 code,首先生成 100 人的信息,然后按照年龄归组,相同年龄的人放到同一个 list 中,如下的输出:
    Age 0 = 2
    Age 1 = 2
    Age 5 = 2
    Age 8 = 1
    Age 9 = 1
    Age 11 = 2
    ……
    Map<Boolean, List<Person>> children = Stream.generate(new PersonSupplier())
        .limit(100).collect(Collectors.partitioningBy(p -> p.getAge() < 18));
    System.out.println("Children number: " + children.get(true).size());
    System.out.println("Adult number: " + children.get(false).size());
    //结果
    Children number: 23 
    Adult number: 77

    Stream 的特性可以归纳为:

    • 不是数据结构

    • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。

    • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。

    • 所有 Stream 的操作必须以 lambda 表达式为参数

    • 不支持索引访问

    • 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。

    • 很容易生成数组或者 List

    • 惰性化

    • 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。

    • Intermediate 操作永远是惰性化的。

    • 并行能力

    • 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。

    • 可以是无限的

      • 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。

    展开全文
  • java8新特性 lambda Stream map(函数式编程)

    万次阅读 多人点赞 2019-07-29 19:40:22
    java8新特性 lambda Stream map(函数式编程) 牛刀小试:使用Java8新特性获取股票数据https://blog.csdn.net/u014646662/article/details/82936131 Java8实战.pdf 下载:...
  • java8实战四:使用流--Stream

    千次阅读 2017-11-24 14:18:45
    使用流在本章中,你将会看到许多Stream API支持的许多操作.这些操作能让你快速完成许多复杂的查询.如筛选、切片、映射、查找、匹配和归约。 接下来,我们会看到一些特殊的流:数值流,来自文件和数组等多种来源的流,...
  • 物联网之mjpeg-streamer视频图像的显示

    千次阅读 2018-07-30 07:40:01
    Ubuntu 下调试摄像头  1. 安装 xawtv 测试软件 #sudo apt-get install xawtv 2. 执行 xawtv 后面带 usb 摄像头的设备节点 ...#xawtv /dev/video0 ...make menuconfig //这里我将两种摄像头的移植都放上去 ...
  • mjpeg-streamer视频图像的显示

    千次阅读 热门讨论 2018-12-13 15:34:01
    据说人眼在观察景物时,光信号传人大脑神经,需经过一段短暂的时间,光的作用结束后,视觉形象并不立即消失,这种残留的视觉称“后像”,视觉的这一现象则被称为“视觉暂留”。也就是说,只要1s内变动24帧甚至更多的...
  • mjpg-stream的移植

    万次阅读 2016-12-13 23:49:34
    MJPG简介: MJPG是MJPEG的缩写,但是MJPEG还可以表示文件格式扩展名.  MJPEG  全名为 “Motion Joint Photographic Experts Group”,是一种视频编码格式,  Motion JPEG技术常用与闭合电路的电视摄像机的模拟...
  • Java8中的Stream API详解:Stream的背景及使用

    万次阅读 多人点赞 2018-03-15 12:19:34
     Stream是Java8的一大亮点,是对容器对象功能的增强,它专注于对容器对象进行各种非常便利、高效的 聚合操作(aggregate operation)或者大批量数据操作。Stream API借助于同样新出现的Lambda表达式,极大的提高...
  • Java 8系列之Stream的基本语法详解

    万次阅读 多人点赞 2017-02-10 16:31:34
    概述继Java 8系列之Lambda表达式之后,我们来了解StreamStream 是用函数式编程方式在集合类上进行复杂操作的工具,其集成了Java 8中的众多新特性之一的聚合操作,开发者可以更容易地使用Lambda表达式,并且更方便...
  • 深入理解Java8中Stream的实现原理

    万次阅读 2019-02-25 15:41:38
    2.Stream API 二.Stream的实现原理 1.一种直白的实现方式 2.Stream流水线解决方案 1).操作如何记录 2).操作如何叠加 3).叠加之后的操作如何执行 4).执行后的结果在哪里 一.容器执行Lambda表达式的方式 1....
  • Java Stream流之求和

    万次阅读 2019-07-15 17:56:38
    BigDecimal bb =list.stream().map(Plan::getAmount).reduce(BigDecimal.ZERO,BigDecimal::add); int、double、long: double max = list.stream().mapToDouble(User::getHeight).sum(); ...
  • string 转streamstream转string

    万次阅读 2014-05-26 14:55:17
    string test = “Testing 1-2-3″;...// convert string to stream MemoryStream stream = new MemoryStream(); StreamWriter writer = new StreamWriter( stream ); writer.Write( test ); writer.Flush(); // conver
  • stream 与byte[] 转换

    万次阅读 2017-12-27 18:41:44
    stream与byte[] 转换
  • Java 8 – 将 Stream 转换为 List

    万次阅读 2017-05-25 20:35:15
    一个Java 8 示例将向你展示怎样通过Collectors.toList把 一个 Stream 转换为一个 List 。 Java8Example1.java package com.mkyong.java8; import java.util.List; import java.util.stream.Collectors; ...
  • octet-stream文件怎么看

    万次阅读 2016-09-21 21:29:27
    抓到一个报文中出现文件的类型是octet-stream,一脸懵比,好吧,只能查看下,原来是二进制流,还得再细细品味! 借鉴URL: http://tool.oschina.net/commons
  • 注意使用stringstream时的清空操作

    万次阅读 多人点赞 2010-08-18 15:49:00
    在C++中可以使用stringstream来很方便的进行类型转换,字符串串接,不过注意重复使用同一个stringstream对象时要先继续清空,而清空很容易想到是clear方法,而在stringstream中这个方法实际上是清空stringstream的...
  • java stream toArray()

    万次阅读 2017-11-10 15:39:39
    发现 java 使用 stream 时,经常会将 map 后的数据输入到数组中,这时一般在 stream 语句后面 加上 toArray().
  • nginx stream模块初探

    万次阅读 2017-05-05 19:38:59
    前言nginx从1.9.0开始,新增加了一个stream模块,用来实现四层协议的转发、代理或者负载均衡等。这完全就是抢HAproxy份额的节奏,鉴于nginx在7层负载均衡和web service上的成功,和nginx良好的框架,stream模块前景...
1 2 3 4 5 ... 20
收藏数 713,328
精华内容 285,331
关键字:

stream