-
5 并行数据加载
2017-07-11 22:29:215 并行数据加载 本章简述Greenplum的数据加载特性。 在一个大规模的,TB级的数据仓库上,大量的数据必须在一个较短的维护窗口中加载。Greenplum支持快速的、并行数据加载和外部表功能。管理员可以选择单行错误...5 并行数据加载
本章简述Greenplum的数据加载特性。
在一个大规模的,TB级的数据仓库上,大量的数据必须在一个较短的维护窗口中加载。Greenplum支持快速的、并行数据加载和外部表功能。管理员可以选择单行错误隔离模式的外部表以便将错误的数据过滤到一个单独的表中,同时继续加载正确的格式化的数据行。管理员也可以通过设定加载选项的阈值,以便控制不正确格式化行数导致的Greenplum数据库引擎中止加载操作。
通过与Greenplum数据引擎的并行文件服务器(gpfdist)结合使用外部表,管理员可以实现从Greenplum数据系统最大并行度和负载带宽。
Figure 2: External Tables Using Greenplum ParallelFile Server (gpfdist)
Greenplum的另外一个功能是gpload,运行您在YAML格式的控制文件指定装载任务。你描述在控制文件中的源数据位置,格式,需要转换,参与的主机,数据库的目的地,以及其他细节和gpload执行的负荷。这使您可以描述一个复杂的任务,并在可控,可重复的方式执行。
-
并行数据转换为串行数据的转换器
2017-07-24 12:04:16这篇文章写一下今天早上设计的并行数据到串行数据的转换器,也算是对并行总线和串行总线一个小小的应用,编码过程中也用到了task。 该转换器主要实现的功能是: 1、把并行地址存入寄存器 2、把并行数据存入寄存器 3...这篇文章写一下今天早上设计的并行数据到串行数据的转换器,也算是对并行总线和串行总线一个小小的应用,编码过程中也用到了task。
该转换器主要实现的功能是:
1、把并行地址存入寄存器
2、把并行数据存入寄存器
3、连接串行单总线
4、地址的串行输出
5、数据的串行输出
6、挂起串行单总线
7、给信号源应答
8、让信号源给出下一个操作对象
9、结束写操作
该设计利用嵌套的状态机实现,主状态机分为四个状态:idle,addr_write,data_write,stop,主状态机中会涉及到任务shift8out的调用,该任务主要实现并行数据到串行数据的转换,也是由一个状态机实现。下面给出整个设计的代码:
设计代码:
///并串转换器// module ps_convertor(clk,rst,data,addr,sda,ack); parameter idle=4'b0001,addr_write=4'b0010,data_write=4'b0100,stop=4'b1000;//独热编码 parameter sh8_start=9'b00000_0001; parameter bit6 =9'b00000_0010; parameter bit5 =9'b00000_0100; parameter bit4 =9'b00000_1000; parameter bit3 =9'b00001_0000; parameter bit2 =9'b00010_0000; parameter bit1 =9'b00100_0000; parameter bit0 =9'b01000_0000; parameter sh8_stop =9'b10000_0000; input clk,rst; input [7:0]addr,data; inout sda;//串行总线 output ack;//应答信号将输入测试模块 reg ack; reg link_write;//写开关 reg [7:0] sh8out_buf;//并行总线缓冲器 reg [3:0] mstate;//主状态机的状态寄存器 reg [8:0] sh8_state;//并串转换状态机状态寄存器 reg FF;//标志寄存器,用来表示任务是否完成 assign sda=(link_write)?sh8out_buf[7]:1'hz;//串行总线数据传输 always@(posedge clk) begin if(!rst)//同步复位 begin mstate<=idle; link_write<=0; FF<=0; sh8out_buf<=0; //sh8_state<=sh8_start; ack<=0; end else begin case (mstate) idle: begin mstate<=addr_write; link_write<=0; FF<=0; sh8out_buf<=addr; sh8_state<=sh8_start; ack<=0; end addr_write: begin if(FF==0) begin shift8out; end else begin FF<=0; mstate<=data_write; sh8out_buf<=data; sh8_state<=sh8_start; end end data_write: begin if(FF==0) begin shift8out; end else begin FF<=0; mstate<=stop; ack<=1; end end stop: begin ack<=0; mstate<=idle; end //default: mstate<=idle; endcase end end //并串转换模块 task shift8out; begin case(sh8_state) sh8_start: begin link_write<=1; sh8_state<=bit6; end bit6: begin //link_write<=1; sh8_state<=bit5; sh8out_buf<=sh8out_buf<<1; end bit5: begin sh8_state<=bit4; sh8out_buf<=sh8out_buf<<1; end bit4: begin sh8_state<=bit3; sh8out_buf<=sh8out_buf<<1; end bit3: begin sh8_state<=bit2; sh8out_buf<=sh8out_buf<<1; end bit2: begin sh8_state<=bit1; sh8out_buf<=sh8out_buf<<1; end bit1: begin sh8_state<=bit0; sh8out_buf<=sh8out_buf<<1; end bit0: begin sh8_state<=sh8_stop; sh8out_buf<=sh8out_buf<<1; end sh8_stop: begin FF<=1; sh8_state<=sh8_start; link_write<=0; end endcase end endtask endmodule
测试模块:`timescale 1ns/1ns `define half_cycle 10 module signal; reg clk,rst; reg [7:0]data,addr; wire sda,ack; always #(`half_cycle) clk=~clk; initial begin rst=1; clk=0; data=8'b1010_1010; addr=0; #100 rst=0; #100 rst=1; #(200* `half_cycle)$stop; end always @(posedge ack) begin data=data+1; addr=addr+1; end ps_convertor m(clk,rst,data,addr,sda,ack); endmodule
最终在modelsim中得到的仿真图如下:
-
Java - 并行数据处理和性能
2018-12-11 15:09:24Java - 并行数据处理和性能并行流配置并行流使用的线程池测量流的性能使用更专业的方法正确使用并行流fork/join框架RecursiveTask使用fork/join的最佳实践偷工作Spliterator分割过程Spliterator特征实现自己的...Java - 并行数据处理和性能
并行流
并行流是一个把元素分成多个块的流,每个块用不同的线程处理。可以自动分区,让所有的处理器都忙起来。
假设要写一个方法,接受一个数量n做参数,计算1-n的和。可以这样实现:public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); }
也许可以使用parallel方法,简单地使用并行计算,提高程序性能:
public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
这样,流可能在内部被分成多个块,导致reduction操作可以在不同的块上互不依赖地并行地各自工作。最后,reduction操作组合每个子流的并行reductions的返回值,返回的结果就是整个流的结果。见下面的示意图
实际上,调用parallel方法,流自身不会有任何变化。在内部,设置一个布尔类型的标记,标明你想在并行模式执行操作,接下来的操作都是并行的。
类似地,你也可以使用sequential方法,把并行流转成串行的。你也许认为可以组合这两个方法:stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
但是,最后一次调用parallel或者sequential才会全局地影响管道。上面的例子,管道将被并行地执行。
配置并行流使用的线程池
并行流内部使用ForkJoinPool。默认地,线程数量等于处理器数量(Runtime.getRuntime().availableProcessors())。但是,可以修改系统属性java.util.concurrent.ForkJoinPool.common.parallelism,配置线程数量。
这是全局配置,所以,除非你认为对性能有帮助,否则不要修改。测量流的性能
我们声称并行加法应该比串行的或者自己的迭代方法快。我们可以使用JMH测量一下。这是一个工具,使用基于注解的方法,可以为JVM程序增加
可靠的microbenchmarks。如果使用maven,可以这样引入:<dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>1.21</version> </dependency> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>1.21</version> </dependency>
第一个库是核心实现,第二个包含一个注解处理器,帮助生成JAR文件,通过它可以方便地运行你的benchmark。maven配置里还应该有下面的plugin:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <finalName>benchmarks</finalName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.openjdk.jmh.Main</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
程序代码如下
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; //测量平均时间 @BenchmarkMode(Mode.AverageTime) //以毫秒为单位,打印benchmark结果 @OutputTimeUnit(TimeUnit.MILLISECONDS) //执行两次,增加可靠性。堆空间是4Gb @Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"}) @State(Scope.Benchmark) public class ParallelStreamBenchmark { private static final long N = 10_000_000L; @Benchmark public long sequentialSum() { return Stream.iterate(1L, i -> i + 1).limit(N) .reduce(0L, Long::sum); } //每次执行benchmark后,执行GC @TearDown(Level.Invocation) public void tearDown() { System.gc(); } }
使用大内存,和每次迭代以后试着GC都是为了尽量减少GC的影响。尽管如此,结果应该再加一些盐。很多因素会影响执行时间,比如你的机器有多少核。
默认地,JMH一般先执行5次热身迭代,这样可以让HotSpot优化代码,然后再执行5次迭代用来计算最终的结果。你可以使用-w和-i命令行参数修改这些配置。
在我的机器上,使用JDK 1.8.0_121, Java HotSpot™ 64-Bit Server VM,执行结果是Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.sequentialSum avgt 10 83.565 ± 1.841 ms/op
你应该期望,使用经典的for循环的迭代版本运行得更快,因为它在更低层(level)工作,而且,更重要的是,它不需要执行原始类型的装箱和拆箱操作。我们测试一下这个方法:
@Benchmark public long iterativeSum() { long result = 0; for (long i = 1L; i <= N; i++) { result += i; } return result; }
执行结果是
Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.iterativeSum avgt 10 6.877 ± 0.068 ms/op
证实了我们的期望:迭代版本比串行流快了10倍。让我们使用并行流试一试:
Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.parallelSum avgt 10 110.157 ± 1.882 ms/op
非常令人失望:并行版本的求和一点都没有发挥多核的优势,比串行版还要慢。为什么会这样?有两个问题混在一起:
- 迭代生成了装箱对象,它们在做加法前,必须拆箱成数字
- 迭代很难划分独立的块来并行地执行
第二点是特别有趣的,不是所有的流都是适合并行处理的。特别是,迭代的流就很难,这是因为,函数的输入依赖上一个函数的结果。见下图:
这意味着,reduction过程并没有像第一张图里所表示的那样执行。reduction开始的时候,还没有整个数字列表,所以没法分块。把流标记为并行的,反而增加了在不同线程上执行的求和要被串行处理的负担。
使用更专业的方法
LongStream.rangeClosed方法使用的是原始long类型,所以不用装箱和拆箱。而且,它生产的数的范围,可以很容易地分成不依赖的块。比如,范围1-20可以被分成1-5、6-10、11-15和16-20。
@Benchmark public long rangedSum() { return LongStream.rangeClosed(1, N) .reduce(0L, Long::sum); }
输出是
Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.rangedSum avgt 10 7.660 ± 1.643 ms/op
可以看出来,比并行流快了很多,仅比经典的for循环慢了一点。LongStream支持并行:
@Benchmark public long parallelRangedSum() { return LongStream.rangeClosed(1, N) .parallel() .reduce(0L, Long::sum); }
输出是
Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.parallelRangedSum avgt 10 4.790 ± 5.142 ms/op
可以发现,并行生效了。甚至比for循环还快了1/3。
正确使用并行流
滥用并行流产生错误的主要原因是使用了改变共享状态的算法。下面是一个通过改变共享的累加器来实现前n个自然数求和的例子:
public long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } public class Accumulator { public long total = 0; public void add(long value) { total += value; } }
这种代码很常见,特别对熟悉命令式编程范式的开发者而言。当你迭代数字列表时,经常这样做:初始化一个累加器,遍历元素,使用累加器相加。
这代码有什么错?它是串行的,失去了并行性。让我们试着使用并行流:public long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
多执行几次,你会发现,每次返回的结果都不一样,而且都不是正确的50000005000000。这是因为多线程累加的时候,total += value并不是原子操作。那么怎样才能写出并行情况下,正确的代码呢?
- 如果有怀疑,就做测试
- 注意装箱问题。Java提供的原始类型流(IntStream、LongStream和DoubleStream)可以避免类似的问题,尽量使用他们
- 有些操作使用并行流性能更差。尤其是像limit和findFirst这种依赖元素顺序的操作,使用并行是非常昂贵的。比如,findAny就比findFirst性能好,因为它跟顺序无关。调用unordered方法,可以把一个有顺序的流变成无顺序的流。比如,如果你需要流的N个元素,而你对前M个感兴趣,在一个无顺序的流上调用limit比有顺序的高效
- 如果数据量不大,不要选择并行流
- 要考虑流的底层数据结构的可分解程度。比如,ArrayList比LinkedList分解起来更高效,因为不遍历就可以分割。使用range工厂增加的原始类型流也很容易分割。可以通过实现自己的Spliterator分割流
- 流的特征,以及中间操作如何修改流的元素,会改变分解过程的性能。比如,一个SIZED流可以被分解成两个相等的部分,并且每个部分可以高效得并行处理,但是,filter会过滤掉任何不满足条件的元素,导致流的size成了未知的
- 考虑结束操作是廉价的还是昂贵的merge步骤(比如,Collector的combiner方法)。如果是昂贵的,组合并行结果的代价会比并行流带来的好处还要高
下面的表格,总结一些流在可分解性方面的并行友好性
源 可分解性 ArrayList 优秀 LinkedList 差 IntStream.range 优秀 Stream.iterate 差 HashSet 好 TreeSet 好 fork/join框架
fork/join框架用来递归地把可并行的任务分解成小任务,然后组合每个子任务的结果,以生成总的结果。它实现了ExecutorService接口,这样所有的子任务都在一个线程池(ForkJoinPool)内工作。
RecursiveTask
要向ForkJoinPool提交任务,你不得不增加RecursiveTask的子类-R是并行任务(以及每个子任务)的返回类型,或者
增加RecursiveAction的子类-当没有返回值的时候。要定义RecursiveTask,需要实现它唯一的抽象方法:protected abstract R compute();
该方法定义分割任务和不能继续被分割时处理一个子任务的算法的逻辑。该方法的实现,经常像下面的伪代码:
if (任务足够小,不再被分) { 顺序执行任务 } else { 把任务分成两个子任务 递归地调用本方法,尽量分割每个子任务 等待所有子任务的完成 组合每个子任务的结果 }
可以发现,这是分治算法的并行实现。我们继续求和的例子,演示怎么使用fork/join框架。首先需要扩展RecursiveTask类:
import java.util.concurrent.RecursiveTask; /** * Created by leishu on 18-12-11. */ public class ForkJoinSumCalculator extends RecursiveTask<Long> { //分割任务的阈值 public static final long THRESHOLD = 10_000; //要被求和的数组 private final long[] numbers; private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } //生成子任务的私有构造器 private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { //子任务的大小 int length = end - start; if (length <= THRESHOLD) { return computeSequentially();//小于阈值,不分割 } //增加第一个子任务 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); //异步执行,新的子任务使用ForkJoinPool的另一个线程 leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); //同步执行第二个子任务,允许递归 Long rightResult = rightTask.compute(); //读取第一个子任务的结果,如果没完成就等待 Long leftResult = leftTask.join(); //组合 return leftResult + rightResult; } //顺序执行 private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
然后写一个方法,执行并行求和:
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); }
执行一下,输出如下
Benchmark Mode Cnt Score Error Units ParallelStreamBenchmark.forkJoinSumB avgt 4 28.458 ± 0.602 ms/op
性能不够好,这是因为在ForkJoinSumCalculator使用的是一个long[]。
使用fork/join的最佳实践
- 调用任务的join方法,会阻塞调用者,直到返回结果。所以,要在两个子任务都启动以后在调用它
- 不要在RecursiveTask内使用ForkJoinPool的invoke方法
- 子任务的fork方法是用来做调度的。在两个子任务上直接调用它似乎是很自然的,但是,在其中一个上调用compute效率更高,因为这样能重用相同的线程
偷工作
任务被分给ForkJoinPool里的线程。每个线程有一个保存任务的双端链表,顺序地执行链表中的任务。如果由于某种原因(比如I/O),一个线程完成了分配给他的全部任务,它会随机地从其他线程选择一个队列,从队列的尾部偷一个任务。这个过程会持续,直到所有的队列都空了为止。所以,要有大量的小任务,而不是几个大任务,这样可以更好地平衡线程的负荷。
Spliterator
Spliterator是Java 8 提供的新接口,意思是“splitable iterator”,用来并行地迭代源中的元素。也许你不用开发自己的Spliterator,但是,理解了它,也就明白了并行流是如何工作的。Java 8已经在Collections框架内提供了Spliterator的默认实现。Collection接口有一个default方法spliterator(),它就返回一个Spliterator对象。我们先看看Spliterator接口的定义:
public interface Spliterator<T> { //用来按顺序消费Spliterator的元素,如果还有元素就返回true boolean tryAdvance(Consumer<? super T> action); //把一些元素分到一个新的Spliterator,以允许他们并行处理 Spliterator<T> trySplit(); //剩余的可被遍历的元素数量估值 long estimateSize(); int characteristics(); }
tryAdvance方法的行为类似于迭代器,用来按顺序消费Spliterator的元素,如果还有元素就返回true。trySplit方法
用来把一些元素分到一个新的Spliterator,以允许他们并行处理。分割过程
把一个流分割成多个部分是一个递归过程,如下图所示。首先,在第一个Spliterator上调用trySplit生成一个新的。然后,在这两个Spliterator上调用trySplit,这样产生四个。一直进行下去,直到该方法返回null,标志着不能再被分割。最后,当所有的trySplit都返回null时,递归过程结束。
分割过程也会受到Spliterator的特征(由characteristics方法声明)的影响。
Spliterator特征
characteristics方法返回一个整数,用来更好地控制和优化Spliterator的用法。
Characteristic 描述 ORDERED 元素是有顺序的(比如List),所以Spliterator使用该顺序做遍历和分区 DISTINCT 对于每对遍历的元素x和y,x.equals(y)返回false SORTED 遍历的元素遵循预定义的排序顺序 SIZED 源的size是已知的(比如set),所以estimatedSize()返回的值是精确的 NON-NULL 元素不会为空 IMMUTABLE 源是不可变的,说明遍历的时候,元素不会被增加、修改和删除 CONCURRENT 源是并发安全的,并发修改的时候,不用任何同步 SUBSIZED Spliterator和接下来产生的Spliterator都是SIZED 实现自己的Spliterator
我们开发一个简单的方法,用来计算字符串中的单词数。
public int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) counter++; lastSpace = false; } } return counter; }
要计算的字符串是但丁的“地域”的第一句
public static final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " che la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
注意,两个单词间的空格数是随机的。执行结果
Found 19 words
使用函数式实现
首先需要把字符串转换成一个流。原始类型int、long和double才有原始的的流,所以,我们使用Stream:
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt);
可以使用reduction计算单词数量。当reduce的时候,你不得不携带由两个变量组成的状态:整数型的总数和布尔型的字符是否是空格。因为Java没有tuples,你得增加一个新类-WordCounter-封装状态:
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } //遍历,累加 public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { //如果上一个字符是空格,而当前的不是,就加1 return lastSpace ? new WordCounter(counter + 1, false) : this; } } //组合,求和 public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
下面是遍历一个新字符时,WordCounter的状态图
然后,我们就可以使用流的reduce方法了
private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); }
我们做一下测试
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words");
执行结果是正确的。
并行的实现
我们修改一下代码
System.out.println("Found " + countWords(stream.parallel()) + " words");
执行结果不是找到19个单词了。因为源字符串在随意的位置被分割,一个字符被多次分割。要解决这个问题,就需要实现自己的Spliterator。
class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; private WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { //消费当前字符 action.accept(string.charAt(currentChar++)); //如果还有字符可被消费,返回true return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; //小于阈值,不再分割 if (currentSize < 10) { return null; } //候选的分割位置是字符串的一半长度 for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { //如果是空格,才分割 if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); //当前位置修改为分割位置 currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
然后,我们做测试
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
这回没问题了。
-
Java8-17-Stream 并行数据处理与性能
2019-03-26 22:12:25文章目录并行数据处理与性能并行流例子将顺序流转换为并行流测量流性能测量对前n个自然数求和的函数的性能流并行没有想象中那么好使用更有针对性的方法正确使用并行流高效使用并行流背后的实现原理拓展阅读参考资料...文章目录
并行数据处理与性能
在前面三章中,我们已经看到了新的 Stream 接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内部迭代能够让原生Java库控制流元素的处理。这种方法让Java程序员无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。
例如,在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。
Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。
在本章中,我们将了解 Stream 接口如何让你不用太费力气就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说,流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的很重要,因为如果你忽视这一方面,就可能因误用而得到意外的(很可能是错的)结果。
我们会特别演示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰是这些错误且无法解释的结果的根源。
因此,我们将会学习如何通过实现和使用你自己的Spliterator 来控制这个划分过程。
并行流
在第4章的笔记中,我们简要地了解到了 Stream 接口可以让你非常方便地处理它的元素:可以通过对收集源调用
parallelStream
方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。
例子
让我们用一个简单的例子来试验一下这个思想。
假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的 BinaryOperator 来归约这个流,如下所示:
public static long sequentialSum(long n) { // 生成自然数无限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n个数 .limit(n) // 对所有数字求和来归纳流 .reduce(0L, Long::sum); }
用更为传统的Java术语来说,这段代码与下面的迭代等价:
public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; }
这似乎是利用并行处理的好机会,特别是n很大的时候。
那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?
根本用不着担心啦。用并行流的话,这问题就简单多了!
将顺序流转换为并行流
我们可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:
public static long parallelSum(long n) { // 生成自然数无限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n个数 .limit(n) // 将流转为并行流 .parallel() // 对所有数字求和来归纳流 .reduce(0L, Long::sum); }
并行流的执行过程:
请注意,在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。
它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。
类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。
请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。
例如,你可以这样做:
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
但最后一次 parallel 或 sequential 调用会影响整个流水线。
在本例中,流水线会并行执行,因为最后调用的是它。
回到我们的数字求和练习,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。
现在你有三个方法,用三种不同的方式(迭代式、顺序归纳和并行归纳)做完全相同的操作,让我们看看谁最快吧!
测量流性能
我们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!
特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。
测量对前n个自然数求和的函数的性能
public static long measurePerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) { fastest = duration; } } return fastest; }
这个方法接受一个函数和一个 long 作为参数。它会对传给方法的 long 应用函数10次,记录每次执行的时间(以毫秒为单位),并返回最短的一次执行时间。
- 流顺序执行
假设你把先前开发的所有方法都放进了一个名为 ParallelStreams 的类,你就可以用这个框架来测试顺序加法器函数对前一千万个自然数求和要用多久:
System.out.println("Sequential sum done in:" + measurePerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
请注意,我们对这个结果应持保留态度。影响执行时间的因素有很多,比如你的电脑支持多少个内核。
你可以在自己的机器上跑一下这些代码。在一台i5 6200U 的笔记本上运行它,输出是这样的:
Sequential sum done in:110 msecs
- for 循环
用传统 for 循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。
如果你试着测量它的性能:
System.out.println("Iterative sum done in:" + measurePerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
将得到:
Iterative sum done in:4 msecs
- 流并行执行
现在我们来对函数的并行版本做测试:
System.out.println("Parallel sum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000) + " msecs");
看看会出现什么情况:
Parallel sum done in: 525 msecs
流并行没有想象中那么好
这相当令人失望,求和方法的并行版本比顺序版本要慢很多。
你如何解释这个意外的结果呢?
这里实际上有两个问题:
-
iterate 生成的是装箱的对象,必须拆箱成数字才能求和
-
我们很难把 iterate 分成多个独立块来并行执行。
第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。
具体来说,iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果。
这意味着,在这个特定情况下,归纳进程不是像上图那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。
如果用得不对(比如采用了一个不易并行化的操作,如 iterate ),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的 parallel 操作时,了解背后到底发生了什么是很有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?
我们在第5章中讨论了一个叫 LongStream.rangeClosed 的方法。
这个方法与 iterate 相比有两个优点。
-
LongStream.rangeClosed 直接产生原始类型的 long 数字,没有装箱拆箱的开销。
-
LongStream.rangeClosed 会生成数字范围,很容易拆分为独立的小块。
例如,范围120可分为15、610、1115和16~20。
- 顺序流
让我们先看一下它用于顺序流时的性能如何,看看拆箱的开销到底要不要紧:
public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); }
这一次的输出是:
Ranged sum done in: 5 msecs
这个数值流比前面那个用 iterate 工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。
由此可见,选择适当的数据结构往往比并行化算法更重要。
- 并行流
但要是对这个新版本应用并行流呢?
public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
现在把这个函数传给的测试方法:
System.out.println("Parallel range sum done in:" + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs");
你会得到:
Parallel range sum done in:2 msecs
ps: 百倍的性能提升。
amazing!终于,我们得到了一个比顺序执行更快的并行归纳,因为这一次归纳操作可以像并行流执行图那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。
尽管如此,请记住,并行化并不是没有代价的。
并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。
总而言之,很多情况下不可能或不方便并行化。然而,在使用并行 Stream 加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。
让我们来看一个常见的陷阱。
正确使用并行流
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。
下面是另一种实现对前n个自然数求和的方法,但这会改变一个共享累加器:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } }
这种代码非常普遍,特别是对那些熟悉指令式编程范式的程序员来说。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。
那这种代码又有什么问题呢?不幸的是,它真的无可救药,因为它在本质上就是顺序的。
每次访问 total 都会出现数据竞争。如果你尝试用同步来修复,那就完全失去并行的意义了。
为了说明这一点,让我们试着把 Stream 变成并行的:
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .parallel() .forEach(accumulator::add); return accumulator.total; }
执行测试方法,并打印每次执行的结果:
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");
你可能会得到类似于下面这种输出:
Result: 9869563545574 Result: 12405006536090 Result: 8268141260766 Result: 11208597038187 Result: 12358062322272 Result: 19218969315182 Result: 11255083226412 Result: 25746147125980 Result: 13327069088874 SideEffect parallel sum done in: 4 msecs
这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000 差很远。
这是由于多个线程在同时访问累加器,执行
total += value
,而这一句虽然看似简单,却不是一个原子操作。问题的根源在于, forEach 中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。
要是你想用并行 Stream 又不想引发类似的意外,就必须避免这种情况。
现在你知道了,共享可变状态会影响并行流以及并行计算。
现在,记住要避免共享可变状态,确保并行 Stream 得到正确的结果。
接下来,我们会看到一些实用建议,你可以由此判断什么时候可以利用并行流来提升性能。
高效使用并行流
一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。
- 测试验证性能
如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
- 留意装箱
留意装箱。自动装箱和拆箱操作会大大降低性能。
Java 8中有原始类型流( IntStream 、LongStream 、 DoubleStream )来避免这种操作,但凡有可能都应该用这些流。
- 特殊的操作本身
有些操作本身在并行流上的性能就比顺序流差。
特别是 limit 和 findFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。
例如, findAny 会比 findFirst 性能好,因为它不一定要按顺序来执行。你总是可以调用 unordered 方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit 可能会比单个有序流(比如数据源是一个 List )更高效。
- 总计算成本
还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
- 较小的数据
对于较小的数据量,选择并行流几乎从来都不是一个好的决定。
并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
要考虑流背后的数据结构是否易于分解。例如, ArrayList 的拆分效率比 LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。
另外,用 range 工厂方法创建的原始类型流也可以快速分解。
- 对于流的操作
流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个 SIZED 流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
- 合并的代价
还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
背后的实现原理
最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。
并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。
拓展阅读
参考资料
《java8 实战》
目录导航
-
并行传输数据和串行传输数据_为什么串行数据传输比并行数据传输快?
2020-09-20 01:53:52并行传输数据和串行传输数据SATA hard drive connections are faster than older PATA hard drive connections and the same can be said for external cabling standards, but this is counter-intuitive: why ... -
74HC165并行数据转串行数据
2012-11-21 00:53:4474HC165 概述 (NXP founded by Philips) ...74HC165是8位并行读取或串行输入移位寄存器,可在末级得到互斥的串行输出(Q7和Q7),当并行读取(PL)输入为低时,从D0到D7口输入的并行数据将被异步地读取进寄存器内。 -
并行数据流转化为一种特殊串行数据流模块的设计
2018-07-14 21:14:38设计两个可综合的电路模块:第一个模块(M1)接受四位并行数据,并将其转化为简化I2C传输格式。sclk为输入主钟,data[3:0]为输入的四位数据,ack为请求发送数据信号(请求后才有数据发送到data[3:0]),数据流用scl... -
并行数据文件系统与计算的高性能集成
2018-10-10 11:55:39在分布式数据存储和计算集成中,并行计算和并行数据访问是基本的优化方法,但是如何能让作业和任务高度并行,在实际应用场景中,由于分布式任务调度和数据本地性没有办法做到完美结合,性能会因此丢失。本文从分布式... -
java8 Stream多线程并行数据处理
2018-11-16 13:13:06Stream多线程并行数据处理 将一个顺序执行的流转变成一个并发的流只要调用 parallel()方法 public static long parallelSum(long n){ return Stream.iterate(1L, i -&amp;amp;amp;amp;gt; i +1).limit(n... -
(8)flink的并行数据流,task与operator chains
2019-09-05 00:13:51文章目录并行数据流One-to-oneRedistributingtask与operator chains 并行数据流 Flink程序的执行具有并行、分布式的特性。在执行过程中,一个 stream 包含一个或多个 stream partition ,而每一个 operator 包含一个... -
如何在TensorFlow中使用并行数据加载,解决视频读取问题
2018-05-14 16:34:04如何在TensorFlow中使用并行数据加载,解决视频读取问题 前言 在TensorFlow中自带有queue和TFrecord以用为异步并行加载数据,以提高整体系统的性能,但是有些情况下,并不需要或者不能用TFrecord,这个时候,... -
Hive和并行数据仓库的比较
2011-12-29 16:22:23看到一篇比较Hive和并行数据仓库的比较文章(http://db3.javaeye.com/blog/807787),写得比较犀利,转载如下:...最近分析和比较了Hive和并行数据仓库的架构,本文记下一些体会。 Hive是架构在Hadoop MapReduce Fram -
基于云计算平台的并行数据挖掘
2012-04-06 21:31:32采用云计算技术,实现海量数据的存储、分析、处理、挖掘,提供高...从系统架构来讲,基于云计算的并行数据挖掘工具平台包括三个层次,依下而上为 分布式计算层;数据挖掘平台层;业务应用层 (1)分布式计算 -
8通道同步并行数据采集PCI模块的设计
2014-05-28 15:55:508通道同步并行数据采集PCI模块的设计 数据采集是自动测试系统的主要功能之一,而在一些应用领域,比如超声、医疗电子中,信号的频率范围不同会要求采样率的不同。有时,为了配合信号处理算法,甚至要求采样率在一定... -
BIEE + Oracle = 并行数据仓库?
2012-06-17 19:45:23最近看了一篇文章,介绍了SQL Server 2008 R2提供的并行数据仓库;看了一下原理,和Greenplunm类似,都是基于massively parallel processing (MPP)架构;主要的思想就是分布和并行。这在Big Data盛行的今天,无疑... -
Hive与并行数据仓库的体系结构比较
2013-01-31 11:33:45最近分析和比较了Hive和并行数据仓库的架构,本文记下一些体会。 Hive是架构在Hadoop MapReduce Framework之上的开源数据分析系统。 Hive具有如下特点: 1. 数据以HDFS文件的形式存储,从而可以很方便的使用... -
标准BT.656并行数据结构
2009-04-23 10:43:00标准BT.656并行数据结构 BT.656并行接口除了传输4:2:2的YCbCr视频数据流外,还有行、列同步所用的控制信号。如图3所示,一帧图像数据由一个625行、每行1 728字节的数据块组成。其中,23~311行是偶数场视频数据,336... -
并行数据转串行数据模块的设计
2017-11-14 19:08:59scl为不断输出的时钟信号,如果scl为高电平时,sda由高变低,穿行数据流开始;如果scl为高电平,sda由低变高,串行数据结束。sda信号的数据值必须在scl高电平之间稳定,在scl低电平时才可以改变,否则的话,立即就... -
基于FPGA并行数据转串行数据(verilog)
2020-09-24 16:24:42//进来的数据和使能打两拍同步一下 always@(posedge clk_50M_i or negedge rst_n_i) if(!rst_n_i)begin data_i_tmp1 ; data_i_tmp2 ; data_en_tmp1 ; data_en_tmp2 ; end else begin data_i_tmp1 ; ... -
USB8616 功能分类: 高速并行数据采集卡
2013-04-13 12:44:33高精度数据采集卡USB8616完成8路电压/ICP速度传感器、加速度传感器、力传感器、扭矩传感器等信号采集,8个通道并行数据采集,内外触发、内外时钟。 应用领域 实时振动噪声分析及数据记录 机械振动在线监测分析 ... -
基于GPU实现的高效的并行数据结构
2011-06-27 21:41:00基于GPU实现的高效的并行数据结构(Implementing Efficient Parallel Data Structures on GPUs)现代的GPU,在计算历史中第一次把数据并行、流式计算平台放入几乎每台台式计算机和笔记本电脑中。一些最近的学术派研究... -
基于FPGA串行数据转并行数据(Verilog)
2020-09-24 17:36:57endmodule 仿真截图: 说明:无意中看到了一些博客上的数据串并转换,逻辑上基本都能实现,但绝大部分都不能用,连基本的信号使能、时序打拍都没有,信号命名也很随便,就简单的实现了移位寄存器,直接复用的价值... -
《云计算》教材试读:并行数据处理MapReduce
2010-06-12 22:21:002.2 并行数据处理MapReduceMapReduce是Google提出的一个软件架构,是一种处理海量数据的并行编程模式,用于大规模数据集(通常大于1TB)的并行运算。“Map(映射)”、“Reduce(化简)”的概念和主要思想,都是从... -
C++高性能的并行数据结构
2018-09-04 20:45:19详见:http://libcds.sourceforge.net./ -
【数据分析】Python使用Dask Dataframes并行数据分析
2018-08-27 16:35:28有时你用Python的Pandas打开一个大数据集,尝试获得一些指标,整个事情只是可怕地冻结。 如果您使用大数据,您知道如果...我不久前发现了这个工具:一种加速Python数据分析的方法,无需获得更好的基础设施或切换语言... -
java8——并行数据处理与性能
2018-03-06 11:09:41前言在Java7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分若干子部分;第二,你要给每个字部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件... -
标准 BT656 并行 数据结构 详解
2015-10-31 23:54:16BT.656并行接口除了传输4:2:2的YCbCr视频数据流外,还有行、列同步所用的控制信号。如 图3所示,一帧图像数据由一个625行、每行1 728字节的数据块组成。其中,23~311行是偶数场视频数据,336~624行是奇数场视频... -
java8------Stream多线程并行数据处理
2019-02-16 10:17:16java8推出steam流概念,像管道一样,将数据比作液体,在管道...1.串行流与并行流在数据量不大的前提下,串行处理的效率要比并行操作高.只有在较大数据前提下,才能显现并行的优势. 2.在对有顺序依赖的操作,如limit,fi...
-
分布式一致性算法之Raft
-
56个民族JS数据,可以直接转换成elementUi的多选器组件直接使用
-
完全格的左右半范数
-
MySQL 多实例安装 及配置主从复制实验环境
-
影响培训效果的因素.pdf
-
Windows Embedded Standard 7 快速入门指南1234.pdf
-
用免疫遗传算法搜索克服龙格现象的全局最优参数序列。
-
物联网基础篇:快速玩转MQTT
-
银行培训效果评估方案 .pdf
-
移动开发技术对比(脑图)
-
讲师训-如何成为一个成功的培训师.ppt
-
mfc中CreateThread(),
-
darkweb2017-top1000.txt
-
【硬核】一线Python程序员实战经验分享(1)
-
mysql 多表连接 聚合函数
-
Docker从入门到精通
-
《专业培训师TTT培训》学员手册.ppt
-
修改jar包package目录结构操作方法(解决不同版本jar包冲突的一种方法)
-
16*64点阵 74HC154 ,74HC595
-
Python启蒙到架构师的核心技术精讲课程