精华内容
下载资源
问答
  • 设计模式.net并行编程.pdf 设计模式.net并行编程.pdf 设计模式.net并行编程.pdf
  • 并行编程模式

    2018-06-26 05:14:00
    并行编程模式
  • 基于Linux并行编程模式的研究与实现.pdf
  • 介绍了并行编程的多种设计模式、并行模式语言
  • 并行设计模式能够降低并行编程的难度与复杂度。针对科学与工程计算领域大量耗时的数据密集型应用的计算,提出了一种能够适用于阵列数据划分应用的FJRR并行设计模式;鉴于开发者更习惯串行编程,在FJRR模式中提出并...
  • 并行编程入门

    千次阅读 2016-10-14 20:10:38
    1. 并行编程简介 2. MapReduce 2.1 MapReduce简介 2.2 MapReduce框架 2.3 Hadoop介绍 2.4 Hadoop基本类 2.5 Hadoop编程实例1.并行编程简介1.1.并行编程作用,用途商业用途,科学计算,大数据分析1.2....

    目录

    1. 并行编程简介
    2. MapReduce
      2.1 MapReduce简介
      2.2 MapReduce框架
      2.3 Hadoop介绍
      2.4 Hadoop基本类
      2.5 Hadoop编程实例

    #1.并行编程简介
    ##1.1.并行编程作用,用途
    商业用途,科学计算,大数据分析

    ##1.2.并行编程兴起原因
    目前的串行编程的局限性
    使用的流水线等隐式并行模式的局限性
    硬件的发展
    ##1.3.并行算法设计原则步骤
    a.分析问题
    b.分解问题
            其中分解方法有:
            数据分解
            递归分解
            探测性分解
            推测性分解
            混合分解
    c.根据分解方法,产生任务
    d.将任务映射到处理器上
    e.要注意的问题:
            减少任务之间的交互(任务粒度,任务的依赖)
            负载均衡(静态均衡和动态均衡)

    ###1.4并行算法模型
            数据并行模型
            任务图模型
            工作池模型(任一个任务可映射到任一个处理器上)
            主-从模型
    ###

    ##1.5.基本通信操作(单端口,双向)
    对于不同的操作,对于不同的设备模型,有如下几种组合:
    a.一对多广播及多对一归约(一传到二,然后二传到4)
    环或线性阵列:
    格网:
    超立方体:
    b.多对多广播 及 多对多归约
    环或线性阵列:
    格网:
    超立方体:
    d.全归约 及 前缀和
    环或线性阵列:
    格网:
    超立方体:
    e.散发及 收集归约
    环或线性阵列:
    格网:
    超立方体:
    f.循环移位
    环或线性阵列:
    格网:
    超立方体:

    ##1.6.解析建模
    模型需要考虑的因素有:

    • 开销分析
    • 性能度量
              执行时间,加速比,总并行开销,效率,成本
    • 粒度影响
    • 系统可扩展性
      ###1.7使用消息传递模式编程
      使用MPI API 进行编程
    MPI,消息传递接口
    int MPI_Init(int *argc,char ***grgv)
    
    int MPI_Finalize()
    
    int MPI_Comm_size(MPI_Comm comm,int * size):用size返回comm域中进程数目
    
    int MPI_Comm_rank(MPI_Comm comm,int * rank):用rank返回comm域中进程等级(0-size-1)
    
    int MPI_Send(void buf,int count,MPI_Datatype datatype,int dest,int tag,MPI_comm comm);
    
    int MPI_Recv(void buf,int count,MPI_Datatype datatype,int source,int tag,MPI_comm comm,MPI_status status);
    
    int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) 
    

    ###

    #2.MapReduce
    参考文献:
    http://www.open-open.com/lib/view/open1328763069203.html
    http://www.wnt.com.cn/html/news/tophome/top_xytd/top_xytd_jswz/bbs_service/20130711/111140562.html
    http://blog.csdn.net/geekcome/article/details/9024419
    http://www.cnblogs.com/biyeymyhjob/archive/2012/08/12/2633608.html
    http://www.educity.cn/wenda/578905.html
    http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/#ibm-pcon
    http://wiki.apache.org/hadoop/
    http://hadoop.apache.org/
    http://research.google.com/archive/mapreduce-osdi04.pdf

    IBM Hadoop分布式并行编程系列:
    第一部分:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/
    第二部分:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/
    第三部分:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop3/
    http://www.educity.cn/wenda/578905.html
    http://datalife.iteye.com/blog/930318

    ##2.1MapReduce简介
            MapReduce 是 Google 公司的核心计算模型,它将复杂的运行于大规模集群上的并行计算过程高度的抽象到了两个函数,Map 和 Reduce, 这是一个令人惊讶的简单却又威力巨大的模型。适合用 MapReduce 来处理的数据集(或任务)有一个基本要求: 待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理
    这里写图片描述

            计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 <key, value> 对转换成另一个或一批 <key, value> 对输出。
    这里写图片描述
            以一个计算文本文件中每个单词出现的次数的程序为例,<k1,v1> 可以是 <行在文件中的偏移位置, 文件中的一行>,经 Map 函数映射之后,形成一批中间结果 <单词,出现次数>, 而 Reduce 函数则可以对中间结果进行处理,将相同单词的出现次数进行累加,得到每个单词的总的出现次数。
    基于 MapReduce 计算模型编写分布式并行程序非常简单,程序员的主要编码工作就是实现 Map 和 Reduce 函数,其它的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通信等,均由 MapReduce 框架(比如 Hadoop )负责处理,程序员完全不用操心。

    ##2.2 MapReduce框架
    ### 2.2.1运行架构图
    如下:这里写图片描述

    ### 2.2.流程分析
    1.在客户端启动一个作业。
    2.向JobTracker请求一个Job ID。
    3.将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息(输入划分信息?)。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。
    4.JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度(这里是不是很像微机中的进程调度呢,呵呵),当作业调度器根据自己的调度算法调度到该作业时,**会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。**这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。
    5.TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。
    以上是在客户端、JobTracker、TaskTracker的层次来分析MapReduce的工作原理的,下面我们再细致一点,从map任务和reduce任务的层次来分析分析吧。
    ### 2.2.2.Map、Reduce任务中Shuffle和排序的过程
    流程分析:
    这里写图片描述

    Map端:
            1.每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

            2.在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可能少的数据写入到磁盘。

            3.当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

            4.将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

    到这里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,如果我们这样看:一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个对数据洗牌的过程呢?呵呵。

    Reduce端:
            1.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。
            2.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。
            3.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数

    ##2.3 Hadoop简介
    ###2.3.1 Hadoop是什么
    MapReduce的一个开源实现
    ### 2.3.2数据分布存储
            Hadoop 中的分布式文件系统 HDFS 由一个管理结点 ( NameNode )和N个数据结点 ( DataNode )组成,每个结点均是一台普通的计算机。在使用上同我们熟悉的单机上的文件系统非常类似,一样可以建目录,创建,复制,删除文件,查看文件内容等。但其底层实现上是把文件切割成 Block,然后这些 Block 分散地存储于不同的 DataNode 上,每个 Block 还可以复制数份存储于不同的 DataNode 上,达到容错容灾之目的。NameNode 则是整个 HDFS 的核心,它通过维护一些数据结构,记录了每一个文件被切割成了多少个 Block,这些 Block 可以从哪些 DataNode 中获得,各个 DataNode 的状态等重要信息。
    ###2.3.3 分布式并行计算
            Hadoop 中有一个作为主控的 JobTracker,用于调度和管理其它的 TaskTracker, JobTracker 可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上,即 DataNode 既是数据存储结点,也是计算结点。 JobTracker 将 Map 任务和 Reduce 任务分发给空闲的 TaskTracker, 让这些任务并行运行,并负责监控任务的运行情况。如果某一个 TaskTracker 出故障了,JobTracker 会将其负责的任务转交给另一个空闲的 TaskTracker 重新运行。
    ###2.3.4 本地计算
            数据存储在哪一台计算机上,就由这台计算机进行这部分数据的计算,这样可以减少数据在网络上的传输,降低对网络带宽的需求。在 Hadoop 这样的基于集群的分布式并行系统中,计算结点可以很方便地扩充,而因它所能够提供的计算能力近乎是无限的,但是由是数据需要在不同的计算机之间流动,故网络带宽变成了瓶颈,是非常宝贵的,“本地计算”是最有效的一种节约网络带宽的手段,业界把这形容为“移动计算比移动数据更经济”。

    这里写图片描述

    ###2.3.5 任务粒度

             把原始大数据集切割成小数据集时,通常让小数据集小于或等于 HDFS 中一个 Block 的大小(缺省是 64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。有 M 个小数据集待处理,就启动 M 个 Map 任务,注意这 M 个 Map 任务分布于 N 台计算机上并行运行,Reduce 任务的数量 R 则可由用户指定。
    ###2.3.6 Partition
            把 Map 任务输出的中间结果按 key 的范围划分成 R 份( R 是预先定义的 Reduce 任务的个数),划分时通常使用 hash 函数如: hash(key) mod R,这样可以保证某一段范围内的 key,一定是由一个 Reduce 任务来处理,可以简化 Reduce 的过程。
    ###2.3.7 Combine
            在 partition 之前,还可以对中间结果先做 combine,即将中间结果中有相同 key的 <key, value> 对合并成一对。combine 的过程与 Reduce 的过程类似,很多情况下就可以直接使用 Reduce 函数,但 combine 是作为 Map 任务的一部分,在执行完 Map 函数后紧接着执行的。Combine 能够减少中间结果中 <key, value> 对的数目,从而减少网络流量。

    ###2.3.8 Reduce 任务从 Map 任务结点取中间结果
            Map 任务的中间结果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盘。中间结果文件的位置会通知主控 JobTracker, JobTracker 再通知 Reduce 任务到哪一个 DataNode 上去取中间结果。注意所有的 Map 任务产生中间结果均按其 Key 用同一个 Hash 函数划分成了 R 份,R 个 Reduce 任务各自负责一段 Key 区间。每个 Reduce 需要向许多个 Map 任务结点取得落在其负责的 Key 区间内的中间结果,然后执行 Reduce 函数,形成一个最终的结果文件。
    ###2.3.9 任务管道
            有 R 个 Reduce 任务,就会有 R 个最终结果,很多情况下这 R 个最终结果并不需要合并成一个最终结果。因为这 R 个最终结果又可以做为另一个计算任务的输入,开始另一个并行计算任务
    ###2.3.10

    ##2.4Hadoop基本类
    ###2.4. 1 InputFormat类
    该类的作用是将输入的文件和数据分割成许多小的split文件,并将split的每个行通过LineRecorderReader解析成<Key,Value>,通过job.setInputFromatClass()函数来设置,默认的情况为类TextInputFormat,其中Key默认为字符偏移量,value是该行的值。

    ###2.4.2.Map类
    根据输入的<Key,Value>对生成中间结果,默认的情况下使用Mapper类,该类将输入的<Key,Value>对原封不动的作为中间按结果输出,通过job.setMapperClass()实现。实现Map函数。

    ###2.4.3.Combine类
    实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数

    ###2.4.4.Partitioner类
    该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数
    ###2.4.5.Reducer类
    将中间结果合并,得到中间结果。通过job.setReduceCalss()方法进行设置,默认使用Reducer类,实现reduce方法。
    ###2.4. 6.OutPutFormat类
    该类负责输出结果的格式。可以通过job.setOutputFormatClass()方法进行设置。默认使用TextOUtputFormat类,得到<Key,value>对。

    hadoop主要是上面的六个类进行mapreduce操作,使用默认的类,处理的数据和文本的能力很有限,具体的项目中,用户通过改写这六个类(重载六个类),完成项目的需求。说实话,我刚开始学的时候,我怀疑过Mapreudce处理数据功能,随着学习深入,真的很钦佩mapreduce的设计,基本就二个函数,通过重载,可以完成所有你想完成的工作

    ##2.5Hadoop编程实例
    ###2.5.1 环境搭建
    Cygwin 安装配置

    1. 下载Cygwin安装文件
    2. 运行安装文件,选择一个下载站点,继续
    3. 选择要安装的程序,默认是不安装某些组件,需要手动选择
            Net Category下的:openssh,openssl
            BaseCategory下的:sed (若需要Eclipse,必须sed)
            Devel Category下的:subversion(建议安装)
    4. 等待下载并完成安装,之后,设置环境变量,把 C:/cygwin/bin;C:/cygwin/usr/bin 加入到系统环境变量的Path中
    5. 打开cygwin,输入 ssh-host-config
        当询问if privilege separation should be used 时输入 no . 
        当询问if sshd should be installed as a service 时输入yes . 
        当询问about the value of CYGWIN environment variable enter 时输入 ntsec .
        其余询问均输入 no
    ps:如果电脑上没有 有密码的帐号,配置会不成功。此时,应该创建一个windows 带密码的帐号,也可以通过该配置界面创建一个
    6. 打开 控制面板-》管理-》服务 启动名为 CYGWIN sshd 的服务,亦可在cygwin中输入 cygrunsrv --start sshd 启动sshd,
        输入cygrunsrv --stop sshd停止sshd
    7. 打开cygwin,输入 ssh-keygen,当询问要filenames 和 pass phrases 的时候都点回车,接受默认的值
    8. 命令结束后输入 cd ~/.ssh 转到.ssh目录,输入 ls –l 应该包含两个文件:id_rsa.pub 和 id_rsa
    9. 在第8步的窗口(当前目录在.ssh)中输入 cat id_rsa.pub >> authorized_keys
    10. 输入 ssh localhost 启动SSH
    

    PS:对于window64位系统,开始我安装的是对应的64位的Cygwin,但是配置不成功,会出现如下错误。删除后,重新安装32位的cygwin后,就好了
    这里写图片描述
    在cygwin中开启停用删除服务的命令:
    开启服务: $ net start 服务名
    停止服务: $ net stop 服务名
    删除服务: $ cygrunsrv -R 服务名
    cygwin自带的命令:
    检查所有安装的软件的版本号: $ cygcheck -c
    检查当前Cygwin的版本号: $ cygcheck -c cygwin
    cygwin编译搭建hadoop环境需要安装的软件包:
    1.openssh
    2.openssl
    3.sed
    4.zlib
    4.tcp_wrappers
    5.diffutils
    6.vim
    7.subversion
    cygwin没有自动卸载功能,需要手动操作3个步骤如下:
    1.停止服务: $ net stop 服务名
    2.删除服务: $ cygrunsrv -R 服务名
    3.删除cygwin文件

    伪分布模式配置
    可以把伪分布模式看作是只有一个节点的集群,在这个集群中,这个节点既是Master,也是Slave,既是NameNode,也是DataNode,既是JobTracker,也是TaskTracker
    这种模式也是在一台单机上运行,但用不同的 Java 进程模仿分布式运行中的各类结点 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),请注意分布式运行中的这几个结点的区别:
    从分布式存储的角度来说,集群中的结点由一个 NameNode 和若干个 DataNode 组成, 另有一个 Secondary NameNode 作为 NameNode 的备份。 从分布式应用的角度来说,集群中的结点由一个 JobTracker 和若干个 TaskTracker 组成,JobTracker 负责任务的调度,TaskTracker 负责并行执行任务。TaskTracker 必须运行在 DataNode 上,这样便于数据的本地计算。JobTracker 和 NameNode 则无须在同一台机器上

    hadoop配置文件详解、安装及相关操作
    http://blog.csdn.net/lin_fs/article/details/7349497
    http://blog.csdn.net/ruby97/article/details/7423088
    注意事项:
    对于配置文件的更改,我开始是直接在window下打开修改的,结果出现如下错误:
    这里写图片描述
    原因:在windows下打开修改后,会更改文件的编码方式,使得文件在linux环境下读取错误问题
    解决办法:
    1.使用utraedit工具打开后,转换成Linux编码方式
    2.重新操作,在复制后,不在window下修改,直接在Linux中用vim编辑器打开修改(需要了解vim命令)
    改完之后,重新操作,结果如下:
    这里写图片描述

    关于错误:ipc.Client: Retrying connect to server: localhost/127.0.0.1:9000. Already tried 0 time(s).的错误。hadoop安装完成
    用jps命令,也看不不到namenode的进程, 必须再用命令hadoop namenode format格式化后,才能再使用
    原因是:hadoop默认配置是把一些tmp文件放在/tmp目录下,重启系统后,tmp目录下的东西被清除,所以报错
    解决方法:在conf/core-site.xml (0.19.2版本的为conf/hadoop-site.xml)中增加以下内容

       <property>
        <name>hadoop.tmp.dir</name>
        <value>/var/log/hadoop/tmp</value>
       <description>A base for other temporary directories</description>
       </property>
       重启hadoop后,格式化namenode即可 
    

    测试配置是否成功
    浏览器下查看Hadoop系统情况的地址。
    http://127.0.0.1:50070/ HDFS情况
    http://127.0.0.1:50060/ Task Tracker 情况
    http://127.0.0.1:50030/ Job Tracker-Map/Reduce Administration
    这里写图片描述

    ###2.5. 2 hadoop命令 1. 格式化工作空间 bin/hadoop namenode –format
    1. 启动hdfs
      进入hadoop目录,在bin/下面有很多启动脚本,可以根据自己的需要来启动。

    三、Hadoop hdfs 整合
    可按如下步骤删除和更改hdfs不需要的文件:
    1.将hadoop-core-1.0.0.jar 移动到lib目录下。
    2. 将ibexec目录下的文件移动到bin目录下。
    3. 删除除bin、lib、conf、logs之外的所有目录和文件。
    4. 如果需要修改日志存储路径,则需要在conf/hadoop-env.sh文件中增加:
    export HADOOP_LOG_DIR=/home/xxxx/xxxx即可。
    四、HDFS文件操作
    Hadoop使用的是HDFS,能够实现的功能和我们使用的磁盘系统类似。并且支持通配符,如*。

    1. 查看文件列表
      查看hdfs中/user/admin/hdfs目录下的文件:bin/hadoop fs -ls /user/admin/hdfs
      查看hdfs中/user/admin/hdfs目录下的所有文件(包括子目录下的文件):bin/hadoop fs -lsr /user/admin/hdfs

    2. 创建文件目录
      新建一个叫做newDir的新目录:bin/hadoop fs -mkdir /user/admin/hdfs/newDir

    3. 删除hdfs中/user/admin/hdfs目录下一个名叫needDelete的文件: bin/hadoop fs -rm /user/admin/hdfs/needDelete
      删除hdfs中/user/admin/hdfs目录以及该目录下的所有文件:bin/hadoop fs -rmr /user/admin/hdfs

    4. 上传文件
      上传一个本机/home/admin/newFile的文件到hdfs中/user/admin/hdfs目录下
      sh bin/hadoop fs –put /home/admin/newFile /user/admin/hdfs/

    5. 下载文件
      下载hdfs中/user/admin/hdfs目录下的newFile文件到本机/home/admin/newFile中
      执行sh bin/hadoop fs –get /user/admin/hdfs/newFile /home/admin/newFile

    6. 查看hdfs中/user/admin/hdfs目录下的newFile文件
      bin/hadoop fs –cat /home/admin/newFile

    7.学习各种 HDFS 命令的使用:bin/hadoop dfs –help 可以

    ###2.5. 3 运行 wordcount 应用
    1.将本地文件系统上的 ./test-in 目录拷到 HDFS 的根目录上,目录名改为 input
    $ bin/hadoop dfs -put test input

    2.查看执行结果,将文件从 HDFS 拷到本地文件系统中再查看:
    $ bin/hadoop jar hadoop-0.20.0-examples.jar wordcount input output
    $ bin/hadoop dfs -get output output
    $ cat output/*
    也可以直接查看
    $ bin/hadoop dfs -cat output/*
    $ bin/stop-all.sh #停止 hadoop 进程

    ###2.5.4 eclipse编程环境搭建
    http://www.cnblogs.com/flyoung2008/archive/2011/12/09/2281400.html

    展开全文
  • MPI并行编程入门中国科学院计算机网络信息中心超级计算中心参考材料张林波 清华大学出版社 莫则尧 科学出版社 都志辉 清华大学出版社消息传递平台MPI• 什么是MPI (Message Passing Interface )– 是函数库规范,而...

    MPI并行编程入门

    中国科学院计算机网络信息中心

    超级计算中心

    参考材料

    张林波 清华大学出版社 莫则尧 科学出版社 都志辉 清华大学出版社

    消息传递平台MPI

    • 什么是MPI (Message Passing Interface )

    – 是函数库规范,而不是并行语言;操作如同库函数调用

    – 是一种标准和规范,而非某个对它的具体实现(MPICH等),

    与编程语言无关

    – 是一种消息传递编程模型,并成为这类编程模型的代表

    • What is the message?

    DATA+ENVELOPE

    • MPI的目标

    – 较高的通信性能

    – 较好的程序可移植性

    – 强大的功能

    消息传递平台MPI

    • MPI的产生

    – 1992-1994年,MPI 1.1版本问世

    – 1995-1997年,MPI 2.0版本出现

    • 扩充并行I/O

    • 远程存储访问

    • 动态进程管理等

    • MPI的语言绑定

    – Fortran (科学与工程计算)

    – C (系统和应用程序开发)

    • 主要的MPI实现

    – 并行机厂商提供

    – 高校、科研部门

    • MPICH (/mpi/mpich )

    • LAMMPI (/)

    消息传递平台MPI

    • MPI程序编译与运行

    – 程序编译

    C: %mpicc -o mpiprog mpisrc.c

    Fortran 77: %mpif77 -o mpiprog mpisrc.f

    – 程序运行

    %mpirun -np 4 mpiprog

    •程序执行过程中不能动态改变进程的个数

    •申请的进程数np与实际处理器个数无关

    MPI基础知识

    • 进程与消息传递

    • MPI重要概念

    • MPI函数一般形式

    • MPI原始数据类型

    • MPI程序基本结构

    • MPI几个基本函数

    • 并行编程模式

    进程与消息传递

    • 单个进程(process )

    – 进程与程序相联,程序一旦在操作系统中运行即成为进程。

    进程拥有独立的执行环境(内存、寄存器、程序计数器等)

    ,是操作系统中独立存在的可执行的基本程序单位

    – 串行应用程序编译形成的可执行代码,分为“指令”和“数据”两

    个部分,并在程序执行时“独立地申请和占有” 内存空间,且

    所有计算均局限于该内存空间。

    进程1 进程2

    内存

    进程与消息传递

    • 单机内多个进程

    – 多个进程可同时存在于单机内同一操作系统。操作系统负责

    调度分时共享处理机资源(CPU、内存、存储、外设等)

    – 进程间相互独立(内存空间不相交)。在操作系统调度下各

    自独立地运行,例如多个串行应用程序在同一台计算机运行

    – 进程间可以相互交换信息。例如数据交换、同步等待,消息

    是这些交换信息的基本单位,消息传递是指这些信息在进程

    间的相互交换,是实现进程间通信的唯一方式

    展开全文
  • 根据该模式数值计算的特点提出了一种并行求解三对角方程组的新方法,相对于传统算法编程简单而且并行效率更高;负载平衡是并行程序性能优化首先要解决的问题,以水格点的个数作为任务分解的标准,实现了较好的负载...
  • .Net并行编程高级教程--Parallel http://www.cnblogs.com/stoneniqiu/p/4857021.html 一直觉得自己对并发了解不够深入,特别是看了《代码整洁之道》觉得自己有必要好好学学并发编程, 因为性能也是衡量代码整洁...

    .Net并行编程高级教程--Parallel

    http://www.cnblogs.com/stoneniqiu/p/4857021.html


    一直觉得自己对并发了解不够深入,特别是看了《代码整洁之道》觉得自己有必要好好学学并发编程,

    因为性能也是衡量代码整洁的一大标准。而且在《失控》这本书中也多次提到并发,不管是计算机还是

    生物都并发处理着各种事物。人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现

    那个事情。所以好奇心驱使下学习并发。便有了此文。

    一、理解硬件线程和软件线程

         多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指

    令能够同时并行运行。硬件线程也称为逻辑内核,一个物理内核可以使用超线程技术提供多个硬件线程

    。所以一个硬件线程并不代表一个物理内核;Windows中每个运行的程序都是一个进程,每一个进程都会

    创建并运行一个或多个线程,这些线程称为软件线程。硬件线程就像是一条泳道,而软件线程就是在其

    中游泳的人。

    二、并行场合

        .Net Framework4 引入了新的Task Parallel Library(任务并行库,TPL),它支持数据并行、任务

    并行和流水线。让开发人员应付不同的并行场合。

    数据并行:有大量数据需要处理,并且必须对每一份数据执行同样的操作。比如通过256bit的密钥对100

    个Unicode字符串进行AES算法加密。
    任务并行:通过任务并发运行不同的操作。例如生成文件散列码,加密字符串,创建缩略图。
    流水线:这是任务并行和数据并行的结合体。
        TPL引入了System.Threading.Tasks ,主类是Task,这个类表示一个异步的并发的操作,然而我们不

    一定要使用Task类的实例,可以使用Parallel静态类。它提供了Parallel.Invoke, Parallel.For 

    Parallel.Forecah 三个方法。

    三、Parallel.Invoke

         试图让很多方法并行运行的最简单的方法就是使用Parallel类的Invoke方法。例如有四个方法:

    WatchMovie
    HaveDinner
    ReadBook
    WriteBlog
        通过下面的代码就可以使用并行。

     System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
      这段代码会创建指向每一个方法的委托。Invoke方法接受一个Action的参数组。

    1
    public static void Invoke(params Action[] actions);
      用lambda表达式或匿名委托可以达到同样的效果。

    System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => 
    ReadBook(), delegate() { WriteBlog(); });
     1.没有特定的执行顺序。

       Parallel.Invoke方法只有在4个方法全部完成之后才会返回。它至少需要4个硬件线程才足以让这4个
    方法并发运行。但并不保证这4个方法能够同时启动运行,如果一个或者多个内核处于繁忙状态,那么底
    层的调度逻辑可能会延迟某些方法的初始化执行。

    给方法加上延时,就可以看到必须等待最长的方法执行完成才回到主方法。

    这样会造成很多逻辑内核处于长时间闲置状态。

    四、Parallel.For

    Parallel.For为固定数目的独立For循环迭代提供了负载均衡 (即将工作分发到不同的任务中执行,这样
    所有的任务在大部分时间都可以保持繁忙) 的并行执行。从而能尽可能地充分利用所有的可用的内核。
    我们比较下下面两个方法,一个使用For循环,一个使用Parallel.For  都是生成密钥在转换为十六进制
    字符串。

     private static void GenerateAESKeys()
            {
                var sw = Stopwatch.StartNew();
                for (int i = 0; i < NUM_AES_KEYS; i++)
                {
                    var aesM = new AesManaged();
                    aesM.GenerateKey();
                    byte[] result = aesM.Key;
                    string hexStr = ConverToHexString(result);
                }
                Console.WriteLine("AES:"+sw.Elapsed.ToString());
            }
    
     private static void ParallelGenerateAESKeys()
            {
                var sw = Stopwatch.StartNew();
                System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) =>
                {
                    var aesM = new AesManaged();
                    aesM.GenerateKey();
                    byte[] result = aesM.Key;
                    string hexStr = ConverToHexString(result);
                });
                Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString());
            }
    
      private static int NUM_AES_KEYS = 100000;
            static void Main(string[] args)
            {
                Console.WriteLine("执行"+NUM_AES_KEYS+"次:");
                GenerateAESKeys();
                ParallelGenerateAESKeys();
                Console.ReadKey();
            }



    执行1000000次

    这里并行的时间是串行的一半。
     
    五、Parallel.ForEach

    在Parallel.For中,有时候对既有循环进行优化可能会是一个非常复杂的任务。Parallel.ForEach为固

    定数目的独立For Each循环迭代提供了负载均衡的并行执行,且支持自定义分区器,让使用者可以完全

    掌握数据分发。实质就是将所有要处理的数据区分为多个部分,然后并行运行这些串行循环。

    修改上面的代码:

      System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range =>
                {
                    var aesM = new AesManaged();
                    Console.WriteLine("AES Range({0},{1} 循环开始时间:

    {2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);

                    for (int i = range.Item1; i < range.Item2; i++)
                    {
                        aesM.GenerateKey();
                        byte[] result = aesM.Key;
                        string hexStr = ConverToHexString(result);
                    }
                    Console.WriteLine("AES:"+sw.Elapsed.ToString());
                });

    从执行结果可以看出,分了13个段执行的。

    第二次执行还是13个段。速度上稍微有差异。开始没有指定分区数,Partitioner.Create使用的是内置

    默认值。

    而且我们发现这些分区并不是同时执行的,大致是分了三个时间段执行。而且执行顺序是不同的。总的

    时间和Parallel.For的方法差不多。

     public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, 

    Action<TSource> body)
    Parallel.ForEach方法定义了source和Body两个参数。source是指分区器。提供了分解为多个分区的数

    据源。body是要调用的委托。它接受每一个已定义的分区作为参数。一共有20多个重载,在上面的例子

    中,分区的类型为Tuple<int,int>,是一个二元组类型。此外,返回一个ParallelLoopResult的值。

    Partitioner.Create 创建分区是根据逻辑内核数及其他因素决定。

     public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int 
    toExclusive)
        {
          int num = 3;
          if (toExclusive <= fromInclusive)
            throw new ArgumentOutOfRangeException("toExclusive");
          int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * 
    num);
          if (rangeSize == 0)
            rangeSize = 1;
          return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, 
    toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);
        }

    因此我们可以修改分区数目,rangesize大致为250000左右。也就是说我的逻辑内核是4.

       var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
       System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 
    1,rangesize), range =>
    再次执行:

    分区变成了四个,时间上没有多大差别(第一个时间是串行时间)。我们看见这四个分区几乎是同时执

    行的。大部分情况下,TPL在幕后使用的负载均衡机制都是非常高效的,然而对分区的控制便于使用者对

    自己的工作负载进行分析,来改进整体的性能。

    Parallel.ForEach也能对IEnumerable<int>集合进行重构。Enumerable.Range生产了序列化的数目。但

    这样就没有上面的分区效果。

     private static void ParallelForEachGenerateMD5HasHes()
            {
                var sw = Stopwatch.StartNew();
                System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), 

    number =>
                {
                    var md5M = MD5.Create();
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                });
                Console.WriteLine("MD5:"+sw.Elapsed.ToString());
            }

    六、从循环中退出

    和串行运行中的break不同,ParallelLoopState 提供了两个方法用于停止Parallel.For 和 

    Parallel.ForEach的执行。

    Break:让循环在执行了当前迭代后尽快停止执行。比如执行到100了,那么循环会处理掉所有小于100的

    迭代。
    Stop:让循环尽快停止执行。如果执行到了100的迭代,那不能保证处理完所有小于100的迭代。
    修改上面的方法:执行3秒后退出。

      private static void ParallelLoopResult(ParallelLoopResult loopResult)
            {
                string text;
                if (loopResult.IsCompleted)
                {
                    text = "循环完成";
                }
                else
                {
                    if (loopResult.LowestBreakIteration.HasValue)
                    {
                        text = "Break终止";
                    }
                    else
                    {
                        text = "Stop 终止";
                    }
                }
                Console.WriteLine(text);
            }

            private static void ParallelForEachGenerateMD5HasHesBreak()
            {
                var sw = Stopwatch.StartNew();
                var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, 
    NUM_AES_KEYS), (int number,ParallelLoopState loopState) =>
                {
                    var md5M = MD5.Create();
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    if (sw.Elapsed.Seconds > 3)
                    {
                        loopState.Stop();
                    }
                });
                ParallelLoopResult(loopresult);
                Console.WriteLine("MD5:" + sw.Elapsed);
            }

    七、捕捉并行循环中发生的异常。

      当并行迭代中调用的委托抛出异常,这个异常没有在委托中被捕获到时,就会变成一组异常,新的

    System.AggregateException负责处理这一组异常。

     private static void ParallelForEachGenerateMD5HasHesException()
            {
                var sw = Stopwatch.StartNew();
                var loopresult = new ParallelLoopResult();
                try
                {
                    loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, 
    NUM_AES_KEYS), (number, loopState) =>
                    {
                        var md5M = MD5.Create();
                        byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number);
                        byte[] result = md5M.ComputeHash(data);
                        string hexString = ConverToHexString(result);
                        if (sw.Elapsed.Seconds > 3)
                        {
                            throw new TimeoutException("执行超过三秒");
                        }
                    });
                }
                catch (AggregateException ex)
                {
                    foreach (var innerEx in  ex.InnerExceptions)
                    {
                        Console.WriteLine(innerEx.ToString());
                    }
                }
               
                ParallelLoopResult(loopresult);
                Console.WriteLine("MD5:" + sw.Elapsed);
            }


    结果:

     异常出现了好几次。

     八、指定并行度。

    TPL的方法总会试图利用所有可用的逻辑内核来实现最好的结果,但有时候你并不希望在并行循环中使用

    所有的内核。比如你需要留出一个不参与并行计算的内核,来创建能够响应用户的应用程序,而且这个

    内核需要帮助你运行代码中的其他部分。这个时候一种好的解决方法就是指定最大并行度。

    这需要创建一个ParallelOptions的实例,设置MaxDegreeOfParallelism的值。

     private static void ParallelMaxDegree(int maxDegree)
            {
                var parallelOptions = new ParallelOptions();
                parallelOptions.MaxDegreeOfParallelism = maxDegree;

                var sw = Stopwatch.StartNew();
                System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int 

    i) =>
                {
                    var aesM = new AesManaged();
                    aesM.GenerateKey();
                    byte[] result = aesM.Key;
                    string hexStr = ConverToHexString(result);
                });
                Console.WriteLine("AES:" + sw.Elapsed.ToString());
            }

    调用:如果在四核微处理器上运行,那么将使用3个内核。

     ParallelMaxDegree(Environment.ProcessorCount - 1);

    时间上大致慢了点(第一次Parallel.For 3.18s),但可以腾出一个内核来处理其他的事情。

    小结:这次学习了Parallel相关方法以及如何退出并行循环和捕获异常、设置并行度,还有并行相关的

    知识。园子里也有类似的博客。但作为自己知识的管理,在这里梳理一遍。
    ========

    C#并行编程-Task

    http://www.cnblogs.com/lonelyxmas/p/3959303.html


    菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。

    任务简介

    TPL引入新的基于任务的编程模型,通过这种编程模型可以发挥多核的功效,提升应用程序的性能,不需

    要编写底层复杂且重量级的线程代码。

    但需要注意:任务并不是线程(任务运行的时候需要使用线程,但并不是说任务取代了线程,任务代码

    是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并没有

    一对一的关系。)

    创建一个新的任务时,调度器(调度器依赖于底层的线程池引擎)会使用工作窃取队列找到一个最合适

    的线程,然后将任务加入队列,任务所包含的代码会在一个线程中运行。如图:

    System.Threading.Tasks.Task

    一个Task表示一个异步操作,Task提供了很多方法和属性,通过这些方法和属性能够对Task的执行进行

    控制,并且能够获得其状态信息。

    Task的创建和执行都是独立的,因此可以对关联操作的执行拥有完全的控制权。

    使用Parallel.For、Parallel.ForEach的循环迭代的并行执行,TPL会在后台创建

    System.Threading.Tasks.Task的实例。

    使用Parallel.Invoke时,TPL也会创建与调用的委托数目一致的System.Threading.Tasks.Task的实例。

    注意项

    程序中添加很多异步的操作作为Task实例加载的时候,为了充分利用运行时所有可用的逻辑内核,任务

    调度器会尝试的并行的运行这些任务,也会尝试在所有的可用内核上对工作进行负载均衡。

    但在实际的编码过程当中,并不是所有的代码片段都能够方便的用任务来运行,因为任务会带来额外的

    开销,尽管这种开销比添加线程所带来的开销要小,但是仍然需要将这个开销考虑在内。

    Task状态与生命周期

    一个Task实例只会完成其生命周期一次,当Task到达它的3种肯呢过的最终状态之一是,就无法回到之前

    的任何状态

    下面贴代码,详解见注释,方便大家理解Task的状态:

    使用任务来对代码进行并行化

    使用Parallel.Invoke可以并行加载多个方法,使用Task实例也能完成同样的工作,下面贴代码:

    等待任务完成Task.WaitAll
    Task.WaitAll 方法,这个方法是同步执行的,在Task作为参数被接受,所有Task结束其执行前,主线程

    不会继续执行下一条指令,下面贴代码

    Task.WaitAll 限定等待时长

    如图10毫秒没有完成任务,则输出了****

    通过取消标记取消任务

    通过取消标记来中断Task实例的执行。 CancellationTokenSource,CancellationToken下的IsCanceled

    属性标志当前是否已经被取消,取消任务,任务也不一定会马上取消,下面贴代码:

    Task异常处理 当很多任务并行运行的时候,可能会并行发生很多异常。Task实例能够处理一组一组的异

    常,这些异常有System.AggregateException类处理

    Task返回值  Task<TResult>

    通过延续串联多个任务

    ContinueWith:创建一个目标Task完成时,异步执行的延续程序,await,如代码所示:

    TaskContinuationOptions

    TaskContinuationOptions参数,可以控制延续另一个任的任务调度和执行的可选行为。下面看代码:

    TaskContinuationOptions 属性有很多,如下所示

     关于并行编程中的Task就写到这,如有问题,请指正。
    ========

    C# 并行编程 之 并发集合 (.Net Framework 4.0)

    http://blog.csdn.net/wangzhiyu1980/article/details/45497907


    此文为个人学习《C#并行编程高级教程》的笔记,总结并调试了一些文章中的代码示例。 在以后开发过

    程中可以加以运用。

    对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问。经常要做的就是对一些队

    列进行加锁-解锁,然后执行类似插入,删除等等互斥操作。 .NetFramework 4.0 中提供了一些封装好

    的支持并行操作数据容器,可以减少并行编程的复杂程度。

    基本信息
    .NetFramework中并行集合的名字空间: System.Collections.Concurrent

    并行容器:

    ConcurrentQueue
    ConcurrentStack
    ConcurrentBag : 一个无序的数据结构集,当不需要考虑顺序时非常有用。
    BlockingCollection : 与经典的阻塞队列数据结构类似
    ConcurrentDictionary

    这些集合在某种程度上使用了无锁技术(CAS Compare-and-Swap和内存屏障 Memory Barrier),与加互斥

    锁相比获得了性能的提升。但在串行程序中,最好不用这些集合,它们必然会影响性能。

    关于CAS: 
    http://www.tuicool.com/articles/zuui6z
    http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml
    关于内存屏障
    http://en.wikipedia.org/wiki/Memory_barrier


    用法与示例
    ConcurrentQueue
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。


    Enqueue:在队尾插入元素
    TryDequeue:尝试删除队头元素,并通过out参数返回
    TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。


    程序示例:

    using System;  
    using System.Text;  
      
    using System.Threading.Tasks;  
    using System.Collections.Concurrent;  
      
    namespace Sample4_1_concurrent_queue  
    {  
        class Program  
        {  
            internal static ConcurrentQueue<int> _TestQueue;  
      
            class ThreadWork1  // producer  
            {  
                public ThreadWork1()  
                { }  
      
                public void run()  
                {  
                    System.Console.WriteLine("ThreadWork1 run { ");  
                    for (int i = 0; i < 100; i++)  
                    {  
                        System.Console.WriteLine("ThreadWork1 producer: " + i);  
                        _TestQueue.Enqueue(i);  
                    }  
                    System.Console.WriteLine("ThreadWork1 run } ");  
                }  
            }  
      
            class ThreadWork2  // consumer  
            {  
                public ThreadWork2()  
                { }  
      
                public void run()  
                {  
                    int i = 0;  
                    bool IsDequeuue = false;  
                    System.Console.WriteLine("ThreadWork2 run { ");  
                    for (; ; )  
                    {  
                        IsDequeuue = _TestQueue.TryDequeue(out i);  
                        if (IsDequeuue)  
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "=====");    
                        if (i == 99)  
                            break;  
                    }  
                    System.Console.WriteLine("ThreadWork2 run } ");  
                }  
            }  
      
            static void StartT1()  
            {  
                ThreadWork1 work1 = new ThreadWork1();  
                work1.run();  
            }  
      
            static void StartT2()  
            {  
                ThreadWork2 work2 = new ThreadWork2();  
                work2.run();  
            }  
            static void Main(string[] args)  
            {  
                Task t1 = new Task(() => StartT1());  
                Task t2 = new Task(() => StartT2());  
      
                _TestQueue = new ConcurrentQueue<int>();  
      
                Console.WriteLine("Sample 3-1 Main {");  
      
                Console.WriteLine("Main t1 t2 started {");  
                t1.Start();  
                t2.Start();  
                Console.WriteLine("Main t1 t2 started }");  
      
                Console.WriteLine("Main wait t1 t2 end {");  
                Task.WaitAll(t1, t2);  
                Console.WriteLine("Main wait t1 t2 end }");  
      
                Console.WriteLine("Sample 3-1 Main }");    
                Console.ReadKey();  
            }  
        }  
    }



    ConcurrentStack
    其完全无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作。

    Push:向栈顶插入元素
    TryPop:从栈顶弹出元素,并且通过out 参数返回
    TryPeek:返回栈顶元素,但不弹出。

    程序示例:

    using System;  
    using System.Text;  
      
    using System.Threading.Tasks;  
    using System.Collections.Concurrent;  
      
    namespace Sample4_2_concurrent_stack  
    {  
        class Program  
        {  
            internal static ConcurrentStack<int> _TestStack;  
      
            class ThreadWork1  // producer  
            {  
                public ThreadWork1()  
                { }  
      
                public void run()  
                {  
                    System.Console.WriteLine("ThreadWork1 run { ");  
                    for (int i = 0; i < 100; i++)  
                    {  
                        System.Console.WriteLine("ThreadWork1 producer: " + i);  
                        _TestStack.Push(i);  
                    }  
                    System.Console.WriteLine("ThreadWork1 run } ");  
                }  
            }  
      
            class ThreadWork2  // consumer  
            {  
                public ThreadWork2()  
                { }  
      
                public void run()  
                {  
                    int i = 0;  
                    bool IsDequeuue = false;  
                    System.Console.WriteLine("ThreadWork2 run { ");  
                    for (; ; )  
                    {  
                        IsDequeuue = _TestStack.TryPop(out i);  
                        if (IsDequeuue)  
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   


    =====" + i);  
      
                        if (i == 99)  
                            break;  
                    }  
                    System.Console.WriteLine("ThreadWork2 run } ");  
                }  
            }  
      
            static void StartT1()  
            {  
                ThreadWork1 work1 = new ThreadWork1();  
                work1.run();  
            }  
      
            static void StartT2()  
            {  
                ThreadWork2 work2 = new ThreadWork2();  
                work2.run();  
            }  
            static void Main(string[] args)  
            {  
                Task t1 = new Task(() => StartT1());  
                Task t2 = new Task(() => StartT2());  
      
                _TestStack = new ConcurrentStack<int>();  
      
                Console.WriteLine("Sample 4-1 Main {");  
      
                Console.WriteLine("Main t1 t2 started {");  
                t1.Start();  
                t2.Start();  
                Console.WriteLine("Main t1 t2 started }");  
      
                Console.WriteLine("Main wait t1 t2 end {");  
                Task.WaitAll(t1, t2);  
                Console.WriteLine("Main wait t1 t2 end }");  
      
                Console.WriteLine("Sample 4-1 Main }");  
      
                Console.ReadKey();  
            }  
        }  
    }  


    测试中一个有趣的现象:


    虽然生产者已经在栈中插入值已经到了25,但消费者第一个出栈的居然是4,而不是25。很像是出错了。


    但仔细想想入栈,出栈和打印语句是两个部分,而且并不是原子操作,出现这种现象应该也算正常。


    Sample 3-1 Main {
    Main t1 t2 started {
    Main t1 t2 started }
    Main wait t1 t2 end {
    ThreadWork1 run {
    ThreadWork1 producer: 0
    ThreadWork2 run {
    ThreadWork1 producer: 1
    ThreadWork1 producer: 2
    ThreadWork1 producer: 3
    ThreadWork1 producer: 4
    ThreadWork1 producer: 5
    ThreadWork1 producer: 6
    ThreadWork1 producer: 7
    ThreadWork1 producer: 8
    ThreadWork1 producer: 9
    ThreadWork1 producer: 10
    ThreadWork1 producer: 11
    ThreadWork1 producer: 12
    ThreadWork1 producer: 13
    ThreadWork1 producer: 14
    ThreadWork1 producer: 15
    ThreadWork1 producer: 16
    ThreadWork1 producer: 17
    ThreadWork1 producer: 18
    ThreadWork1 producer: 19
    ThreadWork1 producer: 20
    ThreadWork1 producer: 21
    ThreadWork1 producer: 22
    ThreadWork1 producer: 23
    ThreadWork1 producer: 24
    ThreadWork1 producer: 25
    ThreadWork2 consumer: 16   =====4
    ThreadWork2 consumer: 625   =====25
    ThreadWork2 consumer: 576   =====24
    ThreadWork2 consumer: 529   =====23
    ThreadWork1 producer: 26
    ThreadWork1 producer: 27
    ThreadWork1 producer: 28


    ConcurrentBag
    一个无序的集合,程序可以向其中插入元素,或删除元素。
    在同一个线程中向集合插入,删除元素的效率很高。


     Add:向集合中插入元素
     TryTake:从集合中取出元素并删除
     TryPeek:从集合中取出元素,但不删除该元素。


    程序示例:


    [csharp] view plain copy 在CODE上查看代码片派生到我的代码片
    using System;  
    using System.Text;  
      
    using System.Threading.Tasks;  
    using System.Collections.Concurrent;  
      
    namespace Sample4_3_concurrent_bag  
    {  
        class Program  
        {  
            internal static ConcurrentBag<int> _TestBag;  
      
            class ThreadWork1  // producer  
            {  
                public ThreadWork1()  
                { }  
      
                public void run()  
                {  
                    System.Console.WriteLine("ThreadWork1 run { ");  
                    for (int i = 0; i < 100; i++)  
                    {  
                        System.Console.WriteLine("ThreadWork1 producer: " + i);  
                        _TestBag.Add(i);  
                    }  
                    System.Console.WriteLine("ThreadWork1 run } ");  
                }  
            }  
      
            class ThreadWork2  // consumer  
            {  
                public ThreadWork2()  
                { }  
      
                public void run()  
                {  
                    int i = 0;  
                    int nCnt = 0;  
                    bool IsDequeuue = false;  
                    System.Console.WriteLine("ThreadWork2 run { ");  
                    for (;;)  
                    {  
                        IsDequeuue = _TestBag.TryTake(out i);  
                        if (IsDequeuue)  
                        {  
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   


    =====" + i);  
                            nCnt++;  
                        }  
      
                        if (nCnt == 99)  
                            break;  
                    }  
                    System.Console.WriteLine("ThreadWork2 run } ");  
                }  
            }  
      
            static void StartT1()  
            {  
                ThreadWork1 work1 = new ThreadWork1();  
                work1.run();  
            }  
      
            static void StartT2()  
            {  
                ThreadWork2 work2 = new ThreadWork2();  
                work2.run();  
            }  
            static void Main(string[] args)  
            {  
                Task t1 = new Task(() => StartT1());  
                Task t2 = new Task(() => StartT2());  
      
                _TestBag = new ConcurrentBag<int>();  
      
                Console.WriteLine("Sample 4-3 Main {");  
      
                Console.WriteLine("Main t1 t2 started {");  
                t1.Start();  
                t2.Start();  
                Console.WriteLine("Main t1 t2 started }");  
      
                Console.WriteLine("Main wait t1 t2 end {");  
                Task.WaitAll(t1, t2);  
                Console.WriteLine("Main wait t1 t2 end }");  
      
                Console.WriteLine("Sample 4-3 Main }");  
      
                Console.ReadKey();  
            }  
        }  
    }  


    BlockingCollection
    一个支持界限和阻塞的容器


    Add :向容器中插入元素
    TryTake:从容器中取出元素并删除
    TryPeek:从容器中取出元素,但不删除。
    CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。
    IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。


    示例程序:


    程序中,消费者线程完全使用  while (!_TestBCollection.IsCompleted) 作为退出运行的判断条件。
    在Worker1中,有两条语句被注释掉了,当i 为50时设置CompleteAdding,但当继续向其中插入元素时,


    系统抛出异常,提示无法再继续插入。


    [csharp] view plain copy 在CODE上查看代码片派生到我的代码片
    using System;  
    using System.Text;  
      
    using System.Threading.Tasks;  
    using System.Collections.Concurrent;  
      
    namespace Sample4_4_concurrent_bag  
    {  
        class Program  
        {  
            internal static BlockingCollection<int> _TestBCollection;  
      
            class ThreadWork1  // producer  
            {  
                public ThreadWork1()  
                { }  
      
                public void run()  
                {  
                    System.Console.WriteLine("ThreadWork1 run { ");  
                    for (int i = 0; i < 100; i++)  
                    {  
                        System.Console.WriteLine("ThreadWork1 producer: " + i);  
                        _TestBCollection.Add(i);  
                        //if (i == 50)  
                        //    _TestBCollection.CompleteAdding();  
                    }  
                    _TestBCollection.CompleteAdding();  
      
                    System.Console.WriteLine("ThreadWork1 run } ");  
                }  
            }  
      
            class ThreadWork2  // consumer  
            {  
                public ThreadWork2()  
                { }  
      
                public void run()  
                {  
                    int i = 0;  
                    int nCnt = 0;  
                    bool IsDequeuue = false;  
                    System.Console.WriteLine("ThreadWork2 run { ");  
                    while (!_TestBCollection.IsCompleted)  
                    {  
                        IsDequeuue = _TestBCollection.TryTake(out i);  
                        if (IsDequeuue)  
                        {  
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   


    =====" + i);  
                            nCnt++;  
                        }  
                    }  
                    System.Console.WriteLine("ThreadWork2 run } ");  
                }  
            }  
      
            static void StartT1()  
            {  
                ThreadWork1 work1 = new ThreadWork1();  
                work1.run();  
            }  
      
            static void StartT2()  
            {  
                ThreadWork2 work2 = new ThreadWork2();  
                work2.run();  
            }  
            static void Main(string[] args)  
            {  
                Task t1 = new Task(() => StartT1());  
                Task t2 = new Task(() => StartT2());  
      
                _TestBCollection = new BlockingCollection<int>();  
      
                Console.WriteLine("Sample 4-4 Main {");  
      
                Console.WriteLine("Main t1 t2 started {");  
                t1.Start();  
                t2.Start();  
                Console.WriteLine("Main t1 t2 started }");  
      
                Console.WriteLine("Main wait t1 t2 end {");  
                Task.WaitAll(t1, t2);  
                Console.WriteLine("Main wait t1 t2 end }");  
      
                Console.WriteLine("Sample 4-4 Main }");  
      
                Console.ReadKey();  
            }  
        }  
    }  


    当然可以尝试在Work1中注释掉 CompleteAdding 语句,此时Work2陷入循环无法退出。


    ConcurrentDictionary
    对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。


    AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。
    GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值



    TryAdd:尝试在容器中添加新的键和值。
    TryGetValue:尝试根据指定的键获得值。
    TryRemove:尝试删除指定的键。
    TryUpdate:有条件的更新当前键所对应的值。
    GetEnumerator:返回一个能够遍历整个容器的枚举器。


    程序示例:


    [csharp] view plain copy 在CODE上查看代码片派生到我的代码片
    using System;  
    using System.Text;  
      
    using System.Threading.Tasks;  
    using System.Collections.Concurrent;  
        
    namespace Sample4_5_concurrent_dictionary  
    {  
        class Program  
        {  
            internal static ConcurrentDictionary<int, int> _TestDictionary;  
      
            class ThreadWork1  // producer  
            {  
                public ThreadWork1()  
                { }  
      
                public void run()  
                {  
                    System.Console.WriteLine("ThreadWork1 run { ");  
                    for (int i = 0; i < 100; i++)  
                    {  
                        System.Console.WriteLine("ThreadWork1 producer: " + i);  
                        _TestDictionary.TryAdd(i, i);  
                    }  
      
                    System.Console.WriteLine("ThreadWork1 run } ");  
                }  
            }  
      
            class ThreadWork2  // consumer  
            {  
                public ThreadWork2()  
                { }  
      
                public void run()  
                {  
                    int i = 0, nCnt = 0;  
                    int nValue = 0;  
                    bool IsOk = false;  
                    System.Console.WriteLine("ThreadWork2 run { ");  
                    while (nCnt < 100)  
                    {  
                        IsOk = _TestDictionary.TryGetValue(i, out nValue);  
                        if (IsOk)  
                        {  
                            System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   


    =====" + i);  
                            nValue = nValue * nValue;  
                            _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return 


    value = nValue; });  
                            nCnt++;  
                            i++;  
                        }  
                    }  
                    System.Console.WriteLine("ThreadWork2 run } ");  
                }  
            }  
      
            static void StartT1()  
            {  
                ThreadWork1 work1 = new ThreadWork1();  
                work1.run();  
            }  
      
            static void StartT2()  
            {  
                ThreadWork2 work2 = new ThreadWork2();  
                work2.run();  
            }  
            static void Main(string[] args)  
            {  
                Task t1 = new Task(() => StartT1());  
                Task t2 = new Task(() => StartT2());  
                bool bIsNext = true;  
                int  nValue = 0;  
      
                _TestDictionary = new ConcurrentDictionary<int, int>();  
      
                Console.WriteLine("Sample 4-5 Main {");  
      
                Console.WriteLine("Main t1 t2 started {");  
                t1.Start();  
                t2.Start();  
                Console.WriteLine("Main t1 t2 started }");  
      
                Console.WriteLine("Main wait t1 t2 end {");  
                Task.WaitAll(t1, t2);  
                Console.WriteLine("Main wait t1 t2 end }");  
      
                foreach (var pair in _TestDictionary)  
                {  
                    Console.WriteLine(pair.Key + " : " + pair.Value);  
                }    
                


    System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>>   
                    enumer = _TestDictionary.GetEnumerator();  
      
                while (bIsNext)  
                {  
                    bIsNext = enumer.MoveNext();  
                    Console.WriteLine("Key: " + enumer.Current.Key +  
                                      "  Value: " + enumer.Current.Value);  
      
                    _TestDictionary.TryRemove(enumer.Current.Key, out nValue);  
                }  
      
                Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count);  
      
                Console.WriteLine("Sample 4-5 Main }");  
      
                Console.ReadKey();  
            }  
        }  

    ========

    C#并行编程-PLINQ:声明式数据并行

    http://www.cnblogs.com/woxpp/p/3951096.html
    目录


    C#并行编程-相关概念


    C#并行编程-Parallel


    C#并行编程-Task


    C#并行编程-并发集合


    C#并行编程-线程同步原语


    C#并行编程-PLINQ:声明式数据并行


    背景


    通过LINQ可以方便的查询并处理不同的数据源,使用Parallel LINQ (PLINQ)来充分获得并行化所带来的


    优势。


    PLINQ不仅实现了完整的LINQ操作符,而且还添加了一些用于执行并行的操作符,与对应的LINQ相比,通


    过PLINQ可以获得明显的加速,但是具体的加速效果还要取决于具体的场景,不过在并行化的情况下一段


    会加速。


    如果一个查询涉及到大量的计算和内存密集型操作,而且顺序并不重要,那么加速会非常明显,然而,


    如果顺序很重要,那么加速就会受到影响。


    AsParallel() 启用查询的并行化


    下面贴代码,看下效果,详情见注释:


     View Code


    当前模拟的数据量比较少,数据量越多,采用并行化查询的效果越明显


    AsOrdered()与orderby
    AsOrdered:保留查询的结果按源序列排序,在并行查询中,多条数据会被分在多个区域中进行查询,查


    询后再将多个区的数据结果合并到一个结果集中并按源序列顺序返回。


    orderby:将返回的结果集按指定顺序进行排序


    下面贴代码方便大家理解:


    在PLINQ查询中,AsOrdered()和orderby子句都会降低运行速度,所以如果顺序并不是必须的,那么在请


    求特定顺序的结果之前,将加速效果与串行执行的性能进行比较是非常重要的。


    指定执行模式 WithExecutionMode


    对串行化代码进行并行化,会带来一定的额外开销,Plinq查询执行并行化也是如此,在默认情况下,执


    行PLINQ查询的时候,.NET机制会尽量避免高开销的并行化算法,这些算法有可能会将执行的性能降低到


    地狱串行执行的性能。


    .NET会根据查询的形态做出决策,并不开了数据集大小和委托执行的时间,不过也可以强制并行执行,


    而不用考虑执行引擎分析的结果,可以调用WithExecutionMode方法来进行设置。、


    下面贴代码,方便大家理解


    通过PLINQ执行归约操作


    PLINQ可以简化对一个序列或者一个组中所有成员应用一个函数的过程,这个过程称之为归约操作,如在


    PLINQ查询中使用类似于Average,Max,Min,Sum之类的聚合函数就可以充分利用并行所带来好处。


    并行执行的规约和串行执行的规约的执行结果可能会不同,因为在操作不能同时满足可交换和可传递的


    情况下产生摄入,在每次执行的时候,序列或组中的元素在不同并行任务中分布可能也会有区别,因而


    在这种操作的情况下可能会产生不同的最终结果,因此,一定要通过对于的串行版本来兴义原始的数据


    源,这样才能帮助PLINQ获得最优的执行结果。


    下面贴代码:


    如上述代码所示


    在LINQ版本中,该方法会返回一个 IEumerable<int>,即调用 Eumerable.Range方法生成指定范围整数


    序列的结果,
    在PLINQ版本中,该方法会返回一个 ParallelQuery<int>,即调用并行版本中


    System.Linq.ParallelEumerable的ParallelEumerable.Range方法,通过这种方法得到的结果序列也是


    并行序列,可以再PLINQ中并行运行。


    如果想对特定数据源进行LINQ查询时,可以定义为  private IEquatable<int> products


    如果想对特定数据源进行PLINQ查询时,可以定义为 private ParallelQuery<int> products


    并发PLINQ任务


    如代码所示tk1,tk2,tk3三个任务,tk2,tk3任务的运行需要基于tk1任务的结果,因此,参数中指定了


    TaskContinuationOptions.OnlyOnRanToCompletion,通过这种方式,每个被串联的任务都会等待之前的


    任务完成之后才开始执行,tk2,tk3在tk1执行完成后,这两个任务的PLINQ查询可以并行运行,并将会可


    能地使用多个逻辑内核。


    取消PLINQ WithCancellation


    通过WithCancellation取消当前PLINQ正在执行的查询操作,代码如下:


    指定查询时所需的并行度 WithDegreeOfParallelism


    默认情况下,PLINQ总是会试图利用所有的可用逻辑内核达到最佳性能,在程序中我们可以利用


    WithDegreeOfParallelism方法指定一个不同最大并行度。


    下面贴代码:


    好处:如果计算机有8个可用的逻辑内核,PLINQ查询最多运行4个并发任务,这样可用使用


    Parallel.Invoke 加载多个带有不同并行度的PLINQ查询,有一些PLINQ查询的可扩展性有限,因此这些


    选项可用让您充分利用额外的内核。


    使用ForAll 并行遍历结果


    下面贴代码:


    ForAll是并行,foreach是串行,如果需要以特定的顺序处理数据,那么必须使用上述串行循环或方法。


    WithMergeOptions


    通过WithMergeOptions扩展方法提示PLINQ应该优先使用哪种方式合并并行结果片段,如下:


    下面贴代码查看下差异:


    需要注意的是:每一个选项都有其优点和缺点,因此一定奥测量显示第一个结果的时间以及完成整个查


    询所需要的时间,这点很重要 。


    使用PLINQ执行MapReduce算法 ILookup IGrouping


    mapreduce ,也称为Map/reduce 或者Map&Reduce ,是一种非常流行的框架,能够充分利用并行化处理巨


    大的数据集,MapReduce的基本思想非常简单:将数据处理问题分解为以下两个独立且可以并行执行的操


    作:


    映射(Map)-对数据源进行操作,为每一个数据项计算出一个键值对。运行的结果是一个键值对的集合


    ,根据键进行分组。


    规约(Reduce)-对映射操作产生的根据键进行分组的所有键值对进行操作,对每一个组执行归约操作,


    这个操作可以返回一个或多个值。


    下面贴代码,方便大家理解,但是该案列所展示的并不是一个纯粹的MapReduce算法实现:


    关于PLINQ:声明式数据并行就写到这,主要是PLINQ下的查询注意项和查询调优的一些扩展方法。如有问


    题,欢迎指正。
    ========
    展开全文
  • 并行编程模型,Lustre

    2015-12-27 16:31:31
    模,要完成这些目标,需要选择合适的并行编程模型,并行编程模型与并行计算机体系 结构紧密相关,目前主流的并行编程模型包括共享内存编程模型和分布式内存编程模型。 共享内存体系架构(如NUMA,SMP 等)下的...

    1      并行编程模型

    现代高性能计算机通过将单个任务并行化,降低任务处理时间、提升计算精度或扩大规

    模,要完成这些目标,需要选择合适的并行编程模型,并行编程模型与并行计算机体系

    结构紧密相关,目前主流的并行编程模型包括共享内存编程模型和分布式内存编程模型。

    共享内存体系架构(如NUMASMP 等)下的编程模型主要是共享内存编程模型,共

    享内存编程模型具有单一地址空间,编程方法简单,但可移植性和可扩展性较差的特点,

    目前使用较多的是OpenMPOpenMP 是基于线程级的并行化,它通过在串行脚本内添

    pragma 制导语句来指导并行化。

    分布式内存体系架构(如MMPCluster 等)下的编程模型是分布式内存编程模型,其

    中较常用的是消息传递编程模型,具有多地址空间,编程较复杂,可移植性和可扩展性

    较好的特点,目前使用较多的是MPIMessage Passing Interface),以函数库的形式被程

    序调用,MPI 是基于进程级的并行,自1993 2 月发布了MPI-1.0 标准后,至今已发

    展了20 年,并于2012 9 月发布了最新的MPI-3.0 标准,基于MPI 标准,业界有多种

    开源和商用的MPI 实现,如开源的有MPICHMVAPICHOpenMPI 等,商用的有Intel

    MPIPlatform MPI 等等。

    当前高性能集群具有多层次结构,集群系统兼具了共享内存体系结构和分布式内存体系

    结构的特点,这就导致了共享内存编程模型和分布式内存编程模型相结合的混合编程模

    型出现,在节点内部使用共享内存编程模型,而节点间则采用分布式内存编程模型,即

    OpenMP+MPI 的模式,其优势在于结合了OpenMP 线程级的细粒度并行(如循环并行)

    MPI进程级的粗粒度并行(如区域分解),在很多情况下,其性能要优于单纯的OpenMP

    程序或MPI 程序。

    计算机通过文件系统管理、存储数据,而信息爆炸时代中人们可以获取的数据成指数倍

    的增长,单纯通过增加硬盘个数来扩展计算机文件系统的存储容量的方式,在容量大小、

    容量增长速度、数据备份、数据安全等方面的表现都差强人意。

    分布式文件系统(Distributed File System)可以有效解决数据的存储和管理难题:将固

    定于某个地点的某个文件系统,扩展到任意多个地点/多个文件系统,众多的节点组成一

    个文件系统网络。每个节点可以分布在不同的地点,通过网络进行节点间的通信和数据

    传输。人们在使用分布式文件系统时,无需关心数据是存储在哪个节点上、或者是从哪

    个节点从获取的,只需要像使用本地文件系统一样管理和存储文件系统中的数据。分布

    式文件系统的设计基于客户机/服务器模式。一个典型的网络可能包括多个供多用户访问

    的服务器。

    当前比较流行的分布式文件系统包括:LustreHadoopMogileFSFreeNASFastDFS

    NFSOpenAFSMooseFSpNFS、以及GoogleFS

    1.1      Lustre

    Lustre 体系结构是一个为集群设计的存储体系结构。其核心组件是运行在Linux 操作系

    统上、支持标准的POSIX* UNIX 文件系统接口、并遵循GPL2.0 许可的Lustre 文件系

    统。据IDC 的统计,Lustre 是在HPC 领域应用最广的文件系统,世界上最快的50 个超

    算网站有60%都使用Lustre

    注:由于Lustre 结构的特点,每两台设备间存在备份关系,所以有时称为集群文件系统,而对于I/O

    的处理模式来说,Lustre 业务是并行处理的所以也可称作并行文件系统,而对于业务的处理来说,

    Lustre I/O 通常会分发给不同的I/O 节点处理,所以也可称之为分布式文件系统,以上三种称

    谓只是分类角度不同,没有对错之分。

    在集群计算机里,计算机与磁盘间数据交换的提升无法跟上微处理器和内存增长的速度,

    从而也拖累了应用程序的性能。Lustre 文件系统很好的解决了这一问题,它可为数以万

    计的客户端系统服务,提供PB 级的存储容量和百吉比特字节每秒(GB/秒)的I/O

    吐量。

    Lustre 文件系统基于C/S 模式设计,其对象存储的特性(客户端与磁盘上的文件分离,

    存储的升级和变更不影响客户端的使用),极大的降低了企业购买存储设备的成本,并

    改变企业购买存储的方式。分布式和并行化的工作方式,使Lustre 的用户不必一次性购

    买价格高昂的高端存储来满足性能需求,而可以通过低端存储和I/O 节点的动态扩充来

    满足业务需求的增长。同时客户为每个集群应用建立一个文件系统,也可转换为使用全

    局统一命名的文件系统,客户可以为每个文件配置条带等文件系统属性,从而减少了集

    群间的数据拷贝、简化的集群的管理,聚集了服务器和集群的存储容量,减少了资源的

    浪费,更易于性能动态的扩展。

    Lustre 文件系统支持多种高性能、低时延的网络,例如基于OFEDOpenFabrics Enterprise

    Distribution)的InfiniBand。在多种不同的RDMA 网络间还可以通过Lustre routing 进行

    桥接。保证了Lustre 可以获得充足的网络带宽。

    Lustre 文件系统通过共享存储分区的方式支持OSTObject storage target)的AA

    Active-Active)故障切换,实现了底层故障对应用透明,MMPMultiple mount protection

    技术的使用为可能导致文件系统故障的系统错误提供了完整的保护。

    组件

    Lustre 文件系统的主要组件有:MDSMDTOSSOSTClient。各个组件间的链接

    关系如图2-7 所示。

    MDSMetadata Server):MDS 负责管理Lustre 文件系统的文件名、目录、权限、文件

    结构等元数据信息,MDS 生成的元数据存储在一个或者多个MDT 上,并为每个Client

    提供服务。MDS 可以有多个,但只有一个为主MDS,其余MDS 工作在备份模式。

    MDTMetadata Target):每个文件系统都有一个MDTMDT 可以是MDS 本地硬盘(只

    有一个MDS 时)、也可以是远端存储的一个LUN 设备。一个MDT 可以通过同时映射

    于华为OceanStor V3 融合存储部署HPC 分布式文件系统参考架构

    载最佳实践

    给两台主机,供多个MDS 进行访问,但同一时刻只能有一个MDS 进行访问,通过这

    种方式可以实现MDS 的高可用性。

    OSSObject Storage Servers):OSS Client 提供文件I/O 服务,客户端从MDS 获取元

    数据信息后,从OSS 访问文件数据,文件数据最终存储在与OSS 相连的OST 上。通常

    每个OSS 会配置2~8 OST,每个OST 最大16TB

    OSTObject Storage Target):用户文件存储在一个或者多个对象中,每个对象对应一个

    独立的OST,每个文件可以存储在一个OST 上,也可以跨越多个OST 进行存储。一个

    OST 可以通过同时映射给两台主机实现OSS 的高可用性。

    Lustre ClientLustre 客户端是指装有Lustre 客户端软件并可以挂载Lustre 文件系统的计

    算节点、虚拟化节点或者桌面节点。在高性能集群中,通常是计算节点、管理节点或登

    陆节点。

    2-7 Lustre 组件组网示意图

    文件存储

    Lustre 文件系统使用FIDLustre file identifiers)作为文件或者对象的标识。而文件数据

    OST 中存储位置的信息被存储在MDT 上,形式为FID 的一个扩展属性,称之为layout

    EAextended attribute)。

    假设文件是一个数据文件(非目录和链接符),MDT 上存储着这个文件在OST 上的存

    放位置。如果MDT 指向的是单个OST 对象,则整个数据文件都存储在这个OST 对象

    上,如果MDT 文件指向多个对象,文件则被使用RAID0 的方式条带化到各个对象。如

    下图所示,当一个客户端需要读写数据文件时,它首先需要从MDT 处获取layout EA

    的信息,由layout EA 信息可以得知具体的数据被存放在哪些OST 上,然后即可是读写。

    2-8 Layout EA 存储示意图

    Lustre 文件系统高性能的一个主要因素是它可以在多个OST 间通过round-robin 算法进

    行基于数据的分段或者chunk 的条带化,用户可以为每个文件配置条带数目、大小、应

    用于哪个OST。当访问单一文件时,条带化可以使文件系统的性能大于单个OST 的带

    宽。条带化还用于单个OST 没有足够的空间存放整个文件时,使用多个OST 同时存储

    数据。

    举例说明文件在Lustre 中的存储方式,文件C stripe_size 比文件A stripe_size 大,

    文件C 允许更多的数据存放在单个分条。文件A stripe_count 3,即数据跨越三个

    对象存储,文件B 和文件C stripe_count 1。在第一次下发I/O 时,文件A stripe_size

    被存放在了OST1OSTOST3 上。第二次下发I/O 时,文件A、文件B、文件C、同

    时写入,根据均衡算法,最终文件A 存储了两份到OST1 上,文件B 存储在了OST2 上,

    文件C 存储在了OST3 上,而整体来看,OST1OST2OST3 存储的数据是一样多的。

    2-9 Lustre 文件存储示意图

    由上面的例子也可以看出来,Lustre 文件系统中的文件大小不受限于单个OST 的大小,

    文件可以被条带化,跨越多个对象(最多200 个)来进行存储,使用ldiskfs 时每个对象

    最大可以到16TB,总文件大小最大可以达31.25PB,使用zfs 时可以最大到256PB,总

    文件大小最大可以达8EB

    文件读写

    Lustre 文件系统中,一个文件读写主要经历以下流程:

    l Lustre 客户端向MDT 发送file open 的请求;

    l MDT 收到请求后查找存储这个文件的对象的layout EA 信息;

    l MDT layout EA 信息通过FID 返回给Lustre 客户端;

    l 客户端直接找到相应的OSS,并提交I/O 请求;

    l OSS 找到对应的OST 进行文件读写,并返回给Lustre 客户端。

    2-10 Lustre 文件读写示意图

    1.2      Lustre I/O性能特点与最佳

    1 Lustre概述
    Lustre是面向集群的存储架构,它是基于Linux平台的开源集群(并行)文件系统,提供与POSIX兼容的文件系统接口。Lustre两个最大特征是高扩展性和高性能,能够支持数万客户端系统、PB级存储容量、数百GB的聚合I/O吞吐量。LustreScale-Out存储架构,借助强大的横向扩展能力,通过增加服务器即可方便扩展系统总存储容量和性能。Lustre的集群和并行架构,非常适合众多客户端并发进行大文件读写的场合,但目前对于小文件应用非常不适用,尤其是海量小文件应用LOSFLots Of Small Files)。Lustre广泛应用于各种环境,目前部署最多的为高性能计算HPC,世界超级计算机TOP 10中的70%TOP 30中的50%TOP 100中的40%均部署了Lustre。另外,Lustre在石油、天然气、制造、富媒体、金融等行业领域也被大量部署应用。

     

    2 Lustre Stripe
    Lustre采用对象存储技术,将大文件分片并以类似RAID0的方式分散存储在多个OST上,一个文件对应多个OST上的对象。Lustre系统中,每个文件对应MDT上的一个元数据文件,inode以扩展属性记录了数据分片布局信息,包括stripe_count(对象数), stripe_size(分片大小), stripe_offset(起始OST)以及每个OST对象信息。当客户数据端访问文件时,首先从MDS请求文件元数据并获得分片布局信息(stripe layout),然后直接与多个OST同时交互进行并发读写。Lustre这种数据分片策略,提高了多用户访问的并发度和聚合I/O带宽,这是Lustre获得高性能的主要因素。再者,Stripe还能够使得Lustre可以存储超大文件,突破单一OST对文件大小的限制。当然,数据分片策略同时也会带来负面影响,比如增加系统负载和数据风险。

     

    LustreOST数量可以达到数千,但是出于复杂性、性能、实际存储需求等考虑,目前设计实现中将单个文件对象数限制为160个。对于EXT4后端文件系统,单个文件最大可达2TB,因此Lustre单个文件最大可以达到320TB。那么,Lustre如何在可用OST集合中选择合适的OST呢?目前有两种选择算法,即Round-Robin和随机加权算法,这两种算法调度的依据是,任意两个OST剩余存储容量相差是否超过20%的阈值。一般在系统使用之初,直接使用Round-Robin算法以顺序轮转方式选择OST,这种算法非常高效。随着文件数据量的增加,一旦达到20%的阈值,Lustre将启用随机加权算法选择OSTLustre维护着一个剩余空间的优先列表,采用随机算法在此列表中选择OST,这种算法会产生开销并影响性能。如果任意两个OST剩余存储容量相差重新降到20%阈值之内,则重新启用Round-Robin算法选择OSTLustre在创建文件时就按照分片模式并采用OST选择算法,预先创建好文件所需的OST对象。分片模式可以使用lfs setstripe进行设置,或者由系统自动选择缺省模式,文件目录会自动继承父目录的分片模式,但可以进行修改。数据写入后,文件分片模式就不能修改,新加入的OST只会参与新创建的文件目录OST选择调度。Lustre目前还没有实现OST存储空间的自动均衡,需要手工进行数据迁移复制达到均衡的效果。

     

    Lustre缺省情况下,stripe_count = 1, stripe_size = 1MB, stripe_offset = -1,即每个文件仅包含一个OST对象,分片大小为1MB,起始OSTLustre自动选择。实际上这种分片模式就是不对文件进行分片存储,显然不能满足许多应用的存储需求,实际应用时需要在分析数据特点、网络环境、访问行为的基础上进行适当配置。分片不是越多越好,在满足存储需求的前提下,应该使得OST对象数量尽可能少。应用lustre Stripe时,应该考虑如下因素:
    1提供高带宽访问Lustre文件分片并存储于多个OSS,对于单一大文件来说,它可以提供远大于单一OSS提供的聚合I/O带宽。在HPC环境中,成百上千的客户端会同时并发读写同一个文件,当文件很大时,分散与多个OSS能够获得非常高的聚合带宽。Lustre文件系统理论上可以提供2.5 TB/s的带宽,经过验证的带宽达到240 GB/s。当然对于小于1GB的文件来说,分片数量不宜多于4个,更多分片不会带来更高的性能提升,还会引入额外开销。对于小文件,文件大小本身可能小于分片大小,实际上是不作分片,对性能不会有提升。
    2改善性能。如果聚合的客户端带宽超过单个OSS的带宽,文件分片存储策略可以充分利用聚合的OSS带宽,极大提高性能,为应用程序提供高速的数据读写访问。合理的分片数量可以估算,客户端聚合I/O带宽除以单个OSS I/O性能即可得到。
    3提供超大容量文件Lustre后端文件系统采用改进的EXT3文件系统(接近于EXT4),单个文件最大为2TB。如果不进行分片,则单个Lustre文件最大只能为2TBLustre目前分片最多可达到160个,因此文件最大可以达到320TB,这是容量是非常大的,基本上可以满足所有单一文件存储容量的需求。
    4提高存储空间利用率。当Lustre剩余存储空间有限时,每个OSS的剩余空间也就更加有限,这时再写入一个的大文件至单一OSS很大可能会由于空间不足而失败。采用分片策略,写入单个OSS的对象容量会成倍减小,如果OSS数量选择合适,文件仍然可以写入Lustre系统。这使得Lustre存储空间利用更为充分,有效提高了利用率。
    5增加负载Stripe会导致额外的锁和网络操作消耗,比如stat, unlink,虽然这些操作可以并发执行,但仍会对性能产生影响。另外,分片多会造成服务器的开销。设想这样一个情形:Lustre中有100OSS100个客户端,100个文件,每个客户端访问一个文件。如果不分片,则每个客户端仅与一个OSS相互,可以进行顺序I/O读写。如果每个文件分成100片,则每个客户端都需要分别与100OSS进行相交,并发访问时,OSS上的磁盘I/O为随机读写。这些都是额外的负载开销,一定程度上影响性能。
    6增加风险。从概率的角度看,多个OSS发生故障的概率要高出单个OSS许多。文件分片存储于多个OSS上,一个分片不可用就会导致整个文件不可访问,即使其他分片仍然是完好的。因此,分片大大增加了数据发生丢失的风险,需要采用适当的措施进行保护,比如RAID5/6或者Failover

     

    3 Lustre I/O性能特征

    1写性能优于读性能
    Lustre系统中通常写性能会优于读性能。首先,对于写操作,客户端是以异步方式执行的,RPC调用分配以及写入磁盘顺序按到达顺序执行,可以实现聚合写以提高效率。而对于读,请求可能以不同的顺序来自多个客户端,需要大量的磁盘seekread操作,显著影响吞吐量。其次,目前Lustre没有实现OST read cache,仅仅在客户端实现了Readahead。这样的设计也是有充分理由的,每个OST有可能会有大量客户端并发访问,如果进行数据预读,内存消耗将会非常大,而且这个是不可控制的。Writecache是在客户端上实现的,内存占用不会太大并且是可控的。再者,对于TCP/IP网络而言,读会占用更多的CPU资源。读操作,Lustre需要从网络接口缓存进行数据Copy而获得所需数据,而写操作可以通过sendfileZero Copy避免额外的数据复制。

    2大文件性能表现好
    Lustre的元数据与数据分离、数据分片策略、数据缓存和网络设计非常适合大文件顺序I/O访问,大文件应用下性能表现非常好。这些设计着眼于提高数据访问的并行性,实现极大的聚合I/O带宽,这其中关键得益于数据分片设计(具体见上面的分析)。另外,后端改进的EXT3文件系统本身也非常适合大文件I/O

    3小文件性能表现差
    然而,Lustre的设计却非常不利于小文件I/O,尤其是LOSFLots of small files)。Lustre在读写文件前需要与MDS交互,获得相关属性和对象位置信息。与本地文件系统相比,增加了一次额外的网络传输和元数据访问开销,这对于小文件I/O而言,开销是相当大的。对于大量频繁的小文件读写,Lustre客户端Cache作用会失效,命中率大大降低。如果文件小于物理页大小,则还会产生额外的网络通信量,小文件访问越频繁开销越大,对Lustre总体I/O性能影响就越大。OST后端采用改进的EXT3文件系统,它对小文件的读写性能本身就不好,其元数据访问效率不高,磁盘寻址延迟和磁盘碎片问题严重。这也是大多数磁盘文件系统的缺点,Reiserfs是针对小文件设计的文件系统,性能表现要好很多。Lustre的设计决定了它对小文件I/O性能表现差,实际I/O带宽远低于所提供的最大带宽。在4OSS的千兆网络配置下,单一客户端小文件读写性能不到4MB/s

     

    4 Lustre小文件优化
    实际上前面已经提到,Lustre并不适合小文件I/O应用,性能表现非常差。因此,建议不要将Lustre应用于LOSF场合。不过,Lustre操作手册仍然给出了一些针对小文件的优化措施。
    1)通过应用聚合读写提高性能,比如对小文件进行Tar,或创建大文件或通过loopback mount来存储小文件。小文件系统调用开销和额外的I/O开销非常大,应用聚合优化可以显著提高性能。另外,可以使用多节点、多进程/多线程尽可能通过聚合来提高I/O带宽。
    2)应用采用O_DIRECT方式进行直接I/O,读写记录大小设置为4KB,与文件系统保持一致。对输出文件禁用locking,避免客户端之间的竞争。
    3)应用程序尽量保证写连续数据,顺序读写小文件要明显优于随机小文件I/O
    4OST采用SSD或更多的磁盘,提高IOPS来改善小文件性能。创建大容量OST,而非多个小容量OST,减少日志、连接等负载。
    5OST采用RAID 1+0替代RAID 5/6,避免频繁小文件I/O引起的数据校验开销。

    Lustre提供了强大的系统监控与控制接口用于进行性能分析与调优,对于小文件I/O,也可以通过调整一些系统参数进行优化。
    1)禁用所有客户端LNET debug功能:缺省开启多种调试信息,sysctl -w lnet.debug=0,减少系统开销,但发生错误时将无LOG可询。
    2)增加客户端Dirty Cache大小:lctl set_param osc./*.max_dirty_mb=256,缺省为32MB,增大缓存将提升I/O性能,但数据丢失的风险也随之增大。
    3)增加RPC并行数量:echo 32 > /proc/fs/lustre/osc/*-OST000*/max_rpcs_in_flight,缺省为8,提升至32将提高数据和元数据性能。不利之处是如果服务器压力很大,可能反而会影响性能。
    4)控制Lustre stripinglfs setstripe -c 0/1/-1 /path/filename,如果OST对象数大于1,小文件性能会下降,因此将OST对象设置为1
    5)客户端考虑使用本地锁:mount -t lustre -o localflock,如果确定多个进程从同一个客户端进行写文件,则可用localflock代替flock,减少发送到MDSRPC数量。
    6)使用loopback mount文件:创建大Lustre文件,与loop设备关联并创建文件系统,然后将其作为文件系统进行mount。小文件作用其上,则原先大量的MDS元数据操作将转换为OSS读写操作,消除了元数据瓶颈,可以显著提高小文件性能。这种方法应用于scratch空间可行,但对于生产数据应该谨慎使用,因为Lustre目前工作在这种模式下还存在问题。操作方法如下:
     dd if=/dev/zero of=/mnt/lustre/loopback/scratch bs=1048576 count=1024
     losetup /dev/loop0 /mnt/lustre/loopback/scratch
     mkfs -t ext4 /dev/loop0
     mount /dev/loop0 /mnt/losf

     

    5 Lustre I/O最佳实践
    Lustre具有鲜明的I/O特点,并具有非常高的扩展性和大文件I/O性能。如果进行适当的配置和操作,Lustre则会展现更高的性能。下面给出一些Lustre I/O最佳实践,可根据实际应用情况择优实践。
    1)使用单进程读取完整的共享小文件,需要时传输数据至其他进程。
    2)使用单进程访问容量在(1MB, 1GB)之间的小文件,将文件OST对象数设为1
    3)使用单进程访问大于1GB的中等文件,文件OST对象数不超过4个。
    4)远大于1GB的大文件OST对象数应设为>4,这种文件不要采用顺序I/Ofile-per-processI/O访问模式。
    5)限制单个目录下的文件数量,包含大量小文件的目录stripe_count设置为1
    6)小文件存放在单一OST上,单进程文件创建和读写性能会得到提高。
    7)包含大量小文件的目录存放在单一OST上,文件创建性能会提到极大提升。
    8)尽量避免频繁地打开和关闭文件。
    9)仅在必要时使用ls -l,它会与所有相关的OST交互,操作代价很大,尽可能使用lslfs find代替。
    10)考虑使用I/O中间件来改善性能,如ADIOSHDF5MPI-IO

     

    6 Lustre深入阅读
    [1] Luster: http://www.lustre.org
    [2] Lustre 2.0 Operations Manual http://wiki.lustre.org/manual/LustreManual20_HTML/index.html
    [3] Understanding Lustre Internals http://wiki.lustre.org/images/d/da/Understanding_Lustre_Filesystem_Internals.pdf
    [4] Lustre Architecture ftp://ftp.uni-duisburg.de/Linux/filesys/Lustre/lustre.pdf
    [5] Lustre White Paper http://www.raidinc.com/pdf/whitepapers/lustrefilesystem_wp.pdf
    [6] Lustre Sources: https://cds.sun.com/is-bin/INTERSHOP.enfinity/WFS/CDS-CDS_SMI-Site/en_US/-/USD/ViewProductDetail-Start?ProductRef=LUSTRE-185-G-F@CDS-CDS_SMI

    展开全文
  • 异构并行编程模型

    2015-06-26 17:18:00
    异构并行编程模型研究与进展[J].软件学报,2014, 25(7): 1459-1475.http://www.jos.org.cn/1000-9825/4608.html LIU Ying, LÜ Fang, WANG Lei, CHEN Li, CUI Hui-Min, FENG Xiao-Bing. Research on Heter...
  • 从异构并行编程接口与编译/运行时支持系统两个角度总结了异构并行编程模型最新的研究成果,它们为异构架构和上层应用带来的技术挑战提供了相应的解决方案.最后,结合目前的研究现状以及异构系统的发展,提出了异构...
  • 利用并行计算中的OpenMP多核编程环境,采用曙光16核服务器为硬件平台,实现对动态模式识别算法的快速性;同时,以压气机Mansoux模型为应用背景,把确定学习理论的动态模式识别方法应用到压气机旋转失速/喘振的快速...
  • mpi-2.pdf 消息传递并行编程环境MPI.pdf MPI并行程序设计.pdf 并行编程模型研究文档.pdf 消息传递并行编程环境MPI (1).pdf
  • 原文的下载地址:...中文版下载地址:深入理解并行编程V1.0(4.1M) 本书是linux内核大牛paul的力作,和鲁阳同学一起,花了两个月时间进行翻译。 目前没有翻译问答部分,主要是时间不够,也担心不能将这部分...
  • 周末,聊点轻松的话题。 本文通过一个不甚成功的优化自旋锁的半吊子尝试,聊一下操作系统并行的难点,中间的部分,...此外,代码可能不严谨,我个人编程能力有限,所以我可能只是表达想法,如果有编程高手能帮忙完...
  • 6.执行器--并行编程框架 ForkJoin

    千次阅读 2014-08-12 10:37:03
    本文假设您已经了解一般并行编程知识,了解Java concurrent部分如ExecutorService等相关内容。 虽说是Java的ForkJoin并行框架,但不要太在意Java,其中的思想在其它语言环境也是同样适用的。因为并发编程在本质上是...
  • MapReduce入门 什么是mapreduce 首先让我们来重温一下 hadoop 的四大组件: HDFS:分布式存储系统 MapReduce:分布式计算系统 ...Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据...
  • 转自浅墨毛星云:http://blog.csdn.net/poem_qianmo/article/details/53240330...我们知道,游戏行业其实一直很缺一本系统介绍游戏编程进阶技巧的书籍,而《游戏编程模式》的出现,正好弥补了这一点。之前已经有提到过
  • Java并发编程实践 PDF 高清版

    热门讨论 2010-06-25 10:44:59
    本书的读者是那些具有一定Java编程经验的程序员、希望了解Java SE 5,6在线程技术上的改进和新特性的程序员,以及Java和并发编程的爱好者。 目录 代码清单 序 第1章 介绍 1.1 并发的(非常)简短历史 1.2 线程的...
  • 本文假设您已经了解一般并行编程知识,了解Java concurrent部分如ExecutorService等相关内容。 虽说是Java的ForkJoin并行框架,但不要太在意Java,其中的思想在其它语言环境也是同样适用的。因为并发编程在本质上是...
  • 这里的内容主要是都志辉老师《高性能计算之并行编程技术——MPI并行程序设计》 书上有一些代码是FORTAN的,我在学习的过程中,将其都转换成C的代码,便于统一记录。   这章内容分为两个部分:MPI对等模式程序...
  • 这本精品书籍浓墨重彩地描述如何使用c# 4、visual studio 2010和.net framework 4高效地创建基于任务的并行应用程序,详细讲述最新的单指令、多数据流指令和向量化等并行编程技术,介绍现代并行库,讨论如何珠联璧合...
  • 最近在阅读《游戏编程模式》的译本:https://gpp.tkchu.me/学习之余正好看到浅墨大神的总结提炼,看完之后搬运过来,记录自己的学习过程原文:https://blog.csdn.net/poem_qianmo/article/details/53240330书中的19...
  • 3.5.3 安全发布的常用模式 3.5.4 事实不可变对象 3.5.5 可变对象 3.5.6 安全地共享对象 第4章 对象的组合 4.1 设计线程安全的类 4.1.1 收集同步需求 4.1.2 依赖状态的操作 4.1.3 状态的所有权 4.2 实例...
  • 这是一篇超过万字读书笔记,总结了《游戏编程模式》一书中所有章节与内容的知识梗概。 我们知道,游戏行业其实一直很缺一本系统介绍游戏编程进阶技巧的书籍,而《游戏编程模式》得出现,正好弥补了这一点。在这篇...
  • 《大规模并行处理器编程实战》学习,其他章节关注专栏 CUDA C 线程组成线程块,线程块组成线程网格,线程网格就是kernel。一个kernel中的所有线程都执行相同的代码,区别在于不同的线程属于不同的块(有不同的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,060
精华内容 4,424
关键字:

并行编程模式pdf