精华内容
下载资源
问答
  • python 多进程和多线程

    千次阅读 2020-07-03 20:38:18
    本文通过一些具体的例子简单介绍一下python多线程和多进程,后续会写一些进程通信和线程通信的一些文章。 python多线程 python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持...

    个人一直觉得对学习任何知识而言,概念是相当重要的。掌握了概念和原理,细节可以留给实践去推敲。掌握的关键在于理解,通过具体的实例和实际操作来感性的体会概念和原理可以起到很好的效果。本文通过一些具体的例子简单介绍一下python的多线程和多进程,后续会写一些进程通信和线程通信的一些文章。

    python多线程

    python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来均以后者为例。

    创建线程

    python中有两种方式实现线程:

    1. 实例化一个threading.Thread的对象,并传入一个初始化函数对象(initial function )作为线程执行的入口;
    2. 继承threading.Thread,并重写run函数;
    • 方式1:创建threading.Thread对象

    复制代码

    import threading
    import time
    
    def tstart(arg):
        time.sleep(0.5)
        print("%s running...." % arg)
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=tstart, args=('This is thread 1',))
        t2 = threading.Thread(target=tstart, args=('This is thread 2',))
        t1.start()
        t2.start()
        print("This is main function")

    复制代码

    结果:

    View Code

    • 方式2:继承threading.Thread,并重写run

    复制代码

    import threading
    import time
    
    class CustomThread(threading.Thread):
        def __init__(self, thread_name):
            # step 1: call base __init__ function
            super(CustomThread, self).__init__(name=thread_name)
            self._tname = thread_name
    
        def run(self):
            # step 2: overide run function
            time.sleep(0.5)
            print("This is %s running...." % self._tname)
    
    if __name__ == "__main__":
        t1 = CustomThread("thread 1")
        t2 = CustomThread("thread 2")
        t1.start()
        t2.start()
        print("This is main function")

    复制代码

     执行结果同方式1.

    threading.Thread

    上面两种方法本质上都是直接或者间接使用threading.Thread类

    threading.Thread(group=Nonetarget=Nonename=Noneargs=()kwargs={})

    关联上面两种创建线程的方式:

    复制代码

    import threading
    import time
    
    class CustomThread(threading.Thread):
        def __init__(self, thread_name, target = None):
            # step 1: call base __init__ function
            super(CustomThread, self).__init__(name=thread_name, target=target, args = (thread_name,))
            self._tname = thread_name
    
        def run(self):
            # step 2: overide run function
            # time.sleep(0.5)
            # print("This is %s running....@run" % self._tname)
            super(CustomThread, self).run()
    
    def target(arg):
        time.sleep(0.5)
        print("This is %s running....@target" % arg)
    
    if __name__ == "__main__":
        t1 = CustomThread("thread 1", target)
        t2 = CustomThread("thread 2", target)
        t1.start()
        t2.start()
        print("This is main function")

    复制代码

    结果:

    This is main function
    This is thread 1 running....@target
    This is thread 2 running....@target

    上面这段代码说明:

    1. 两种方式创建线程,指定的参数最终都会传给threading.Thread类;
    2. 传给线程的目标函数是在基类Thread的run函数体中被调用的,如果run没有被重写的话。

    threading模块的一些属性和方法可以参照官网,这里重点介绍一下threading.Thread对象的方法

    下面是threading.Thread提供的线程对象方法和属性:

    • start():创建线程后通过start启动线程,等待CPU调度,为run函数执行做准备;
    • run():线程开始执行的入口函数,函数体中会调用用户编写的target函数,或者执行被重载的run函数;
    • join([timeout]):阻塞挂起调用该函数的线程,直到被调用线程执行完成或超时。通常会在主线程中调用该方法,等待其他线程执行完成。
    • name、getName()&setName():线程名称相关的操作;
    • ident:整数类型的线程标识符,线程开始执行前(调用start之前)为None;
    • isAlive()、is_alive():start函数执行之后到run函数执行完之前都为True;
    • daemon、isDaemon()&setDaemon():守护线程相关;

    这些是我们创建线程之后通过线程对象对线程进行管理和获取线程信息的方法。

     多线程执行

     在主线程中创建若线程之后,他们之间没有任何协作和同步,除主线程之外每个线程都是从run开始被执行,直到执行完毕。

    join

    我们可以通过join方法让主线程阻塞,等待其创建的线程执行完成。

    复制代码

    import threading
    import time
    
    def tstart(arg):
        print("%s running....at: %s" % (arg,time.time()))
        time.sleep(1)
        print("%s is finished! at: %s" % (arg,time.time()))
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=tstart, args=('This is thread 1',))
        t1.start()
        t1.join()   # 当前线程阻塞,等待t1线程执行完成
        print("This is main function at:%s" % time.time())

    复制代码

    结果:

    This is thread 1 running....at: 1564906617.43
    This is thread 1 is finished! at: 1564906618.43
    This is main function at:1564906618.43

    如果不加任何限制,当主线程执行完毕之后,当前程序并不会结束,必须等到所有线程都结束之后才能结束当前进程。

    将上面程序中的t1.join()去掉,执行结果如下:

    This is thread 1 running....at: 1564906769.52
    This is main function at:1564906769.52
    This is thread 1 is finished! at: 1564906770.52

    可以通过将创建的线程指定为守护线程(daemon),这样主线程执行完毕之后会立即结束未执行完的线程,然后结束程序。

    deamon守护线程

    复制代码

    import threading
    import time
    
    def tstart(arg):
        print("%s running....at: %s" % (arg,time.time()))
        time.sleep(1)
        print("%s is finished! at: %s" % (arg,time.time()))
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=tstart, args=('This is thread 1',))
        t1.setDaemon(True)
        t1.start()
        # t1.join()   # 当前线程阻塞,等待t1线程执行完成
        print("This is main function at:%s" % time.time())

    复制代码

    结果:

    This is thread 1 running....at: 1564906847.85
    This is main function at:1564906847.85

    python多进程

    相比较于threading模块用于创建python多线程,python提供multiprocessing用于创建多进程。先看一下创建进程的两种方式。

    The multiprocessing package mostly replicates the API of the threading module.  —— python doc

    创建进程

    创建进程的方式和创建线程的方式类似:

    1. 实例化一个multiprocessing.Process的对象,并传入一个初始化函数对象(initial function )作为新建进程执行入口;
    2. 继承multiprocessing.Process,并重写run函数;
    • 方式1:

    复制代码

    from multiprocessing import Process  
    import os, time
    
    def pstart(name):
        # time.sleep(0.1)
        print("Process name: %s, pid: %s "%(name, os.getpid()))
    
    if __name__ == "__main__": 
        subproc = Process(target=pstart, args=('subprocess',))  
        subproc.start()  
        subproc.join()
        print("subprocess pid: %s"%subproc.pid)
        print("current process pid: %s" % os.getpid())

    复制代码

    结果:

    Process name: subprocess, pid: 4888 
    subprocess pid: 4888
    current process pid: 9912
    • 方式2:

    复制代码

    from multiprocessing import Process  
    import os, time
    
    class CustomProcess(Process):
        def __init__(self, p_name, target=None):
            # step 1: call base __init__ function()
            super(CustomProcess, self).__init__(name=p_name, target=target, args=(p_name,))
    
        def run(self):
            # step 2:
            # time.sleep(0.1)
            print("Custom Process name: %s, pid: %s "%(self.name, os.getpid()))
    
    if __name__ == '__main__':
        p1 = CustomProcess("process_1")
        p1.start()
        p1.join()
        print("subprocess pid: %s"%p1.pid)
        print("current process pid: %s" % os.getpid())

    复制代码

    这里可以思考一下,如果像多线程一样,存在一个全局的变量share_data,不同进程同时访问share_data会有问题吗?

    由于每一个进程拥有独立的内存地址空间且互相隔离,因此不同进程看到的share_data是不同的、分别位于不同的地址空间,同时访问不会有问题。这里需要注意一下。

    Subprocess模块

    既然说道了多进程,那就顺便提一下另一种创建进程的方式。

    python提供了Sunprocess模块可以在程序执行过程中,调用外部的程序。

    如我们可以在python程序中打开记事本,打开cmd,或者在某个时间点关机:

    >>> import subprocess
    >>> subprocess.Popen(['cmd'])
    <subprocess.Popen object at 0x0339F550>
    >>> subprocess.Popen(['notepad'])
    <subprocess.Popen object at 0x03262B70>
    >>> subprocess.Popen(['shutdown', '-p'])

    或者使用ping测试一下网络连通性:

    复制代码

    >>> res = subprocess.Popen(['ping', 'www.cnblogs.com'], stdout=subprocess.PIPE).communicate()[0]
    >>> print res
    正在 Ping www.cnblogs.com [101.37.113.127] 具有 32 字节的数据:

    来自 101.37.113.127 的回复: 字节=32 时间=1ms TTL=91
    来自 101.37.113.127 的回复: 字节=32 时间=1ms TTL=91
    来自 101.37.113.127 的回复: 字节=32 时间=1ms TTL=91
    来自 101.37.113.127 的回复: 字节=32 时间=1ms TTL=91

    101.37.113.127 的 Ping 统计信息:
    数据包: 已发送 = 4,已接收 = 4,丢失 = 0 (0% 丢失),
    往返行程的估计时间(以毫秒为单位):
    最短 = 1ms,最长 = 1ms,平均 = 1ms

    复制代码

    python多线程与多进程比较

    先来看两个例子:

    开启两个python线程分别做一亿次加一操作,和单独使用一个线程做一亿次加一操作:

    复制代码

    def tstart(arg):
        var = 0
        for i in xrange(100000000):
            var += 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=tstart, args=('This is thread 1',))
        t2 = threading.Thread(target=tstart, args=('This is thread 2',))
        start_time = time.time()
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print("Two thread cost time: %s" % (time.time() - start_time))
        start_time = time.time()
        tstart("This is thread 0")
        print("Main thread cost time: %s" % (time.time() - start_time))

    复制代码

    结果:

    Two thread cost time: 20.6570000648
    Main thread cost time: 2.52800011635

    上面的例子如果只开启t1和t2两个线程中的一个,那么运行时间和主线程基本一致。这个后面会解释原因。

    使用两个进程进行上面的操作:

    复制代码

    def pstart(arg):
        var = 0
        for i in xrange(100000000):
            var += 1
    
    if __name__ == '__main__':
        p1 = Process(target = pstart, args = ("1", ))
        p2 = Process(target = pstart, args = ("2", ))
        start_time = time.time()
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print("Two process cost time: %s" % (time.time() - start_time))
        start_time = time.time()
        pstart("0")
        print("Current process cost time: %s" % (time.time() - start_time))

    复制代码

    结果:

    Two process cost time: 2.91599988937
    Current process cost time: 2.52400016785

    对比分析

    双进程并行执行和单进程执行相同的运算代码,耗时基本相同,双进程耗时会稍微多一些,可能的原因是进程创建和销毁会进行系统调用,造成额外的时间开销。

    但是对于python线程,双线程并行执行耗时比单线程要高的多,效率相差近10倍。如果将两个并行线程改成串行执行,即:

        t1.start()
        t1.join()
        t2.start()
        t2.join()
        #Two thread cost time: 5.12199997902
        #Main thread cost time: 2.54200005531

    可以看到三个线程串行执行,每一个执行的时间基本相同。

    本质原因双线程是并发执行的,而不是真正的并行执行。原因就在于GIL锁。

    GIL锁

    提起python多线程就不得不提一下GIL(Global Interpreter Lock 全局解释器锁),这是目前占统治地位的python解释器CPython中为了保证数据安全所实现的一种锁。不管进程中有多少线程,只有拿到了GIL锁的线程才可以在CPU上运行,即时是多核处理器。对一个进程而言,不管有多少线程,任一时刻,只会有一个线程在执行。对于CPU密集型的线程,其效率不仅仅不高,反而有可能比较低。python多线程比较适用于IO密集型的程序。对于的确需要并行运行的程序,可以考虑多进程。

    多线程对锁的争夺,CPU对线程的调度,线程之间的切换等均会有时间开销。

    线程与进程区别

    下面简单的比较一下线程与进程

    • 进程是资源分配的基本单位,线程是CPU执行和调度的基本单位;
    • 通信/同步方式:
      • 进程:
        • 通信方式:管道,FIFO,消息队列,信号,共享内存,socket,stream流;
        • 同步方式:PV信号量,管程
      • 线程:
        • 同步方式:互斥锁,递归锁,条件变量,信号量
        • 通信方式:位于同一进程的线程共享进程资源,因此线程间没有类似于进程间用于数据传递的通信方式,线程间的通信主要是用于线程同步。
    • CPU上真正执行的是线程,线程比进程轻量,其切换和调度代价比进程要小;
    • 线程间对于共享的进程数据需要考虑线程安全问题,由于进程之间是隔离的,拥有独立的内存空间资源,相对比较安全,只能通过上面列出的IPC(Inter-Process Communication)进行数据传输;
    • 系统有一个个进程组成,每个进程包含代码段、数据段、堆空间和栈空间,以及操作系统共享部分 ,有等待,就绪和运行三种状态;
    • 一个进程可以包含多个线程,线程之间共享进程的资源(文件描述符、全局变量、堆空间等),寄存器变量和栈空间等是线程私有的;
    • 操作系统中一个进程挂掉不会影响其他进程,如果一个进程中的某个线程挂掉而且OS对线程的支持是多对一模型,那么会导致当前进程挂掉;
    • 如果CPU和系统支持多线程与多进程,多个进程并行执行的同时,每个进程中的线程也可以并行执行,这样才能最大限度的榨取硬件的性能;

    线程和进程的上下文切换

    进程切换过程切换牵涉到非常多的东西,寄存器内容保存到任务状态段TSS,切换页表,堆栈等。简单来说可以分为下面两步:

    1. 页全局目录切换,使CPU到新进程的线性地址空间寻址;
    2. 切换内核态堆栈和硬件上下文,硬件上下文包含CPU寄存器的内容,存放在TSS中;

    线程运行于进程地址空间,切换过程不涉及到空间的变换,只牵涉到第二步;

    使用多线程还是多进程?

    CPU密集型:程序需要占用CPU进行大量的运算和数据处理;

    I/O密集型:程序中需要频繁的进行I/O操作;例如网络中socket数据传输和读取等;

    由于python多线程并不是并行执行,因此较适合与I/O密集型程序,多进程并行执行适用于CPU密集型程序;

    欢迎转载博客文章,转载请标明出处!

    分类: python, 多线程&多进程

    展开全文
  • Python多进程和多线程(跑满CPU)

    万次阅读 多人点赞 2019-05-07 15:06:16
    Python多进程和多线程(跑满CPU) 概念 任务可以理解为进程(process),如打开一个word就是启动一个word进程。在一个word进程之中不只是进行打字输入,还需要拼写检查、打印等子任务,我们可以把进程中的这些子...

    参考资料:

    https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376

    Python多进程和多线程(跑满CPU

    • 概念

    任务可以理解为进程(process),如打开一个word就是启动一个word进程。在一个word进程之中不只是进行打字输入,还需要拼写检查、打印等子任务,我们可以把进程中的这些子任务称为线程(thread)。

    由于每个进程至少要干一件事,那么一个进程至少有一个线程,有时候有的复杂进程有多个线程,在进程中的多个线程是可以同时执行的。多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。

     

    总结一下就是,多任务的实现有3种方式:

    • 多进程模式;

    启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务。

    • 多线程模式

    启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务。

    • 多进程+多线程模式。

     

    启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。同时执行多个任务通常各个任务之间并不是没有关联的,而是需要相互通信和协调,有时,任务1必须暂停等待任务2完成后才能继续执行,有时,任务3和任务4又不能同时执行,所以,多进程和多线程的程序的复杂度要远远高于我们前面写的单进程单线程的程序。

     

    • 多进程

    要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

    子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

    multiprocessing模块提供了一个Process类来代表一个进程对象。

    创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

             multiprocessing模块提供了一个Pool进程池的方式批量创建子进程。

     

    from multiprocessing import Pool
    
    import os, time
    
    
    def long_time_task(name):
    
        print('Run task %s (%s)...' % (name, os.getpid()))
    
        start = time.time()
    
        time.sleep(1)
    
        end = time.time()
    
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__=='__main__':
    
        print('Parent process %s.' % os.getpid())
    
        p = Pool(5)
    
        for i in range(10):
    
            p.apply_async(long_time_task, args=(i,))
    
        print('Waiting for all subprocesses done...')
    
        p.close()
    
        p.join()
    
        print('All subprocesses done.')

    执行结果:

    Parent process 13780.
    
    Waiting for all subprocesses done...
    
    Run task 0 (9208)...
    
    Run task 1 (5052)...
    
    Run task 2 (15320)...
    
    Run task 3 (13604)...
    
    Run task 4 (1408)...
    
    Task 0 runs 1.00 seconds.
    
    Run task 5 (9208)...
    
    Task 1 runs 1.00 seconds.
    
    Run task 6 (5052)...
    
    Task 2 runs 1.00 seconds.
    
    Run task 7 (15320)...
    
    Task 3 runs 1.00 seconds.
    
    Run task 8 (13604)...
    
    Task 4 runs 1.00 seconds.
    
    Run task 9 (1408)...
    
    Task 5 runs 1.00 seconds.
    
    Task 6 runs 1.00 seconds.
    
    Task 7 runs 1.00 seconds.
    
    Task 8 runs 1.00 seconds.
    
    Task 9 runs 1.00 seconds.
    
    All subprocesses done.

    使用Pool对象,Pool中的参数表示调用多少个并行进程进行程序运行,pool的默认容量为CPU的计算核的数量,我们可以这样理解pool池:

    直观上的理解为指定了可以容纳最多多少个进程进行并行运算。而p.apply_async()函数给进程池添加进程任务如上图的Process1, Process2 … ,它的参数包括待运行程序和程序的传入参数。对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

    请注意输出的结果,process 01234是立刻执行的,而process  5要等待前面某个process完成后才执行,这是因为Pool的大小为5,在电脑上默认为CPU的核数。因此,最多同时执行5个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

    p = Pool(10)就可以同时跑10个进程。

     

    • 子进程

    很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。Subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。如果子进程还需要输入,则可以通过communicate()方法输入:

     

    • 进程间的通信

    Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Pythonmultiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

    from multiprocessing import Process, Queue
    
    import os, time, random
    
    
    # 写数据进程执行的代码:
    
    def write(q):
    
        print('Process to write: %s' % os.getpid())
    
        for value in ['A', 'B', 'C']:
    
            print('Put %s to queue...' % value)
    
            q.put(value)
    
            time.sleep(random.random())
    
    
    # 读数据进程执行的代码:
    
    def read(q):
    
        print('Process to read: %s' % os.getpid())
    
        while True:
    
            value = q.get(True)
    
            print('Get %s from queue.' % value)
    
    
    if __name__=='__main__':
    
        # 父进程创建Queue,并传给各个子进程:
    
        q = Queue()
    
        pw = Process(target=write, args=(q,))
    
        pr = Process(target=read, args=(q,))
    
        # 启动子进程pw,写入:
    
        pw.start()
    
        # 启动子进程pr,读取:
    
        pr.start()
    
        # 等待pw结束:
    
        pw.join()
    
        # pr进程里是死循环,无法等待其结束,只能强行终止:
    
        pr.terminate()

     

    运行结果:

    Process to write: 15728
    
    Put A to queue...
    
    Process to read: 15748
    
    Get A from queue.
    
    Put B to queue...
    
    Get B from queue.
    
    Put C to queue...
    
    Get C from queue.

    Pipes如何使用?

     

    • 多线程

    多任务可以由多进程完成,也可以由一个进程内的多线程完成。一个进程至少有一个线程,由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:

    import time, threading
    
    
    # 新线程执行的代码:
    
    def loop():
    
        print('thread %s is running...' % threading.current_thread().name)
    
        n = 0
    
        while n < 5:
    
            n = n + 1
    
            print('thread %s >>> %s' % (threading.current_thread().name, n))
    
            time.sleep(1)
    
        print('thread %s ended.' % threading.current_thread().name)
    
    
    print('thread %s is running...' % threading.current_thread().name)
    
    t = threading.Thread(target=loop, name='LoopThread')
    
    t.start()
    
    t.join()
    
    print('thread %s ended.' % threading.current_thread().name)

    执行结果:

    thread MainThread is running...
    
    thread LoopThread is running...
    
    thread LoopThread >>> 1
    
    thread LoopThread >>> 2
    
    thread LoopThread >>> 3
    
    thread LoopThread >>> 4
    
    thread LoopThread >>> 5
    
    thread LoopThread ended.
    
    thread MainThread ended.

    由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……。在上述程序中我们的主线程是整个程序,子线程执行loop()函数,输出的LoopThread是子线程的名字,输出的1,2,3,… 不是线程的数量。

    Lock:

    多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。(对于变量:进程à复制;线程à共享)

    对于多线程将程序变量改乱了的例子:

    import time, threading
    
    
    # 假定这是你的银行存款:
    
    balance = 0
    
    
    def change_it(n):
    
        # 先存后取,结果应该为0:
    
        global balance
    
        balance = balance + n
    
        balance = balance - n
    
    
    def run_thread(n):
    
        for i in range(100000):
    
            change_it(n)
    
    
    t1 = threading.Thread(target=run_thread, args=(5,))
    
    t2 = threading.Thread(target=run_thread, args=(8,))
    
    t1.start()
    
    t2.start()
    
    t1.join()
    
    t2.join()
    
    print(balance)

    我们定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。原因是因为高级语言的一条语句在CPU执行时是若干条语句,即使一个简单的计算:

    Balance = Balance + n

    都需要分两步计算:

    1. 计算临时变量temp = Balance + n
    2. 将临时变量temp赋值给Balance

    当两个线程正常顺序工作时,并不会造成冲突,但是,如果两个线程中的CPU执行语句不是按照线程顺序执行时有可能会造成冲突:

    初始值 balance = 0

    t1: x1 = balance + 5  # x1 = 0 + 5 = 5

    t2: x2 = balance + 8  # x2 = 0 + 8 = 8

    t2: balance = x2      # balance = 8

    t1: balance = x1      # balance = 5

    t1: x1 = balance - 5  # x1 = 5 - 5 = 0

    t1: balance = x1      # balance = 0

    t2: x2 = balance - 8  # x2 = 0 - 8 = -8

    t2: balance = x2   # balance = -8

    结果 balance = -8

    究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

     

    两个线程同时一存一取,就可能导致余额不对,你肯定不希望你的银行存款莫名其妙地变成了负数,所以,我们必须确保一个线程在修改balance的时候,别的线程一定不能改。

    如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:

    balance = 0
    
    lock = threading.Lock()
    
    
    def run_thread(n):
    
        for i in range(100000):
    
            # 先要获取锁:
    
            lock.acquire()
    
            try:
    
                # 放心地改吧:
    
                change_it(n)
    
            finally:
    
                # 改完了一定要释放锁:
    
                lock.release()

    当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

    获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。

    锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

     

    • 多核CPU

    如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。如果写一个死循环的话,会出现什么情况呢?打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。我们可以监控到一个死循环线程会100%占用一个CPU。如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。试试用Python写个死循环:

    import threading, multiprocessing
    
    
    def loop():
    
        x = 0
    
        while True:
    
            x = x ^ 1
    
    
    for i in range(multiprocessing.cpu_count()):
    
        t = threading.Thread(target=loop)
    
    t.start()

     

    程序:

    进程:

    资源监视器:

     

    启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

     

    • 进程VS.线程

    我们介绍了多进程和多线程,这是实现多任务最常用的两种方式。现在,我们来讨论一下这两种方式的优缺点。

    首先,要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。

    如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。

    如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。

    多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。

    多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。

    多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。

    在Windows下,多线程的效率比多进程要高,所以微软的IIS服务器默认采用多线程模式。由于多线程存在稳定性的问题,IIS的稳定性就不如Apache。为了缓解这个问题,IIS和Apache现在又有多进程+多线程的混合模式,真是把问题越搞越复杂。

    线程切换:

    无论是多进程还是多线程,只要数量一多,效率肯定上不去,为什么呢?我们打个比方,假设你不幸正在准备中考,每天晚上需要做语文、数学、英语、物理、化学这5科的作业,每项作业耗时1小时。

    如果你先花1小时做语文作业,做完了,再花1小时做数学作业,这样,依次全部做完,一共花5小时,这种方式称为单任务模型,或者批处理任务模型。

    假设你打算切换到多任务模型,可以先做1分钟语文,再切换到数学作业,做1分钟,再切换到英语,以此类推,只要切换速度足够快,这种方式就和单核CPU执行多任务是一样的了,以幼儿园小朋友的眼光来看,你就正在同时写5科作业。

    但是,切换作业是有代价的,比如从语文切到数学,要先收拾桌子上的语文书本、钢笔(这叫保存现场),然后,打开数学课本、找出圆规直尺(这叫准备新环境),才能开始做数学作业。操作系统在切换进程或者线程时也是一样的,它需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。

    所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。

     

    计算密集型 vs. IO密集型:

    是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型IO密集型

    计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

    计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

    第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

    IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

    异步IO

    考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。

    现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

    对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

    • 小结

    多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时,又要小心死锁的发生。Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦。

    • 分布式进程

    ThreadProcess中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

    Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

    举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

    原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

    我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

    # task_master.py
    
    
    import random, time, queue
    
    from multiprocessing.managers import BaseManager
    
    
    # 发送任务的队列:
    
    task_queue = queue.Queue()
    
    # 接收结果的队列:
    
    result_queue = queue.Queue()
    
    # 从BaseManager继承的QueueManager:
    
    class QueueManager(BaseManager):
    
        pass
    
    # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
    
    QueueManager.register('get_task_queue', callable=lambda: task_queue)
    
    QueueManager.register('get_result_queue', callable=lambda: result_queue)
    
    # 绑定端口5000, 设置验证码'abc':
    
    manager = QueueManager(address=('', 5000), authkey=b'abc')
    
    # 启动Queue:
    
    manager.start()
    
    # 获得通过网络访问的Queue对象:
    
    task = manager.get_task_queue()
    
    result = manager.get_result_queue()
    
    # 放几个任务进去:
    
    for i in range(10):
    
        n = random.randint(0, 10000)
    
        print('Put task %d...' % n)
    
        task.put(n)
    
    # 从result队列读取结果:
    
    print('Try get results...')
    
    for i in range(10):
    
        r = result.get(timeout=10)
    
        print('Result: %s' % r)
    
    # 关闭:
    
    manager.shutdown()
    
    print('master exit.')
    
    

    请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以):

    # task_worker.py
    
    import time, sys, queue
    
    from multiprocessing.managers import BaseManager
    
    # 创建类似的QueueManager:
    
    class QueueManager(BaseManager):
    
        pass
    
    # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
    
    QueueManager.register('get_task_queue')
    
    QueueManager.register('get_result_queue')
    
    # 连接到服务器,也就是运行task_master.py的机器:
    
    server_addr = '127.0.0.1'
    
    print('Connect to server %s...' % server_addr)
    
    # 端口和验证码注意保持与task_master.py设置的完全一致:
    
    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    
    # 从网络连接:
    
    m.connect()
    
    # 获取Queue的对象:
    
    task = m.get_task_queue()
    
    result = m.get_result_queue()
    
    # 从task队列取任务,并把结果写入result队列:
    
    for i in range(10):
    
        try:
    
            n = task.get(timeout=1)
    
            print('run task %d * %d...' % (n, n))
    
            r = '%d * %d = %d' % (n, n, n*n)
    
            time.sleep(1)
    
            result.put(r)
    
        except Queue.Empty:
    
            print('task queue is empty.')
    
    # 处理结束:
    
    print('worker exit.')

    任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

    现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:

    $ python3 task_master.py

    Put task 3411...

    Put task 1605...

    Put task 1398...

    Put task 4729...

    Put task 5300...

    Put task 7471...

    Put task 68...

    Put task 4219...

    Put task 339...

    Put task 7866...

    Try get results...

    task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

    $ python3 task_worker.py

    Connect to server 127.0.0.1...

    run task 3411 * 3411...

    run task 1605 * 1605...

    run task 1398 * 1398...

    run task 4729 * 4729...

    run task 5300 * 5300...

    run task 7471 * 7471...

    run task 68 * 68...

    run task 4219 * 4219...

    run task 339 * 339...

    run task 7866 * 7866...

    worker exit.

    task_worker.py进程结束,在task_master.py进程中会继续打印出结果:

    Result: 3411 * 3411 = 11634921

    Result: 1605 * 1605 = 2576025

    Result: 1398 * 1398 = 1954404

    Result: 4729 * 4729 = 22363441

    Result: 5300 * 5300 = 28090000

    Result: 7471 * 7471 = 55815841

    Result: 68 * 68 = 4624

    Result: 4219 * 4219 = 17799961

    Result: 339 * 339 = 114921

    Result: 7866 * 7866 = 61873956

    这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

    Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:

    Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue

    authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.pyauthkeytask_master.pyauthkey不一致,肯定连接不上。

    小结:

    Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。

    注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

    展开全文
  • python多进程和多线程谁更快

    千次阅读 2019-06-16 11:34:39
    Python IO密集型任务、计算密集型任务,以及多线程多进程 - tsw123 - 博客园 https://www.cnblogs.com/tsw123/p/9504460.html Python IO密集型任务、计算密集型任务,以及多线程多进程选择 对于IO密集型...

     

    Python IO密集型任务、计算密集型任务,以及多线程、多进程 - tsw123 - 博客园
    https://www.cnblogs.com/tsw123/p/9504460.html

     

     

    Python IO密集型任务、计算密集型任务,以及多线程、多进程选择

    对于IO密集型任务:

    • 单进程单线程直接执行用时:10.0333秒
    • 多线程执行用时:4.0156秒
    • 多进程执行用时:5.0182秒

    说明多线程适合IO密集型任务。

     

    对于计算密集型任务

    • 单进程单线程直接执行用时:10.0273秒
    • 多线程执行用时:13.247秒
    • 多进程执行用时:6.8377秒

    说明多进程适合计算密集型任务。

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    95

    96

    97

    98

    99

    100

    101

    102

    103

    104

    105

    106

    #coding=utf-8

    import sys

    import multiprocessing

    import time

    import threading

     

     

    # 定义全局变量Queue

    g_queue = multiprocessing.Queue()

     

    def init_queue():

        print("init g_queue start")

        while not g_queue.empty():

            g_queue.get()

        for _index in range(10):

            g_queue.put(_index)

        print("init g_queue end")

        return

     

    # 定义一个IO密集型任务:利用time.sleep()

    def task_io(task_id):

        print("IOTask[%s] start" % task_id)

        while not g_queue.empty():

            time.sleep(1)

            try:

                data = g_queue.get(block=True, timeout=1)

                print("IOTask[%s] get data: %s" % (task_id, data))

            except Exception as excep:

                print("IOTask[%s] error: %s" % (task_id, str(excep)))

        print("IOTask[%s] end" % task_id)

        return

     

    g_search_list = list(range(10000))

    # 定义一个计算密集型任务:利用一些复杂加减乘除、列表查找等

    def task_cpu(task_id):

        print("CPUTask[%s] start" % task_id)

        while not g_queue.empty():

            count = 0

            for in range(10000):

                count += pow(3*23*2if in g_search_list else 0

            try:

                data = g_queue.get(block=True, timeout=1)

                print("CPUTask[%s] get data: %s" % (task_id, data))

            except Exception as excep:

                print("CPUTask[%s] error: %s" % (task_id, str(excep)))

        print("CPUTask[%s] end" % task_id)

        return task_id

     

    if __name__ == '__main__':

        print("cpu count:", multiprocessing.cpu_count(), "\n")

     

        print(u"========== 直接执行IO密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        task_io(0)

        print(u"结束:", time.time() - time_0, "\n")

     

        print("========== 多线程执行IO密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        thread_list = [threading.Thread(target=task_io, args=(i,)) for in range(10)]

        for in thread_list:

            t.start()

        for in thread_list:

            if t.is_alive():

                t.join()

        print("结束:", time.time() - time_0, "\n")

     

        print("========== 多进程执行IO密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        process_list = [multiprocessing.Process(target=task_io, args=(i,)) for in range(multiprocessing.cpu_count())]

        for in process_list:

            p.start()

        for in process_list:

            if p.is_alive():

                p.join()

        print("结束:", time.time() - time_0, "\n")

     

        print("========== 直接执行CPU密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        task_cpu(0)

        print("结束:", time.time() - time_0, "\n")

     

        print("========== 多线程执行CPU密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        thread_list = [threading.Thread(target=task_cpu, args=(i,)) for in range(10)]

        for in thread_list:

            t.start()

        for in thread_list:

            if t.is_alive():

                t.join()

        print("结束:", time.time() - time_0, "\n")

     

        print("========== 多进程执行cpu密集型任务 ==========")

        init_queue()

        time_0 = time.time()

        process_list = [multiprocessing.Process(target=task_cpu, args=(i,)) for in range(multiprocessing.cpu_count())]

        for in process_list:

            p.start()

        for in process_list:

            if p.is_alive():

                p.join()

        print("结束:", time.time() - time_0, "\n")

    参考:https://zhuanlan.zhihu.com/p/24283040

     

     

     

    =========================================================

    =========================================================

    =========================================================

     

     

     

     

    Python多线程和多进程谁更快? - 张玉宝 - 博客园
    https://www.cnblogs.com/zhangyubao/p/7003535.html

     

    python多进程和多线程谁更快

    • python3.6
    • threading和multiprocessing
    • 四核+三星250G-850-SSD

    自从用多进程和多线程进行编程,一致没搞懂到底谁更快。网上很多都说python多进程更快,因为GIL(全局解释器锁)。但是我在写代码的时候,测试时间却是多线程更快,所以这到底是怎么回事?最近再做分词工作,原来的代码速度太慢,想提速,所以来探求一下有效方法(文末有代码效果图)

    这里先来一张程序的结果图,说明线程和进程谁更快


    一些定义

    并行是指两个或者多个事件在同一时刻发生。并发是指两个或多个事件在同一时间间隔内发生

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个程序的执行实例就是一个进程。


    实现过程

    而python里面的多线程显然得拿到GIL,执行code,最后释放GIL。所以由于GIL,多线程的时候拿不到,实际上,它是并发实现,即多个事件,在同一时间间隔内发生。

    但进程有独立GIL,所以可以并行实现。因此,针对多核CPU,理论上采用多进程更能有效利用资源。


    现实问题

    在网上的教程里面,经常能见到python多线程的身影。比如网络爬虫的教程、端口扫描的教程。

    这里拿端口扫描来说,大家可以用多进程实现下面的脚本,会发现python多进程更快。那么不就是和我们分析相悖了吗?

    import sys,threading
    from socket import *
    
    host = "127.0.0.1" if len(sys.argv)==1 else sys.argv[1]
    portList = [i for i in range(1,1000)]
    scanList = []
    lock = threading.Lock()
    print('Please waiting... From ',host)
    
    
    def scanPort(port):
        try:
            tcp = socket(AF_INET,SOCK_STREAM)
            tcp.connect((host,port))
        except:
            pass
        else:
            if lock.acquire():
                print('[+]port',port,'open')
                lock.release()
        finally:
            tcp.close()
    
    for p in portList:
        t = threading.Thread(target=scanPort,args=(p,))
        scanList.append(t)
    for i in range(len(portList)):
        scanList[i].start()
    for i in range(len(portList)):
        scanList[i].join()

    谁更快

    因为python锁的问题,线程进行锁竞争、切换线程,会消耗资源。所以,大胆猜测一下:

    在CPU密集型任务下,多进程更快,或者说效果更好;而IO密集型,多线程能有效提高效率。


    大家看一下下面的代码:

    import time
    import threading
    import multiprocessing
    
    max_process = 4
    max_thread = max_process
    
    def fun(n,n2):
        #cpu密集型
        for  i in range(0,n):
            for j in range(0,(int)(n*n*n*n2)):
                t = i*j
    
    def thread_main(n2):
        thread_list = []
        for i in range(0,max_thread):
            t = threading.Thread(target=fun,args=(50,n2))
            thread_list.append(t)
    
        start = time.time()
        print(' [+] much thread start')
        for i in thread_list:
            i.start()
        for i in thread_list:
            i.join()
        print(' [-] much thread use ',time.time()-start,'s')
    
    def process_main(n2):
        p = multiprocessing.Pool(max_process)
        for i in range(0,max_process):
            p.apply_async(func = fun,args=(50,n2))
        start = time.time()
        print(' [+] much process start')
        p.close()#关闭进程池
        p.join()#等待所有子进程完毕
        print(' [-] much process use ',time.time()-start,'s')
    
    if __name__=='__main__':
        print("[++]When n=50,n2=0.1:")
        thread_main(0.1)
        process_main(0.1)
        print("[++]When n=50,n2=1:")
        thread_main(1)
        process_main(1)
        print("[++]When n=50,n2=10:")
        thread_main(10)
        process_main(10)


    结果如下:

    可以看出来,当对cpu使用率越来越高的时候(代码循环越多的时候),差距越来越大。验证我们猜想(在CPU密集型任务下,多进程更快,或者说效果更好;而IO密集型,多线程能有效提高效率。)


    CPU和IO密集型

    1. CPU密集型代码(各种循环处理、计数等等)
    2. IO密集型代码(文件处理、网络爬虫等)

    判断方法:

    1. 直接看CPU占用率, 硬盘IO读写速度
    2. 计算较多->CPU;时间等待较多(如网络爬虫)->IO
    3. 请自行百度

    参考

    为什么在Python里推荐使用多进程而不是多线程?
    如何判断进程是IO密集还是CPU密集
    搞定python多线程和多进程

     

     

    展开全文
  • python多进程和多线程看这一篇就够了

    千次阅读 多人点赞 2020-10-10 22:08:35
    脑海中关于进程线程的概念一直很模糊,什么时候该用多进程,什么时候该用多线程总是搞不清楚。同时python因为历史遗留问题存在GIL全局锁,就让人更加困惑。这一篇就完整整理一下python中进程线程的概念实现。 ...

    脑海中关于进程和线程的概念一直很模糊,什么时候该用多进程,什么时候该用多线程总是搞不清楚。同时python因为历史遗留问题存在GIL全局锁,就让人更加困惑。这一篇就完整整理一下python中进程和线程的概念和实现。

    进程和线程

    进程(process)和线程(thread)的区别应该算是个老生常谈的话题。

    这里引用知乎用户的一个高赞回答来深入浅出的解释一下

    看了一遍排在前面的答案,类似”进程是资源分配的最小单位,线程是CPU调度的最小单位“这样的回答感觉太抽象,都不太容易让人理解。
    
    做个简单的比喻:进程=火车,线程=车厢线程在进程下行进(单纯的车厢无法运行)
    一个进程可以包含多个线程(一辆火车可以有多个车厢)
    不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
    同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
    进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
    进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
    进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
    进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-"互斥锁"
    进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”
    
    作者:知乎用户
    链接:https://www.zhihu.com/question/25532384/answer/411179772
    来源:知乎
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    

    这里有几个知识点要重点记录一下

    • 单个CPU在任一时刻只能执行单个线程,只有多核CPU还能真正做到多个线程同时运行
    • 一个进程包含多个线程,这些线程可以分布在多个CPU上
    • 多核CPU同时运行的线程可以属于单个进程或不同进程
    • 所以,在大多数编程语言中因为切换消耗的资源更少,多线程比多进程效率更高

    坏消息,Python是个特例!

    GIL锁

    python始于1991年,创立初期对运算的要求不高,为了解决多线程共享内存的数据安全问题,引入了GIL锁,全称为Global Interpreter Lock,也就是全局解释器锁。

    GIL规定,在一个进程中每次只能有一个线程在运行。这个GIL锁相当于是线程运行的资格证,某个线程想要运行,首先要获得GIL锁,然后遇到IO或者超时的时候释放GIL锁,给其余的线程去竞争,竞争成功的线程获得GIL锁得到下一次运行的机会。

    正是因为有GIL的存在,python的多线程其实是假的,所以才有人说python的多线程非常鸡肋。但是虽然每个进程有一个GIL锁,进程和进程之前还是不受影响的。

    GIL是个历史遗留问题,过去的版本迭代都是以GIL为基础来的,想要去除GIL还真不是一件容易的事,所以我们要做好和GIL长期面对的准备。

    多进程 vs 多线程

    那么是不是意味着python中就只能使用多进程去提高效率,多线程就要被淘汰了呢?

    那也不是的。

    这里分两种情况来讨论,CPU密集型操作IO密集型操作。针对前者,大多数时间花在CPU运算上,所以希望CPU利用的越充分越好,这时候使用多进程是合适的,同时运行的进程数和CPU的核数相同;针对后者,大多数时间花在IO交互的等待上,此时一个CPU和多个CPU是没有太大差别的,反而是线程切换比进程切换要轻量得多,这时候使用多线程是合适的。

    所以有了结论:

    • CPU密集型操作使用多进程比较合适,例如海量运算
    • IO密集型操作使用多线程比较合适,例如爬虫,文件处理,批量ssh操作服务器等等

    代码实现

    下面来详细看看多进程和多线程是如何实现的。

    创建一个函数用来执行

    def func():
        print('process {} starts'.format(os.getpid()))
        time.sleep(2)
        print('process {} ends'.format(os.getpid()))
    

    为了表示一个耗时任务,这个函数会休眠2秒钟,并在开始和结尾处打印该函数执行时候的进程ID。

    多进程

    做为对比,首先来看看顺序执行两边函数的情况

    if __name__ == '__main__':
        print('main process is {}'.format(os.getpid()))
        start_time = time.time()
        ### single process
        func()
        func()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    打印结果如下

    main process is 24308
    process 24308 starts
    process 24308 ends
    process 24308 starts
    process 24308 ends
    total time is 4.001222372055054
    

    可以看到,这里是单个进程先后顺序执行了两遍函数,共耗时约4秒。

    下面来看看多进程的情况

    if __name__ == '__main__':
        print('main process is {}'.format(os.getpid()))
        start_time = time.time()
        ### multiprocess
        from multiprocessing import Process
        p1 = Process(target=func)
        p2 = Process(target=func)
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    从主进程创建新的进程使用的是Process类,该类在实例化时通常接两个参数

    • target - 新的进程执行的函数的函数名
    • args - 函数的参数,元组格式传入

    这里因为func函数没有参数需要传递,所以args没有赋值。

    创建完Process对象以后通过start()方法来启动该进程,同时如果想让某个进程阻塞主进程,可以执行该进程的join()方法。正常情况下创建完子进程以后主进程会继续向下执行直到结束,如果有子进程阻塞了主进程则主进程会等待该子进程执行完以后才向下执行。这里主进程会等待p1和p2两个子进程都执行完毕才计算结束时间。

    打印结果如下

    main process is 33536
    process 25764 starts
    process 11960 starts
    process 25764 ends
    process 11960 ends
    total time is 2.3870742321014404
    

    可以看到,创建的子进程和主进程的进程ID是不一样的,说明此时一共有三个进程在同时跑。最后的用时为2.387秒,几乎降到了顺序执行一半的程度,当然比单个函数执行的时间还是慢了点,说明进程的创建和停止还是需要耗时的。

    进程池

    从上面的例子可以看出,进程的创建和停止都是消耗资源的,所以进程绝不是越多越好。因为单个CPU核某时刻只能执行单个进程,所以最好的情况是将进程数量与CPU核数相等,这样可以最大化利用CPU。

    这时就有一个问题出现了,进程数少还好说,进程数多了的话如何自动去维持一个固定的进程数目呢,这时候就要用到进程池了。进程池就是规定一个可容纳最大进程数目的池子,当池子中进程数目不足时自动添加新进程,从而将同时运行的进程数目维持在一个上限之内。这里的上限就应该是CPU的核数。

    if __name__ == '__main__':
    	from multiprocessing import Process, cpu_count, Pool
        print('main process is {}'.format(os.getpid()))
        print('core number is {}'.format(cpu_count()))  # 8
        start_time = time.time()
    	### multiprocess pool
        p = Pool(8)
        for i in range(14):
            p.apply_async(func)
        p.close()
        p.join()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里我首先利用cpu_count()方法计算了一下我这台电脑的CPU核数,8核,所以进程池的最大进程数目设定为8。

    我电脑物理上是单CPU,4核。但是因为intel有超线程技术,一个核可以当作两个核来跑,所以逻辑上相当于8核

    这里利用Pool类来创建进程池,传递一个参数是最大进程数。利用Pool对象的apply_async()方法往进程池中添加待执行的任务(注意不是进程,只是任务),这里也可以利用map_async(func,iterable)来添加,用来类似于内建的map()方法,不过需要待执行的函数带参数,类似下面这样

    def func(n):
        print('process {} starts'.format(os.getpid()))
        time.sleep(n)
        print('process {} ends'.format(os.getpid()))
        
    ### multiprocess pool
        p = Pool(8)
        # for i in range(14):
        #     p.apply_async(func)
        p.map_async(func,range(14))
        p.close()
        p.join()
    

    然后是close()方法,进程池不再接受新的任务(注意不是进程),以及terminate()方法,关闭主进程,此时未开始的子进程都不会执行了。同样的,想要让进程池去阻塞主进程可以用join()方法。注意join()一定要在close()或者terminate()之后

    上面的程序执行结果如下

    main process is 12860
    core number is 8
    process 11956 starts
    process 34224 starts
    process 10596 starts
    process 20596 starts
    process 27668 starts
    process 15604 starts
    process 10820 starts
    process 16632 starts
    process 11956 ends
    process 11956 starts
    process 34224 ends
    process 34224 starts
    process 10596 ends
    process 10596 starts
    process 20596 ends
    process 20596 starts
    process 27668 ends
    process 27668 starts
    process 15604 ends
    process 15604 starts
    process 10820 ends
    process 16632 ends
    process 11956 ends
    process 34224 ends
    process 10596 ends
    process 20596 ends
    process 27668 ends
    process 15604 ends
    total time is 5.258298635482788
    

    一共14个任务,在最大数目为8的进程池里面至少要执行两轮,同时加上进程启动和停止的消耗,最后用时5.258秒。

    这里顺便补充一下windows和linux如何查看每个cpu核的负载。

    如果是windows系统,在任务管理器中,调到第二个Performance标签,在cpu曲线图上右击鼠标可以更改为逻辑cpu的负载图,如下

    1-windows.png

    如果是linux系统,通过top命令,然后按数字1就可以了,如下

    2-linux.png

    进程间通讯

    前面说到进程间是相互独立的,不共享内存空间,所以在一个进程中声明的变量在另一个进程中是看不到的。这时候就要借助一些工具来在两个进程间进行数据传输了,其中最常见的就是队列了。

    队列(queue)在生产消费者模型中很常见,生产者进程在队列一端写入,消费者进程在队列另一端读取。

    首先创建两个函数,分别扮演生产者和消费者

    def write_to_queue(queue):
        for index in range(5):
            print('write {} to {}'.format(str(index), queue))
            queue.put(index)
            time.sleep(1)
    
    
    def read_from_queue(queue):
        while True:
            result = queue.get(True)
            print('get {} from {}'.format(str(result), queue))
    

    这两个函数都接受一个队列作为参数然后利用put()方法往其中写入或者get()方法来读取。生产者会连续写入5个数字,每次间隔1秒,消费者则会一直尝试读取。

    主程序如下

    if __name__ == '__main__':
    	from multiprocessing import Process, cpu_count, Pool
        print('main process is {}'.format(os.getpid()))
        print('core number is {}'.format(cpu_count()))  # 8
        start_time = time.time()
    	### multiprocess queue
        from multiprocessing import Queue
        queue = Queue()
        pw = Process(target=write_to_queue, args=(queue,))
        pr = Process(target=read_from_queue, args=(queue,))
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    注意这里在创建子进程的时候就用元组的形式传递了参数,如果元组只有一个元素,记住添加逗号,否则会被认为是单个元素而不是元组。同时这里因为消费者是死循环,所以只是将生产者加入了阻塞,生产者进程执行完毕以后停止消费者进程。

    最后打印结果如下

    main process is 28268
    core number is 8
    write 0 to <multiprocessing.queues.Queue object at 0x0000023C6B25BF88>
    get 0 from <multiprocessing.queues.Queue object at 0x000002EF410B1C88>
    write 1 to <multiprocessing.queues.Queue object at 0x0000023C6B25BF88>
    get 1 from <multiprocessing.queues.Queue object at 0x000002EF410B1C88>
    write 2 to <multiprocessing.queues.Queue object at 0x0000023C6B25BF88>
    get 2 from <multiprocessing.queues.Queue object at 0x000002EF410B1C88>
    write 3 to <multiprocessing.queues.Queue object at 0x0000023C6B25BF88>
    get 3 from <multiprocessing.queues.Queue object at 0x000002EF410B1C88>
    write 4 to <multiprocessing.queues.Queue object at 0x0000023C6B25BF88>
    get 4 from <multiprocessing.queues.Queue object at 0x000002EF410B1C88>
    total time is 5.603313446044922
    

    多线程

    首先创建一个函数用于测试

    import threading
    def func2(n):
        print('thread {} starts'.format(threading.current_thread().name))
        time.sleep(2)
        print('thread {} ends'.format(threading.current_thread().name))
        return n
    

    多线程使用的是threading.Thread

    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        ### multithread
        t1 = threading.Thread(target=func2, args=(1,))
        t2 = threading.Thread(target=func2, args=(2,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    基本用法和上面进程的Process差不多,打印的结果如下

    main thread is MainThread
    thread Thread-1 starts
    thread Thread-2 starts
    thread Thread-1 ends
    thread Thread-2 ends
    total time is 2.002077341079712
    

    对比前面多进程的2.38秒,这里还是快了不少的。

    线程池

    和进程一样,通常是使用线程池来完成自动控制线程数量的目的。但是这里就没有一个推荐的上限数量了,毕竟因为GIL的存在不管怎么样每次都只有一个线程在跑。

    同时threading模块是不支持线程池的,python3.4以后官方推出了concurrent.futures模块来统一进程池和线程池的接口,这里关注一下线程池。

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        ### threadpool
        executor = ThreadPoolExecutor(5)
        all_tasks = [executor.submit(func2, i) for i in range(8)]
        wait(all_tasks, return_when=ALL_COMPLETED)
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里利用ThreadPoolExecutor()创建一个线程池,最大上限为5,然后利用submit()方法往线程池中添加任务(注意是任务,不是线程),submit方法会返回一个future对象,注意这里我将创建的任务放进了一个列表中。

    如果要阻塞主线程,不能用join方法了,需要用到wait()方法,该方法接受三个参数,第一个参数是一个future对象的列表,第二个参数是超时时间,这里放空,第三个参数是在什么时候结束阻塞,默认是ALL_COMPLETED表示全部任务结束之后,也可以设定为FIRST_COMPLETED表示第一个任务结束以后。

    打印结果如下

    main thread is MainThread
    thread ThreadPoolExecutor-0_0 starts
    thread ThreadPoolExecutor-0_1 starts
    thread ThreadPoolExecutor-0_2 starts
    thread ThreadPoolExecutor-0_3 starts
    thread ThreadPoolExecutor-0_4 starts
    thread ThreadPoolExecutor-0_0 ends
    thread ThreadPoolExecutor-0_0 starts
    thread ThreadPoolExecutor-0_2 ends
    thread ThreadPoolExecutor-0_2 starts
    thread ThreadPoolExecutor-0_1 ends
    thread ThreadPoolExecutor-0_1 starts
    thread ThreadPoolExecutor-0_3 ends
    thread ThreadPoolExecutor-0_4 ends
    thread ThreadPoolExecutor-0_0 ends
    thread ThreadPoolExecutor-0_2 endsthread ThreadPoolExecutor-0_1 ends
    
    total time is 4.003619432449341
    

    最后的结果也是接近两倍的函数耗时4秒,比进程池快了不止一点点。

    map

    这里需要额外提一下多线程中的map方法。

    多进程中的map_async()方法和多线程中的map()方法除了将任务加入线程池,还会按添加的顺序返回每个线程的执行结果,这个执行结果也很特殊,是一个生成器

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        ### map
        executor = ThreadPoolExecutor(5)
        all_results = executor.map(func2, range(8))  # map返回的是线程执行的结果的生成器对象
        for result in all_results:
            print(result)
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里的all_results是一个生成器,可以通过for循环来按顺序获取每个线程的返回结果。同时值得注意的是map方法并不会阻塞主线程,也没法使用wait方法,只能通过获取生成器的结果来阻塞主线程了。

    执行结果如下

    main thread is MainThread
    thread ThreadPoolExecutor-0_0 starts
    thread ThreadPoolExecutor-0_1 starts
    thread ThreadPoolExecutor-0_2 starts
    thread ThreadPoolExecutor-0_3 starts
    thread ThreadPoolExecutor-0_4 starts
    thread ThreadPoolExecutor-0_0 ends
    thread ThreadPoolExecutor-0_0 starts
    0
    thread ThreadPoolExecutor-0_1 ends
    thread ThreadPoolExecutor-0_1 starts
    thread ThreadPoolExecutor-0_2 ends
    1
    thread ThreadPoolExecutor-0_2 starts
    2
    thread ThreadPoolExecutor-0_3 ends
    3
    thread ThreadPoolExecutor-0_4 ends
    4
    thread ThreadPoolExecutor-0_0 ends
    5
    thread ThreadPoolExecutor-0_1 ends
    6
    thread ThreadPoolExecutor-0_2 ends
    7
    total time is 4.004628419876099
    

    可以看出线程结果是按顺序返回的。

    异步

    想要不用map方法又要异步获取线程的返回值,还可以用as_completed()方法

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        executor = ThreadPoolExecutor(5)
        all_tasks = [executor.submit(func2, i) for i in range(8)]
        for future in as_completed(all_tasks):
            print(future.result())
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    as_completed接受一个任务列表做为参数,返回一个生成器,所以主线程依然会被阻塞,等所有线程执行完毕打印出结果再继续执行主线程。

    打印结果如下

    main thread is MainThread
    thread ThreadPoolExecutor-0_0 starts
    thread ThreadPoolExecutor-0_1 starts
    thread ThreadPoolExecutor-0_2 starts
    thread ThreadPoolExecutor-0_3 starts
    thread ThreadPoolExecutor-0_4 starts
    thread ThreadPoolExecutor-0_0 endsthread ThreadPoolExecutor-0_1 ends
    thread ThreadPoolExecutor-0_1 starts
    1
    thread ThreadPoolExecutor-0_2 ends
    thread ThreadPoolExecutor-0_2 starts
    2
    
    thread ThreadPoolExecutor-0_0 starts
    thread ThreadPoolExecutor-0_3 ends
    thread ThreadPoolExecutor-0_4 ends
    0
    3
    4
    thread ThreadPoolExecutor-0_1 ends
    5
    thread ThreadPoolExecutor-0_0 ends
    7
    thread ThreadPoolExecutor-0_2 ends
    6
    total time is 4.003146648406982
    

    这里的线程结果就不是按照就不是按照添加任务的顺序,而是按照返回的先后顺序打印的。

    所以,想要获取多线程的返回结果,按照添加顺序就用map方法,按照返回的先后顺序就用as_completed方法

    想要更深入了解python中的futures模块,可以参考下面的文章学习下源码分析

    https://www.jianshu.com/p/b9b3d66aa0be

    同时python中还有专门做异步编程的asyncio模块,以后有时间再专门写文章说明。

    线程间通讯

    与多进程的内存独立不同,多线程间可以共享内存,所以同一个变量是可以被多个线程共享的,不需要额外的插件。想要让多个线程能同时操作某变量,要么将该变量作为参数传递到线程中(必须是可变变量,例如list和dict),要么作为全局变量在线程中用global关键字进行声明

    因为有GIL的存在,每次只能有一个线程在对变量进行操作,有人就认为python不需要互斥锁了。但是实际情况却和我们想的相差很远,先看下面这个例子

    def increase(var):
        global total_increase_times
        for i in range(1000000):
            var[0] += 1
            total_increase_times += 1
    
    
    def decrease(var):
        global total_decrease_times
        for i in range(1000000):
            var[0] -= 1
            total_decrease_times += 1
            
            
    if __name__ == '__main__':
        print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        var = [5]
        total_increase_times = 0
        total_decrease_times = 0
        t1 = threading.Thread(target=increase, args=(var,))
        t2 = threading.Thread(target=decrease, args=(var,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(var)
        print('total increase times: {}'.format(str(total_increase_times)))
        print('total decrease times: {}'.format(str(total_decrease_times)))
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里首先定义了两个函数,分别对传进来的list的第一个元素进行加一和减一操作,重复多遍。这里之所以使用list因为要满足可变变量的要求,对于python中变量和传参不熟悉的朋友可以参考另一篇博客《python3中各类变量的内存堆栈分配和函数传参区别实例详解》

    然后在主线程中创建两个子线程分别运行,同时创建两个全局变量total_increase_timestotal_decrease_times分别来统计对变量进行加值和减值的次数,为了防止可能由于操作次数不一致导致的错误。

    打印结果如下

    main thread is MainThread
    [281970]
    total increase times: 1000000
    total decrease times: 1000000
    total time is 0.7370336055755615
    

    很奇怪,对变量值增加和减少同样的次数,最后的结果却和原先的值不一致。而且如果将该程序重复运行多次,每次得到的最终值都不同,有正有负

    这是为什么呢?

    这是因为某些在我们看来是原子操作的,例如+或者-,在python看来不是的。例如执行a+=1操作,在python看来其实是三步:获取a的值,将值加1,将新的值赋给a。在这三步中的任意位置,该线程都有可能被暂停,然后让别的线程先运行。这时候就有可能出现如下的局面

    线程1获取了a的值为10,被暂停
    线程2获取了a的值为10
    线程2将a的值赋值为9,被暂停
    线程1将a的值赋值为11,被暂停
    线程2获取了a的值为11
    ...
    

    这样线程1就将线程2的操作全部覆盖了,这也就是为什么最后的结果有正有负。

    那么如何处理这种情况呢?

    需要用到互斥锁。

    互斥锁

    线程1在操作变量a的时候就给a上一把锁,别的线程看到变量有锁就不会去操作该变量,一直到线程1再次获得GIL之后继续操作将锁释放,别的线程才有机会对该变量进行操作。

    修改下上面的代码

    def increase(var, lock):
        global total_increase_times
        for i in range(1000000):
            if lock.acquire():
                var[0] += 1
                lock.release()
                total_increase_times += 1
    
    
    def decrease(var, lock):
        global total_decrease_times
        for i in range(1000000):
            if lock.acquire():
                var[0] -= 1
                lock.release()
                total_decrease_times += 1
                
                
    if __name__ == '__main__':
        print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        lock = threading.Lock()
        var = [5]
        total_increase_times = 0
        total_decrease_times = 0
        t1 = threading.Thread(target=increase, args=(var, lock))
        t2 = threading.Thread(target=decrease, args=(var, lock))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(var)
        print('total increase times: {}'.format(str(total_increase_times)))
        print('total decrease times: {}'.format(str(total_decrease_times)))
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里创建了一个全局锁lock并传递给两个线程,利用acquire()方法获取锁,如果没有获取到锁该线程会一直卡在这,并不会继续循环,操作完毕用release()方法释放锁。

    打印结果如下

    main thread is MainThread
    [5]
    total increase times: 1000000
    total decrease times: 1000000
    total time is 2.1161584854125977
    

    最终的结果不管执行多少次都没有问题,但是因为前面说的等待锁的过程会造成大量时间的浪费,这里耗时2.116秒比前面的0.737秒要慢了3倍。

    队列

    多线程间通讯也可以用queue,因为queue是对线程安全的,不需要额外加锁了

    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
    if __name__ == '__main__':
    	print('main thread is {}'.format(threading.current_thread().name))
        start_time = time.time()
        ### multithread queue
        from queue import Queue
        queue = Queue()
        tw = threading.Thread(target=write_to_queue, args=(queue,))
        tr = threading.Thread(target=read_from_queue, args=(queue,))
        tr.setDaemon(True)
        tw.start()
        tr.start()
        tw.join()
        end_time = time.time()
        print('total time is {}'.format(str(end_time - start_time)))
    

    这里不能像进程中那样用terminate方法停止一个线程,需要用setDaemon方法。

    打印结果如下

    main thread is MainThread
    write 0 to <queue.Queue object at 0x000001E3DACD21C8>
    get 0 from <queue.Queue object at 0x000001E3DACD21C8>
    write 1 to <queue.Queue object at 0x000001E3DACD21C8>
    get 1 from <queue.Queue object at 0x000001E3DACD21C8>
    write 2 to <queue.Queue object at 0x000001E3DACD21C8>
    get 2 from <queue.Queue object at 0x000001E3DACD21C8>
    write 3 to <queue.Queue object at 0x000001E3DACD21C8>
    get 3 from <queue.Queue object at 0x000001E3DACD21C8>
    write 4 to <queue.Queue object at 0x000001E3DACD21C8>
    get 4 from <queue.Queue object at 0x000001E3DACD21C8>
    total time is 5.00357986831665
    

    扩展

    多进程间的变量共享也可以用类似多线程那样传递变量或者全局变量的方式,限于篇幅这里没有展开说,感兴趣的朋友可以参考知乎上一篇不错的文章https://zhuanlan.zhihu.com/p/68828849

    总结

    总结下文章中涉及的知识点

    • CPU密集型使用多进程,IO密集型使用多线程

    • 查看进程ID和线程ID的命令分别是os.getpid()threading.current_thread()

    • 多进程使用multiprocessing就可以了,通常使用进程池来完成操作,阻塞主进程使用join方法

    • 多线程使用threading模块,线程池使用concurrent.futures模块,同时主线程的阻塞方法有多种

    • 不管多进程还是多线程,生产消费模型都可以用队列来完成,如果要用多线程操作同一变量记得加锁

    我是T型人小付,一位坚持终身学习的互联网从业者。喜欢我的博客欢迎在csdn上关注我,如果有问题欢迎在底下的评论区交流,谢谢。

    展开全文
  • 多线程适用于IO密集型任务,对于科学计算类任务,多线程非但不能提升效率,还有可能因为线程间切换调度而增加时间的消耗 import multiprocessing import time from queue import Queue from threading import ...
  • Python 多进程多线程启动

    万次阅读 2021-02-22 12:52:49
    Python 多进程启动 def main(self, num): """ 多进程启动 ValueError: Pool not running:这个问题的根源在于:pool.close()提前生效,关闭了pool。所以提示pool没有运行。 解决:多层循环的情况下,将pool....
  • C++多线程调用Python多进程

    千次阅读 2016-04-18 22:28:29
    C++、Java等编程想提高效率,很容易想到的就是使用多线程,而在Python中,由于...但是翻遍了C/Python API没找到C语言调用Python多进程的方法。而目前的项目却恰好希望能用C++调用Python多进程。尝试了好多C/Python AP
  • python 多线程 多进程同时运行 多任务要求 python 基础语法 python 文件目录操作 python 模块应用 开发工具 pycharm 实现方法 多任务的实现可以用进程线程来实现 进程—> 线程----> 多任务应用 多进程操作...
  • python多进程和多线程谁更快 python3.6 threadingmultiprocessing 四核+三星250G-850-SSD 自从用多进程和多线程进行编程,一致没搞懂到底谁更快。网上很多都说python多进程更快,因为GIL(全局解释器锁)。但是我...
  • 我在这里写了一些多线程的学习总结,在这里写了一些多进程的学习笔记,在这片文章中,就它们并串行执行的情况进行一下对比。 直接上程序 import multiprocessing as mp # 多进程的模块 import threading as td from ...
  •  # 多进程和多线程的区别在于,对于同一个变量,多进程是每个进程都有一份自己的拷贝,而多线程则是共享这个变量。  # 多线程使用不当有一定数据风险,应该为此加锁  # 因为 python 解释器带有全局锁 GIL,...
  • 最近几天在用python写一个从kafka取数据判断有没有明文手机号的需求,在这其中遇到很多坑,坑也跟python多线程多进程有关。 我在实现从kafka取数据的时候,需求还要求一分钟发送钉钉从kafka取出的明文手机号,所以...
  • 一、Python多进程多线程 关于python多进程多线程的相关基础知识,在我之前的博客有写过,并且就关于python多线程的GIL锁问题,也在我的一篇博客中有相关的解释。 为什么python多线程在面对IO密集型任务的时候会产生...
  • Python多进程多线程测试

    万次阅读 2019-09-17 17:18:35
    今天在工作中遇到爬虫效率问题,在此处记录多进程多线程测试脚本 #!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = 'Seven' from concurrent.futures import ThreadPoolExecutor, ...
  • 小编我今天就来尝试下用一文总结下Python多进程和多线程的概念和区别, 并详细介绍如何使用python的multiprocess和threading模块进行多线程和多进程编程。   重要知识点 - 什么是进程(proces...
  • 搞定python多线程和多进程

    千次阅读 2017-07-10 13:14:11
    一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发线程,每条线程并行执行不同的任务。一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。1.1.2 线程的工作方式...
  • python 多进程并发与多线程并发总结

    千次阅读 2017-05-22 18:41:27
    本文对Python支持的几种并发方式进行简单的总结。 Python支持的并发分为多线程并发与多进程并发(异步IO本文不涉及)。概念上来说,多进程并发即运行多个独立的程序,优势在于并发处理...Python多线程和多进程的支持
  • IO密集型: 推荐使用多进程...为什么Python多线程在IO密集型中是鸡肋?通过一段代码来测试 单线程执行从1亿递减到0 import datetime def run(n): while n > 0: n -= 1 if __name__ == '__main__': start_ti...
  • Python多进程多线程、协程学习小结
  • python多进程和多线程的配合使用

    千次阅读 2017-08-09 06:47:19
    由于python多线程中存在PIL锁,因此python多线程不能利用多核,那么,由于现在的计算机是多核的,就不能充分利用计算机的多核资源。...python多线程适合IO密集型的场景,多进程适合计算密集型场景。
  • python多线程和多进程总结

    千次阅读 2018-05-16 10:32:46
    python多线程: 多线程的理解:多进程和多线程都可以执行多个任务,线程是进程的一部分。线程的特点是线程之间可以共享内存变量,资源消耗少(不过再Unix环境中,多进程和多线程资源调度消耗差距不明显,Unix调度...
  • Python多进程和多线程详解

    万次阅读 多人点赞 2018-10-16 18:02:56
    1.进程的概念 一个CPU的时候运行,轮询调度实现并发执行 CPU运行机制: 计算机程序:存储在磁盘上的可执行二进制(或其他类型)文件。 只有把它们加载到内存中,并被操作系统调用它们才会拥有其自己的生命周期。...
  • python 彻底解读多线程多进程

    万次阅读 多人点赞 2019-03-26 14:20:34
    title: 多线程多进程 ...top: 0 date: 2019-03-03 16:16:41 tags: 多线程多进程 ...description: 对python多线程多进程进一步刨析。 真是这样的话,有些话,只有准确的时间准确的地点亲口说出来。现在时间错...
  • Python多进程多线程

    千次阅读 多人点赞 2021-02-06 19:17:47
    多进程与多线程进程与线程多进程多线程 进程与线程 想象在学校的一个机房,有固定数量的电脑,老师安排了一个爬虫任务让大家一起完成,每个学生使用一台电脑爬取部分数据,将数据放到一个公共数据库。共同资源就像...
  • Python 多线程 多进程 协程 yield

    千次阅读 2017-08-20 20:32:16
    python多线程和多进程的最大区别是稳定性和效率问题多进程互相之间不影响,一个崩溃了不影响其他进程,稳定性高 多线程因为都在同一进程里,一个线程崩溃了整个进程都完蛋多进程对系统资源开销大,多线程对系统资源...
  • 大话多进程多线程 “进程——资源分配的最小单位,线程——程序执行的最小单位” 进程: 是程序执行时的一个实例,即它是程序已经执行到课中程度的数据结构的汇集。从内核的观点看,进程的目的就是担当分配系统...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 106,866
精华内容 42,746
关键字:

python多进程和多线程

python 订阅