精华内容
下载资源
问答
  • 名称:串行数据转换为并行数据 说明:串行数据由 RXD 发送给串 并转换芯片 74164,TXD 则用于输出移位时钟脉冲,74164 将串行输入的 1 字节转换为并行数据,并将转换的数据通过 8 只 LED 显示出来。本例串口工作...
  • 名称:并行数据转换为串行数据 说 明 : 切 换 连 接 到 并 串 转 换 芯 片74LS165 的拨码开关,该芯片将并行数据以串行方式发送到 8051 的 RXD 引脚,移位脉冲由 TXD 提供,显示在 P0 口。
  • protues串行数据转换为并行数据很经典。
  • ps:用于并行数据结构的并行节库
  • 绍基于C6727B的并行数据传输系统的设计,利用其内部的dMAX模块,可以处理来自外部的2个中断,实现从HPI接口和EMIF接口并行传输数据,在数据传输时无需CPU参与;由于并行数据传输存在优先级和总线仲裁,还介绍了多个...
  • 并行数据挖掘研究

    2013-12-15 23:06:07
    并行数据挖掘自算法的一些探讨,对数据挖掘研究有一些帮助。
  • 为了解决云粒子测量系统中并行数据采集通道之间的信息交互问题,采用LabVIEW中库函数节点、消息队列和状态机相结合的方法,实现了多路不同速率数据的并行采集,试验验证了该并行数据采集方法的可行性.研究结果表明:基于...
  • ad7606-fpga-并行,ad7606并行数据读取,Verilog源码.zip
  • 串行数据转并行数据并行数据转串行数据行转串行代码 module parallel_to_serial(clk,data_in,data_out,state); input clk;input [7:0] data_in;output data_out;output [7:0] state;reg [7:0] state=0;reg data_...
  • 并行数据转换为串行数据_74ls165 由拨马开关控制并行数据状态,通过74LS165转串口输出 在由8位LED灯的亮灭来显示当前的拨码开关的状态 源程序+Proteus的仿真文件 可以帮助你很好的学习哦
  • 为了解决云粒子测量系统中并行数据采集通道之间的信息交互问题,采用LabVIEW中库函数节点、消 息队列和状态机相结合的方法,实现了多路不同速率数据的并行采集,试验验证了该并行数据采集方法的可行性. 研究结果表明:...
  • 多路并行数据采集系统设计;多路并行数据采集系统设计;多路并行数据采集系统设计/
  • 单片机89C51并行数据采集技术;单片机89C51并行数据采集技术
  • 基于Hadoop平台的并行数据挖掘算法工具箱与数据挖掘云.pdf
  • 提出了一款基于Hadoop的并行数据分析系统―――PDM.该系统拥有大量以MapReduce为计算框架的并行数据分析算法,不仅包括传统的ETL、数据挖掘、数据统计和文本分析算法,还引入了基于图理论的SNA(社会网络分析)算法.详细...
  • 数据集解析器 解析OPUS并行数据集以创建可用于NLP的多语言并行语料库
  • 并行数据转换为串行数据的转换器

    万次阅读 2017-07-24 12:04:16
    这篇文章写一下今天早上设计的并行数据到串行数据的转换器,也算是对并行总线和串行总线一个小小的应用,编码过程中也用到了task。 该转换器主要实现的功能是: 1、把并行地址存入寄存器 2、把并行数据存入寄存器 3...

    这篇文章写一下今天早上设计的并行数据到串行数据的转换器,也算是对并行总线和串行总线一个小小的应用,编码过程中也用到了task。

    该转换器主要实现的功能是:

    1、把并行地址存入寄存器

    2、把并行数据存入寄存器

    3、连接串行单总线

    4、地址的串行输出

    5、数据的串行输出

    6、挂起串行单总线

    7、给信号源应答

    8、让信号源给出下一个操作对象

    9、结束写操作

    该设计利用嵌套的状态机实现,主状态机分为四个状态:idle,addr_write,data_write,stop,主状态机中会涉及到任务shift8out的调用,该任务主要实现并行数据到串行数据的转换,也是由一个状态机实现。下面给出整个设计的代码:

    设计代码:

    ///并串转换器//
    module ps_convertor(clk,rst,data,addr,sda,ack);
    parameter idle=4'b0001,addr_write=4'b0010,data_write=4'b0100,stop=4'b1000;//独热编码
    parameter sh8_start=9'b00000_0001;
    parameter bit6     =9'b00000_0010;
    parameter bit5     =9'b00000_0100;
    parameter bit4     =9'b00000_1000;
    parameter bit3     =9'b00001_0000;
    parameter bit2     =9'b00010_0000;
    parameter bit1     =9'b00100_0000;
    parameter bit0     =9'b01000_0000;
    parameter sh8_stop =9'b10000_0000;
    
              
    input clk,rst;
    input [7:0]addr,data;
    inout sda;//串行总线
    output ack;//应答信号将输入测试模块
    
    reg ack;
    
    reg link_write;//写开关
    reg [7:0] sh8out_buf;//并行总线缓冲器
    
    reg [3:0] mstate;//主状态机的状态寄存器
    reg [8:0] sh8_state;//并串转换状态机状态寄存器
    
    reg FF;//标志寄存器,用来表示任务是否完成
    
    assign sda=(link_write)?sh8out_buf[7]:1'hz;//串行总线数据传输
    
    always@(posedge clk)
    begin
     if(!rst)//同步复位
     begin
      mstate<=idle;
      link_write<=0;
      FF<=0;
      sh8out_buf<=0;
      //sh8_state<=sh8_start;
      ack<=0;
     end
     else 
     begin
     case (mstate)
      idle: begin
             mstate<=addr_write;
    		 link_write<=0;
    		 FF<=0;
    		 sh8out_buf<=addr;
    		 sh8_state<=sh8_start;
    		 ack<=0;
            end
      addr_write: 
            begin
    		 if(FF==0)
    		  begin
    		   shift8out;
    		  end
    		 else 
    		  begin
    		   FF<=0;
    		   mstate<=data_write;
    		   sh8out_buf<=data;
    		   sh8_state<=sh8_start;
    		  end
    		end
      data_write:
            begin
    		 if(FF==0)
    		  begin
    		   shift8out;
    		  end
    		 else 
    		  begin
    		   FF<=0;
    		   mstate<=stop;
    		   ack<=1;
    		  end
    		end
      stop: begin
             ack<=0;
    		 mstate<=idle;
            end
      //default: mstate<=idle;       
     endcase
     end
    end
    //并串转换模块
    task shift8out;
     begin
      case(sh8_state)
       sh8_start: begin
                   link_write<=1;
    			   sh8_state<=bit6;
                  end
       bit6:      begin
                   //link_write<=1;
                   sh8_state<=bit5;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit5:      begin
                   sh8_state<=bit4;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit4:      begin
                   sh8_state<=bit3;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit3:      begin
                   sh8_state<=bit2;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit2:      begin
                   sh8_state<=bit1;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit1:      begin
                   sh8_state<=bit0;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       bit0:      begin
                   sh8_state<=sh8_stop;
    			   sh8out_buf<=sh8out_buf<<1;
                  end
       sh8_stop: begin 
                  FF<=1;
    			  sh8_state<=sh8_start;
    			  link_write<=0;
    			 end
    			  
      endcase
     end
    endtask
    
    endmodule
    

    测试模块:

    `timescale 1ns/1ns
    `define half_cycle 10
    module signal;
    reg clk,rst;
    reg [7:0]data,addr;
    wire sda,ack;
    always #(`half_cycle) clk=~clk;
    initial 
    begin
      rst=1;
      clk=0;
      data=8'b1010_1010;
      addr=0;
      #100 rst=0;
      #100 rst=1;
      #(200* `half_cycle)$stop;
    end
    
    always @(posedge ack)
    begin
     data=data+1;
     addr=addr+1;
    end
     ps_convertor m(clk,rst,data,addr,sda,ack);
    endmodule
    


    最终在modelsim中得到的仿真图如下:



    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    展开全文
  • 电法勘探中并行数据采集与传统数据采集效率的比较研究
  • 包含:微软Big-Data-and-PDW-Solution并行数据仓库解决方案.pdf 和微软大数据库解决方案手册.pdf 面对快速增长的数据量,企业需要具备切合实际、及时的业务洞察力。 通过使用 SQL Server 中的并行数据仓库及其大规模...
  • 目前并行数据库的研究已经进入了实际应用阶段,而数据仓库的大数据量处理更需要并行处理能力的支持。针对数据仓库的特点,提出了一种可操作的并行数据划分方法和物理存储方案,同时对基于该种数据存储的数据操作做...
  • 并行传输数据和串行传输数据SATA hard drive connections are faster than older PATA hard drive connections and the same can be said for external cabling standards, but this is counter-intuitive: why ...
    并行传输数据和串行传输数据

    并行传输数据和串行传输数据

    SATA hard drive connections are faster than older PATA hard drive connections and the same can be said for external cabling standards, but this is counter-intuitive: why wouldn’t the parallel transmission be faster?

    SATA硬盘驱动器的连接速度比旧的PATA硬盘驱动器连接要快,对于外部电缆连接标准也可以这么说,但这是违反直觉的:为什么并行传输不会更快?

    Today’s Question & Answer session comes to us courtesy of SuperUser—a subdivision of Stack Exchange, a community-driven grouping of Q&A web sites.

    今天的“问答”环节由SuperUser提供,它是Stack Exchange的一个分支,该社区是由社区驱动的Q&A网站分组。

    问题 (The Question)

    SuperUser reader Modest is curious about the data transfer rates of parallel and serial connections:

    SuperUser阅读器Modest对并行和串行连接的数据传输速率感到好奇:

    Intuitively, you would think that parallel data transmission should be faster than serial data transmission; in parallel you are transferring many bits at the same time, whereas in serial you are doing one bit at a time.

    凭直觉,您会认为并行数据传输应该比串行数据传输快; 并行操作是同时传输许多位,而串行操作是一次传输一位。

    So what makes SATA interfaces faster than PATA, PCI-e devices faster than PCI, and serial ports faster than parallel?

    那么,什么使SATA接口比PATA更快,PCI-e设备比PCI更快,串行端口比并行更快?

    While it’s easy to fall into the reasoning that SATA is newer than PATA, there must be a more concrete mechanism at work than just age.

    虽然很容易得出SATA比PATA更新的理由,但在工作中必须有一种比年龄更具体的机制。

    答案 (The Answer)

    SuperUser contributor Mpy offers some insight into the nature of the transmission types:

    超级用户贡献者Mpy对传输类型的性质提供了一些见解:

    You cannot formulate it this way.

    您不能以此方式制定。

    Serial transmission is slower than parallel transmission given the same signal frequency. With a parallel transmission you can transfer one word per cycle (e.g. 1 byte = 8 bits) but with a serial transmission only a fraction of it (e.g. 1 bit).

    相同的信号频率下,串行传输比并行传输要慢 在并行传输中,您可以每个周期传输一个字(例如1字节= 8位),而在串行传输中,仅传输其一小部分(例如1位)。

    The reason modern devices use serial transmission is the following:

    现代设备使用串行传输的原因如下:

    • You cannot increase the signal frequency for a parallel transmission without limit, because, by design, all signals from the transmitter need to arrive at the receiver at the same time. This cannot be guaranteed for high frequencies, as you cannot guarantee that the signal transit time is equal for all signal lines (think of different paths on the mainboard). The higher the frequency, the more tiny differences matter. Hence the receiver has to wait until all signal lines are settled — obviously, waiting lowers the transfer rate.

      您可以在不增加信号频率并行传输无极限,因为按照设计,从发射器需要的所有信号在接收器在同一时间到达。 这不能保证在高频下使用,因为您不能保证所有信号线的信号传输时间都相等(请考虑主板上的不同路径)。 频率越高,差异越小。 因此,接收器必须等到所有信号线都建立好之后,显然,等待会降低传输速率。

    • Another good point (from this post) is that one needs to consider crosstalk with parallel signal lines. The higher the frequency, the more pronounced crosstalk gets and with it the higher the probability of a corrupted word and the need to retransmit it. [1]

      另一个好处(来自本文)是,需要考虑与并行信号线的串扰。 频率越高,串扰越明显,随之而来的单词损坏和重传的可能性也越高。 [1]

    So, even if you transfer less data per cycle with a serial transmission, you can go to much higher frequencies which results in a higher net transfer rate.

    因此,即使您使用串行传输每个周期传输较少的数据,您也可以使用更高的频率,从而导致更高的净传输速率。

    [1] This also explains why UDMA-Cables (Parallel ATA with increased transfer speed) had twice as many wires as pins. Every second wire was grounded to reduce crosstalk.

    [1]这也解释了为什么UDMA电缆(具有更高传输速度的并行ATA)的导线数是引脚的两倍。 每隔两根导线接地,以减少串扰。

    Scott Chamberlain echoes Myp’s answer and expands upon the economics of design:

    斯科特·张伯伦(Scott Chamberlain)回应了Myp的回答,并扩展了设计经济学:

    The problem is synchronization.

    问题是同步。

    When you send in parallel you must measure all of the lines at the exact same moment, as you go faster the size of the window for that moment gets smaller and smaller, eventually it can get so small that some of the wires may still be stabilizing while others are finished before you ran out of time.

    并行发送时,必须在同一时刻测量所有线路,因为随着速度的加快,该时刻的窗口尺寸会越来越小,最终可能会变得很小,以至于某些电线可能仍在稳定而其他人则在您没时间用完之前就完成了。

    By sending in serial you no longer need to worry about all of the lines stabilizing, just one line. And it is more cost efficient to make one line stabilize 10 times faster than to add 10 lines at the same speed.

    通过串行发送,您无需担心所有线路都稳定下来,只需担心一条线路。 而且,使一条线的稳定速度比以相同速度添加10条线的速度快10倍,具有更高的成本效益。

    Some things like PCI Express do the best of both worlds, they do a parallel set of serial connections (the 16x port on your motherboard has 16 serial connections). By doing that each line does not need to be in perfect sync with the other lines, just as long as the controller at the other end can reorder the “packets” of data as they come in using the correct order.

    诸如PCI Express之类的东西在两全其美方面发挥了最大作用,它们完成了一组并行的串行连接(主板上的16x端口具有16个串行连接)。 通过这样做,只要另一端的控制器可以按照正确的顺序对数据的“数据包”进行重新排序,就不必与其他行完美同步。

    The How Stuff Works page for PCI-Express does a very good explination in depth on how PCI Express in serial can be faster than PCI or PCI-X in parallel.

    PCI-Express的“工作原理”页面对串行PCI Express如何比并行PCI或PCI-X更快提供了很好的深度解释。

    TL;DR Version: It is easier to make a single connection go 16 times faster than 8 connections go 2 times faster once you get to very high frequencies.

    TL; DR版本:一旦到达非常高的频率,使单个连接的传输速度比8个连接的传输速度快2倍就容易了。



    Have something to add to the explanation? Sound off in the the comments. Want to read more answers from other tech-savvy Stack Exchange users? Check out the full discussion thread here.

    有什么补充说明吗? 在评论中听起来不对。 是否想从其他精通Stack Exchange的用户那里获得更多答案? 在此处查看完整的讨论线程

    翻译自: https://www.howtogeek.com/171947/why-is-serial-data-transmission-faster-than-parallel-data-transmission/

    并行传输数据和串行传输数据

    展开全文
  • Google云计算原理-并行数据处理模型MapReduce中文PPT
  • 根据标准模板库函数建立交通网络仿真的共享并行数据结构, 并利用SQL 数据库技术进行并行数据结构的存取, 以减少对内存资源的占用, 方便各仿真单元对网络数据的同步访问。最后以24 个交叉口组成的交通网络为例进行...
  • 如何在TensorFlow中使用并行数据加载,解决视频读取问题 前言 在TensorFlow中自带有queue和TFrecord以用为异步并行加载数据,以提高整体系统的性能,但是有些情况下,并不需要或者不能用TFrecord,这个时候,...

    前言

    在TensorFlow中自带有queue和TFrecord以用为异步并行加载数据,以提高整体系统的性能,但是有些情况下,并不需要或者不能用TFrecord,这个时候,可以手动写一个简单的并行加载数据的框架,可以大大提高系统的性能。 如有谬误,请联系指正。转载请注明出处。

    ∇ \nabla 联系方式:

    e-mail: FesianXu@gmail.com

    QQ: 973926198

    github: https://github.com/FesianXu

    知乎专栏: 计算机视觉/计算机图形理论与应用

    微信公众号
    qrcode
    code: https://github.com/FesianXu/Parallel-DataLoader-in-TensorFlow


    为什么需要并行数据加载

    在很多深度学习应用中,特别是涉及到图片甚至是视频的处理的时候,经常需要解码图片和视频格式以将其加载到内存中去,本文以解码avi视频为例子,这个过程通常是很慢的,有时候解码一个批次的视频(如128个视频)甚至会需要3秒钟,相比而言,在GPU比如GTX 1080Ti中对模型进行训练反而不需要那么久,如果采取读一个批次的视频,然后再进行训练这种串行的训练策略,那么整个系统的瓶颈将会受限于整个系统的IO能力和解码能力,倒反而不是网络训练了,这样就有本末倒置之嫌了。解决视频解码慢也可以通过预先将视频解码为图片,然后在训练的过程中读取图片,这会省去解码视频的时间,但是这样大大增大了对硬盘的需求。以前我做过一个实验,6400个avi视频,总大小约为4GB,经过预解码为图片后,体积增加了约10倍!可想而知,如果这个视频量更大,硬盘是很难承受的(比如NTU RGB-D数据集有约50000个RGB高清视频)。于是,这个时候,我们可以考虑并行地加载数据,解码视频。

    系统模型

    接下来我们的讨论将基于假设:

    1. 我们的网络训练是在高性能GPU上完成的,视频解码是在CPU上完成的,于是单次数据读取时间大于单次网络训练时间,既是 t 网 络 训 练 < t 数 据 加 载 t_{网络训练} < t_{数据加载} t<t
    2. 称数据加载为生产者,网络训练为消费者。

    串行的数据加载模型

    serial

    串行数据加载如上图描述,数据加载的工作在CPU上完成,网络训练在GPU上完成,这个时候,很容易观察到在数据加载的时候,GPU是空闲的,而在GPU训练的时候,CPU又是空闲的,因此无论是CPU还是GPU都没有得到充分利用。

    并行的数据加载模型

    parallel

    并行数据加载模型如上图所示,在这个模型中,我们首先需要维护一个全局的FIFO队列,这个队列用于保存视频解码过程中的每一个批次,同时需要产生多个DataLoader线程用于解码数据和将数据入队,最后需要一个主线程,用于模型训练同时负责数据出队。当全局队列为空的时候,出队和计算线程将会被阻塞,直到数据加载线程将数据入队后为止;当全局队列为慢的时候,入队和解码线程将会被阻塞,直到计算线程出队使用了数据为止。这样就构成了一个并行数据加载的模型了。

    实现

    整个过程可以在python中简单实现,我们需要定义一个FIFOqueue类,用于保存数据,如:

    import queue
    class FIFOQueue(object):
      __max_len = None
      __queue = None
      def __init__(self, max_len=5):
        if self.__max_len is None:
          self.__max_len = max_len
        else:
          if self.__max_len is not max_len:
            raise ValueError('The FIFOQueue has been declared yet and max_len is not same!')
    
        if self.__queue is None:
          self.__queue = queue.Queue(maxsize=max_len)
      def enqueue(self, item):
        '''
        put a batch into queue. If the queue is full, then it will be blocked and wait until the queue is not full.
        :param item: a batch with the format of (data_batch, data_label)
        :return: None
        '''
        self.__queue.put(item)
      def dequeue(self):
        '''
        pop a batch from queue. If the queue is empty then it will be blocked till the queue is not empty.
        :return: the batch with the format of (data_batch, data_label)
        '''
        item = self.__queue.get()
        return item
      def max_len(self):
        return self.__max_len
      def get_len(self):
        return self.__queue.qsize()
    

    可以发现只是对queue的简单封装。

    在最主要的Train类中,如:

    import FIFOqueue as queue
    import threading
    
    class Train(object):
      _train_global_queue = None
      _val_global_queue = None
      _test_global_queue = None
      _threads = []
    
      def __init__(self,
                   main_task,
                   batch_size=32,
                   train_yield=None,
                   val_yield=None,
                   test_yield=None,
                   max_nthread=10,
                   max_len=10):
        self._train_global_queue = queue.FIFOQueue(max_len=max_len) if train_yield is not None else None
        self._val_global_queue = queue.FIFOQueue(max_len=max_len) if val_yield is not None else None
        self._test_global_queue = queue.FIFOQueue(max_len=max_len) if test_yield is not None else None
        # init the global queue and maintain them
        train_threads = [threading.Thread(target=self._data_enqueue,
                         args=(train_yield, batch_size, task_id, 'train_data_load', self._train_global_queue))
        for task_id in range(max_nthread)]
    
        def wrapper_main_task(fn):
          while True:
            fn(self._train_global_queue.dequeue())
    
        self._threads += train_threads
        self._threads += [threading.Thread(target=wrapper_main_task, args=([main_task]))]
    
      def _data_enqueue(self, fn, batch_size, task_id, task_type, queue_h):
        print('here begin the data loading with task_id %d with type %s' % (task_id, task_type))
        while True:
          item = fn()
          item['task_id'] = task_id
          item['task_type'] = task_type
          queue_h.enqueue(item=item)
    
      def start(self):
        for each_t in self._threads:
          each_t.start()
    

    我们实现了刚才说是的并行加载的过程,其中需要注意几点:

    1. _data_enqueue是用于将数据入队列的,其中fn为数据生成器,需要用户自行重写传入。
    2. wrapper_main_task是用于封装主任务的,并且在使得可以在主任务中出队,利用数据。

    具体的使用过程请参考github上的代码,我已经开源到github上了。

    实验效果

    在符合我们的假设的情况下,我们利用time.sleep对生产者和消费者进行模拟,其中消费者延时0.2秒,生产者延时1秒。(在生产者中还进行了简单的文本读取,作为实际例子。)

    可以观察到在单线程的时候其cpu使用率仅为5.3%,在实际print中看到也是数据生成的很慢:
    s_res

    而在开了10个生产者线程之后,cpu使用率变为29.8%,提高了接近6倍。
    p_res

    在开了30个生产者线程之后,cpu使用率变为119.3%提高了20多倍。
    p_res_2

    当然也不是说开越多生产者越好,这个是与具体的任务有关的,但是只要是符合基本假设,都可以有较大幅度的系统提升。

    展开全文
  • Java8-17-Stream 并行数据处理与性能

    万次阅读 2019-03-26 22:12:25
    文章目录并行数据处理与性能并行流例子将顺序流转换为并行流测量流性能测量对前n个自然数求和的函数的性能流并行没有想象中那么好使用更有针对性的方法正确使用并行流高效使用并行流背后的实现原理拓展阅读参考资料...

    并行数据处理与性能

    在前面三章中,我们已经看到了新的 Stream 接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内部迭代能够让原生Java库控制流元素的处理。这种方法让Java程序员无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。

    例如,在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。

    Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。

    在本章中,我们将了解 Stream 接口如何让你不用太费力气就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说,流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的很重要,因为如果你忽视这一方面,就可能因误用而得到意外的(很可能是错的)结果。

    我们会特别演示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰是这些错误且无法解释的结果的根源。

    因此,我们将会学习如何通过实现和使用你自己的Spliterator 来控制这个划分过程。

    并行流

    在第4章的笔记中,我们简要地了解到了 Stream 接口可以让你非常方便地处理它的元素:可以通过对收集源调用 parallelStream 方法来把集合转换为并行流。

    并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。

    例子

    让我们用一个简单的例子来试验一下这个思想。

    假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的 BinaryOperator 来归约这个流,如下所示:

    public static long sequentialSum(long n) {
        // 生成自然数无限流
        return Stream.iterate(1L, i -> i + 1)
                // 限制到前n个数
                .limit(n)
                // 对所有数字求和来归纳流
                .reduce(0L, Long::sum);
    }
    

    用更为传统的Java术语来说,这段代码与下面的迭代等价:

    public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 0; i <= n; i++) {
            result += i;
        }
        return result;
    }
    

    这似乎是利用并行处理的好机会,特别是n很大的时候。

    那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?

    根本用不着担心啦。用并行流的话,这问题就简单多了!

    将顺序流转换为并行流

    我们可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:

    public static long parallelSum(long n) {
        // 生成自然数无限流
        return Stream.iterate(1L, i -> i + 1)
                // 限制到前n个数
                .limit(n)
                // 将流转为并行流
                .parallel()
                // 对所有数字求和来归纳流
                .reduce(0L, Long::sum);
    }
    

    并行流的执行过程:

    并行流的执行过程

    请注意,在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。

    它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。

    类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。

    请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。

    例如,你可以这样做:

    stream.parallel()
            .filter(...)
            .sequential()
            .map(...)
            .parallel()
            .reduce();
    

    但最后一次 parallel 或 sequential 调用会影响整个流水线。

    在本例中,流水线会并行执行,因为最后调用的是它。

    回到我们的数字求和练习,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。

    现在你有三个方法,用三种不同的方式(迭代式、顺序归纳和并行归纳)做完全相同的操作,让我们看看谁最快吧!

    测量流性能

    我们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!

    特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。

    测量对前n个自然数求和的函数的性能

    public static long measurePerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) {
                fastest = duration;
            }
        }
        return fastest;
    }
    

    这个方法接受一个函数和一个 long 作为参数。它会对传给方法的 long 应用函数10次,记录每次执行的时间(以毫秒为单位),并返回最短的一次执行时间。

    • 流顺序执行

    假设你把先前开发的所有方法都放进了一个名为 ParallelStreams 的类,你就可以用这个框架来测试顺序加法器函数对前一千万个自然数求和要用多久:

    System.out.println("Sequential sum done in:" + measurePerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
    

    请注意,我们对这个结果应持保留态度。影响执行时间的因素有很多,比如你的电脑支持多少个内核。

    你可以在自己的机器上跑一下这些代码。在一台i5 6200U 的笔记本上运行它,输出是这样的:

    Sequential sum done in:110 msecs
    
    • for 循环

    用传统 for 循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。

    如果你试着测量它的性能:

    System.out.println("Iterative sum done in:" + measurePerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
    

    将得到:

    Iterative sum done in:4 msecs
    
    • 流并行执行

    现在我们来对函数的并行版本做测试:

    System.out.println("Parallel sum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000) + " msecs");
    

    看看会出现什么情况:

    Parallel sum done in: 525 msecs
    

    流并行没有想象中那么好

    这相当令人失望,求和方法的并行版本比顺序版本要慢很多。

    你如何解释这个意外的结果呢?

    这里实际上有两个问题:

    1. iterate 生成的是装箱的对象,必须拆箱成数字才能求和

    2. 我们很难把 iterate 分成多个独立块来并行执行。

    第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。

    具体来说,iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果。

    这意味着,在这个特定情况下,归纳进程不是像上图那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

    这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。

    如果用得不对(比如采用了一个不易并行化的操作,如 iterate ),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的 parallel 操作时,了解背后到底发生了什么是很有必要的。

    使用更有针对性的方法

    那到底要怎么利用多核处理器,用流来高效地并行求和呢?

    我们在第5章中讨论了一个叫 LongStream.rangeClosed 的方法。

    这个方法与 iterate 相比有两个优点。

    1. LongStream.rangeClosed 直接产生原始类型的 long 数字,没有装箱拆箱的开销。

    2. LongStream.rangeClosed 会生成数字范围,很容易拆分为独立的小块。

    例如,范围120可分为15、610、1115和16~20。

    • 顺序流

    让我们先看一下它用于顺序流时的性能如何,看看拆箱的开销到底要不要紧:

    public static long rangedSum(long n) {
        return LongStream.rangeClosed(1, n)
                .reduce(0L, Long::sum);
    }
    

    这一次的输出是:

    Ranged sum done in: 5 msecs
    

    这个数值流比前面那个用 iterate 工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。

    由此可见,选择适当的数据结构往往比并行化算法更重要。

    • 并行流

    但要是对这个新版本应用并行流呢?

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n)
                .parallel()
                .reduce(0L, Long::sum);
    }
    

    现在把这个函数传给的测试方法:

    System.out.println("Parallel range sum done in:" + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs");
    

    你会得到:

    Parallel range sum done in:2 msecs
    

    ps: 百倍的性能提升。

    amazing!终于,我们得到了一个比顺序执行更快的并行归纳,因为这一次归纳操作可以像并行流执行图那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。

    尽管如此,请记住,并行化并不是没有代价的。

    并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。

    总而言之,很多情况下不可能或不方便并行化。然而,在使用并行 Stream 加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。

    让我们来看一个常见的陷阱。

    正确使用并行流

    错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

    下面是另一种实现对前n个自然数求和的方法,但这会改变一个共享累加器:

    public static long sideEffectSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n)
                .forEach(accumulator::add);
        return accumulator.total;
    }
    
    public static class Accumulator {
        private long total = 0;
    
        public void add(long value) {
            total += value;
        }
    }
    

    这种代码非常普遍,特别是对那些熟悉指令式编程范式的程序员来说。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。

    那这种代码又有什么问题呢?不幸的是,它真的无可救药,因为它在本质上就是顺序的。

    每次访问 total 都会出现数据竞争。如果你尝试用同步来修复,那就完全失去并行的意义了。

    为了说明这一点,让我们试着把 Stream 变成并行的:

    public static long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n)
                .parallel()
                .forEach(accumulator::add);
        return accumulator.total;
    }
    

    执行测试方法,并打印每次执行的结果:

    System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");
    

    你可能会得到类似于下面这种输出:

    Result: 9869563545574
    Result: 12405006536090
    Result: 8268141260766
    Result: 11208597038187
    Result: 12358062322272
    Result: 19218969315182
    Result: 11255083226412
    Result: 25746147125980
    Result: 13327069088874
    SideEffect parallel sum done in: 4 msecs
    

    这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000 差很远。

    这是由于多个线程在同时访问累加器,执行 total += value,而这一句虽然看似简单,却不是一个原子操作。

    问题的根源在于, forEach 中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。

    要是你想用并行 Stream 又不想引发类似的意外,就必须避免这种情况。

    现在你知道了,共享可变状态会影响并行流以及并行计算。

    现在,记住要避免共享可变状态,确保并行 Stream 得到正确的结果。

    接下来,我们会看到一些实用建议,你可以由此判断什么时候可以利用并行流来提升性能。

    高效使用并行流

    一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。

    • 测试验证性能

    如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。

    • 留意装箱

    留意装箱。自动装箱和拆箱操作会大大降低性能。

    Java 8中有原始类型流( IntStream 、LongStream 、 DoubleStream )来避免这种操作,但凡有可能都应该用这些流。

    • 特殊的操作本身

    有些操作本身在并行流上的性能就比顺序流差。

    特别是 limit 和 findFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。

    例如, findAny 会比 findFirst 性能好,因为它不一定要按顺序来执行。你总是可以调用 unordered 方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit 可能会比单个有序流(比如数据源是一个 List )更高效。

    • 总计算成本

    还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。

    • 较小的数据

    对于较小的数据量,选择并行流几乎从来都不是一个好的决定。

    并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
    要考虑流背后的数据结构是否易于分解。

    例如, ArrayList 的拆分效率比 LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。

    另外,用 range 工厂方法创建的原始类型流也可以快速分解。

    • 对于流的操作

    流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个 SIZED 流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。

    • 合并的代价

    还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

    背后的实现原理

    最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。

    并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。

    拓展阅读

    Fork/Join 框架

    参考资料

    《java8 实战》

    【Java8实战】开始使用流

    JDK8 实战系列

    目录导航

    目录导航

    展开全文
  • 74HC165并行数据转串行数据

    千次阅读 2012-11-21 00:53:44
    74HC165 概述 (NXP founded by Philips) ...74HC165是8位并行读取或串行输入移位寄存器,可在末级得到互斥的串行输出(Q7和Q7),当并行读取(PL)输入为低时,从D0到D7口输入的并行数据将被异步地读取进寄存器内。
  • 时钟分配芯片在高速并行数据采集中的应用.pdf
  • java8 Stream多线程并行数据处理

    千次阅读 2018-11-16 13:13:06
    Stream多线程并行数据处理 将一个顺序执行的流转变成一个并发的流只要调用 parallel()方法 public static long parallelSum(long n){ return Stream.iterate(1L, i -&amp;amp;amp;amp;amp;gt; i +1).limit(n...
  • "并口"是指8位数据同时通过并行线传输。这样数据传输速度大大提高,但并行传输线路长度受到限制。"长线"是相对于数据的传输速度而言的。例如,数据传输速率为9 600 b/s时,20 m的电缆即可认为是长线。增加传输线的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 592,911
精华内容 237,164
关键字:

并行数据