精华内容
下载资源
问答
  • 并发是用来描述并行执行的方式(看上去一起发生的事件,例如目前操作系统的多任务调度程序,看上去桌面上有许多程序在同时运行.),并发是形容词....并行有数据并行任务并行性数据并行性指对许多数据执行相...

    并发是用来描述并行执行的方式(看上去一起发生的事件,例如目前操作系统的多任务调度程序,看上去桌面上有许多程序在同时运行.),并发是形容词.

    并行是指通过并发将一个操作分解成一组粒度更细的工作单元,并且这些工作单元可以在不同的处理器内核上运行.并行是动词,指必须有两个以上的事件发生.

    工作单元,以粒度可分为最小到单一CPU指令,大到函数或系统任务.

    并行有数据并行性与任务并行性

    数据并行性指对许多数据执行相同的并行操作,比如对一组数的每个数进行加法操作.

    任务并行性指对相同的一组数据执行不同的并行操作,比如一个执行加法操作,而另一个执行乘法操作.

    优秀的多核微处理器:AMD Multicore Opteron、Intel Core2 Duo

    并行编程的麻烦

    内存一致模型简称内存模型

    缓存一致性(cache coherency) 将缓存与主内存数据进行同步

    内存一致性模型(memory consistency model)

    CPU可以对内存的读取或写入操作重新排序(乱序执行Out of Order),而导致内存一致性的产生.就是指CPU中实际内存访问操作与程序代码中内存访问操作的不一致性程度.

    CPU的硬件架构决定了内存模型的强度,强度越高写代码就会更容易.

    x86/64架构的CPU有较强的内存一致性模型,而安腾系列处理器Itanium(IA-64)是较弱的模型.它为了提高性能(不必根踪缓存行的状态)不会主动刷新缓存,而是提供指令操作缓存刷新到主内存.

    还有一些软件实现的内存模型,比如java虚拟机和.NET CLR,它们都是比较强的内存模型,而且不考虑具体的硬件实现.

    解决办法:内存栅栏(memory fence)

    内存栅栏可以阻止目标架构对指令进行重排.利用语言平台的特性或原子方法可以实现内存栅栏,比如.net中的Interlocked类方法,c++中的volatile变量.

    附录一些并行库:

    Single Unix Specification的POSIX线程库.

    Intel的TBB线程库.

    Standard Template Adaptive Parallel Library,STAPL:标准模板适配并行库.

    最后并行编程库必须得到操作系统的支持,如果系统没有相应功能api,上面那些并行库也就一无是处了.

    展开全文
  • 开始之前,我们先澄清两个概念,「多核」指的是有效利用 CPU 的多核提高程序执行效率,「并行」和「并发」一字之差,但其实是两个完全不同的概念,「并发」一般是由 CPU 内核通过时间片或者中断来控制的,遇到 IO ...

    开始之前,我们先澄清两个概念,「多核」指的是有效利用 CPU 的多核提高程序执行效率,「并行」和「并发」一字之差,但其实是两个完全不同的概念,「并发」一般是由 CPU 内核通过时间片或者中断来控制的,遇到 IO 阻塞或者时间片用完时会交出线程的使用权,从而实现在一个内核上处理多个任务,而「并行」则是多个处理器或者多核处理器同时执行多个任务,同一时间有多个任务在调度,因此,一个内核是无法实现并行的,因为同一时间只有一个任务在调度。

    多进程、多线程以及协程显然都是属于「并发」范畴的,可以实现程序的并发执行,至于是否支持「并行」,则要看程序运行系统是否是多核,以及编写程序的语言是否可以利用 CPU 的多核特性。

    下面我们以 goroutine 为例,来演示如何在 Go 语言中通过协程有效利用「多核」实现程序的「并行」执行,具体实现的话就是根据系统 CPU 核心数量来分配等值的子协程数,让所有协程分配到每个内核去并行执行。要查看系统核心数,以 MacOS 为例, 可以通过 sysctl hw 命令分别查看物理 CPU 和逻辑 CPU 核心数:

    e9baa576d544556e1fac41f48f31b46d.png

    我的系统物理 CPU 核心数是 4 个,逻辑 CPU 核心数是 8 个,所谓物理 CPU 核心数指的是真正插在物理插槽上 CPU 的核心数,逻辑 CPU 核心数指的是结合 CPU 多核以及超线程技术得到的 CPU 核心数,最终核心数以逻辑 CPU 核心数为准。

    此外,你也可以在 Go 语言中通过调用 runtime.NumCPU() 方法获取 CPU 核心数。

    接下来,我们来模拟一个可以并行的计算任务:启动多个子协程,子协程数量和 CPU 核心数保持一致,以便充分利用多核并行运算,每个子协程计算分给它的那部分计算任务,最后将不同子协程的计算结果再做一次累加,这样就可以得到所有数据的计算总和。我们编写对应的示例文件 parallel.go

    package mainimport (    "fmt"    "runtime"    "time")func sum(seq int, ch chan int) {    defer close(ch)    sum := 0    for i := 1; i <= 10000000; i++ {        sum += i    }    fmt.Printf("子协程%d运算结果:%d\n", seq, sum)    ch }func main()  {    // 启动时间    start := time.Now()    // 最大 CPU 核心数    cpus := runtime.NumCPU()    runtime.GOMAXPROCS(cpus)    chs := make([]chan int, cpus)    for i := 0; i < len(chs); i++ {        chs[i] = make(chan int, 1)        go sum(i, chs[i])    }    sum := 0    for _, ch := range chs {        res :=         sum += res    }    // 结束时间    end := time.Now()    // 打印耗时    fmt.Printf("最终运算结果: %d, 执行耗时(s): %f\n", sum, end.Sub(start).Seconds())}    

    这里我们通过 runtime.NumCPU() 获取逻辑 CPU 核心数,然后通过 runtime.GOMAXPROCS() 方法设置程序运行时可以使用的最大核心数,这里设置为和系统 CPU 核心数一致,然后初始化一个通道数组,数量和 CPU 核心数保持一致,以便充分利用多核实现并行计算,接下来就是依次启动子协程进行计算,并在子协程中计算完成后将结果数据发送到通道中,最后在主协程中接收这些通道数据并进行再次累加,作为最终计算结果打印出来,同时计算程序运行时间作为性能的考量依据。

    此时,我们运行 parallel.go,得到的结果如下:

    e6b9f750c3f3848e89ddbc25a1e520bb.png

    然后我们修改 runtime.GOMAXPROCS() 方法中传入的 CPU 核心数为 1,再次运行 parallel.go,得到的结果如下:

    b897d44bb9148ba58b279dec478c6bd7.png

    可以看到使用多核比单核整体运行速度快了4倍左右,查看系统 CPU 监控也能看到所有内核都被打满,这在 CPU 密集型计算中带来的性能提升还是非常显著的,不过对于 IO 密集型计算可能没有这么显著,甚至有可能比单核低,因为 CPU 核心之间的切换也是需要时间成本的,所以 IO 密集型计算并不推荐使用这种机制,什么是 IO 密集型计算?比如数据库连接、网络请求等。

    另外,需要注意的是,目前 Go 语言默认就是支持多核的,所以如果上述示例代码中没有显式设置 runtime.GOMAXPROCS(cpus) 这行代码,编译器也会利用多核 CPU 来执行代码,其结果是运行耗时和设置多核是一样的。

    推荐阅读

    • Go 语言并发编程系列(八)— 通道类型篇:错误和异常处理


    喜欢本文的朋友,欢迎关注“Go语言中文网”:

    5d3797d90c13e145a50999777d790103.png

    Go语言中文网启用微信学习交流群,欢迎加微信:274768166

    展开全文
  • python中的并行由于cpython中的gil的存在我们可以暂时不奢望能在cpython中使用多线程利用多核资源进行并行计算了,因此我们在python中可以利用多进程的方式充分利用多核资源。 python中我们可以使用很多方式进行多...

    o55g08d9dv.jpg广告关闭

    腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元!

    python中的并行由于cpython中的gil的存在我们可以暂时不奢望能在cpython中使用多线程利用多核资源进行并行计算了,因此我们在python中可以利用多进程的方式充分利用多核资源。 python中我们可以使用很多方式进行多进程编程,例如os.fork()来创建进程或者通过multiprocessing模块来更方便的创建进程和进程池等。 在上...

    2lro7lxmuy.jpeg

    github.compytlab —前言并行计算是使用并行计算机来减少单个计算问题所需要的时间,我们可以通过利用编程语言显式的说明计算中的不同部分如何再不同的处理器上同时执行来设计我们的并行程序,最终达到大幅度提升程序效率的目的。 众所周知,python中的gil限制了python多线程并行对多核cpu的利用,但是我们仍然可以...

    yd72o0iz1n.jpeg

    译者:caspar译文:https:segmentfault.coma1190000000414339 原文:https:medium.combuilding-things-on-the-internet40e9b2b36148python在程序并行化方面多少有些声名狼藉。 撇开技术上的问题,例如线程的实现和 gil,我觉得错误的教学指导才是主要问题。 常见的经典 python 多线程、多进程教程多显得偏重...

    目录何为并行和并发python有哪些相关的模块该如何选择合适的模块cpu-bound和io-bound问题threading、asyncio和multiprocessing优劣抉择结论何为并行和并发在文章开始之前先看看来自 stackoverflow 的一篇回答是如何解释并行和并发的。 (https:stackoverflow.comquestions1050222what-is-the-difference-between-concu...

    通过下面的for循环,每一个使用ray需要0.84秒,使用python多处理需要7.5秒,使用串行python需要24秒(在48个物理核上)。 这一性能差异解释了为什么可以在...相反,python multiprocessing并没有提供一种自然的方法来并行化python类,因此用户经常需要在map调用之间传递相关的状态。 这种策略在实践中很难实现...

    7cb8fbo2r3.png

    消息传递指的是并行执行的各个进程拥有自己独立的堆栈和代码段,作为互不相关的多个程序独立执行,进程之间的信息交互完全通过显示地调用通信函数来完成。 mpi4py是构建在mpi之上的python非官方库,使得python的数据可以在进程之间进行传递。 2.mpi执行模型并行程序是指一组独立、同一的处理过程; 所有的进程包含...

    我对python中的并行处理很陌生。 下面有一段代码,它遍历所有目录并解压缩所有tar.gz文件。 然而,这需要相当长的时间。 import tarfileimport gzipimport os def unziptar(path): for root, dirs,files in os.walk(path): for i in files:fullpath = os.path.join(root, i) if i.endswith(tar.gz): print extracting...

    python并行计算简单实现multiprocessing包是python中的多进程管理包. pool(num)类提供一个进程池,然后在多个核中执行这些进程,其中默认参数num是当前机器cpu的核数.pool.map(func, iterable) 2个参数,第一个参数是函数, 第二个参数是需要可迭代的变量,作为参数传递到func如果func含有的参数多于一个,可以利用functo...

    mo1y84nyp5.png

    python 在程序并行化方面多少有些声名狼藉。 撇开技术上的问题,例如线程的实现和 gil,我觉得错误的教学指导才是主要问题。 常见的经典 python 多线程、多进程教程多显得偏重。 而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。 传统的例子简单搜索下python 多线程教程,不难发现几乎所有的教程都给出涉及类...

    由于gil(global interpreter lock, 全局解释锁)的存在,使用多线程并不会真正意义上实现并发,使用多进程可以通过子进程的形式同时运行多个解释器,而它们的gil是独立的,这样就可以是python程序充分利用多核cpu进行并行计算。 3. future类一般由executor.submit()创建,将可调用对象封装为异步执行。 future是一种...

    6.4 本文使用的 pypy 版本为 5.9. 0-beta0,兼容 python 3.5 语法本文使用的 jython 版本为 2. 7. 0,兼容 python 2.7 语法若无特殊说明,作语言解时...这一对宏允许你在自定义的 c 扩展中释放 gil,从而可以重新利用多核的优势。 沿用上面的例子,自定义的 c 扩展函数好比是流水线上一个特殊的物品...

    向我们展示了通过numba模块加速,使python的数学计算时间下降4-5个数量级。 本文,edward将从硬件层面着眼,和读者一起学习python如何调用多cpu实现并行计算,从而缩短生物信息分析时间。 全文共 2756字 0图预计阅读时间:15 分钟面向人群:1-8岁生物信息学开发者关键字:python 并行计算01多进程效果通过两个例子...

    先占式多工法(pre-emptive multitasking):操作系统知道每个线程,并且可以随时中断该线程后运行别的线程,即对线程进行切换。 线程的切换可以发生在单个python语句里,在任何时候都可能需要进行任务切换。 多核cpu的并行,通过多进程,python创建新的进程(一般来说电脑几核就开几个进程)。 每一个进程可以被看做...

    多线程基础概念并行与并发并行:同时处理多个任务,必须在多核环境下一段时间内同时处理多个任务,单核也可以并发并发手段线程:内核空间的调度进程:内核空间的调度协程:用户空间的调度线程可以允许程序在同一进程空间中并发运行多个操作。 本次主要介绍python标准库中的多线程模块threading。 threading模块线程...

    详情请看下一篇博文python 性能的优化计算密集型当然我们可以使用jit,分布式编程,python 调用c编程来优化性能,但是要充分利用计算机的核数,可以通过concurrent.futures模块来实现,其在实现提高并行计算能力时时通过多进程实现。 该concurrent.futures模块提供了一个用于异步执行callables的高级接口。 可以使用...

    cpython解释器的问题,jpython 就不会# 对于io密集型 没什么区别,只要io时会切换即可# 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此... # 即不能通过物理核心数增加速度,不能实现(并行)# =====# 多线程socket可以input# import socket# from threading import thread# def chat(conn)...

    qfwkoz2j6z.jpeg

    here’s how you can get a 2–6x speed-up on your data pre-processing withpython最近在 towards data science 上看到一篇文章,如何用 python 进行并行处理,觉得非常有帮助,因此介绍给大家,用我的风格对文章做了编译。 ----数据的预处理,是机器学习非常重要的一环。 尽管 python 提供了很多让人欲罢不能的库...

    本文主要用到python标准库concurrent.futures提供的并发执行功能,类似于进程池的用法,在多核或多cpu平台能够大幅度提高处理速度。 from concurrent.futures import processpoolexecutorprimes = def isprime(n):if n%2 == 0: return false for i in range(3, int(n**0.5)+1, 2): if n%i == 0:return false return ...

    所以很多人说python的线程是假线程,并能利用多核,并不能真正并行。 之所以感觉到线程并行,是因为线程上下文不断切换的缘故。 python 3.2开始使用新的gil。 新的gil实现中用一个固定的超时时间来指示当前的线程放弃全局锁。 在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁...

    gil确保任何时候都只有一个python线程执行。 gil最大的问题就是python的多线程程序并不能利用多核cpu的优势。 但process pools能解决这个问题! 因为我们在运行单独的python实例,每个实例都有自己的gil。 这样你就有了真正的并行处理的python代码! 不要害怕并行处理! 有了concurrent.futures库,python可以让你简...

    展开全文
  • CELL处理器、Intel Core 2处理器的并行计算基本原理 Cell处理器并行编程基本技术:CELL处理器 pThread并行编程技术: Intel多核处理器、AMD多核处理器、SMP服务器 MPI并行编程技术:SMP多核服务器、CLUSTER、MPP...
  • JDK 7 中的 Fork/Join 模式 轻松实现多核时代的并行计算 甘 志 (ganzhi@cn.ibm.com),软件工程师, IBM 中国软件实验室(CSDL BJ)戴 晓君 (daixiaoj@cn.ibm.com),软件工程师, IBM 中国软件实验室(CSDL BJ)2007 年 ...

    JDK 7 中的 Fork/Join 模式 轻松实现多核时代的并行计算

     

    (ganzhi@cn.ibm.com), 软件工程师, IBM 中国软件实验室(CSDL BJ
    晓君 (daixiaoj@cn.ibm.com),
    软件工程师, IBM 中国软件实验室(CSDL BJ

    2007 8 23

     

    随着多核时代的来临,软件开发人员不得不开始关注并行编程领域。而 JDK 7 中将会加入的 Fork/Join 模式是处理并行编程的一个经典的方法。虽然不能解决所有的问题,但是在它的适用范围之内,能够轻松的利用多个 CPU 提供的计算资源来协作完成一个复杂的计算任务。通过利用 Fork/Join 模式,我们能够更加顺畅的过渡到多核的时代。本文将介绍使用 JDK 7 Fork/Join 模式的方法和其他相关改进。阅读本文之后,读者将能够独立地在软件开发中使用 Fork/Join 模式来改进程序的性能。

    介绍

    随着多核芯片逐渐成为主流,大多数软件开发人员不可避免地需要了解并行编程的知识。而同时,主流程序语言正在将越来越多的并行特性合并到标准库或者语言本身之中。我们可以看到,JDK 在这方面同样走在潮流的前方。在 JDK 标准版 5 中,由 Doug Lea 提供的并行框架成为了标准库的一部分(JSR-166)。随后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了标准库中(JSR-166x)。直到今天,尽管 Java SE 7 还没有正式发布,一些并行相关的新特性已经出现在 JSR-166y 中:

    1Fork/Join 模式;

    2TransferQueue,它继承自 BlockingQueue 并能在队列满时阻塞生产者

    3ArrayTasks/ListTasks,用于并行执行某些数组/列表相关任务的类;

    4IntTasks/LongTasks/DoubleTasks,用于并行处理数字类型数组的工具类,提供了排序、查找、求和、求最小值、求最大值等功能;

    其中,对 Fork/Join 模式的支持可能是对开发并行软件来说最通用的新特性。在 JSR-166y 中,Doug Lea实现ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 时就大量的用到了 Fork/Join 模式。读者还需要注意一点,因为 JDK 7 还没有正式发布,因此本文涉及到的功能和发布版本有可能不一样。

    Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。 1 给出了一个 Fork/Join 模式的示意图,位于图上部的 Task 依赖于位于其下的 Task 的执行,只有当所有的子任务都完成之后,调用者才能获得 Task 0 的返回结果。


    1. Fork/Join 模式示意图
    图 1.  Fork/Join 模式示意图

    可以说,Fork/Join 模式能够解决很多种类的并行问题。通过使用 Doug Lea 提供的 Fork/Join 框架,软件开发人员只需要关注任务的划分和中间结果的组合就能充分利用并行平台的优良性能。其他和并行相关的诸多难于处理的问题,例如负载平衡、同步等, 都可以由框架采用统一的方式解决。这样,我们就能够轻松地获得并行的好处而避免了并行编程的困难且容易出错的缺点。

     

     

    使用 Fork/Join 模式

    在开始尝试 Fork/Join 模式之前,我们需要从 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下载 JSR-166y 的源代码,并且我们还需要安装最新版本的 JDK 6(下载网址请参阅 参考资源)。Fork/Join 模式的使用方式非常直观。首先,我们需要编写一个 ForkJoinTask 来完成子任务的分割、中间结果的合并等工作。随后,我们将这个 ForkJoinTask 交给 ForkJoinPool 来完成应用的执行。

    通 常我们并不直接继承 ForkJoinTask,它包含了太多的抽象方法。针对特定的问题,我们可以选择 ForkJoinTask 的不同子类来完成任务。RecursiveAction ForkJoinTask 的一个子类,它代表了一类最简单的 ForkJoinTask:不需要返回值,当子任务都执行完毕之后,不需要进行中间结果的组合。如果我们从 RecursiveAction 开始继承,那么我们只需要重载 protected void compute() 方法。下面,我们来看看怎么为快速排序算法建立一个 ForkJoinTask 的子类:


    清单 1. ForkJoinTask 的子类

    class SortTask extends RecursiveAction {

    final long[] array;

    final int lo;

    final int hi;

    private int THRESHOLD = 30;

     

    public SortTask(long[] array) {

    this.array = array;

    this.lo = 0;

    this.hi = array.length - 1;

    }

     

    public SortTask(long[] array, int lo, int hi) {

    this.array = array;

    this.lo = lo;

    this.hi = hi;

    }

     

    protected void compute() {

    if (hi - lo < THRESHOLD)

    sequentiallySort(array, lo, hi);

    else {

    int pivot = partition(array, lo, hi);

    coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array,

    pivot + 1, hi));

    }

    }

     

    private int partition(long[] array, int lo, int hi) {

    long x = array[hi];

    int i = lo - 1;

    for (int j = lo; j < hi; j++) {

    if (array[j] <= x) {

    i++;

    swap(array, i, j);

    }

    }

    swap(array, i + 1, hi);

    return i + 1;

    }

     

    private void swap(long[] array, int i, int j) {

    if (i != j) {

    long temp = array[i];

    array[i] = array[j];

    array[j] = temp;

    }

    }

     

    private void sequentiallySort(long[] array, int lo, int hi) {

    Arrays.sort(array, lo, hi + 1);

    }

    }

    清单 1 中,SortTask 首先通过 partition() 方法将数组分成两个部分。随后,两个子任务将被生成并分别排序数组的两个部分。当子任务足够小时,再将其分割为更小的任务反而引起性能的降低。因此,这里我们使用一个 THRESHOLD,限定在子任务规模较小时,使用直接排序,而不是再将其分割成为更小的任务。其中,我们用到了 RecursiveAction 提供的方法 coInvoke()。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。coInvoke() 的参数还可以是任务的数组。

    现在剩下的工作就是将 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默认建立具有与 CPU 可使用线程数相等线程个数的线程池。我们在一个 JUnit test 方法中将 SortTask 提交给一个新建的 ForkJoinPool


    清单 2. 新建的 ForkJoinPool

    @Test

    public void testSort() throws Exception {

    ForkJoinTask sort = new SortTask(array);

    ForkJoinPool fjpool = new ForkJoinPool();

    fjpool.submit(sort);

    fjpool.shutdown();

     

    fjpool.awaitTermination(30, TimeUnit.SECONDS);

     

    assertTrue(checkSorted(array));

    }

     

    在上面的代码中,我们用到了 ForkJoinPool 提供的如下函数:

    submit():将 ForkJoinTask 类的对象提交给 ForkJoinPoolForkJoinPool 将立刻开始执行 ForkJoinTask

    shutdown():执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow() 方法。

    awaitTermination():阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束。

    并行快速排序的完整代码如下所示:


    清单 3. 并行快速排序的完整代码

    package tests;

     

    import static org.junit.Assert.*;

     

    import java.util.Arrays;

    import java.util.Random;

    import java.util.concurrent.TimeUnit;

     

    import jsr166y.forkjoin.ForkJoinPool;

    import jsr166y.forkjoin.ForkJoinTask;

    import jsr166y.forkjoin.RecursiveAction;

     

    import org.junit.Before;

    import org.junit.Test;

     

    class SortTask extends RecursiveAction {

    final long[] array;

    final int lo;

    final int hi;

    private int THRESHOLD = 0; //For demo only

     

    public SortTask(long[] array) {

    this.array = array;

    this.lo = 0;

    this.hi = array.length - 1;

    }

     

    public SortTask(long[] array, int lo, int hi) {

    this.array = array;

    this.lo = lo;

    this.hi = hi;

    }

     

    protected void compute() {

    if (hi - lo < THRESHOLD)

    sequentiallySort(array, lo, hi);

    else {

    int pivot = partition(array, lo, hi);

    System.out.println("/npivot = " + pivot + ", low = " + lo + ", high = " + hi);

    System.out.println("array" + Arrays.toString(array));

    coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array,

    pivot + 1, hi));

    }

    }

     

    private int partition(long[] array, int lo, int hi) {

    long x = array[hi];

    int i = lo - 1;

    for (int j = lo; j < hi; j++) {

    if (array[j] <= x) {

    i++;

    swap(array, i, j);

    }

    }

    swap(array, i + 1, hi);

    return i + 1;

    }

     

    private void swap(long[] array, int i, int j) {

    if (i != j) {

    long temp = array[i];

    array[i] = array[j];

    array[j] = temp;

    }

    }

     

    private void sequentiallySort(long[] array, int lo, int hi) {

    Arrays.sort(array, lo, hi + 1);

    }

    }

     

    public class TestForkJoinSimple {

    private static final int NARRAY = 16; //For demo only

    long[] array = new long[NARRAY];

    Random rand = new Random();

     

    @Before

    public void setUp() {

    for (int i = 0; i < array.length; i++) {

    array[i] = rand.nextLong()%100; //For demo only

    }

    System.out.println("Initial Array: " + Arrays.toString(array));

    }

     

    @Test

    public void testSort() throws Exception {

    ForkJoinTask sort = new SortTask(array);

    ForkJoinPool fjpool = new ForkJoinPool();

    fjpool.submit(sort);

    fjpool.shutdown();

     

    fjpool.awaitTermination(30, TimeUnit.SECONDS);

     

    assertTrue(checkSorted(array));

    }

     

    boolean checkSorted(long[] a) {

    for (int i = 0; i < a.length - 1; i++) {

    if (a[i] > (a[i + 1])) {

    return false;

    }

    }

    return true;

    }

    }

     

    运行以上代码,我们可以得到以下结果:

    Initial Array: [46, -12, 74, -67, 76, -13, -91, -96]

     

    pivot = 0, low = 0, high = 7

    array[-96, -12, 74, -67, 76, -13, -91, 46]

     

    pivot = 5, low = 1, high = 7

    array[-96, -12, -67, -13, -91, 46, 76, 74]

     

    pivot = 1, low = 1, high = 4

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     

    pivot = 4, low = 2, high = 4

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     

    pivot = 3, low = 2, high = 3

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     

    pivot = 2, low = 2, high = 2

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     

    pivot = 6, low = 6, high = 7

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     

    pivot = 7, low = 7, high = 7

    array[-96, -91, -67, -13, -12, 46, 74, 76]

     


     

    Fork/Join 模式高级特性

    使用 RecursiveTask

    除了 RecursiveActionFork/Join 框架还提供了其他 ForkJoinTask 子类:带有返回值的 RecursiveTask,使用 finish() 方法显式中止的 AsyncAction LinkedAsyncAction,以及可使用 TaskBarrier 为每个任务设置不同中止条件的 CyclicAction

    RecursiveTask 继承的子类同样需要重载 protected void compute() 方法。与 RecursiveAction 稍有不同的是,它可使用泛型指定一个返回值的类型。下面,我们来看看如何使用 RecursiveTask 的子类。


    清单 4. RecursiveTask 的子类

    class Fibonacci extends RecursiveTask<Integer> {

    final int n;

     

    Fibonacci(int n) {

    this.n = n;

    }

     

    private int compute(int small) {

    final int[] results = { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89 };

    return results[small];

    }

     

    public Integer compute() {

    if (n <= 10) {

    return compute(n);

    }

    Fibonacci f1 = new Fibonacci(n - 1);

    Fibonacci f2 = new Fibonacci(n - 2);

    f1.fork();

    f2.fork();

    return f1.join() + f2.join();

    }

    }

     

    清单 4 中, Fibonacci 的返回值为 Integer 类型。其 compute() 函数首先建立两个子任务,启动子任务执行,阻塞以等待子任务的结果返回,相加后得到最终结果。同样,当子任务足够小时,通过查表得到其结果,以减小因过多地分割任务引起的性能降低。其中,我们用到了 RecursiveTask 提供的方法 fork() join()。它们分别表示:子任务的异步执行和阻塞等待结果完成。

    现在剩下的工作就是将 Fibonacci 提交到 ForkJoinPool 了,我们在一个 JUnit test 方法中作了如下处理:


    清单 5. Fibonacci 提交到 ForkJoinPool

    @Test

    public void testFibonacci() throws InterruptedException, ExecutionException {

    ForkJoinTask<Integer> fjt = new Fibonacci(45);

    ForkJoinPool fjpool = new ForkJoinPool();

    Future<Integer> result = fjpool.submit(fjt);

     

    // do something

    System.out.println(result.get());

    }

     

    使用 CyclicAction 来处理循环任务

    CyclicAction 的用法稍微复杂一些。如果一个复杂任务需要几个线程协作完成,并且线程之间需要在某个点等待所有其他线程到达,那么我们就能方便的用 CyclicAction TaskBarrier 来完成。图 2 描述了使用 CyclicAction TaskBarrier 的一个典型场景。


    2. 使用 CyclicAction TaskBarrier 执行多线程任务
    图 2. 使用 CyclicAction 和 TaskBarrier 执行多线程任务

    继承自 CyclicAction 的子类需要 TaskBarrier 为每个任务设置不同的中止条件。从 CyclicAction 继承的子类需要重载 protected void compute() 方法,定义在 barrier 的每个步骤需要执行的动作。compute() 方法将被反复执行直到 barrier isTerminated() 方法返回 TrueTaskBarrier 的行为类似于 CyclicBarrier。下面,我们来看看如何使用 CyclicAction 的子类。


    清单 6. 使用 CyclicAction 的子类

    class ConcurrentPrint extends RecursiveAction {

    protected void compute() {

    TaskBarrier b = new TaskBarrier() {

    protected boolean terminate(int cycle, int registeredParties) {

    System.out.println("Cycle is " + cycle + ";"

    + registeredParties + " parties");

    return cycle >= 10;

    }

    };

    int n = 3;

    CyclicAction[] actions = new CyclicAction[n];

    for (int i = 0; i < n; ++i) {

    final int index = i;

    actions[i] = new CyclicAction(b) {

    protected void compute() {

    System.out.println("I'm working " + getCycle() + " "

    + index);

    try {

    Thread.sleep(500);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    };

    }

    for (int i = 0; i < n; ++i)

    actions[i].fork();

    for (int i = 0; i < n; ++i)

    actions[i].join();

    }

    }

     

    清单 6 中,CyclicAction[] 数组建立了三个任务,打印各自的工作次数和序号。而在 b.terminate() 方法中,我们设置的中止条件表示重复 10 次计算后中止。现在剩下的工作就是将 ConcurrentPrint 提交到 ForkJoinPool 了。我们可以在 ForkJoinPool 的构造函数中指定需要的线程数目,例如 ForkJoinPool(4) 就表明线程池包含 4 个线程。我们在一个 JUnit test 方法中运行 ConcurrentPrint 的这个循环任务:


    清单 7. 运行 ConcurrentPrint 循环任务

    @Test

    public void testBarrier () throws InterruptedException, ExecutionException {

    ForkJoinTask fjt = new ConcurrentPrint();

    ForkJoinPool fjpool = new ForkJoinPool(4);

    fjpool.submit(fjt);

    fjpool.shutdown();

    }

     

    RecursiveTask CyclicAction 两个例子的完整代码如下所示:


    清单 8. RecursiveTask CyclicAction 两个例子的完整代码

    package tests;

     

    import java.util.concurrent.ExecutionException;

    import java.util.concurrent.Future;

     

    import jsr166y.forkjoin.CyclicAction;

    import jsr166y.forkjoin.ForkJoinPool;

    import jsr166y.forkjoin.ForkJoinTask;

    import jsr166y.forkjoin.RecursiveAction;

    import jsr166y.forkjoin.RecursiveTask;

    import jsr166y.forkjoin.TaskBarrier;

     

    import org.junit.Test;

     

    class Fibonacci extends RecursiveTask<Integer> {

    final int n;

     

    Fibonacci(int n) {

    this.n = n;

    }

     

    private int compute(int small) {

    final int[] results = { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89 };

    return results[small];

    }

     

    public Integer compute() {

    if (n <= 10) {

    return compute(n);

    }

    Fibonacci f1 = new Fibonacci(n - 1);

    Fibonacci f2 = new Fibonacci(n - 2);

    System.out.println("fork new thread for " + (n - 1));

    f1.fork();

    System.out.println("fork new thread for " + (n - 2));

    f2.fork();

    return f1.join() + f2.join();

    }

    }

     

    class ConcurrentPrint extends RecursiveAction {

    protected void compute() {

    TaskBarrier b = new TaskBarrier() {

    protected boolean terminate(int cycle, int registeredParties) {

    System.out.println("Cycle is " + cycle + ";"

    + registeredParties + " parties");

    return cycle >= 10;

    }

    };

    int n = 3;

    CyclicAction[] actions = new CyclicAction[n];

    for (int i = 0; i < n; ++i) {

    final int index = i;

    actions[i] = new CyclicAction(b) {

    protected void compute() {

    System.out.println("I'm working " + getCycle() + " "

    + index);

    try {

    Thread.sleep(500);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    };

    }

    for (int i = 0; i < n; ++i)

    actions[i].fork();

    for (int i = 0; i < n; ++i)

    actions[i].join();

    }

    }

     

    public class TestForkJoin {

    @Test

    public void testBarrier () throws InterruptedException, ExecutionException {

    System.out.println("/ntesting Task Barrier ...");

    ForkJoinTask fjt = new ConcurrentPrint();

    ForkJoinPool fjpool = new ForkJoinPool(4);

    fjpool.submit(fjt);

    fjpool.shutdown();

    }

     

    @Test

    public void testFibonacci () throws InterruptedException, ExecutionException {

    System.out.println("/ntesting Fibonacci ...");

    final int num = 14; //For demo only

    ForkJoinTask<Integer> fjt = new Fibonacci(num);

    ForkJoinPool fjpool = new ForkJoinPool();

    Future<Integer> result = fjpool.submit(fjt);

     

    // do something

    System.out.println("Fibonacci(" + num + ") = " + result.get());

    }

    }

     

    运行以上代码,我们可以得到以下结果:

    testing Task Barrier ...

    I'm working 0 2

    I'm working 0 0

    I'm working 0 1

    Cycle is 0; 3 parties

    I'm working 1 2

    I'm working 1 0

    I'm working 1 1

    Cycle is 1; 3 parties

    I'm working 2 0

    I'm working 2 1

    I'm working 2 2

    Cycle is 2; 3 parties

    I'm working 3 0

    I'm working 3 2

    I'm working 3 1

    Cycle is 3; 3 parties

    I'm working 4 2

    I'm working 4 0

    I'm working 4 1

    Cycle is 4; 3 parties

    I'm working 5 1

    I'm working 5 0

    I'm working 5 2

    Cycle is 5; 3 parties

    I'm working 6 0

    I'm working 6 2

    I'm working 6 1

    Cycle is 6; 3 parties

    I'm working 7 2

    I'm working 7 0

    I'm working 7 1

    Cycle is 7; 3 parties

    I'm working 8 1

    I'm working 8 0

    I'm working 8 2

    Cycle is 8; 3 parties

    I'm working 9 0

    I'm working 9 2

     

    testing Fibonacci ...

    fork new thread for 13

    fork new thread for 12

    fork new thread for 11

    fork new thread for 10

    fork new thread for 12

    fork new thread for 11

    fork new thread for 10

    fork new thread for 9

    fork new thread for 10

    fork new thread for 9

    fork new thread for 11

    fork new thread for 10

    fork new thread for 10

    fork new thread for 9

    Fibonacci(14) = 610

     

     

    结论

    从 以上的例子中可以看到,通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。

    在实际工作中利用 Fork/Join 模式,可以充分享受多核平台为应用带来的免费午餐。

     

    参考资料

    学习

    阅读文章The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software:了解为什么从现在开始每个严肃的软件工作者都应该了解并行编程方法。

    阅读 Doug Lea 的文章A Java Fork/Join Framework:了解 Fork/Join 模式的实现机制和执行性能。

    阅读 developerWorks 文章驯服 Tiger:并发集合:了解如何使用并行 Collection 库。

    阅读 developerWorks 文章Java 理论与实践:非阻塞算法简介:介绍了 JDK 5 在并行方面的重要增强以及在 JDK5 平台上如何实现非阻塞算法的一般介绍。

    书籍Java Concurrency in Practice:介绍了大量的并行编程技巧、反模式、可行的解决方案等,它对于 JDK 5 中的新特性也有详尽的介绍。


    获得产品和技术

    访问 Doug Lea JSR 166 站点获得最新的源代码。

    Sun 公司 网站下载 Java SE 6

    作者简介

     

     

    甘 志,IBM 中国软件实验室(CSDL BJChina Emerging Technology Institute 成员,主要研究方向为 UMLModel driven development SOA。他在上海交通大学计算机系获得网络安全方向博士学位,期间发表了多篇论文和技术书籍。你可以通过 ganzhi@cn.ibm.com 联系他。

     

     

    戴晓君,IBM 中国软件实验室(CSDL BJChina Emerging Technology Institute 成员,主要研究方向为并行编程和敏捷软件开发方法。他在中国科学院软件研究所获得计算机软件与理论硕士学位。你可以通过 daixiaoj@cn.ibm.com 联系他。

     

     

     

     

     

     

     

     

     

    展开全文
  • 详细介绍多核程序设计理论以及理论模型,并提供详细编程实例
  • 并研究了TBB 结合MPI在SMP 集群系统上实现高效的混合并行计算应用的法。最终发现TBB 在多核编程方面有显著的优势。T T B 和MPI的结合, 又为多核处理器结点集群提供了并行层次化结构, 大大优化集群的性能。
  • 多核与异步并行

    2017-01-21 16:17:03
    多核与异步并行 我们在设计多线程程序时往往有很多性能指标,例如低延迟(latency),高吞吐量(throughput),高响应度(responsiveness)等。随着多核处理器上CPU核数的日益增加,如何高
  • Horde 提供了一组简单易用的消息传递接口和事件驱动 (event-driven) 编程模型,用以帮助程序员表达算法逻辑中潜在的并行性,将计算分解底层硬件结构去耦合,从而简化编写并行程序的复杂度,灵活地在不同的底层结构...
  • 并行程序设计 目前并行程序设计的状况是:①并行软件的发展落后于并行硬件;②和串行系统的应用软件相比,现今...②串行程序设计仅有一个普遍被接受的冯*诺依曼模型,而并行计算模型虽有好多,但没有一个被共同认可;
  • 进程线程概念,MFC的进程和线程编程,.Net下的进程和线程编程,Java的进程和线程编程,并行计算,并行编程,VC++本地多核编程
  • 并行多核编程技术 1

    千次阅读 2013-10-31 16:25:34
    本系列文章将会对.NET 4中的并行编程技术(也称之为多核编程技术)以及应用作全面的介绍。  本篇文章的议题如下:  1. 并行编程和多线程编程的区别。   2. 并行编程技术的利弊   3. 何时采用并行编程  1....
  • 多核编程PPT

    2018-03-27 23:55:01
    多核处理器环境下的并行计算,讲义,胶片。详细描述。
  • OpenMP是一个业界的标准,很早以前就有了,只是近一段时间才逐渐热起来。我们可以在C/C++和... 在并行计算中用处很大。 官方网站www.openmp.org。 下面是一个详细介绍的链接。http://topic.csdn.net/u/20080512
  • 并行计算简介和多核CPU编程Demo

    千次阅读 2014-08-26 11:18:21
    tag:多线程,并行计算,OpenMP,多核编程,工作线程池 ( 2008.01.19 更新 鉴于读者反映代码阅读困难,重新改写了文章和实现,使文章更易读 ) ( 2007.09.04 更新 把用事件控制的线程启动更新为临界区的实现 ) ...
  • 多核编程与单核多线程编程

    千次阅读 2018-04-11 11:31:38
    多核编程与单核多线程编程的区别 1. 锁竞争 单核中,如果单个线程取得锁的控制权,则会获得CPU的运行时间,其它等待获取锁的线程就会阻塞。使用了锁,影响的只是加锁和解锁的耗时,CPU始终运行。 多核中,若2个...
  • 2、做并行计算、数据挖掘,机器学习等一般都要用的numpy,这个在Windows版本上安装有点问题,安装比较麻烦,建议在linux上搭建环境 3、安装openmpi,(不太好装)这个在网上目前还没有比较好的快捷安装方法,一般是...
  • 这个分类的知识来自周伟明先生的《多核计算与程序设计》一书,在此把书中对我个人帮助较大的部分整理出来,以便更容易的在工作中加以应用。 一,多核编程与单核多线程的区别 1, 锁竞争导致的串行化的区别...
  • 多核异构并行计算OpenMP---并行控制并行控制指令parallel功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样 并行控制 指令parallel /*File:parallel.cpp*/ #include<stdio.h> #include<omp.h>...
  • Linux多核编程

    2012-06-18 13:49:48
    Linux 多核 编程 基于linux 实现了多线程 并行计算(实验二 Linux多线程编程
  • 虚拟化通过在最高层次上实现并行机制,提供了利用多核或者多处理器系统的方式 对于软件开发人员来说,如何利用多个核心增加单个应用程序的吞吐量或者速度是一个问题。 采用并行机制提高单个任务的性能 并行可以完成...
  • 并行计算技术为主流计算带来了巨大变化,目前大多数PC,笔记本电脑甚至笔记本电脑都采用多处理器芯片,最多包含四个处理器。标准组件越来越多地最初设计用于高速图形处理的GPU(图形处理单元)和FPGA(可编程...
  • 多核处理器 并行计算 程序代码code,能够更加轻松上手学习并行编程技术
  • 作者 | Yan Gu来源 | 转载自知乎用户Yan Gu【导读】随着计算机技术的发展,毫无疑问现代计算机的处理速度和计算能力也越来越强。然而细心的同学们可能早已注意到,从2005年起,单核的 CPU 性能就没有显著的提升了。...
  • 多核并行编程的背景 在摩尔定律失效之前,提升处理器性能通过主频提升...要充分发挥多核丰富的计算资源优势,多核下的并行编程就不可避免,Linux kernel就是一典型的多核并行编程场景。但多核下的并行编程却挑战多...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 26,256
精华内容 10,502
关键字:

多核编程与并行计算