精华内容
下载资源
问答
  • 主要介绍了Hadoop多Job并行处理的实例详解的相关资料,希望通过本文能帮助到大家,需要的朋友可以参考下
  • 本篇文章主要介绍了Python中使用多进程来实现并行处理的方法小结,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • 给大家详细介绍了JavaScript中的并行处理,一直以来,JavaScript 都并没有多线程的能力,而单线程在让开发者方便的同时,也使它一直不能处理计算量复杂的场景,唯一方法就是让服务端去做。下来通过这篇文章我们一...
  • 并行计算是提高处理速度最有效的技术之一,图像并行处理技术为提高图像处理效率提供了广阔的空间。图像并行处理包括并行算法和多处理器并行硬件系统,图像处理并行算法的执行效率依赖于多处理器系统的硬件结构。通常...
  • DM7大规模并行处理MPP

    2019-08-30 15:20:10
    DM7大规模并行处理MPP
  • 提出了一种基于流处理器的图像灰度变换并行处理系统。该系统采用Strom-1系列SP16HP-G220流数字信号处理器构建硬件平台,根据流处理器体系结构特点建立了适应图像灰度变换并行处理的流程序模型,并对图像灰度变换进行...
  • Linux下BASH多进程并行处理的实现代码,需要的朋友可以参考下
  • 分布式并行计算:就是利用一组由多种不同结构功能组成的、网络互连的计算机同时解决一个大型综合问题的计算。因为这种环境,分布式并行计算具有计算成本低、计算性能最佳,满足不断增长的计算需求、开发方便等优点。
  • Akka自举 实验Akka进行并行处理
  • 在对SPGD控制算法分析的基础上, 充分提取和发掘算法内在的并发性, 采用流水线和并行处理技术, 设计并实现了基于现场可编程门阵列(FPGA)加数字信号处理器(DSP)的单指令流多数据流(SIMD)结构实时并行处理机, 实现了...
  • 基于Spark的转录组大数据并行处理方法.pdf
  • CUDA和OpenCV图像并行处理方法研究,希望对研究并行计算的朋友们有帮助!
  • 多进程并行处理例子

    千次阅读 2019-07-02 15:26:24
    因为服务器cpu比较多,所以可以进行多进程的并行处理任务,定义了48个进程同时跑,单一进程处理一张图片需要3--5分钟,比较耗时。主要任务是从openimage数据集中分割出自己想要的分割数据集。 code: import os ...

    因为服务器cpu比较多,所以可以进行多进程的并行处理任务,定义了48个进程同时跑,单一进程处理一张图片需要3--5分钟,比较耗时。主要任务是从openimage数据集中分割出自己想要的分割数据集。

    code:

    import os 
    import cv2
    import csv
    import numpy as np
    from multiprocessing import Pool
    import time
    train_list = ['train_00', 'train_01', 'train_02', 'train_03', 'train_04', 'train_05', 'train_06', 'train_07', 'train_08']
    txt_path = '/home/test_list.txt'
    csv_path = '/home/open-image/v5/test-annotations-bbox.csv'
    
    txt_list = []
    csv_list = []
    img_count = 0
    for i in open(txt_path):
        txt_list.append(i[:-1])
    with open(csv_path,'r') as fp:
        csv_list = fp.readlines()
    def compute_iou(rect1,rect2):
        S_rect1 = (rect1[3] - rect1[1]) * (rect1[2] - rect1[0])
        S_rect2 = (rect2[3] - rect2[1]) * (rect2[2] - rect2[0])
        sum_area = S_rect1 +S_rect2
        left_line = max(rect1[0], rect2[0])
        right_line = min(rect1[2], rect2[2])
        top_line = max(rect1[1], rect2[1])
        bottom_line = min(rect1[3], rect2[3])
        if(left_line >= right_line or top_line >= bottom_line):
            return 0
        else:
            intersect = (right_line - left_line) * (bottom_line - top_line)
            return intersect / (sum_area - intersect)
    def get_index(im_array):
        im_h = im_array.shape[0]
        im_w = im_array.shape[1]
        xx_array = []
        yy_array = []
        for hh in range(im_h):
            for ww in range(im_w):
                if(im_array[hh][ww]!=0):
                    xx_array.append(ww)
                    yy_array.append(hh)
        return np.min(xx_array),np.min(yy_array),np.max(xx_array),np.max(yy_array)
    
    def cpr_box(str_0, img_box, im_height, im_width):
        for ii in range(1,len(csv_list)):
            csv_line = csv_list[ii]
            csv_line = csv_line[:-1]
            cpr_str = csv_line.split(',')[0]+csv_line.split(',')[2]
    
            xmin = csv_line.split(',')[4]
            xmax = csv_line.split(',')[5]
            ymin = csv_line.split(',')[6]
            ymax = csv_line.split(',')[7]
    
            xmin = float(xmin)*im_width
            xmax = float(xmax)*im_width
            ymin = float(ymin)*im_height
            ymax = float(ymax)*im_height
    
            search_box = (xmin,ymin,xmax,ymax)
            iou = compute_iou(search_box,img_box)
            if(cpr_str == str_0 and iou > 0.35):
                return xmin,ymin,xmax,ymax
        return None
    def check_path(image_id):
        for ch in train_list:
            ppath = '/home/public/openimage/'+ch+'/'+image_id+'.jpg'
            if(os.path.exists(ppath) == True):
                return ppath
        return None
    def Processing_Task(TaskID):
        for pro_num in range(TaskID*48, TaskID*48+47):
            time_start = time.time()
            pro_im_path = txt_list[pro_num]
            img = cv2.imread(pro_im_path)
            img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            img_x_min, img_y_min, img_x_max, img_y_max = get_index(img_gray)
            img_box = (img_x_min, img_y_min, img_x_max, img_y_max)
            im_height = img.shape[0]
            im_width = img.shape[1]
            im_id_cls_hash = pro_im_path.split('/')[-1]
            im_id = im_id_cls_hash.split('_')[0]
            search_word = im_id + '/m/04_sv'
            cpr_set = cpr_box(search_word,img_box,im_height,im_width)
            if(cpr_set!=None):
                ori_img_path = check_path(im_id)
                if(ori_img_path!=None):
                    x_min, y_min, x_max, y_max = cpr_set[0], cpr_set[1], cpr_set[2], cpr_set[3]
                    ori_im = cv2.imread(ori_img_path)
                    im_ori_height = ori_im.shape[0]
                    im_ori_width = ori_im.shape[1]
                    x_ori_min = im_ori_width*x_min/im_width
                    y_ori_min = im_ori_height*y_min/im_height
                    x_ori_max = im_ori_width*x_max/im_width
                    y_ori_max = im_ori_height*y_max/im_height
    
                    tmp_im = cv2.getRectSubPix(img, (int(x_max - x_min), int(y_max - ymin)), (int((x_min + x_max)/2), int((y_min + y_max)/2)))
                    tmp_im_ori = cv2.getRectSubPix(img, (int(x_ori_max - x_ori_min), int(y_ori_max - y_ori_min)), (int((x_ori_min + x_ori_max)/2), int((y_ori_min + y_ori_max)/2)))
                    width_1 = tmp_im.shape[1]
                    width_2 = tmp_im_ori.shape[1]
                    width_min = np.min([width_1, width_2])
                    im_resize = cv2.resize(tmp_im,(width_min,int(width_min * im_height/im_width)))
                    im_ori_resize = cv2.resize(tmp_im_ori,(width_min, int(width_min * im_height/im_width)))
                    cv2.imwrite('/home/img/{:09d}.jpg'.format(pro_num),im_ori_resize)
                    cv2.imwrite('/home/msk/{:09d}.jpg'.format(pro_num),im_resize)
            time_end = time.time()
            print('TaskID %d costs %0.2f'%(TaskID,time_end - time_start))
    
    
    if __name__=='__main__':
        print('Start....')
        p = Pool(48)
        for po in range(48):
            p.apply_async(Processing_Task, args = (po,))
        p.close()
        p.join()
        

    这样就很节省时间。如果用单进程去做,大概有12000张图片,最多需要1000个小时,加上以后实际测得一张图片0.2min总共需要40个小时,这效率很可观

    展开全文
  • 并行处理就是利用多个CPU和I/O资源来执行单个数据库操作。尽管现在每个主要的数据库供应商都声称可以提供并行处理能力,但每个供应商所提供的体系结构其实存在关键的差异。本文讨论Oracle9i并行处理的体系结构,并...
  • 为了满足海量视频数据的处理,提出了一种利用非透明桥连接多个CPU桥连的硬件架构及其并行处理方法,实现视频的并行处理。本系统突破了单个高性能CPU的计算能力,大大提高了海量视频信号的处理能力;而且该处理方法...
  • Python并行处理

    万次阅读 2017-11-02 14:37:36
    原文:Parallel Processing in Python 作者:Frank Hofmann 翻译:Diwei 简介当你在机器上启动某个程序时,它只是在自己的“bubble”...例如,这个所谓的进程环境包括该进程使用的内存页,处理该进程打开的文件,用

    原文:Parallel Processing in Python
    作者:Frank Hofmann
    翻译:Diwei

    简介

    当你在机器上启动某个程序时,它只是在自己的“bubble”里面运行,这个气泡的作用就是用来将同一时刻运行的所有程序进行分离。这个“bubble”也可以称之为进程,包含了管理该程序调用所需要的一切。

    例如,这个所谓的进程环境包括该进程使用的内存页,处理该进程打开的文件,用户和组的访问权限,以及它的整个命令行调用,包括给定的参数。

    此信息保存在UNIX/Linux系统的流程文件系统中,该系统是一个虚拟文件系统,可通过/proc目录进行访问。条目都已经根据进程ID排过序了,该ID是每个进程的唯一标识符。示例1显示了具有进程ID#177的任意选择的进程。

    示例1:可用于进程的信息



    构建程序代码以及数据

    程序越复杂,就越有助于将其分成较小的模块。不仅仅源代码是这样,在机器上执行的代码也同样适用于这条规则。该规则的典型范例就是使用子进程并行执行。这背后的想法就是:

    • 单个进程包含了可以单独运行的代码段
    • 某些代码段可以同时运行,因此原则上允许并行
    • 使用现代处理器和操作系统的特性,例如可以使用处理器的所有核心,这样就可以减少程序的总执行时间
    • 减少程序/代码的复杂性,并将工作外包专门的代理

    使用子进程需要重新考虑程序的执行方式,从线性到并行。它类似于将公司的工作视角从普通员工转变为经理——你必须关注谁在做什么,某个步骤需要多长时间,以及中间结果之间的依赖关系。

    这有利于将代码分割成更小的部分,这些更小的部分可以由专门用于此任务的代理执行。如果还没有想清楚,试想一下数据集的构造原理,它也是同样的道理,这样就可以由单个代理进行有效的处理。但是这也引出了一些问题:

    • 为什么要将代码并行化?落实到具体案例中或者在努力的过程中,思考这个问题有意义吗?
    • 程序是否打算只运行一次,还是会定期运行在类似的数据集上?
    • 能把算法分成几个单独的执行步骤吗?
    • 数据是否允许并行化?如果不允许,那么数据组织将以何种方式进行调整?
    • 计算的中间结果是否相互依赖?
    • 需要对硬件进行调整吗?
    • 在硬件或算法中是否存在瓶颈,如何避免或者最小化这些因素的影响?
    • 并行化的其他副作用有哪些?

    可能的用例就是主进程,以及后台运行的等待被激活的守护进程(主/从)。此外,这可能是启动按需运行的工作进程的一个主要过程。在实践中,主要的过程是一个馈线过程,它控制两个或多个被馈送数据部分的代理,并在给定的部分进行计算。

    请记住,由于操作系统所需要的子进程的开销,并行操作既昂贵又耗时。与以线性方式运行两个或多个任务相比,在并行的情况下,根据您的用例,可以在每个子过程中节省25%到30%的时间。例如,如果在系列中执行了两项消耗5秒的任务,那么总共需要10秒的时间,并且在并行化的情况下,在多核机器上平均需要8秒。有3秒是用于各种开销,即这部分是无法压缩和优化的,所以速度提高是有极限的。

    运行与Python并行的函数

    Python提供了四种可能的处理方式。首先可以使用multiprocessing模块并行执行功能。第二,进程的替代方法是线程。从技术上讲,这些都是轻量级的进程,不在本文的范围之内。想了解更加详细的内容,可以看看Python的线程模块。第三,可以使用os模块的system()方法或subprocess模块提供的方法调用外部程序,然后收集结果。

    multiprocessing模块涵盖了一系列方法来处理并行执行例程。这包括进程,代理池,队列以及管道。

    清单1使用了五个代理程序池,同时处理三个值的块。对于代理的数量和对chunksize的值都是任意选择的,用于演示目的。根据处理器中核心的数量来调整这些值。

    Pool.map()方法需要三个参数 - 在数据集的每个元素上调用的函数,数据集本身和chunksize。在清单1中,我们使用square函数,并计算给定整数值的平方。此外,chunksize不是必须的。如果未明确设置,则默认chunksize为1。

    请注意,代理商的执行订单不能保证,但结果集的顺序是正确的。它根据原始数据集的元素的顺序包含平方值。

    清单1:并行运行函数



    运行此代码应该产生以下输出:



    注意:我们将使用Python 3作为这些例子。

    使用队列运行多个函数

    作为数据结构,队列是非常普遍的,并且以多种方式存在。 它被组织为先进先出(FIFO)或先进先出(LIFO)/堆栈,以及有和没有优先级(优先级队列)。 数据结构被实现为具有固定数量条目的数组,或作为包含可变数量的单个元素的列表。

    列表2.1-2.7中,我们使用FIFO队列。 它被实现为已经由来自multiprocessing模块的相应类提供的列表。此外,time模块被加载并用于模拟工作负载。

    清单2.1:要使用的模块



    接下来,定义一个worker函数(清单2.2)。 该函数实际上代表代理,需要三个参数。进程名称指示它是哪个进程,tasksresults都指向相应的队列。

    在工作函数里面是一个while循环。tasksresults都是在主程序中定义的队列。tasks.get()从要处理的任务队列中返回当前任务。小于0的任务值退出while循环,返回值为-1。任何其他任务值都将执行一个计算(平方),并返回此值。将值返回到主程序实现为result.put()。这将在results队列的末尾添加计算值。

    清单2.2:worker函数



    下一步是主循环(参见清单2.3)。首先,定义了进程间通信(IPC)的经理。接下来,添加两个队列,一个保留任务,另一个用于结果。

    清单2.3:IPC和队列



    完成此设置后,我们定义一个具有四个工作进程(代理)的进程池。我们使用类multiprocessing.Pool(),并创建一个它的实例。 接下来,我们定义一个空的进程列表(见清单2.4)。

    清单2.4:定义一个进程池



    作为以下步骤,我们启动了四个工作进程(代理)。 为了简单起见,它们被命名为“P0”到“P3”。使用multiprocessing.Pool()完成创建四个工作进程。这将它们中的每一个连接到worker功能以及任务和结果队列。 最后,我们在进程列表的末尾添加新初始化的进程,并使用new_process.start()启动新进程(参见清单2.5)。

    清单2.5:准备worker进程



    我们的工作进程正在等待工作。我们定义一个任务列表,在我们的例子中是任意选择的整数。这些值将使用tasks.put()添加到任务列表中。每个工作进程等待任务,并从任务列表中选择下一个可用任务。 这由队列本身处理(见清单2.6)。

    清单2.6:准备任务队列



    过了一会儿,我们希望我们的代理完成。 每个工作进程对值为-1的任务做出反应。 它将此值解释为终止信号,此后死亡。 这就是为什么我们在任务队列中放置尽可能多的-1,因为我们有进程运行。 在死机之前,终止的进程会在结果队列中放置-1。 这意味着是代理正在终止的主循环的确认信号。

    在主循环中,我们从该队列读取,并计数-1。 一旦我们计算了我们有过程的终止确认数量,主循环就会退出。 否则,我们从队列中输出计算结果。

    清单2.7:结果的终止和输出



    示例2显示了Python程序的输出。 运行程序不止一次,您可能会注意到,工作进程启动的顺序与从队列中选择任务的进程本身不可预测。 但是,一旦完成结果队列的元素的顺序与任务队列的元素的顺序相匹配。

    示例2



    注意:如前所述,由于执行顺序不可预测,您的输出可能与上面显示的输出不一致。

    使用os.system()方法

    system()方法是os模块的一部分,它允许在与Python程序的单独进程中执行外部命令行程序。system()方法是一个阻塞调用,你必须等到调用完成并返回。 作为UNIX / Linux拜物教徒,您知道可以在后台运行命令,并将计算结果写入重定向到这样的文件的输出流(参见示例3):

    示例3:带有输出重定向的命令



    在Python程序中,您只需简单地封装此调用,如下所示:

    清单3:使用os模块进行简单的系统调用



    此系统调用创建一个与当前Python程序并行运行的进程。 获取结果可能会变得有点棘手,因为这个调用可能会在你的Python程序结束后终止 - 你永远都不会知道。

    使用这种方法比我描述的先前方法要贵得多。 首先,开销要大得多(进程切换),其次,它将数据写入物理内存,比如一个需要更长时间的磁盘。 虽然这是一个更好的选择,你的内存有限(像RAM),而是可以将大量输出数据写入固态磁盘。

    使用子进程模块

    该模块旨在替换os.system()os.spawn()调用。子过程的想法是简化产卵过程,通过管道和信号与他们进行通信,并收集他们生成的输出包括错误消息。

    从Python 3.5开始,子进程包含方法subprocess.run()来启动一个外部命令,它是底层subprocess.Popen()类的包装器。 作为示例,我们启动UNIX/Linux命令df -h,以查找机器的/ home分区上仍然有多少磁盘空间。在Python程序中,您可以执行如下所示的调用(清单4)。

    清单4:运行外部命令的基本示例



    这是基本的调用,非常类似于在终端中执行的命令df -h / home。请注意,参数被分隔为列表而不是单个字符串。输出将与示例4相似。与此模块的官方Python文档相比,除了调用的返回值之外,它将调用结果输出到stdout

    示例4显示了我们的呼叫的输出。输出的最后一行显示命令的成功执行。调用subprocess.run()返回一个类CompletedProcess的实例,它有两个名为args(命令行参数)的属性和returncode(命令的返回值)。

    示例4:运行清单4中的Python脚本



    要抑制输出到stdout,并捕获输出和返回值进行进一步的评估,subprocess.run()的调用必须稍作修改。没有进一步修改,subprocess.run()将执行的命令的输出发送到stdout,这是底层Python进程的输出通道。 要获取输出,我们必须更改此值,并将输出通道设置为预定义值subprocess.PIPE。清单5显示了如何做到这一点。

    清单5:抓取管道中的输出



    如前所述,subprocess.run()返回一个类CompletedProcess的实例。在清单5中,这个实例是一个简单命名为output的变量。该命令的返回码保存在属性output.returncode中,打印到stdout的输出可以在属性output.stdout中找到。 请注意,这不包括处理错误消息,因为我们没有更改输出渠道。

    结论

    由于现在的硬件已经很厉害了,因此也给并行处理提供了绝佳的机会。Python也使得用户即使在非常复杂的级别,也可以访问这些方法。正如在multiprocessingsubprocess模块之前看到的那样,可以让你很轻松的对该主题有很深入的了解。

    展开全文
  • 介绍了一种基于多DSP的并行处理系统设计与实现,以及其在分布式雷达组网航迹融合中的实际应用。重点介绍了该系统由1块系统主板和4块TS201处理板卡组成的原理和结构,即系统内主板与处理板卡的板级并行设计、单块板卡...
  • Java7之前并行处理数据集合是一件很麻烦的事情:第一步:明确把包含数据的数据结构分成若干子部分;第二步,为每一个子部分分配一个线程;第三步,你需要恰当的时候对他们进行同步来避免不出现竞争的条件;第四步,...

    Stream接口可以让你以声明的方式处理数据。同时,用内部迭代取代外部迭代能够让原生Java库控制流元素的处理。


    Java7之前并行处理数据集合是一件很麻烦的事情:第一步:明确把包含数据的数据结构分成若干子部分;第二步,为每一个子部分分配一个线程;第三步,你需要恰当的时候对他们进行同步来避免不出现竞争的条件;第四步,等待所有线程完成,然后把这些部分结果合并起来。

    在本文中你将看到,如何不花大力气来实现对数据集的并行处理。它允许你声明性地将顺序流变为并行流。流是如何在幕后利用JAVA7引入的分支/合并框架

    对收集源调用parallelStream方法把集合转换为并行流。并行流就是把内容分为多个块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让他们都忙起来。

    package ExeriseAboutSreeam;
    
    import java.util.stream.Stream;
    /*高斯小时候发明了从1+2+3+...+100的简化公式,其实就是今天我们熟悉的等差数列求和问题
     * 现在我们用计算机编程来实现给定任何n,求1+2+3+...+n的和
     */
    public class TheSum {
    	//传统方法实现---外部迭代
    	public static long iterativeSum(long n) {
    		long result = 0;
    		for(long i =1L; i<=result;i++) {
    			result += i;
    		}
    		return result;
    	}
    	
    	//利用流处理来实现----内部迭代
    	public static long sequentialSum(long n) {
    		return Stream.iterate(1L,i -> i+1)
    				.limit(n)
    				.reduce(0L,Long::sum);
    	}
    	
    	//等一等,能不能提高一下计算的效率,假设您的电脑是多核的,怎么利用这太神机?
    	public static long parallelSum(long n) {
    		//利用并行流
    		return Stream.iterate(1L, i -> i+1)
    				.limit(n)
    				.parallel()
    				.reduce(0L, Long ::sum);
    	}
    }

    如果,看不懂后面两种实现方法,请自行补习JAVA8关于Lambda表达式和流处理的相关内容。

    现在,初步体会到并行流的强大作用,可以将数据集划分成若干块,然后利用多核进行多线程同步计算。这样将分配线程、合并结果等工作都抛给了计算机已有的框架。


    效果如何?我们来测试一下,使用并行方式是不是更快?

    package ExeriseAboutSreeam;
    
    import java.util.function.Function;
    import java.util.stream.LongStream;
    import java.util.stream.Stream;
    /*高斯小时候发明了从1+2+3+...+100的简化公式,其实就是今天我们熟悉的等差数列求和问题
     * 现在我们用计算机编程来实现给定任何n,求1+2+3+...+n的和
     */
    public class TheSum {
    	//传统方法实现---外部迭代
    	public static long iterativeSum(long n) {
    		long result = 0;
    		for(long i =1L; i<=result;i++) {
    			result += i;
    		}
    		return result;
    	}
    	
    	//利用流处理来实现----内部迭代
    	public static long sequentialSum(long n) {
    		return Stream.iterate(1L,i -> i+1)
    				.limit(n)
    				.reduce(0L,Long::sum);
    	}
    	
    	//等一等,能不能提高一下计算的效率,假设您的电脑是多核的,怎么利用这台神机?
    	public static long parallelSum(long n) {
    		//利用并行流
    		return Stream.iterate(1L, i -> i+1)
    				.limit(n)
    				.parallel()
    				.reduce(0L, Long ::sum);
    	}
    	//换一个数据结构
    	public static long parallelRangedSum(long n) {
    		return LongStream.rangeClosed(1,n)
    				.parallel()
    				.reduce(0L, Long ::sum);
    				
    	}
    	
    	
    	//测试模块---测流量性能
    	public long measureSumPerf(Function<Long, Long> adder, long n) {
    		long fastest = Long.MAX_VALUE;
    		for(int i =0; i<10 ; i++) {
    			long start = System.nanoTime();
    			long sum = adder.apply(n);
    			long duration = (System.nanoTime() - start) /1_000_000;
    			//System.out.println("结果:" + sum);
    			if(duration < fastest) fastest = duration;
    		}
    		return fastest;
    	}
    	
    	//主函数,测试时间;
    	public static void main(String[] args) {
    		TheSum theSum = new TheSum();
    		System.out.println("方法1测试结果:" + 
    		theSum.measureSumPerf(TheSum::iterativeSum, 10_000_000) + "msecs");
    		
    		System.out.println("方法2测试结果:" + 
    		theSum.measureSumPerf(TheSum::sequentialSum, 10_000_000) + "msecs");
    		
    		System.out.println("方法3测试结果:" + 
    		theSum.measureSumPerf(TheSum::parallelSum, 10_000_000) + "msecs");
    		
    		System.out.println("方法4测试结果:" + 
    		theSum.measureSumPerf(TheSum::parallelRangedSum, 10_000_000) + "msecs");
    		
    	}
    		
    }
    

    测试结果:

    方法1测试结果:0msecs
    方法2测试结果:101msecs
    方法3测试结果:104msecs
    方法4测试结果:2msecs

    结果好像和预想的不一致,利用流和并行并没有比传统的外部迭代方式快。为什么会出现这种情况?其实iterator是不适合做 并行处理的,因为它的每一次迭代都要用前一次的结果,加上装箱和拆箱的损耗,反而更耗时。方法四利用了别的数据结构,代码的时间复杂度明显降了下来。看来要正确使用并行流也不是一件容易的事情,用不好的话,反而弄巧成拙,代码运行起来更慢。

    如何正确使用并行流?

    待更新.......

















    展开全文
  • 并行处理实验报告:基于MPI实现的矩阵乘法的性能分析。里面包含MPI实现代码,稠密矩阵以及稀疏矩阵的加速比分析
  • 云计算(Cloud Computing ):是分布式处理(Distributed Computing)、并行处理(Parallel Computing)和网格计算(Grid Computing)的发展,或者说是这些计算机科学概念的商业实现。是指基于互联网的超级计算模式--即把...
  • 关于C#中的并行处理

    千次阅读 2019-06-16 22:23:16
    然而,当算法不可再优化时,我们就该考虑能否合理的将数据分割成若干个子集,然后去做并行处理。下面是我通过将Excel中的两个Sheet页的数据作比较的例子,目的是得到,A表中有,B表中没有的数据。(两表中各有10000...

          当我们需要处理大量的数据时,为了能够提高程序的处理速度,我们的做法通常是尽可能的优化算法。然而,当算法不可再优化时,我们就该考虑能否合理的将数据分割成若干个子集,然后去做并行处理。下面是我通过将Excel中的两个Sheet页的数据作比较的例子,目的是得到,A表中有,B表中没有的数据。(两表中各有10000行数据)

           

        class Program
        {
            static void Main(string[] args)
            {
                TableCompare tc = new TableCompare();
                ExcelHelper excelHelper = new         
                ExcelHelper(@"C:\Users\soul\Desktop\test.xlsx");
                DataTable tb1 = excelHelper.ExcelToDataTable("sheet1", true);
                DataTable tb2 = excelHelper.ExcelToDataTable("sheet2", true);
                Stopwatch stopwatch = new Stopwatch();
                stopwatch.Start();
    
                DataTable retTb = tc.SingleThreadCompare(tb1, tb2);
                //DataTable retTb = tc.MultiThreadCompare(tb1, tb2);
                //DataTable retTb = tc.ParallelCompare(tb1, tb2);
    
                stopwatch.Stop();
                TimeSpan timeSpan = stopwatch.Elapsed;
                PrintDataTable(retTb);
                Console.WriteLine(timeSpan.TotalSeconds + "秒");
                Console.ReadKey();
            }
    
            static void PrintDataTable(DataTable table)
            {
                for (int i = 0; i < table.Rows.Count; i++)
                {
                    for (int j = 0; j < table.Columns.Count; j++)
                    {
                        Console.Write(table.Rows[i][j] + "\t");
                    }
                    Console.WriteLine();
                }
            }
        }
    class TableCompare
        {
    
            private DataTable retTb;
    
            public DataTable SingleThreadCompare(DataTable tb1, DataTable tb2)
            {
                retTb = tb1.Clone();
    
                for (int i = 0; i < tb1.Rows.Count; i++)
                {
                    bool isExist = true;
                    for (int j = 0; j < tb2.Rows.Count; j++)
                    {
                        bool isEquals = true;
                        for (int x = 0; x < tb2.Columns.Count; x++)
                        {
                            if (Convert.ToString(tb1.Rows[i][x]) != Convert.ToString(tb2.Rows[j][x]))
                            {
                                isEquals = false;
                            }
                        }
                        if (isEquals)
                        {
                            isExist = false;
                        }
                    }
                    if (isExist)
                    {
                        retTb.ImportRow(tb1.Rows[i]);
                    }
                }
                return retTb;
            }
    
            public DataTable MultiThreadCompare(DataTable tb1, DataTable tb2)
            {
                retTb = tb1.Clone();
                int workCount = tb1.Rows.Count / 2;
                bool IsComplete1 = false;
                bool IsComplete2 = false;
                Thread t1 = new Thread(() => Run(0, workCount, tb1, tb2, ref IsComplete1));
                Thread t2 = new Thread(() => Run(workCount, tb1.Rows.Count, tb1, tb2, ref IsComplete2));
                t1.Start();
                t2.Start();
                while (!IsComplete1 && !IsComplete2){ }
                return retTb;
            }
    
            private void Run(int begin, int workCount, DataTable tb1, DataTable tb2, ref bool IsComplete)
            {
                for (int i = begin; i < workCount; i++)
                {
                    bool isExist = true;
                    for (int j = 0; j < tb2.Rows.Count; j++)
                    {
                        bool isEquals = true;
                        for (int x = 0; x < tb2.Columns.Count; x++)
                        {
                            if (Convert.ToString(tb1.Rows[i][x]) != Convert.ToString(tb2.Rows[j][x]))
                            {
                                isEquals = false;
                            }
                        }
                        if (isEquals)
                        {
                            isExist = false;
                        }
                    }
                    if (isExist)
                    {
                        lock (retTb)
                        {
                            retTb.ImportRow(tb1.Rows[i]);
                        }
                    }
                }
                IsComplete = true;
            }
    
            public DataTable ParallelCompare(DataTable tb1, DataTable tb2)
            {
                retTb = tb1.Clone();
    
                Parallel.For(0, tb1.Rows.Count, i =>
                {
                    bool isExist = true;
                    for (int j = 0; j < tb2.Rows.Count; j++)
                    {
                        bool isEquals = true;
                        for (int x = 0; x < tb2.Columns.Count; x++)
                        {
                            if (Convert.ToString(tb1.Rows[i][x]) != Convert.ToString(tb2.Rows[j][x]))
                            {
                                isEquals = false;
                            }
                        }
                        if (isEquals)
                        {
                            isExist = false;
                        }
                    }
                    if (isExist)
                    {
                        retTb.ImportRow(tb1.Rows[i]);
                    }
                });
                return retTb;
            }
        }

    SingleThreadCompare只使用一个线程做处理,即在主线程中进行处理。

    运行结果如下所示:

           这种处理方式,运行速度较慢,很难让人满意。

           为了提高运行速度,决定采用多线程的方式来处理,即MultiThreadCompare方法,这里我只使用了两个子线程,两个子线程各做一半的数据处理,得到如下的效果:

           速度提升了将近一半,结果是喜人的,不过,在.Net Framework 4.0 以上版本提供了一个并行处理框架Parallel,使用此框架则无需手动创建子线程,即可做并行处理。ParallelCompare方法的处理结果如下:

    并行处理后,速度得到了大量的提升。

    展开全文
  • 用于单机多核大数据结构的python并行处理的实用函数。 避免对只读大数据结构进行不必要的复制。 它包括: 1. map功能的并行版本。 它旨在以内存高效的方式在具有多个内核的单台机器上执行无限并行化的任务。 在...
  • 针对频繁出现的数据冗余、数据复用效率低下等问题,将列存储方式结合并行处理机制对数据复用策略进行优化。构建了基于MapReduce的数据复用并行化处理模型,利用改进型CSM模式匹配算法结合数据挖掘过程中的数据筛选...
  • 针对宽带扩频信号码片码率高达150 Mcps以上,传统扩频快捕处理算法无法适应的技术难题,提出了一种基于信号并行处理技术的快捕算法。算法通过并行NCO(数控振荡器)生成本地伪码、载波,实现600 MHz的等效采样匹配,...
  • 并行处理仿真 第一种解决方案是在线程之间使用共享内存进行多线程合并排序,并通过多进程合并排序来显示差异 其他两个问题是现实世界中两种不同类型的问题,它们被建模为具有并行处理的计算机程序,需要谨慎使用线程...
  • Java多线程并行处理任务的实现

    万次阅读 2019-04-20 21:08:02
    Java多线程并行处理任务的实现 在实际项目开发的过程中,遇到过需要处理一个由多个子任务组成的任务的问题.顺序处理起来会造成响应时间超长,用户体验不好的问题.我想到一个解决方案,即使用多线程并行处理子任务.思路...
  • 针对上述问题,在遥感影像剖分面片数据模型的研究基础上,提出了剖分面片模板并行计算模式,设计并实现了一种面向剖分面片模板的遥感影像并行处理方法。该方法基于MPI(message passing interface)与OpenMP(open ...
  • MATLAB并行处理

    千次阅读 2019-09-03 23:15:38
    NumWorkers代表worker的个数,这里开启了4个并行池; “IdleTimeout”指定了池定时关闭的时长(30min) “SpmdEnabled”指定池是否可以运行 SPMD 代码 single-program-multiple-data(SPMD) 最常见的并行技术风格 %...
  • 安装gem install parallel并行并行运行任何代码以并行处理(>使用所有CPU)或线程(>加速阻止操作)。 最适合于地图缩减或例如并行下载/上传。 安装gem install parallel用法#2个CPU->在2个进程中工作(a,b + c)...
  • Oracle Parallel 并行处理

    2011-07-07 17:21:41
    oracle中的并行处理,还是值得一看的!

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 516,668
精华内容 206,667
关键字:

并行处理