精华内容
下载资源
问答
  • 在科学计算中使用multiprocessing进行多进程并行计算。 前提:多个方法func1,func2,…,funcN是相互独立的,可以并行计算。 当每个func的运行时间都较长时,利用多进程并行计算才会极大提高运行效率。原因:使用...

    简介

    多进程 vs 多线程
    CPU密集型用多进程;IO密集型用多线程。

    实例1

    在科学计算中使用multiprocessing进行多进程并行计算。

    • 前提:多个方法func1,func2,…,funcN是相互独立的,可以并行计算。
    • 当每个func的运行时间都较长时,利用多进程并行计算才会极大提高运行效率。原因:使用多进程本身会有一定的时间开销。

    实例代码
    注意:通过sleep(t)来模拟控制func运行消耗的时间

    def func1(x):
        print(f'当前进程:{os.getpid()},计算func1中......')
        time.sleep(5)
        return x**2
    
    def func2(x):
        print(f'当前进程:{os.getpid()},计算func2中......')
        time.sleep(5)
        return x**3
        
    def test1():
        """单进程计算
        """
        print('方法1:单进程计算')
        print(f'当前主进程:{os.getpid()}')
        start = time.time()
        a = func1(2)
        b = func2(2)
        c = a + b
        end = time.time()
        t = end - start
        print('c=', c)
        print(f'计算完成,共用时:{t:.2f}s')
    
    
    def test2():
        """利用multiprocessing实现多进程计算
        """
        print('方法2:多进程计算')
        print(f'当前主进程:{os.getpid()}')
        start = time.time()
        pool = Pool()
        result1 = pool.apply_async(func1, args=(2, ))
        result2 = pool.apply_async(func2, args=(2, ))
        pool.close()
        pool.join()
        a = result1.get()
        b = result2.get()
        c = a + b
        end = time.time()
        t = end - start
        print('c=', c)
        print(f'计算完成,共用时:{t:.2f}s')
        
    if __name__ == "__main__":
        test1()
        test2()
    

    结果及分析

    • 方法1计算过程中始终在同一个进程;方法2开始的进程与方法1相同,因为都是从main()入口进去的。然后计算func1和func2时,就分别开了两个新进程。符合预期。
    • 多进程计算确实提高了运行效率,时间缩短了近3s;
    • 理论上func1和func2并行计算,需要5s。实际有7s多,说明使用多进程是有一定固定开销的。因此,单个func运行时间越多,多进程计算节省时间也越多。调节sleep时间,可证实这一结论。
    方法1:单进程计算
    当前主进程:15316
    当前进程:15316,计算func1中......
    当前进程:15316,计算func2中......
    c= 12
    计算完成,共用时:10.00s
    
    方法2:多进程计算
    当前主进程:15316
    当前进程:14996,计算func1中......
    当前进程:16708,计算func2中......
    c= 12
    计算完成,共用时:7.17s
    
    展开全文
  • python语法——使用Pool实现多进程并行 简介 可以使用 Pool来实现多进程并行。 Pool 模块来自于 multiprocessing 模块。 multiprocessing 模块是跨平台版本的多进程模块,像线程一样管理进程,与 threading 很...

    python语法——使用Pool实现多进程并行

    简介

    可以使用 Pool来实现多进程并行。

    Pool 模块来自于 multiprocessing 模块。

        multiprocessing 模块是跨平台版本的多进程模块,像线程一样管理进程,与 threading 很相似,对多核CPU的利用率会比 threading 好的多。
        Pool 类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

    函数
    apply()

    函数原型:apply(func[, args=()[, kwds={}]])

    该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。
    apply_async()

    函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

    与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。
    map()

    函数原型:map(func, iterable[, chunksize=None])

    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。
    map_async()

    函数原型:map_async(func, iterable[, chunksize[, callback]])

    与map用法一致,但是它是非阻塞的。其有关事项见apply_async。

    阻塞与非阻塞的讲解见下面备注。
    close()

    关闭进程池(pool),使其不在接受新的任务。
    terminal()

    结束工作进程,不在处理未处理的任务。
    join()

    主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。
    示例

    比如我想同时让服务器执行多条 hive 命令,可编程如下:

    from multiprocessing import Pool
    import subprocess

    # 定义所有并行语句都回调用的函数
    def run_sh(sh):
        '''
        执行一行shell命令
        '''
        (statusLoad, outputLoad) = subprocess.getstatusoutput(sh)    
        return (statusLoad, outputLoad)

    # 将需要执行的多条语句放入到一个list中
    sh_list = []
    sh_list.append('hive -e "select * from A" > A_result')
    sh_list.append('hive -e "select * from B" > B_result')
    sh_list.append('hive -e "select * from C" > C_result')

    # 开始并行
    pool = Pool(len(sh_list))
    pool.map(run_sh, sh_list) # 表示将 sh_list 每个元素作为参数递给 run_sh
    pool.close() # 将进程池关闭,不再接受新的进程
    pool.join() # 主进程阻塞,只有池中所有进程都完毕了才会通过

    # 开始处理结果文件,此时三个 *_result 文件肯定是存在并且已经写入完毕的

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24

    备注
    1、阻塞与非阻塞的区别

    map() 会使进程阻塞,即通过 map() 开启的多进程都结束之后,这个函数才会有返回结果,否则主进程会一直等待,不会往下进行 。

    map_async() 为非阻塞,即通过 map_async() 开启多进程之后,立刻会返回结果,主进程会继续往下执行。

    注意:

    如果后面调用了 join() 函数,则不管之前用的是 map 还是 map_async,主进程都会等待,直到进程池中所有进程执行完毕,才会继续往下执行。
    2、starmap 函数

    Pool 类中,python 3.X 还引入了 starmap 函数,与 map 的区别在于, starmap 支持将多个参数放入到队列中,不同参数按照顺序以元组形式存放,举例如下:

    from multiprocessing import Pool
    def func(a, b):
        print(a + b)

    if __name__=="__main__":
        args = [(1,2),(3,4),(5,6)]
        pool = Pool(3)
        pool.starmap(func, args)    

        1
        2
        3
        4
        5
        6
        7
        8

    输出

    3
    7
    11

        1
        2
        3

    内存共享问题

    多进程并行有一个特点:多个进程之间并不能共享内存。

    比如一个人写出了以下代码,期望可以对同一个数进行累加:

    from multiprocessing import Pool
    def func(dic, c):
        dic['count'] += c

    if __name__=="__main__":
        d = dict()
        d['count'] = 0
        args = [(d, 1), (d, 2), (d, 3)]
        pool = Pool(3)
        pool.starmap(func, args)   
        pool.close()
        pool.join()
        print(f'dic={d}')

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13

    但是输出为:

    dic={'count': 0}

    不是我们想要的结果。

    这是因为,多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改。
    解决办法

    可以使用 multiprocessing.Manager 来创建对象,这样的对象可以被共享,如:

    from multiprocessing import Pool, Manager
    def func(dic, c):
        dic['count'] += c

    if __name__=="__main__":
        d = Manager().dict()  #生成一个字典,可以在多个进程中传递和共享。
        d['count'] = 0
        args = [(d, 1), (d, 2), (d, 3)]
        pool = Pool(3)
        pool.starmap(func, args)   
        pool.close()
        pool.join()
        print(f'dic={d}')

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13

    输出是我们所期望的:

    dic={'count': 6}

        1

    Manager() 内部有加锁机制,不允许两个进程同时修改一份数据,因为进程的数据是独立的,因此数据是安全的。

    另外,如果只要求并行,不要求必须是多进程,可以使用多线程来实现共享数据。 参照python技巧——使用threadpool实现多线程并行。
    ---------------------
    作者:HappyRocking
    来源:CSDN
    原文:https://blog.csdn.net/HappyRocking/article/details/83856489
    版权声明:本文为博主原创文章,转载请附上博文链接!

    展开全文
  • python 多进程并行多输入函数 map,multiprocessing import的库 from multiprocessing.pool import Pool 需要并行的原始函数 def generate_image_crops(vid_root_path, vid_curated_path,addnum): print(addnum) ...

    python 多进程并行多输入函数 map,multiprocessing

    import的库

    from multiprocessing.pool import Pool
    

    需要并行的原始函数

    def generate_image_crops(vid_root_path, vid_curated_path,addnum):
    	print(addnum)
    	#以下省略具体的函数定义
    

    map并行套路1:新写一个函数(下面的代码没有省略任何的东西)

    def generate_image_crops_mulcpu(args): 
        return generate_image_crops(*args)
    

    放大招,main中并行的套路2

    if __name__ == "__main__":
    	#下面两个是参数的一部分而已,不用管他们
        vid_root_path = "/media/katy/Seagate Exp/track/ILSVRC2015"
        vid_curated_path = "/media/katy/Seagate Exp/track/ILSVRC15-curation"   
        
        pool = Pool(processes=3) #这个要注意!!!!!!,并行几个进程processes就等于几
        parlist = [(vid_root_path, vid_curated_path,1),(vid_root_path, vid_curated_path,2),(vid_root_path, vid_curated_path,3)] #这个也要注意,参数要list的形式,每个()中是一组参数
        pool.map(generate_image_crops_mulcpu,parlist) #这个也是套路了,套路1的函数,+参数list
        pool.close()
        pool.join()
    
    展开全文
  • MPI多进程并行计算矩阵乘法实现

    万次阅读 2014-11-15 22:02:05
    MPI多进程并行计算矩阵乘法实现

          MPI多进程并行计算矩阵乘法实现,对原始矩阵A、B进行初始化算为:

          其主要思想:是把相乘的矩阵按行分解(任务分解),分别分给不同的进程,然后在汇总到一个进程上,在程序上实现则用到了主从模式,人为的把进程分为主进程从进程,主进程负责对原始矩阵初始化赋值,并把数据均匀分发(为了负载均衡)到从进程上进行相乘运算,主要用到的知识是MPI点对点通信的机制。

    具体代码为:

    // YinXing_Matrix_multiplication.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    #include <stdio.h>
    #include  "mpi.h"
    #include <Windows.h>
    #define N 200                     //矩阵的阶数
    #define MASTER 0                  //主进程号
    #define FROM_MASTER 1             //设置消息类型
    #define FROM_WORKER 2             //设置消息类型
    MPI_Status status;
    void main(int argc,char **argv)
    {
        int numtasks,                   //进程总数
             taskid,                    //进程标识
             numworkers,                //从进程数目
             source,                    //消息源
             dest,                      //消息目的地
             nbytes,
             mtype,                     //消息类型
             rows,                     
             averow,extra,offset,       //从进程所分的行数
             i,j,k,t,
             count;
       double A[N][N],B[N][N];
       long long C[N][N];
       long long sumand = 0;           //求的乘积矩阵的所有元素的和
       double starttime,endtime;
       MPI_Init(&argc,&argv);   // 初始化MPI环境
       MPI_Comm_rank(MPI_COMM_WORLD,&taskid);//标识各个MPI进程 ,告诉调用该函数进程的当前进程号
       MPI_Comm_size(MPI_COMM_WORLD,&numtasks);//用来标识相应进程组中有多少个进程
       numworkers = numtasks-1;     //从进程数目
    
    /* 程序采用主从模式,以下为主进程程序 */
       
    if(taskid==MASTER)
    {
    	printf("--------主进程开始对矩阵A、B初始化\n");
        //对A矩阵B矩阵 进行初始化赋值
        for(i=0;i<N;i++)
    	{
    		t = i + 1; 
            for(j=0;j<N;j++)
    	    {
               A[i][j]= t ++ ;
               B[i][j]= 1;
    	    }
    	}
    	printf("--------主进程开始对从进程发送数据\n");
        /*将数据(A矩阵B矩阵)发送到从进程*/
        averow=N/numworkers;   // 每个从进程所得到的行数
        extra=N%numworkers;
        offset=0;
        mtype=FROM_MASTER;
        for(dest=1;dest<=numworkers;dest++)
        {
            rows=(dest<=extra)?  averow+1:averow;
    		//发送标识每个从进程开始寻找数据矩阵的偏移地址
            MPI_Send(&offset,1,MPI_INT,dest,mtype,MPI_COMM_WORLD);
    		//发送每个从进程需要计算的行数
            MPI_Send(&rows,1,MPI_INT,dest,mtype,MPI_COMM_WORLD);
    		//发送每个从进程 count=rows*N 比特流偏移量
            count=rows*N;
    		//发送 矩阵 A
            MPI_Send(&A[offset][0],count,MPI_DOUBLE,dest,mtype,MPI_COMM_WORLD);
             count=N*N;
    		 //发送矩阵 B
            MPI_Send(&B,count,MPI_DOUBLE,dest,mtype,MPI_COMM_WORLD);
            offset=offset+rows;
        }
    	printf("--------主进程等待从进程计算结果\n");
         /*等待接收从进程计算结果*/
    	starttime = MPI_Wtime(); // 矩阵开始计算的时间
        mtype=FROM_WORKER;
        for(i=1;i<=numworkers;i++)
        {
            source=i; // 主进程 接受 从 从进程上发送的数据 ( C 矩阵)
    		printf("主进程接受从进程号为 = %d\n",i);
            MPI_Recv(&offset,1,MPI_INT,source,mtype,MPI_COMM_WORLD,&status);
            MPI_Recv(&rows,1,MPI_INT,source,mtype,MPI_COMM_WORLD,&status);
            count=rows*N;
            MPI_Recv(&C[offset][0],count,MPI_DOUBLE,source,mtype,MPI_COMM_WORLD,&status);
        }
    	endtime =  MPI_Wtime(); // 矩阵 结束计算的时间
    	printf("********两矩阵相乘的时间为 :=%f\n",endtime - starttime);
    
        /*主进程计算 结果矩阵的 所有元素的和 为验证结果*/
    
        for(i=0;i<N;i++)
        {
            for(j=0;j<N;j++)
    			sumand += C[i][j];
        }
    	printf("--------主进程对矩阵C求和 sumand= %lld\n",sumand); 
    }
    /* 从进程 接受数据 并 计算乘积 */
    if(taskid>MASTER)
    {
        mtype=FROM_MASTER;
        source=MASTER;
    	//接受主进程发送的数据偏移值
    	printf("\n++++++++该从进程开始从主进程上接受数据\n");
        MPI_Recv(&offset,1,MPI_INT,source,mtype,MPI_COMM_WORLD,&status);
    	printf("++++++++该从进程的偏移量为:=%d\n",offset);
    	//接收主进程发送到从进程需要计算的 行数
        MPI_Recv(&rows,1,MPI_INT,source,mtype,MPI_COMM_WORLD,&status);
    	printf("++++++++该从进程需要计算的行数为: =%d\n",rows);
    
        count=rows*N;//接受矩阵A
        MPI_Recv(&A,count,MPI_DOUBLE,source,mtype,MPI_COMM_WORLD,&status);
    
        count=N*N;  //接受矩阵B
        MPI_Recv(&B,count,MPI_DOUBLE,source,mtype,MPI_COMM_WORLD,&status);
    
        for(k=0;k<N;k++)  //计算乘积
        for(i=0;i<rows;i++)
        {
            C[i][k]= 0.0;
            for(j=0;j<N;j++)
                C[i][k] = C[i][k] + A[i][j] * B[j][k];
        }
        mtype=FROM_WORKER; // 把从矩阵计算乘积的 结果发送到 主进程上
        MPI_Send(&offset,1,MPI_INT,MASTER,mtype,MPI_COMM_WORLD);
        MPI_Send(&rows,1,MPI_INT,MASTER,mtype,MPI_COMM_WORLD);
        MPI_Send(&C,rows*N,MPI_DOUBLE,MASTER,mtype,MPI_COMM_WORLD);
    }
    MPI_Finalize();
    }
    运行结果如下:


    
    

    由上图我们可以看到:

    开启2进程(1个主进程1个从进程)的时间:0.133171

    开启3进程(1个主进程2个从进程)的时间:0.068259

    开启4进程(1个主进程3个从进程)的时间:0.048743

    相乘的加速比为(2个进程/1个进程):

    0.133171 /0.068259 = 1.950966172958877

    相乘的加速比为(3个进程/1个进程):

    0.133171 /0.048743 = 2.732105122786862

    展开全文
  • 多进程并行处理例子

    千次阅读 2019-07-02 15:26:24
    因为服务器cpu比较多,所以可以进行多进程并行处理任务,定义了48个进程同时跑,单一进程处理一张图片需要3--5分钟,比较耗时。主要任务是从openimage数据集中分割出自己想要的分割数据集。 code: import os ...
  • C#多进程并行

    千次阅读 2017-07-23 18:19:00
    为了并行执行个任务,可以启动进程并行数)。 下面提供两种方法,总任务数10,最大并行数4。 一、方法1 using System; using System.Collections.Generic; using System.Linq; using System.Text; ...
  • PHP 是强大的web开发语言,以至于大家常常忘记PHP 可以用来开发健壮的命令行(CLI)程序以至于daemon程序,而编写daemon程序...本文即是使用QPM的Supervisor::taskFactoryMode()实现多进程并行任务处理程序的一个例子。
  • processpoolexecutor, 多进程, map, submit
  • python多进程并行的简单实现

    千次阅读 2019-10-29 22:37:43
      这两天在跑一个文件处理程序的时候想着利用多核并行处理实现节省时间,然后例程里用了threading.Thread来实现,结果依旧很慢,查看了一下cpu使用情况根本就是在单核上跑啊。查了一下才发现python中线程存在PIL...
  • Python 大文件多进程并行处理小例

    万次阅读 2018-09-04 14:02:33
    下面是一个错误的多进程思路 # coding:utf-8 import multiprocessing as mp filename = "www.geniatech.net" cores = 20 pool = mp.Pool(cores) jobs = [] def work (line) : pass with ...
  • python-multiprocessing 多进程并行计算

    万次阅读 2017-08-13 14:59:33
    python的multiprocessing包是标准库提供的多进程并行计算包,提供了和threading(多线程)相似的API函数,但是相比于threading,将任务分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。下面我们对...
  • for 循环, 多进程并行加速

    千次阅读 2019-11-05 11:38:06
    ########## for 循环并行,单一参数 ############# import time import multiprocessing def do(i): print(i) time.sleep(2) if __name__ == '__main__': param = [] #假设有100次循环 for i in range(0,.....
  • 可以将数据分为份,然后开个窗口并行执行,这样就能加快UPDATE执行速度 下面是Python全自动主键切片+并行执行脚本,脚本里面是将数据切分为4分,开4个并行进程 from multiprocessing import Pool import pymysql...
  • PHP实现多进程并行执行脚本

    千次阅读 2016-04-21 10:59:52
    由于php的进程是不支持多线程的,有些场景为了方便以及提高性能,可以用php实现多进程以弥补这个不足: #!/usr/bin/env php $cmds=array( array('/apps/bin/launcher.php','charge/promotion_props_stat.php','...
  • linux多进程并行压缩/解压命令pigz

    千次阅读 2019-02-11 15:50:21
    压缩: tar cvf - 目录名 | pigz -9 -p 24 &gt; file.tgz pigz:用法-9是压缩比率比较大,-p是指定cpu的核数。 解压: pigz -d file.tgz 这时候是tar包,那么再用 tar -xvf file.tar 解包。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 215,983
精华内容 86,393
关键字:

多进程并行