精华内容
下载资源
问答
  • 前 言 对于正在编程程序员,编程模型可被看作一台虚拟机,并可通过编程语言和函数库实现。如果某个编程模型成为计算机科学研究中热点,...针对特定编程模型,同时具有上述四种特性中一种或两种相对容易,...

    screenshot

    前  言

    对于正在编程的程序员,编程模型可被看作一台虚拟机,并可通过编程语言和函数库实现。如果某个编程模型成为计算机科学研究中的热点,需要具有以下特性:高效性(易于描述各种抽象算法)、移植性(兼容各种硬件计算平台)、高性能(高效均衡地利用硬件平台的计算能力)、通用性(广泛地描述各种算法)。针对特定的编程模型,同时具有上述四种特性中的一种或两种相对容易,但同时具备这四种特性几乎是不可能的。特定的编程模型无法同时具备四种特性的主要原因在于编程模型的多样性,因此需要根据具体的科学应用,选择不同的编程模型特性。
    随着并行计算技术的发展,计算机科学领域的专家将研究重点转移到能够适应高性能并行计算和超级计算系统的编程模型设计方向。并行编程模型包含执行模型(选择代码执行路径)和内存模型(管理计算节点间和节点内的数据流)。多核计算需要并发计算和移动数据,这增加了程序运行结果和性能的不确定性,导致并行编程模型变得更加复杂。
    从技术上分析,编程模型和编程系统间存在一定区别。编程模型是一种编程方式,例如采用大量同步或者隐含编译器协助的并行化方式,而编程系统指程序员编写程序时实际使用的系统抽象接口。随着时间推移,编程模型和编程系统间的区别逐渐变得模糊。目前,编程模型既是一种编程方式,也是模型实例化过程中所使用的系统抽象接口。
    与通用的编程模型设计不同,在大多数并行系统中,程序开发人员往往不采用单一的并行编程模型。不同的开发人员会选择不同层次的虚拟化方式,并在高效性、移植性、高性能和通用性四种编程模型特性中选择不同的组合。针对面向终端的程序开发应用,具体研究领域的科学家通常倾向于选择更高效和高级别的编程模型,即使该编程模型只能针对特定的算法而缺乏通用性。针对编程语言和函数库,程序开发者一般更倾向于选择高性能和低级别的编程模型,即使该编程模型具有较高的使用难度。然而,针对面向终端的程序应用以及编程语言和函数库开发,上述编程模型选择并非是绝对的,可根据实际的开发应用情况进行调整。
    关于本书
    本书对当今高性能计算以及超级计算系统上的几种最主要的并行编程模型进行了概述。书中包含多种并行编程模型,它们拥有不同的生产效率、可移植性、性能以及表达范围。因此,读者可以学习和理解每种编程模型提供了哪些折中。
    第1章讨论了消息传递接口(MPI)。MPI是当今面向分布式内存计算的最重要的并行编程模型。该章对MPI最常用的功能进行了概述,并涉及MPI标准的第三个主要版本 ——MPI-3。
    第2~5章从低层次的运行时库到高层次的编程模型,对单边通信模型进行了讨论。第2章介绍了全局地址空间网络(GASNet),它是一种低层次的编程模型,用于多种分区全局地址空间(PGAS)模型的一种通用可移植运行时系统。第3章讨论了OpenSHMEM单边通信库,它用于向用户直接呈现本地硬件通信功能。OpenSHMEM通过扩展库的形式模拟了许多PGAS模型的功能,这样做的好处是不依赖于语言扩展及相应的编译器支持。第4章提供了Unified Parallel C(UPC)编程模型的概述。UPC是基于C语言的PGAS模型,它为全局地址空间内存的创建与管理提供了相应的语言扩展及库接口。第5章介绍了全局数组(GA),与OpenSHMEM类似,GA是另一种基于库的单边通信模型。但GA基于多维数组提供了更高层次的抽象,以方便用户编程。
    第6章讨论了Chapel。它是一种高生产率编程模型,支持以任务并行及数据并行两种方式对应用进行描述。Chapel同时也具有一级语言概念,可对局部性进行描述与推理,这与它支持的并行特性是互不相关的。
    第7~11章展示了面向任务的编程模型,它们允许用户以任务的方式描述计算及数据单元,并允许运行时系统来管理计算以及必要的数据移动。第7章对Charm++编程模型进行了讨论。Charm++提供了一种依赖于工作过分解的抽象模型,以在可用的计算单元间动态地管理任务。第8章深入讨论了异步动态负载均衡(ADLB)库,它提供了另一种面向任务的工作共享方法,并以MPI作为低层次的通信模型。第9章讨论了可扩展任务对象集合(Scioto)编程模型,它依赖于类似PGAS的单边通信框架来实现基于工作窃取的负载均衡。第10章描述了Swift,它是一种高层次的脚本语言,允许用户使用高层次语义对计算进行描述,并在内部将其翻译成其他面向任务的编程模型,如ADLB。第11章描述了并行集(CnC),它是一种高层次的声明式模型,允许用户将应用描述为由相互通信的内核构成的图。
    第12~16章展示了面向节点内并行的编程模型,涉及的硬件环境包括多核架构、加速器以及两者同时存在的情况。第12章讨论了OpenMP。OpenMP是当今科学计算领域最重要的节点内并行编程模型。该章介绍了OpenMP的进化历程以及核心特性,并涉及OpenMP 4.0。第13章讨论了Cilk Plus编程模型,它是一种对C及C++语言的并行扩展,用于在现代共享内存多核机器上开发规则以及非规则并行。第14章讨论了Intel TBB(Threading Building Block),它是一个基于C++模板类实现的库。与Cilk Plus类似,TBB支持共享内存多核架构上的并行执行。第15章讨论了NVIDIA提供的CUDA(Compute Unified Device Architecture)编程模型。CUDA通过单指令多线程块运行方式来支持NVIDIA图形处理单元上的并行计算。尽管CUDA是NVIDIA设备上的专有编程模型,但CUDA在并行编程社区中具有广泛影响力,并在应用中得到广泛使用,因此第15章对CUDA进行了讨论。第16章描述了OpenCL(Open Computing Language)模型,它提供了一个低层次的、平台无关的编程模型,可以在不同异构架构上进行编程,其中包括图形处理单元。
    本书对不同编程模型的讲解方式在其他书籍中是很少见的。尤其是通过使用说明的方式来展示材料,而不是以更正式的类似于研究论文的方式展示。本书不是一个致力于详细描述每个编程模型语法及语义的参考手册。本书的目标是描述使用这些模型进行并行编程的通用方法,以及每种方法所实现的目标。不过,本书提供了一些模型所提供的核心接口的语法及语义定义,我们将这些定义作为编程模型所提供抽象的例子。提供这些定义的目的是提高内容的可读性。这些定义并不一定是最重要的或最常用的接口,而只是作为例子说明如何使用该编程模型。

    目录

    1.1 引言
    1.2 MPI基础
    1.3 点对点通信
    1.4 数据类型
    1.5 非阻塞式通信
    1.6 聚合通信
    1.7 单边通信
    1.8 并行I/O
    1.9 其他特性
    1.10 MPI开发心得
    1.11 总结
    2.1 研究背景与动机
    2.2 GASNet概述
    2.2.1 相关术语
    2.2.2 线程
    2.2.3 API组织
    2.3 核心API
    2.3.1 开始和结束
    2.3.2 段信息
    2.3.3 屏障
    2.3.4 锁与中断
    2.3.5 活动消息
    2.3.6 活动消息进程
    2.3.7 活动消息规则与约束
    2.3.8 出错代码
    2.4.1 GASNet段
    2.4.2 排序与内存模型
    2.4.3 阻塞与非阻塞
    2.4.4 批量与单个
    2.4.5 寄存器–内存与远程memset操作
    2.4.6 扩展API总结
    2.5 附加内容
    2.5.1 GASNet工具
    2.5.2 可移植平台头文件
    2.6.1 编译和运行示例
    2.6.2 Hello World示例
    2.6.3 AM Ping-Pong示例
    2.6.4 AM Ring示例
    2.6.5 MCS Locks示例
    2.7 未来方向
    3.1 引言
    3.2 设计理念和根据
    3.3 OpenSHMEM存储模型
    3.4.1 初始化和查询
    3.4.2 分配和释放
    3.4.3 关于分配和对称堆的说明
    3.5 远程内存访问:put和get
    3.5.1 RMA函数语义
    3.5.2 RMA函数使用
    3.6 排序和同步
    3.6.1 全局同步屏障
    3.6.2 fence和quiet:RMA操作排序
    3.6.3 
    3.6.4 wait和wait_until
    3.7 集合操作
    3.7.1 选择集合参与者
    3.7.2 同步数组和工作数组
    3.7.3 非全局同步屏障
    3.7.4 广播
    3.7.5 收集
    3.7.6 归约
    3.8 原子内存操作
    3.8.1 原子加和递增
    3.8.2 原子取–加和取–递增
    3.8.3 原子交换和条件交换
    3.9 未来方向

    展开全文
  • 并行计算的类型需求 计算密集型需求(Compute-Intensive) 数据密集型需求(Data-Intensive) 网络密集型需求(Net-Intensive) 第一篇 并行计算硬件基础 一. 并行计算机系统及其模型 1. 计算机系统互联 (1)...

    并行计算共有四个模块:

    1. 并行计算机体系结构
    2. 并行计算算法
    3. 并行程序设计
    4. 并行计算性能评估

    并行计算的三种类型需求

    1. 计算密集型需求(Compute-Intensive)
    2. 数据密集型需求(Data-Intensive)
    3. 网络密集型需求(Net-Intensive)

    第一篇 并行计算硬件基础

    一. 并行计算机系统及其模型

    1. 计算机系统互联

    (1)系统互联
    在多处理机、多计算机或分布式系统中,不同组成部分(CPU/存储模块、I/O设备、网络接口等)都要通过互联网络彼此连接起来。
    (2)静态互联网络
    (3)动态互联网络
    (4)标准互联网络

    2. 并行计算机体系结构

    SIMD(Signal-Instruction Multiple-Data)单指令数据流
    MIMD(Multiple-Instruction Multiple - Data)多指令多数据流
    MIMD包括
    (1)PVP(Parallel Vector Processor)并行向量处理机
    (2)SMP(Symmetric Multiprocessor)对称多处理机
    (3)MPP(Massively Parallel Processor)大规模并行处理机
    (4)COW(Cluster of Workstations)工作站机群
    (5)DSM(Distributed Shared Memory)分布式共享存储多处理机

    二. 并行计算机系统介绍

    并行计算发展历史出现了不同类型的并行机,包括向量机、SIMD计算机和MIMD计算机。目前向量机和SIMD已经退出历史舞台,MIMD类型的并行机占据主导地位。包括对称多处理机、大规模并行处理机以及机群系统。
    注:SIMD(Signal-Instruction Multiple-Data)单指令数据流
    MIMD(Multiple-Instruction Multiple - Data)多指令多数据流
    本章从以下几个部分介绍:

    共享存储多处理机系统;
    分布式存储多计算机系统;
    机群系统;

    1. 共享存储多处理机系统

    共享存储的对称多处理机SMP(Symmetric Multiprocessor)结构在现今的并行服务器中几乎普遍采用。
    结构特性
    (1)对称性:系统中任何处理器可访问任何存储单元和I/O设备;
    (2)单地址空间
    (3)高速缓存及其一致性:多级高速缓存可支持数据局部性,而其一致性可由硬件来增强。
    (4)低通信延迟
    问题:
    (1)欠可靠:总线,存储器或OS失效都会造成系统崩溃
    (2)可观的延迟
    (3)慢速增加的带宽
    (4)不可扩放性

    2. 分布式存储多计算机系统

    分布式存储的大规模并行处理机MPP(Massively Parallel Processor)。由多个处理器组成的大型计算机系统。

    3. 机群系统

    机群(Cluster)主要指两种类型:一是构筑高端大规模并行处理系统MPP机群;二是有LAN互连而成的工作站机群COW。

    三. 并行计算性能评测

    1. 机器级
    2. 算法级
    3. 程序级

    第二篇 并行算法的设计

    本篇主要研究如何设计并行算法
    并行算法的设计基础
    并行算法的一般设计策略:三种并行化法、全新法、借用法
    并行算法的基本设计技术
    并行算法的一般设计过程

    第四章 并行算法的设计基础

    任何并行算法的设计都是基于一种特定的并行计算模型,而并行计算模型是从各种具体并行机中抽象出来的。并行计算模型一般可以分为抽象计算模型和实用计算模型。

    1.并行算法基础知识

    (1)基本概念
    同步算法:算法的诸进程的执行必须相互等待的一类并行算法
    异步算法:算法的诸进程的执行不必互相等待的一类并行算法
    分布算法:是指由通信链路连接的多个场点或节点,协同完成问题求解的一类并行算法
    (2)并行算法的复杂性度量
    分析并行算法时,通常要分析如下几个指标。
    运行时间 包括计算时间和同步时间
    处理器数 求解给定问题所用的处理器数目
    并行计算的成本 并行算法运行时间与其所需的处理器数乘积
    总运算量 并行算法所完成的总的操作数量
    (3)同步
    同步(Synchronization)是在时间上强使各执行进程在某一点必须互相等待。
    (4)通信
    通信(Communication)是在空间上对个并发执行的进程施行数据交换。

    2. 并行计算模型

    所谓计算模型实际上就是硬件和软件之间的一种桥梁,使用它能够设计分析算法,在其上高级语言能被有效地编译且能够用硬件来实现。
    在串行计算时冯 诺依曼机就是理想的串行计算模型,但是在并行计算时,还没有一个类似于冯诺依曼机的真正通用并行计算模型。目前流行的计算模型要么过于简单,抽象(PRAM);要么过于专用(如互联网模型和VLSI模型)
    (1)PRAM模型(Parallel Random Access Machine)并行随机取存器。也称之为共享存储的SIMD模型。
    (2)异步PRAM模型
    (3)BSP模型(Bulk Synchronous Parallel)
    (4)logP模型

    对BSP和logP的对比:
    BSP把所有的计算和通信视为一个整体行为而不是一个单独的进程和通信的个体行为。采用各进程延迟通信的办法,将诸消息组合成一个尽可能大的通信实体施行选路传输,这就是所谓的整体大同步。简化了算法的设计和分析,但是牺牲了运行时间,因为延迟通信意味着所有进程均必须等待最慢者。改进方法就是采用自己同步,慢的放到一个子集,快的放到一个子集。就变成了logP

    第五章 并行算法的一般策略

    设计并行算法一般有三章策略
    (1)检测和开拓现有串行算法中的固有并行性而直接将其并行化
    (2)从问题描述本身出发,根据问题的固有属性从头开始设计一个全新的并行算法,通常算法最高效
    (3)借用已有的并行算法使之可求解新的一类问题

    1. 串行算法的直接并行化

    科学和工程问题中的数值计算问题,依据其数值分析数学原理而产生了很多广泛使用的串行算法,在设计这类问题时大都采用串行算法直接并行化的办法,这样如算法的稳定性、收敛性等复杂问题均无需考虑

    2. 从问题描述重新开始设计并行算法

    对于有些算法恐难直接进行并行化,此时从问题描述出发寻求新的途径设计新的并行算法。

    3. 借用法

    所谓借用法是指借用已知的某类问题求解算法来求解另一类问题。

    第六章 并行算法的基本设计技术

    本章介绍一些基本的设计技术可供参考使用。大思路就是将一原始问题分成若干个部分,然后各部分由相应的处理器同时执行。
    最基本的设计技术:均匀划分技术、方根划分、对数划分、功能划分技术。
    流水线技术是并行处理最基本的技术之一。

    第七章 并行算法的一般设计流程

    1. 任务划分Partitioning
    2. 通信分析Communication
    3. 任务组合Agglomeration
    4. 处理器映射Mapping
      PCAM
      划分:将整个计算分解成一些小的任务,其目的是尽可能开拓并行执行的机会
      通信:确定诸任务中所需交换的数据和协调诸任务的执行,由此可检测上述划分的合理性
      组合:按xngneng要求和实现代价来考察前两个阶段的结果,必要时可将一些小的任务组成更大的任务以提高性能或减少通信开销。
      映射:将任务分配到一个处理器上,其目的是最小化全局执行时间和通信成本以及最大化处理器利用率。
    展开全文
  • 在分析LOD内在并行基础上,利用通用的并行编程环境OpenMP对其进行线程化,提出了一基于叉树网格划分的并行简化算法,并在四核计算机上应用Intel parallel amplifier分析器按函数查看性能变化,对比优化前后...
  • 1. 设计目的、意义(功能描述) 蒙特·卡罗方法(Monte ...学习并行计算的历程不会因为完成本次大作业而停止,我们是为了用知识武装大脑而学习,通过学习充实自己的生活,要努力学习,争取以后能够完成规模更大的程序。
  • 1. 设计目的、意义(功能描述) 蒙特·卡罗方法(Monte ...学习并行计算的历程不会因为完成本次大作业而停止,我们是为了用知识武装大脑而学习,通过学习充实自己的生活,要努力学习,争取以后能够完成规模更大的程序。
  • GPU硬件及其编程模型Brook+特性,本文提出了四种面向GPU的并行优化策 略,包括平衡线程并行性和局部性、分支消除、开发指令级并行和提高访存带宽 利用率。本文选取了矩阵乘、LU分解和Mgrid三个重要科学计算程序...
  • 其实并行计算的软件技术早已存在了几十年,然而其原来主要服务于高性能计算一类的应用,所以并行化编程一直也都为阳春白雪的光环笼罩。现在谈到多核编程,讨论较多的是各种软件或者并行编程模型的使用;对于初学者而...

    多核计算平台的普及化使得并行(Parallel)或者并发(Concurrent)程序设计(这里不妨称它们为并行化程序设计)成为一种编程技术主流。其实并行计算的软件技术早已存在了几十年,然而其原来主要服务于高性能计算一类的应用,所以并行化编程一直也都为阳春白雪的光环笼罩。现在谈到多核编程,讨论较多的是各种软件或者并行编程模型的使用;对于初学者而言却仍可能难以循其径而入。

    其实,并行化的程序设计是有章可循的。按照开发流程的顺序,可以把并行化程序设计分为以下四个阶段:
    1. 可行算法(解决方案)的描述与分析
    2. 工作分解(Decomposition)——依赖性和同步与通信开销分析
    3. 选择编程(实现)模型
    4. 性能检查及优化

    在设计的初始阶段,开发者应当针对要解决的问题先找到一个可行的解决方案或者算法。比如,排序问题的解决方案有气泡排序,快速排序,二叉树排序等已知可行的方法可以作为并行化的基础算法。而在进行具体的并行化设计之前,有一个很重要的分析(或者评估)要做,那就是并行化的必要性分析;即,应该估计一下目标问题的计算量。如果需要解决的问题计算量并不是很大,比如只需要对30个整数进行排序,即使采用传统串行程序也不会占用太多时间,那么就可能没有为之设计并行化程序的必要,因为并行化也是要付出其它计算的代价的。另外,基础算法的选择也很有讲究。有些算法本身就不具备太多的并行性,比如图论算法中最小生成树/Minimum Spanning Tree(MST)的Kruskal算法;而有些算法则具有很好的可扩展性(Scalability),比如MST的Boruvka算法 (关于MST算法并行化的例子可参见ISN学术社区课件)。为确定算法的并行性,一般需要借助一些理论和工具的帮助。Amdal's law是大多数设计者所采用的估计并行化加速比上限的定理;而Gustafson's law则是分析并行程序可扩展性的有力理论指导。而为了能够使用这些定理给出指导,还需要一些软件工具(Profiling Tools)的辅助,从而确定理论所需的一些参数(如串行程序的并行量p)。提供这一种功能的常用的工具有Intel性能分析器Vtune,Windows里面的PerfMon等。对于理论和软件工具的使用可以参照学术社区的课件。基础算法的确定需要开发者结合下一步进行综合考虑,反复尝试几次。

    当候选的基础算法确定后,开发人员就需要进行一个并行化编程中非常重要的步骤——工作分解(Decomposition)。分解是对基础算法分析,将之分解为若干相对独立的部分(或者操作);进而可以通过后面一步(选择适当编程模型)把这些相对独立的部分分配到多个执行(处理)单元执行。工作分解的手段一般可以分为任务分解(Task Decomposition)和数据分解(Data Decomposition)。任务分解就是把一个算法按照操作的相关性(依赖性)分解为若干可以同时执行的子任务,比如下图中的函数h,q和r或者s是可以并行的。
    数据分解则是将一个比较大的待处理的数据集,如数组分割为若干子集,从而可以的不同部分的成员实施同时的运算或操作。此外还有一种常用的工作分解方法是流水线(Pipeline)方式,其基本原理是仿照生产流水线的操作,把一个大任务分解为若干紧密相连的阶段,从而提高每个阶段工作单元的运行效率。学术社区的在线课件有对工作分解的具体讲解,这里就不再赘述。对于分解方法的应用,需要开发者进行大量的分析实践,积累经验,提高分解的正确性和效率;另一方面,也还是有一些通用的经验可以借鉴。比如,算法中有循环体的,一般意味着可能可以采用数据分解,将不同的迭代(如果它们之间没什么依赖性的话)划分为不同的执行子集,分配给不同的线程或者进程执行;类似的,过程和函数是程序员把通用操作打包,从而便于阅读和维护有效手段,这也就暗示了函数体也是很好的数据分解的候选人;而在媒体流处理这类应用中,流水线是常用的分解手段。有心的读者可能注意到我们前面介绍分解方法是多次提到相关性或者依赖性(Dependence)。这是在进行工作分解时要面对的一个重要指标,因为分解部分之间的依赖性直接影响它们的可并行性。比如下图中,我们使用依赖性图(Dependence Graph)进行分析,就会发现各个循环迭代中对于变量a[i]具有读写依赖,那么这个循环体是不能被通过数据分解而并行化的,从而这种分解方案是不可行的(当然也有一些依赖性通过优化手段可以去除,由于篇幅的缘故,关于依赖性分析请参照学术社区的课件)。

    应当指出的是,大多数的应用(算法)是不可能分解为完全独立的部分,这些部分多少都需要一定的依赖和协调来完成工作。分解所要达到的目的应该是把基础算法分解为相互依赖性最小的若干部分,因为各个部分的协调和同步(Synchronizations)与通信(Communications)是需要额外开销的,这是并行化所要付出的开销。开发者进行并行化编程应当是使并行化的收益大于开销(要使用商人的哲学)。工作分解的依赖性分析可以帮助我们确定资源(如变量,数据结构)的性质,比如是共享还是私有等,从而确定同步和通信的对象,以及适用手段(如采用信号量还是互斥等)。

    当完成了基础算法的工作分解后,一个并行化的算法(解决方案)也就具备雏形了。这时候,设计者要选用一种(对于有经验者也可以采用多种混合)编程模型来实现并行化的算法。由于关于这方面的介绍已经汗牛充栋,所以这里不过多重复,只是稍微分享自己的经验。一般对于初学者和希望快速并行化的开发者,可以适用类似OpenMP一类隐式线程化实现(Implicit Threading);对于有有经验者或者硬核派,可以适用Windows的Win32 API,P-threads API或者适用于分布式平台的MPI,这类显性(Explicit Threading)进行相对底层的(如线程创建、销毁已经同步)控制;如果想使用既提供高层(隐性)并行化,又提供低层(显性)并行化手段的编程模型,Java和Intel的Threading Building Blocks(TBB)会是很好的选择。

    经过前面三步,开发者应该已经有了一个能够工作的并行化程序。此时,应该问自己的问题是,并行化的效果足够吗?能够满足需求吗?如果答案是否定的,就需要问自己还有足够的资源(人力,时间)来作进一步的优化吗?如果答案是肯定的,那么可以重复前面三个步骤,更进一步地发掘并行性,降低同步与通信的开销,从而获得性能的提升。

    转载于:https://www.cnblogs.com/me115/archive/2011/01/24/1943189.html

    展开全文
  • 原文地址:Java并发的四种风味:Thread、Executor、ForkJoin和Actor 这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好的几种解决方法,Executor服务、Fork/Join 框架以及计算中的Actor...

    原文地址:Java并发的四种风味:Thread、Executor、ForkJoin和Actor

    这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好的几种解决方法,Executor服务、Fork/Join 框架以及计算中的Actor模型

    Java并发编程的4种风格:Threads,Executors,Fork/Join和Actors
    这里写图片描述

    我们生活在一个事情并行发生的世界。自然地,我们编写的程序也反映了这个特点,它们可以并发的执行。当然除了Python代码(译者注:链接里面讲述了Python的全局解释器锁,解释了原因),不过你仍然可以使用Jython在JVM上运行你的程序,来利用多处理器电脑的强大能力。

    然而,并发程序的复杂程度远远超出了人类大脑的处理能力。相比较而言,我们简直弱爆了:我们生来就不是为了思考多线程程序、评估并发访问有限资源以及预测哪里会发生错误或者瓶颈。

    面对这些困难,人类已经总结了不少并发计算的解决方案和模型。这些模型强调问题的不同部分,当我们实现并行计算时,可以根据问题做出不同的选择。

    在这篇文章中,我将会用对同一个问题,用不同的代码来实现并发的解决方案;然后讨论这些方案有哪些好的地方,有哪些缺陷,可能会有什么样的陷阱在等着你。

    我们将介绍下面几种并发处理和异步代码的方式:

    • 裸线程
    • Executors和Services
    • ForkJoin框架和并行流
    • Actor模型

    为了更加有趣一些,我没有仅仅通过一些代码来说明这些方法,而是使用了一个共同的任务,因此每一节中的代码差不多都是等价的。另外,这些代码仅仅是展示用的,初始化的代码并没有写出来,并且它们也不是产品级的软件示例。


    任务

    任务:实现一个方法,它接收一条消息和一组字符串作为参数,这些字符串与某个搜索引擎的查询页面对应。对每个字符串,这个方法发出一个http请求来查询消息,并返回第一条可用的结果,越快越好。

    如果有错误发生,抛出一个异常或者返回空都是可以的。我只是尝试避免为了等待结果而出现无限循环。

    简单说明:这次我不会真正深入到多线程如何通讯的细节,或者深入到Java内存模型。如果你迫切地想了解这些,你可以看我前面的文章利用JCStress测试并发

    为了后面的代码值只关注于并发编程,这里提供两个类:

    • 一个接口IFlavorDemo,用于规定并发查询方法getFirstResult
    • 一个工具类EngineUtils,用于模拟搜索引擎列表搜索方法

    EngineUtils.java

    /**
     * 搜索引擎工具类
     * Created by 韩超 on 2018/3/6.
     */
    public class EngineUtils {
        private final static Logger LOGGER = Logger.getLogger(EngineUtils.class);
    
        //搜索引擎列表
        private static List<String> engineList;
    
        static {
            engineList = new ArrayList<>();
            engineList.add("百度");
            engineList.add("Google");
            engineList.add("必应");
            engineList.add("搜狗");
            engineList.add("Redis");
            engineList.add("Solr");
        }
    
        /**
         * <p>Title: 模拟一个搜索引擎进行一次问题查询</p>
         * @author 韩超 2018/3/6 11:20
         */
        public static String searchByEngine(String question,String engine) throws InterruptedException {
            //获取随机的时间间隔
            int interval = RandomUtils.nextInt(1,5000);
            LOGGER.info("搜索引擎[" + engine + "]正在查询,预计用时" + interval + "毫秒...");
            //当前线程休眠指定时间,模拟搜索引擎用时
            Thread.sleep(interval);
            return "通过搜索引擎[" + engine + "],首先查到关于(" + question + ")问题的结果,用时 = " + interval + "毫秒!";
        }
    
        public static List<String> getEngineList() {
            return engineList;
        }
    
        public static void setEngineList(List<String> engineList) {
            EngineUtils.engineList = engineList;
        }
    }

    IFlavorDemo.java

    /**
     * Created by 韩超 on 2018/3/6.
     */
    public interface IFlavorDemo {
        String getFirstResult(String question, List<String > engines);
    }

    那么,让我们从最直接、最核心的方式来在JVM上实现并发:手动管理裸线程。

    方法1:使用“原汁原味”的裸线程

    解放你的代码,回归自然,使用裸线程!线程是并发最基本的单元Java线程本质上被映射到操作系统线程,并且每个线程对象对应着一个计算机底层线程

    自然地,JVM管理着线程的生存期,而且只要你不需要线程间通讯,你也不需要关注线程调度。

    每个线程有自己的栈空间,它占用了JVM进程空间的指定一部分。

    线程的接口相当简明,你只需要提供一个Runnable,调用.start()开始计算。没有现成的API来结束线程,你需要自己来实现,通过类似boolean类型的标记来通讯。

    在下面的例子中,我们对每个被查询的搜索引擎,创建了一个线程。查询的结果被设置到AtomicReference,它不需要锁或者其他机制来保证只出现一次写操作。开始吧!

    代码:

    /**
     * <p>并发四种口味-01 裸线程</p>
     *
     * @author hanchao 2018/3/5 21:53
     **/
    public class FlavorThreadsDemo implements IFlavorDemo {
        private static final Logger LOGGER = Logger.getLogger(FlavorThreadsDemo.class);
    
        /**
         * 通过多个搜索引擎查询多个条件,并返回第一条查询结果
         *
         * @param question 查询问题
         * @param engines  查询条件数组
         * @return 最先查出的结果
         * @author hanchao 2018/3/5 22:05
         */
        @Override
        public String getFirstResult(String question, List<String> engines) {
            //将存放查询的数据类型设置为"Atomic"类型,保证原子性
            AtomicReference<String> result = new AtomicReference<String>();
            LOGGER.info("通过裸线程进行并发编程,自己控制现场数量:" + engines.size());
    
            //使用原子变量去测试裸线程创建是否有序
            AtomicInteger count = new AtomicInteger(1);
    
            //针对每一个搜索引擎,都开启一个线程进行查询
            for (String engine : engines) {
                //通过java8提供的lambda表达式创建线程
                new Thread(
                        () -> {
                            try {
                                //调用某种搜索引擎进行搜索
                                result.compareAndSet(null, EngineUtils.searchByEngine(question, engine));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                ).start();//通过.start()启动线程
                LOGGER.info("为搜索引擎[" + engine + "]创建" + count + "个线程...");
                count.getAndIncrement();
            }
            //无限循环,直至result有值为止
            while (result.get() == null) ;
            //返回搜索结果
            return result.get();
        }
    
        /**
         * <p>创建一组搜索引擎,对同一话题进行查询,并获取第一个查到的结果。</p>
         *
         * @author hanchao 2018/3/5 22:47
         **/
        public static void main(String[] args) {
            //通过工具类获取搜索引擎列表
            List<String> engines = EngineUtils.getEngineList();
            //通过 裸线程 进行并发查询,获取最先查到的答案
            String result = new FlavorThreadsDemo().getFirstResult("正则表达式", engines);
            //打印结果
            LOGGER.info(result);
        }
    }

    结果:

    
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:29 - 通过裸线程进行并发编程,自己控制现场数量:6
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[百度]创建1个线程...
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[Google]创建2个线程...
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[必应]创建3个线程...
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[搜狗]创建4个线程...
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[Redis]创建5个线程...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[Redis]正在查询,预计用时2484毫秒...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[Google]正在查询,预计用时419毫秒...
    2018-03-06 22:32:46 INFO  FlavorThreadsDemo:47 - 为搜索引擎[Solr]创建6个线程...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[百度]正在查询,预计用时2093毫秒...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[搜狗]正在查询,预计用时4568毫秒...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[必应]正在查询,预计用时1022毫秒...
    2018-03-06 22:32:46 INFO  EngineUtils:36 - 搜索引擎[Solr]正在查询,预计用时3937毫秒...
    2018-03-06 22:32:47 INFO  FlavorThreadsDemo:67 - 通过搜索引擎[Google],首先查到关于(正则表达式)问题的结果,用时 = 419毫秒!

    使用裸线程的主要优点是,你很接近并发计算的操作系统/硬件模型。并且这个模型非常简单:多个线程运行,通过共享内存通讯,就是这样

    自己管理线程的最大劣势是,你很容易过分的关注线程的数量。线程是很昂贵的对象,创建它们需要耗费大量的内存和时间。这是一个矛盾,线程太少,你不能获得良好的并发性;线程太多,将很可能导致内存问题,调度也变得更复杂。

    然而,如果你需要一个快速和简单的解决方案,你绝对可以使用这个方法,不要犹豫。

    方法2:认真对待Executor和CompletionService

    另一个选择是使用API来管理一组线程。幸运的是,JVM为我们提供了这样的功能,就是Executor接口。Executor接口的定义非常简单:

    public interface Executor {
        void execute(Runnable command);
    }

    Executor接口隐藏了如何处理Runnable的细节。它仅仅说,“开发者!你只不过是一袋肉,给我任务,我会处理它!”

    更酷的是,Executors类提供了一组方法,能够创建拥有完善配置的线程池和executor。我们将使用newFixedThreadPool(),它创建预定义数量的线程,并不允许线程数量超过这个预定义值。这意味着,如果所有的线程都被使用的话,提交的命令将会被放到一个队列中等待;当然这是由executor来管理的。

    在它的上层,有ExecutorService管理executor的生命周期,以及CompletionService会抽象掉更多细节,作为已完成任务的队列。得益于此,我们不必担心只会得到第一个结果。

    代码:

    /**
     * 并发四种口味-02 Executor
     * Created by 韩超 on 2018/3/6.
     */
    public class FlavorExecutorsDemo implements IFlavorDemo  {
        private final static Logger LOGGER = Logger.getLogger(FlavorExecutorsDemo.class);
    
        /**
         * <p>Title: 通过多个搜索引擎查询多个条件,并返回第一条查询结果</p>
         *
         * @param question 问题
         * @param engines  搜索引擎列表
         * @author 韩超 2018/3/6 10:07
         */
        @Override
        public String getFirstResult(String question, List<String> engines){
            //将查询结果放在"Atomic"变量中,保证原子性
            AtomicReference<String> result = new AtomicReference<String>();
    
            //通过Executors.newFixedThreadPool(size)创建固定大小的线程池,只能运行size数量的线程,其余线程等待
            //创建ExecutorService线程池,此线程池能够主动控制线程池的运行、关闭和终止
            ExecutorService service = Executors.newFixedThreadPool(3);
            LOGGER.info("通过Executors创建固定大小的线程池,线程池大小:3,当前线程数:" + Thread.activeCount() + "线程池最大线程数:" + (Thread.activeCount() + 3));
            try{
                //使用原子变量去测试 线程池提交服务 的是否有序
                AtomicInteger count = new AtomicInteger();
    
                //针对每一个搜索引擎,都调用一次service的submit()方法
                for (String engine : engines) {
                    //lambda,通过service.submit()设置业务代码
                    service.submit(
                            () -> {
                                LOGGER.info("为搜索引擎[" + engine + "]进行第" + count + "次服务提交...当前活跃线程数:" + Thread.activeCount());
                                count.getAndIncrement();
                                //调用某种搜索引擎进行搜索,并将搜索结果通过CAS方式放到result中
                                result.compareAndSet(null, EngineUtils.searchByEngine(question, engine));
                                return result;
                            }
                    );
                }
                //当result取不到值时,证明还没有搜索引擎获取查出结果,通过while的无限循环进行等待
                while (null == result.get()) ;
            }finally {
                //记得要手动关闭ExecutorService线程池
                service.shutdown();
            }
    
            return result.get();
        }
    
        /**
         * <p>Title: 创建一组搜索引擎,对同一话题进行查询,并获取第一个查到的结果。</p>
         *
         * @author 韩超 2018/3/6 10:05
         */
        public static void main(String[] args) throws InterruptedException {
            //通过工具类获取搜索引擎列表
            List<String> engines = EngineUtils.getEngineList();
            //通过 executor 进行并发查询,获取最先查到的答案
            String result = new FlavorExecutorsDemo().getFirstResult("如何使用筷子?", engines);
            //打印结果
            LOGGER.info(result);
        }
    }

    结果:

    2018-03-06 22:33:24 INFO  FlavorExecutorsDemo:33 - 通过Executors创建固定大小的线程池,线程池大小:3,当前线程数:2线程池最大线程数:5
    2018-03-06 22:33:24 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[Google]进行第0次服务提交...当前活跃线程数:5
    2018-03-06 22:33:24 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[百度]进行第0次服务提交...当前活跃线程数:5
    2018-03-06 22:33:24 INFO  EngineUtils:36 - 搜索引擎[百度]正在查询,预计用时3251毫秒...
    2018-03-06 22:33:24 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[必应]进行第0次服务提交...当前活跃线程数:5
    2018-03-06 22:33:24 INFO  EngineUtils:36 - 搜索引擎[Google]正在查询,预计用时1589毫秒...
    2018-03-06 22:33:24 INFO  EngineUtils:36 - 搜索引擎[必应]正在查询,预计用时4199毫秒...
    2018-03-06 22:33:25 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[搜狗]进行第3次服务提交...当前活跃线程数:5
    2018-03-06 22:33:25 INFO  FlavorExecutorsDemo:72 - 通过搜索引擎[Google],首先查到关于(如何使用筷子?)问题的结果,用时 = 1589毫秒!
    2018-03-06 22:33:25 INFO  EngineUtils:36 - 搜索引擎[搜狗]正在查询,预计用时1340毫秒...
    2018-03-06 22:33:27 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[Redis]进行第4次服务提交...当前活跃线程数:5
    2018-03-06 22:33:27 INFO  EngineUtils:36 - 搜索引擎[Redis]正在查询,预计用时4383毫秒...
    2018-03-06 22:33:27 INFO  FlavorExecutorsDemo:43 - 为搜索引擎[Solr]进行第5次服务提交...当前活跃线程数:5
    2018-03-06 22:33:27 INFO  EngineUtils:36 - 搜索引擎[Solr]正在查询,预计用时4173毫秒...

    如果你需要精确的控制程序产生的线程数量,以及它们的精确行为,那么executor和executor服务将是正确的选择。例如,需要仔细考虑的一个重要问题是,当所有线程都在忙于做其他事情时,需要什么样的策略?增加线程数量或者不做数量限制?把任务放入到队列等待?如果队列也满了呢?无限制的增加队列大小?

    感谢JDK,已经有很多配置项回答了这些问题,并且有着直观的名字,例如上面的Executors.newFixedThreadPool(4)。

    线程和服务的生命周期也可以通过选项来配置,使资源可以在恰当的时间关闭。唯一的不便之处是,对新手来说,配置选项稍微有一些复杂和抽象。然而,在并发编程方面,你几乎找不到更简单的了。

    总之,对于大型系统,我个人认为使用executor最合适

    方法3:通过并行流,使用ForkJoinPool (FJP)

    Java 8中加入了并行流,从此我们有了一个并行处理集合的简单方法。它和lambda一起,构成了并发计算的一个强大工具。

    如果你打算运用这种方法,那么有几点需要注意。首先,你必须掌握一些函数编程的概念,它实际上更有优势。其次,你很难知道并行流实际上是否使用了超过一个线程,这要由流的具体实现来决定。如果你无法控制流的数据源,你就无法确定它做了什么。

    另外,你需要记住,默认情况下是通过ForkJoinPool.commonPool()实现并行的。这个通用池由JVM来管理,并且被JVM进程内的所有线程共享。这简化了配置项,因此你不用担心。

    代码:

    /**
     * 并发四种口味-03 Fork/Join框架
     * Created by 韩超 on 2018/3/6.
     */
    public class FlavorParallelDemo implements IFlavorDemo {
        private final static Logger LOGGER = Logger.getLogger(FlavorParallelDemo.class);
    
        /**
         * 通过多个搜索引擎查询多个条件,并返回第一条查询结果
         *
         * @param question 查询问题
         * @param engines  查询条件数组
         * @return 最先查出的结果
         * @author hanchao 2018/3/5 22:05
         */
        @Override
        public String getFirstResult(String question, List<String> engines) {
            LOGGER.info("使用默认并行流进行并发编程,默认划分的子任务数 = CPU内核数(4)");
            //使用原子变量去测试任务划分是否有序
            AtomicInteger count = new AtomicInteger();
    
            //用list.stream.parallel()开启并行流进行并发编程
            Optional<String> result = engines.stream().parallel().map(
                    (engine) -> {
                        try {
                            LOGGER.info("CPU划分了第" + count + "个子任务....");
                            count.getAndIncrement();
                            return EngineUtils.searchByEngine(question, engine);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
            ).findAny();//任何一个子任务完成都可以结束
            return result.get();
        }
    
        /**
         * <p>Title: 创建一组搜索引擎,对同一话题进行查询,并获取第一个查到的结果。</p>
         *
         * @author 韩超 2018/3/6 11:53
         */
        public static void main(String[] args) {
            //通过工具类获取搜索引擎列表
            List<String> engines = EngineUtils.getEngineList();
            //通过 并行操作 进行并发查询,获取最先查到的答案
            String result = new FlavorParallelDemo().getFirstResult("正则表达式", engines);
            //打印结果
            LOGGER.info(result);
        }
    }

    结果:

    2018-03-06 22:33:44 INFO  FlavorParallelDemo:26 - 使用默认并行流进行并发编程,默认划分的子任务数 = CPU内核数(42018-03-06 22:33:44 INFO  FlavorParallelDemo:34 - CPU划分了第0个子任务....
    2018-03-06 22:33:44 INFO  FlavorParallelDemo:34 - CPU划分了第0个子任务....
    2018-03-06 22:33:44 INFO  FlavorParallelDemo:34 - CPU划分了第0个子任务....
    2018-03-06 22:33:44 INFO  FlavorParallelDemo:34 - CPU划分了第0个子任务....
    2018-03-06 22:33:44 INFO  EngineUtils:36 - 搜索引擎[百度]正在查询,预计用时2545毫秒...
    2018-03-06 22:33:44 INFO  EngineUtils:36 - 搜索引擎[搜狗]正在查询,预计用时3158毫秒...
    2018-03-06 22:33:44 INFO  EngineUtils:36 - 搜索引擎[Google]正在查询,预计用时1317毫秒...
    2018-03-06 22:33:44 INFO  EngineUtils:36 - 搜索引擎[必应]正在查询,预计用时2503毫秒...
    2018-03-06 22:33:47 INFO  FlavorParallelDemo:57 - 通过搜索引擎[Google],首先查到关于(正则表达式)问题的结果,用时 = 1317毫秒!

    看上面的并行流(parallelStream)例子,我们不关心单独的任务在哪里完成,由谁完成。然而,这也意味着,你的应用程序中可能存在一些停滞的任务,而你却无法知道。在另一篇关于并行流的文章中,我详细地描述了这个问题。并且有一个变通的解决方案,虽然它并不是世界上最直观的方案。

    ForkJoin是一个很好的框架,由比我更聪明的人来编写和预先配置。因此当我需要写一个包含并行处理的小型程序时,ForkJoin是我的第一选择。

    ForkJoin最大的缺点是,你必须预见到它可能产生的并发症。如果对JVM没有整体上的深入了解,这很难做到。这只能来自于经验。

    方法4:雇用一个Actor

    Actor模型是对我们本文中所探讨的方法的一个奇怪的补充。JDK中没有actor的实现;因此你必须引用一些实现了actor的库。

    简短地说,在actor模型中,你把一切都看做是一个actor。一个actor是一个计算实体,就像上面第一个例子中的线程,它可以从其他actor那里接收消息,因为一切都是actor。

    在应答消息时,它可以给其他actor发送消息,或者创建新的actor并与之交互,或者只改变自己的内部状态。

    相当简单,但这是一个非常强大的概念。生命周期和消息传递由你的框架来管理,你只需要指定计算单元是什么就可以了。另外,actor模型强调避免全局状态,这会带来很多便利。你可以应用监督策略,例如免费重试,更简单的分布式系统设计,错误容忍度等等。

    下面是一个使用Akka Actors的例子。Akka Actors有Java接口,是最流行的JVM Actor库之一。实际上,它也有Scala接口,并且是Scala目前默认的actor库。Scala曾经在内部实现了actor。不少JVM语言都实现了actor,比如Fantom。这些说明了Actor模型已经被广泛接受,并被看做是对语言非常有价值的补充。

    代码:

    /**
     * Created by 韩超 on 2018/3/6.
     */
    public class FlavorActorDemo implements IFlavorDemo {
        private final static Logger LOGGER = Logger.getLogger(FlavorActorDemo.class);
    
        /**
         * <p>Title: 定义查询条件类,用于传递消息</p>
         *
         * @author 韩超 2018/3/6 16:16
         */
        static class QueryTerms {
            /**
             * 问题
             */
            private String question;
            /**
             * 搜索引擎
             */
            private String engine;
    
            public String getQuestion() {
                return question;
            }
    
            public void setQuestion(String question) {
                this.question = question;
            }
    
            public String getEngine() {
                return engine;
            }
    
            public void setEngine(String engine) {
                this.engine = engine;
            }
    
            public QueryTerms(String question, String engin) {
                this.question = question;
                this.engine = engin;
            }
        }
    
        /**
         * <p>Title: 定义查询结果类,用于消息传递</p>
         *
         * @author 韩超 2018/3/6 16:17
         */
        static class QueryResult {
            /**
             * 查询结果
             */
            private String result;
    
            public QueryResult(String result) {
                this.result = result;
            }
    
            public String getResult() {
                return result;
            }
    
            public void setResult(String result) {
                this.result = result;
            }
        }
    
        /**
         * <p>Title:搜索引擎Actor </br>
         * 继承UntypedAbstractActor成为一个Actor</p>
         *
         * @author 韩超 2018/3/6 14:42
         */
        static class SearchEngineAcotr extends UntypedAbstractActor {
    
            /**
             * <p>Title: Actor都需要重写消息接收处理方法</p>
             *
             * @author 韩超 2018/3/6 14:42
             */
            @Override
            public void onReceive(Object message) throws Throwable {
                //如果消息是指定的类型Message,则进行处理,否则不处理
                if (message instanceof QueryTerms) {
                    //通过工具类进行一次搜索引擎查询
                    String result = EngineUtils.searchByEngine(((QueryTerms) message).getQuestion(), ((QueryTerms) message).getEngine());
                    //通过getSender().tell(result,actor)将actor的 处理结果[result] 发送消息的发送者[getSender()]
                    //通过getSender获取消息的发送方
                    //通过getSelf()获取当前Actor
                    getSender().tell(new QueryResult(result), getSelf());
                } else {
                    unhandled(message);
                }
            }
        }
    
        /**
         * <p>Title: 问题查询器Actor</br>
         * 继承自UntypedAbstractActor</p>
         *
         * @author 韩超 2018/3/6 16:31
         */
        static class QuestionQuerier extends UntypedAbstractActor {
            /**
             * 搜索引擎列表
             */
            private List<String> engines;
            /**
             * 搜索结果
             */
            private AtomicReference<String> result;
            /**
             * 问题
             */
            private String question;
    
            public QuestionQuerier(String question, List<String> engines, AtomicReference<String> result) {
                this.question = question;
                this.engines = engines;
                this.result = result;
            }
    
            /**
             * <p>Title: Actor都需要重写消息接收处理方法</p>
             *
             * @author 韩超 2018/3/6 16:35
             */
            @Override
            public void onReceive(Object message) throws Throwable {
                //如果收到查询结果,则对查询结果进行处理
                if (message instanceof QueryResult) {//如果消息是指定的类型Result,则进行处理,否则不处理
                    //通过CAS设置原子引用的值
                    result.compareAndSet(null, ((QueryResult) message).getResult());
                    //如果已经查询到了结果,则停止Actor
                    //通过getContext()获取ActorSystem的上下文环境
                    //通过getContext().stop(self())停止当前Actor
                    getContext().stop(self());
                } else {//如果没有收到处理结果,则创建搜索引擎Actor进行查询
    
                    //使用原子变量去测试Actor的创建是否有序
                    AtomicInteger count = new AtomicInteger(1);
    
                    //针对每一个搜索引擎,都创建一个Actor
                    for (String engine : engines) {
                        LOGGER.info("为" + engine + "创建第" + count + "个搜索引擎Actor....");
                        count.getAndIncrement();
    
                        //通过actorOf(Props,name)创建Actor
                        //通过Props.create(Actor.class)创建Props
                        ActorRef fetcher = this.getContext().actorOf(Props.create(SearchEngineAcotr.class), "fetcher-" + engine.hashCode());
                        //创建查询条件
                        QueryTerms msg = new QueryTerms(question, engine);
                        //将查询条件告诉Actor
                        fetcher.tell(msg, self());
                    }
                }
            }
        }
    
        /**
         * 通过多个搜索引擎查询多个条件,并返回第一条查询结果
         *
         * @param question 查询问题
         * @param engines  查询条件数组
         * @return 最先查出的结果
         * @author 韩超 2018/3/6 16:44
         */
        @Override
        public String getFirstResult(String question, List<String> engines) {
            //创建一个Actor系统
            ActorSystem system = ActorSystem.create("searchByEngines");
            //创建一个原子引用用于保存查询结果
            AtomicReference<String> result = new AtomicReference<>();
            //通过静态方法,调用Props的构造器,创建Props对象
            Props props = Props.create(QuestionQuerier.class, question, engines, result);
            //通过system.actorOf(props,name)创建一个 问题查询器Actor
            final ActorRef querier = system.actorOf(props, "master");
            //告诉问题查询器开始查询
            querier.tell(new Object(), ActorRef.noSender());
    
            //通过while无限循环 等待actor进行查询,知道产生结果
            while (null == result.get()) ;
            //关闭 Actor系统
            system.terminate();
            //返回结果
            return result.get();
        }
    
        /**
         * <p>Title: </p>
         *
         * @author 韩超 2018/3/6 14:15
         */
        public static void main(String[] args) {
            //通过工具类获取搜索引擎列表
            List<String> engines = EngineUtils.getEngineList();
            //通过 Actor 进行并发查询,获取最先查到的答案
            String result = new FlavorActorDemo().getFirstResult("今天你吃了吗?", engines);
            //打印结果
            LOGGER.info(result);
    
        }
    }

    结果:

    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为百度创建第1个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为Google创建第2个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[百度]正在查询,预计用时4894毫秒...
    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为必应创建第3个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[Google]正在查询,预计用时3258毫秒...
    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为搜狗创建第4个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[必应]正在查询,预计用时76毫秒...
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[搜狗]正在查询,预计用时1869毫秒...
    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为Redis创建第5个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  FlavorActorDemo:157 - 为Solr创建第6个搜索引擎Actor....
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[Redis]正在查询,预计用时3403毫秒...
    2018-03-06 22:34:18 INFO  EngineUtils:36 - 搜索引擎[Solr]正在查询,预计用时1165毫秒...
    2018-03-06 22:34:18 INFO  FlavorActorDemo:212 - 通过搜索引擎[必应],首先查到关于(今天你吃了吗?)问题的结果,用时 = 76毫秒!
    [INFO] [03/06/2018 22:34:23.797] [searchByEngines-akka.actor.default-dispatcher-7] [akka://searchByEngines/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://searchByEngines/user/master/fetcher-2582786#-1241077001] to Actor[akka://searchByEngines/user/master#-316368035] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    
    

    Akka actor在内部使用ForkJoin框架来处理工作。这里的代码很冗长,不要担心,大部分代码是消息类QueryItems(查询条件类)QueryResult(查询结果类)的定义,然后是两个不同的actor:QuestionQuerier(问题查询器)用来组织所有的搜索引擎,而SearchEngineActor(搜索引擎Actor)用来从给定的URL获取结果。这里代码行比较多是因为我不愿意把很多东西写在同一行上。Actor模型的强大之处来自于Props对象的接口,通过接口我们可以为actor定义特定的选择模式,定制的邮箱地址等。结果系统也是可配置的,只包含了很少的活动件。这是一个很好的迹象!

    使用Actor模型的一个劣势是,它要求你避免全局状态,因此你必须小心的设计你的应用程序,而这可能会使项目迁移变得很复杂。同时,它也有不少优点,因此学习一些新的范例和使用新的库是完全值得的


    总结

    这篇文章中我们讨论了在Java应用中添加并行的几种不同方法。从我们自己管理Java线程开始,我们逐渐地发现更高级的解决方案,执行不同的executor服务ForkJoin框架actor计算模型

    不知道当你面临真实问题时该如何选择?它们都有各自的优缺点,你需要在直观和易用性、配置和增加/减少机器性能等方面做出选择。


    参考文献

    [1] CompletionService/ExecutorCompletionService/线程池/concurrent包
    [2] 深入浅出parallelStream
    [3] Spring与Akka的集成
    [4] akka(tell,ask,send)
    [5] Akka2使用探索4(Actors)
    [6] akka学习教程(四) actor生命周期
    [7] Akka 通过Props实例创建Actor

    展开全文
  • 第九天 - MapReduce计算模型 - 案例 第九天 - MapReduce计算模型 - 案例 一、概念 二、流程 三、案例一 - WordCount ...MapReduce是一编程模型,用于大规模数据集的并行运算。能自动完成计算任务的并行...
  • 四种基本神经网络

    千次阅读 2020-11-30 11:30:33
    神经网络是机器学习中种模型,是一种模仿动物神经网络行为特征,进行分布式并行信息处理算法数学模型。这种网络依靠系统复杂程度,通过调整内部大量节点之间相互连接关系,从而达到处理信息目的。 ...
  • Opencl是一典型异构架构,可以很好实施并发性,为了简化并行计算复杂度以及兼容各个芯片差异性,opencl将其抽象为四大模型(Platform model, Execution model,Memory model 以及Programming model),它是整个...
  • 神经网络也是机器学习种模型,是一种模仿动物神经网络行为特征,进行分布式并行信息处理算法数学模型。。这种网络依靠系统复杂程度,通过调整内部大量节点之间相互连接关系,从而达到处理信息目的。 ...
  • MapReduce将整个并行计算过程抽象到两个函数 Map(映射):对一些独立元素组成的的列表每一个元素进行指定操作,可以高度并行。 Reduce(化简):对一个列表元素进行合并。 一个简单MapReduce程序只需要指定...
  • 从自己管理Java线程,到各种更好几解决方法,Executor服务、ForkJoin 框架以及计算Actor模型。  Java并发编程4风格:Threads,Executors,ForkJoin和Actors  我们生活在一个事情并行发生世界。自然...
  • Java并发的四种风味

    2015-08-07 09:21:16
    刚看到这篇文章,讨论了Java应用中并行处理多种方法,其中有JDK8的并行...从自己管理Java线程,到各种更好几解决方法,Executor服务、ForkJoin 框架以及计算Actor模型。 Java并发编程4风格:Thread...
  • 给出了上述并行计算过程复杂度定理,在理论上定性分析了算法计算复杂度;最后,以北京市为实验区,对所提城市道路拥堵分析算法性能进行了验证。实验结果表明,所提方法可实现城市道路拥堵情况快速有效检测分析...
  • Reduction是一广泛使用计算模型,特别是在并行计算领域。简单地来说,Reduction就是一系列划分(Partition)和汇总(Summarize)操作集合:对输入数据分块,对每一个分块汇总,然后再将汇总后数据视为新...
  • 通过搭建的Hadoop分布式计算平台对不同样本数据集分别进行10次准确性实验和效率实验,结果表明:a)聚类的平均准确率在实验所采用的四种UCI标准数据集上,相比原始K-means聚类算法和基于粒子群优化算法改进的K-means聚类...

空空如也

空空如也

1 2 3 4 5 6
收藏数 114
精华内容 45
关键字:

并行计算的四种模型