精华内容
下载资源
问答
  • python多进程原理及其实现

    万次阅读 多人点赞 2019-02-16 21:29:14
    1 进程的基本概念 ...进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。 进程的过程: 创建, 就绪, 运行 ,阻塞, ...

    1 进程的基本概念

    什么是进程?

    进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

    进程的过程: 创建, 就绪, 运行 ,阻塞, 消亡 .

    2 父进程和子进程

    ​ Linux 操作系统提供了一个 fork() 函数用来创建子进程,这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。

    ​ 在 Python 中同样提供了 fork() 函数,此函数位于 os 模块下。

    # -*- coding: utf-8 -*-  
    import os
    import time
    
    print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    
    pid = os.fork()
    if pid == 0:
        print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        time.sleep(5)
    else:
        print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        # pid表示回收的子进程的pid
        #pid, result = os.wait()  # 回收子进程资源  阻塞
        time.sleep(5)
        #print("父进程:回收的子进程pid=%d" % pid)
        #print("父进程:子进程退出时 result=%d" % result)
    
    # 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。
    # 父进程中拿到的返回值是创建的子进程的pid,大于0
    print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    
    # 在创建子进程前: pid=8122, ppid=8075
    # 父进程信息:    pid=8122, ppid=8075
    # 子进程信息:   pid=8123, ppid=8122
    # fork创建完后:  pid=8122, ppid=8075
    # fork创建完后:  pid=8123, ppid=8122
    
    # 取消13,14,15,16行注释后:
    # 在创建子进程前: pid=8133, ppid=8075
    # 父进程信息  : pid=8133, ppid=8075
    # 子进程信息  : pid=8134, ppid=8133
    # fork创建完后 : pid=8134, ppid=8133
    # 父进程:回收的子进程pid=8134
    # 父进程:子进程退出时 result=0
    # fork创建完后 : pid=8133, ppid=8075
    
    # getpid()得到本身进程id,getppid()得到父进程进程id,如果已经是父进程,得到系统进程id
    
    2.1 父子进程如何区分?

    ​ 子进程是父进程通过fork()产生出来的,pid = os.fork()

    ​ 通过返回值pid是否为0,判断是否为子进程,如果是0,则表示是子进程

    ​ 由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。

    2.2 子进程如何回收?

    python中采用os.wait()方法用来回收子进程占用的资源

    pid, result = os.wait() # 回收子进程资源 阻塞,等待子进程执行完成回收

    如果有子进程没有被回收的,但是父进程已经死掉了,这个子进程就是僵尸进程。

    3. Python进程模块

    ​ python的进程multiprocessing模块有多种创建进程的方式,每种创建方式和进程资源的回收都不太相同,下面分别针对Process,Pool及系统自带的fork三种进程分析。

    3.1 fork()
    import os
    pid = os.fork() # 创建一个子进程
    os.wait() # 等待子进程结束释放资源
    # pid为0的代表子进程。
    

    缺点:
    ​ 1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
    ​ 2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
    ​ 3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。
    优点:
    ​ 是系统自带的接近低层的创建方式,运行效率高。

    3.2 Process进程

    multiprocessing模块提供Process类实现新建进程

    # -*- coding: utf-8 -*-
    import os
    from multiprocessing  import Process
    import time
    
    def fun(name):
    	print("2 子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    	print("hello " + name)
    	
    
    def test():
    	print('ssss')
    
    
    if __name__ == "__main__":
    	print("1 主进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))  
    	ps = Process(target=fun, args=('jingsanpang', ))
    	print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
    	print("3 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    	print(ps.is_alive())
    	ps.start()
    	print(ps.is_alive())
    	print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
    	print("4 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    	ps.join()
    	print(ps.is_alive())
    	print("5 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    	ps.terminate()
    	print("6 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        
    # 1 主进程信息: pid=8143, ppid=8075
    # 111 ##### ps pid: None, ident:None
    # 3 进程信息: pid=8143, ppid=8075
    # False
    # True
    # 222 #### ps pid: 8144, ident:8144
    # 4 进程信息: pid=8143, ppid=8075
    # 2 子进程信息: pid=8144, ppid=8143
    # hello jingsanpang
    # False
    # 5 进程信息: pid=8143, ppid=8075
    # 6 进程信息: pid=8143, ppid=8075
    

    特点:
    ​ 1.注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
    2.主进程执行完毕后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程
    ​ 结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
    3.Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。

    4.当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
    5.windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。

    另外还可以通过继承Process对象来重写run方法创建进程

    3.3 进程池POOL (多个进程)
    # -*- coding: utf-8 -*-
    
    import multiprocessing
    import time
    
    def work(msg):
    	mult_proces_name = multiprocessing.current_process().name
        time.sleep(2)
    	print('process: ' + mult_proces_name + '-' + msg)
    	
    if __name__ == "__main__":
    	pool = multiprocessing.Pool(processes=5) # 创建5个进程
    	for i in range(10):
    		msg = "process %d" %(i)
    		pool.apply_async(work, (msg, ))
    	pool.close() # 关闭进程池,表示不能在往进程池中添加进程
    	pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
    	print("Sub-process all done.")
    
    # process: ForkPoolWorker-3-process 3
    # process: ForkPoolWorker-2-process 0
    # process: ForkPoolWorker-4-process 2
    # process: ForkPoolWorker-1-process 1
    # process: ForkPoolWorker-5-process 4  每5个进程一起
    # process: ForkPoolWorker-3-process 5
    # process: ForkPoolWorker-2-process 6
    # process: ForkPoolWorker-4-process 7
    # process: ForkPoolWorker-1-process 8
    # process: ForkPoolWorker-5-process 9
    # Sub-process all done.
    

    ​ 上述代码中的pool.apply_async()apply()函数的变体,apply_async()apply()的并行版本,apply()apply_async()的阻塞版本,使用apply()主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()既是Pool的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。 async 异步

    多个子进程并返回值

    apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)

    import multiprocessing
    import time
    
    def func(msg):
        time.sleep(2)
        return multiprocessing.current_process().name + '-' + msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4) # 创建4个进程
        results = []
        for i in range(10):
            msg = "process %d" %(i)
            results.append(pool.apply_async(func, (msg, )))
        pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
        pool.join() # 等待进程池中的所有进程执行完毕
        print ("Sub-process(es) done.")
        for res in results:
            print (res.get())
            
    # Sub-process(es) done.
    # ForkPoolWorker-2-process 0
    # ForkPoolWorker-4-process 1
    # ForkPoolWorker-3-process 2
    # ForkPoolWorker-1-process 3
    # ForkPoolWorker-3-process 4
    # ForkPoolWorker-2-process 5
    # ForkPoolWorker-1-process 6
    # ForkPoolWorker-4-process 7
    # ForkPoolWorker-2-process 8
    # ForkPoolWorker-3-process 9
    # ForkPoolWorker-1-process 10
    # ForkPoolWorker-4-process 11
    

    ​ 与之前的输出不同,这次的输出是有序的。

    ​ 如果电脑是八核,建立8个进程,在Ubuntu下输入top命令再按下大键盘的1,可以看到每个CPU的使用率是比较平均的

    4 进程间通信方式

    1. 管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
    2. 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
    3. 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
    4. 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。

    以上几种进程间通信方式中,消息队列是使用的比较频繁的方式。

    (1)管道pipe

    import multiprocessing
    
    def foo(sk):
       sk.send('hello father')
       print(sk.recv())
    
    if __name__ == '__main__':
       conn1,conn2=multiprocessing.Pipe()    #开辟两个口,都是能进能出,括号中如果False即单向通信
       p=multiprocessing.Process(target=foo,args=(conn1,))  #子进程使用sock口,调用foo函数
       p.start()
       print(conn2.recv())  #主进程使用conn口接收
       conn2.send('hi son') #主进程使用conn口发送
    

    (2)消息队列Queue

    Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。

    Queue的一些常用方法:

    • Queue.qsize():返回当前队列包含的消息数量;
    • Queue.empty():如果队列为空,返回True,反之False ;
    • Queue.full():如果队列满了,返回True,反之False;
    • Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
    • Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
    • Queue.put():将一个值添加进数列,可传参超时时长。
    • Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。

    案例:

    from multiprocessing import Process, Queue
    import time
    
    def write(q):
       for i in ['A', 'B', 'C', 'D', 'E']:
          print('Put %s to queue' % i)
          q.put(i)
          time.sleep(0.5)
    
    def read(q):
       while True:
          v = q.get(True)
          print('get %s from queue' % v)
    
    if __name__ == '__main__':
       q = Queue()
       pw = Process(target=write, args=(q,))
       pr = Process(target=read, args=(q,))
       print('write process = ', pw)
       print('read  process = ', pr)
       pw.start()
       pr.start()
       pw.join()
       pr.join()
       pr.terminate()
       pw.terminate()
    

    Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据**。**

    注:进程间通信应该尽量避免使用共享数据的方式

    5 多进程实现生产者消费者

    以下通过多进程实现生产者,消费者模式

    import multiprocessing
    from multiprocessing import Process
    from time import sleep
    import time
    
    class MultiProcessProducer(multiprocessing.Process):
       def __init__(self, num, queue):
          """Constructor"""
          multiprocessing.Process.__init__(self)
          self.num = num
          self.queue = queue
    
       def run(self):
          t1 = time.time()
          print('producer start ' + str(self.num))
          for i in range(1000):
             self.queue.put((i, self.num))
          # print 'producer put', i, self.num
          t2 = time.time()
    
          print('producer exit ' + str(self.num))
          use_time = str(t2 - t1)
          print('producer ' + str(self.num) + ', 
          use_time: '+ use_time)
    
    class MultiProcessConsumer(multiprocessing.Process):
       def __init__(self, num, queue):
          """Constructor"""
          multiprocessing.Process.__init__(self)
          self.num = num
          self.queue = queue
    
       def run(self):
          t1 = time.time()
          print('consumer start ' + str(self.num))
          while True:
             d = self.queue.get()
             if d != None:
                # print 'consumer get', d, self.num
                continue
             else:
                break
          t2 = time.time()
          print('consumer exit ' + str(self.num))
          print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))
    
    
    def main():
       # create queue
       queue = multiprocessing.Queue()
    
       # create processes
       producer = []
       for i in range(5):
          producer.append(MultiProcessProducer(i, queue))
    
       consumer = []
       for i in range(5):
          consumer.append(MultiProcessConsumer(i, queue))
    
       # start processes
       for i in range(len(producer)):
          producer[i].start()
    
       for i in range(len(consumer)):
          consumer[i].start()
    
       # wait for processs to exit
       for i in range(len(producer)):
          producer[i].join()
    
       for i in range(len(consumer)):
          queue.put(None)
    
       for i in range(len(consumer)):
          consumer[i].join()
    
       print('all done finish')
    
    
    if __name__ == "__main__":
       main()
    

    6 总结

    ​ python中的多进程创建有以下两种方式:

    (1)fork子进程 ( linux )

    (2)采用 multiprocessing 这个库创建子进程

    ​ 需要注意的是队列中Queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue()

    ​ 另外, 进程池使用 multiprocessing.Pool实现,pool = multiprocessing.Pool(processes = 3),产生一个进程池,pool.apply_async实现非租塞模式,pool.apply实现阻塞模式。

    apply_async和 apply函数,前者是非阻塞的,后者是阻塞。可以看出运行时间相差的倍数正是进程池数量。

    ​ 同时可以通过result.append(pool.apply_async(func, (msg, )))获取非租塞式调用结果信息的。

    展开全文
  • Python多进程原理与实现

    万次阅读 多人点赞 2018-08-21 16:16:29
    1 进程的基本概念 什么是进程?...进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。 进程的生命周期:创建(New)、...

    1 进程的基本概念

    什么是进程?

    ​ 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

    进程的生命周期:创建(New)、就绪(Runnable)、运行(Running)、阻塞(Block)、销毁(Destroy)

    进程的状态(分类):(Actived)活动进程、可见进程(Visiable)、后台进程(Background)、服务进程(Service)、空进程

    2 父进程和子进程

    ​ Linux 操作系统提供了一个 fork() 函数用来创建子进程,这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。

    ​ 在 Python 中同样提供了 fork() 函数,此函数位于 os 模块下。

    # -*- coding: utf-8 -*-  
    __author__ = 'diesn'
    __date__ = '2018/5/31 下午5:17' 
    
    import os
    import time
    
    print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    
    pid = os.fork()
    if pid == 0:
        print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        time.sleep(5)
    else:
        print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        # pid表示回收的子进程的pid
        #pid, result = os.wait()  # 回收子进程资源  阻塞
        time.sleep(5)
        #print("父进程:回收的子进程pid=%d" % pid)
        #print("父进程:子进程退出时 result=%d" % result)
    
    # 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。
    # 父进程中拿到的返回值是创建的子进程的pid,大于0
    print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    2.1 父子进程如何区分?

    ​ 子进程是父进程通过fork()产生出来的,pid = os.fork()

    ​ 通过返回值pid是否为0,判断是否为子进程,如果是0,则表示是子进程

    ​ 由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。

    2.2 子进程如何回收?

    python中采用os.wait()方法用来回收子进程占用的资源

    pid, result = os.wait() # 回收子进程资源  阻塞,等待子进程执行完成回收

    如果有子进程没有被回收的,但是父进程已经死掉了,这个子进程就是僵尸进程。

    3 Python进程模块

    ​ python的进程multiprocessing模块有多种创建进程的方式,每种创建方式和进程资源的回收都不太相同,下面分别针对Process,Pool及系统自带的fork三种进程分析。

    3.1 fork()
    import os
    pid = os.fork() # 创建一个子进程
    os.wait() # 等待子进程结束释放资源
    pid为0的代表子进程。

    缺点:
    ​ 1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
    ​ 2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
    ​ 3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。
    优点:
    ​ 是系统自带的接近低层的创建方式,运行效率高。

    3.2 Process进程

    multiprocessing模块提供Process类实现新建进程

    # -*- coding: utf-8 -*-
    import os
    from multiprocessing  import Process
    import time
    
    def fun(name):
        print("2 子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        print("hello " + name)
    
    
    def test():
        print('ssss')
    
    
    if __name__ == "__main__":
        print("1 主进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        ps = Process(target=fun, args=('jingsanpang', ))
        print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
        print("3 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        print(ps.is_alive())  # 启动之前 is_alive为False(系统未创建)
        ps.start()
        print(ps.is_alive())  # 启动之后,is_alive为True(系统已创建)
    
        print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
        print("4 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        ps.join() # 等待子进程完成任务   类似于os.wait()
        print(ps.is_alive())
        print("5 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
        ps.terminate()  #终断进程
        print("6 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

    特点:
    ​ 1.注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
    2.主进程执行完后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
    3.Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。

    4.当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
    5.windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。

    另外还可以通过继承Process对象来重写run方法创建进程

    3.3 进程池POOL (多个进程)
    # -*- coding: utf-8 -*-
    __author__ = 'disen'
    __date__ = '2018/5/31 下午9:16'
    
    import multiprocessing
    import time
    
    def work(msg):
        mult_proces_name = multiprocessing.current_process().name
        print('process: ' + mult_proces_name + '-' + msg)
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=5) # 创建5个进程
        for i in range(20):
            msg = "process %d" %(i)
            pool.apply_async(work, (msg, ))
        pool.close() # 关闭进程池,表示不能在往进程池中添加进程
        pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
        print("Sub-process all done.")

    ​ 上述代码中的pool.apply_async()apply()函数的变体,apply_async()apply()的并行版本,apply()apply_async()的阻塞版本,使用apply()主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()既是Pool的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。

    多个子进程并返回值

    apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)

    import multiprocessing
    import time
    
    def func(msg):
        return multiprocessing.current_process().name + '-' + msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4) # 创建4个进程
        results = []
        for i in range(20):
            msg = "process %d" %(i)
            results.append(pool.apply_async(func, (msg, )))
        pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
        pool.join() # 等待进程池中的所有进程执行完毕
        print ("Sub-process(es) done.")
    
        for res in results:
            print (res.get())

    ​ 与之前的输出不同,这次的输出是有序的。

    ​ 如果电脑是八核,建立8个进程,在Ubuntu下输入top命令再按下大键盘的1,可以看到每个CPU的使用率是比较平均的

    4 进程间通信方式

    1. 管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
    2. 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
    3. 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
    4. 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。

    以上几种进程间通信方式中,消息队列是使用的比较频繁的方式。

    (1)管道pipe**

    import multiprocessing
    
    def foo(conn):
       conn.send('hello father')   #向管道pipe发消息
       print(conn.recv())
    
    if __name__ == '__main__':
       conn1,conn2=multiprocessing.Pipe(True)    #开辟两个口,都是能进能出,括号中如果False即单向通信
       p=multiprocessing.Process(target=foo,args=(conn1,))  #子进程使用sock口,调用foo函数
       p.start()
       print(conn2.recv())  #主进程使用conn口接收,从管道(Pipe)中读取消息
       conn2.send('hi son') #主进程使用conn口发送

    (2)消息队列Queue

    Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。

    Queue的一些常用方法:

    • Queue.qsize():返回当前队列包含的消息数量;
    • Queue.empty():如果队列为空,返回True,反之False ;
    • Queue.full():如果队列满了,返回True,反之False;
    • Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
    • Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
    • Queue.put():将一个值添加进数列,可传参超时时长。
    • Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。

    案例:

    from multiprocessing import Process, Queue
    import time
    
    
    def write(q):
       for i in ['A', 'B', 'C', 'D', 'E']:
          print('Put %s to queue' % i)
          q.put(i)
          time.sleep(0.5)
    
    
    def read(q):
       while True:
          v = q.get(True)
          print('get %s from queue' % v)
    
    
    if __name__ == '__main__':
       q = Queue()
       pw = Process(target=write, args=(q,))
       pr = Process(target=read, args=(q,))
       print('write process = ', pw)
       print('read  process = ', pr)
       pw.start()
       pr.start()
       pw.join()
       pr.join()
       pr.terminate()
       pw.terminate()
    

    Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据

    注:进程间通信应该尽量避免使用共享数据的方式

    5 多进程实现生产者消费者

    以下通过多进程实现生产者,消费者模式

    import multiprocessing
    from multiprocessing import Process
    from time import sleep
    import time
    
    
    class MultiProcessProducer(multiprocessing.Process):
       def __init__(self, num, queue):
          """Constructor"""
          multiprocessing.Process.__init__(self)
          self.num = num
          self.queue = queue
    
       def run(self):
          t1 = time.time()
          print('producer start ' + str(self.num))
          for i in range(1000):
             self.queue.put((i, self.num))
          # print 'producer put', i, self.num
          t2 = time.time()
    
          print('producer exit ' + str(self.num))
          use_time = str(t2 - t1)
          print('producer ' + str(self.num) + ', 
          use_time: '+ use_time)
    
    
    
    class MultiProcessConsumer(multiprocessing.Process):
       def __init__(self, num, queue):
          """Constructor"""
          multiprocessing.Process.__init__(self)
          self.num = num
          self.queue = queue
    
       def run(self):
          t1 = time.time()
          print('consumer start ' + str(self.num))
          while True:
             d = self.queue.get()
             if d != None:
                # print 'consumer get', d, self.num
                continue
             else:
                break
          t2 = time.time()
          print('consumer exit ' + str(self.num))
          print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))
    
    
    def main():
       # create queue
       queue = multiprocessing.Queue()
    
       # create processes
       producer = []
       for i in range(5):
          producer.append(MultiProcessProducer(i, queue))
    
       consumer = []
       for i in range(5):
          consumer.append(MultiProcessConsumer(i, queue))
    
       # start processes
       for i in range(len(producer)):
          producer[i].start()
    
       for i in range(len(consumer)):
          consumer[i].start()
    
       # wait for processs to exit
       for i in range(len(producer)):
          producer[i].join()
    
       for i in range(len(consumer)):
          queue.put(None)
    
       for i in range(len(consumer)):
          consumer[i].join()
    
       print('all done finish')
    
    
    if __name__ == "__main__":
       main()

    6 总结

    ​ python中的多进程创建有以下两种方式:

    (1)fork子进程

    (2)采用 multiprocessing 这个库创建子进程

    ​ 需要注意的是队列中queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue()

    ​ 另外, 进程池使用 multiprocessing.Pool实现,pool = multiprocessing.Pool(processes = 3),产生一个进程池,pool.apply_async实现非租塞模式,pool.apply实现阻塞模式。

    apply_async和 apply函数,前者是非阻塞的,后者是阻塞。可以看出运行时间相差的倍数正是进程池数量。

    ​ 同时可以通过result.append(pool.apply_async(func, (msg, )))获取非租塞式调用结果信息的。

    展开全文
  • nginx多进程原理和特点

    千次阅读 2015-05-11 20:59:57
    目录 一、进程模型 1 二、进程控制 2 三、网络事件 2 四、Nginx架构 3 五、daemon守护线程 3 ...Nginx之所以为广大码农喜爱,除了其高性能外,还有...与Memcached的经典多线程模型相比,Nginx是经典的多进程模型。Ngi

    目录

    一、进程模型 1

    二、进程控制 2

    三、网络事件 2

    四、Nginx架构 3

    五、daemon守护线程 3

    六、惊群现象 4

    七、相对于线程,采用进程的优点 4

    八、多线程的问题 5

    九、异步非阻塞 5

     

    一、进程模型
    Nginx之所以为广大码农喜爱,除了其高性能外,还有其优雅的系统架构。与Memcached的经典多线程模型相比,Nginx是经典的多进程模型。Nginx启动后以daemon的方式在后台运行,后台进程包含一个master进程和多个worker进程,具体如下图:


    图1 Nginx多进程模型 

    master进程主要用来管理worker进程,具体包括如下4个主要功能:

    (1)接收来自外界的信号。

    (2)向各worker进程发送信号。

    (3)监控woker进程的运行状态。

    (4)当woker进程退出后(异常情况下),会自动重新启动新的woker进程。

    woker进程主要用来处理网络事件,各个woker进程之间是对等且相互独立的,它们同等竞争来自客户端的请求,一个请求只可能在一个woker进程中处理,woker进程个数一般设置为机器CPU核数。

    二、进程控制

    对Nginx进程的控制主要是通过master进程来做到的,主要有两种方式:

    (1) 手动发送信号

    从图1可以看出,master接收信号以管理众woker进程,那么,可以通过kill向master进程发送信号,比如kill -HUP pid用以通知Nginx从容重启。所谓从容重启就是不中断服务:master进程在接收到信号后,会先重新加载配置,然后再启动新进程开始接收新请求, 并向所有老进程发送信号告知不再接收新请求并在处理完所有未处理完的请求后自动退出。

    (2)自动发送信号

    可以通过带命令行参数启动新进程来发送信号给master进程,比如./nginx -s reload用以启动一个新的Nginx进程,而新进程在解析到reload参数后会向master进程发送信号(新进程会帮我们把手动发送信号中的动作 自动完成)。当然也可以这样./nginx -s stop来停止Nginx。

    三、网络事件

    Nginx采用异步非阻塞的方式来处理网络事件,类似于Libevent,具体过程如下图:


    图2 Nginx网络事件 

    master进程先建好需要listen的socket后,然后再fork出多个woker进程,这样每个work进程都可以去 accept这个socket。当一个client连接到来时,所有accept的work进程都会受到通知,但只有一个进程可以accept成功,其它 的则会accept失败。Nginx提供了一把共享锁accept_mutex来保证同一时刻只有一个work进程在accept连接,从而解决惊群问 题。当一个worker进程accept这个连接后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接,这样一个完成的 请求就结束了。

     

    四、Nginx架构

      Nginx全称是什么? Nginx ("engine x") 是一个高性能的 HTTP和反向代理服务器,也是一个 IMAP/POP3/SMTP代理服务器。

    五、daemon守护线程

      nginx在启动后,在unix系统中会以daemon的方式在后台运行,后台进程包含一个master进程和多个worker进程。

     

      当然nginx也是支持多线程的方式的,只是我们主流的方式还是多进程的方式,也是nginx的默认方式。

      master进程主要用来管理worker进程,包含:接收来自外界的信号,向各worker进程发送信号,监控worker进程的运行状态,当worker进程退出后(异常情况下),会自动重新启动新的worker进程。

      worker进程则是处理基本的网络事件。多个worker进程之间是对等的,他们同等竞争来自客户端的请求,各进程互相之间是独立的。一个请求,只可能在一个worker进程中处理,一个worker进程,不可能处理其它进程的请求。

      worker进程的个数是可以设置的,一般我们会设置与机器cpu核数一致。更多的worker数,只会导致进程来竞争cpu资源了,从而带来不必要的上下文切换。而且,nginx为了更好的利用多核特性,具有cpu绑定选项,我们可以将某一个进程绑定在某一个核上,这样就不会因为进程的切换带来cache的失效。

    六、惊群现象

      每个worker进程都是从master进程fork过来。在master进程里面,先建立好需要listen的socket之 后,然后再fork出多个worker进程,这样每个worker进程都可以去accept这个socket(当然不是同一个socket,只是每个进程 的这个socket会监控在同一个ip地址与端口,这个在网络协议里面是允许的)。一般来说,当一个连接进来后,所有在accept在这个socket上 面的进程,都会收到通知,而只有一个进程可以accept这个连接,其它的则accept失败。

    七、相对于线程,采用进程的优点

      进程之间不共享资源,不需要加锁,所以省掉了锁带来的开销。

      采用独立的进程,可以让互相之间不会影响,一个进程退出后,其它进程还在工作,服务不会中断,master进程则很快重新启动新的worker进程。

      编程上更加容易。

    八、多线程的问题

      而多线程在多并发情况下,线程的内存占用大,线程上下文切换造成CPU大量的开销。想想apache的常用工作方式(apache 也有异步非阻塞版本,但因其与自带某些模块冲突,所以不常用),每个请求会独占一个工作线程,当并发数上到几千时,就同时有几千的线程在处理请求了。这对 操作系统来说,是个不小的挑战,线程带来的内存占用非常大,线程的上下文切换带来的cpu开销很大,自然性能就上不去了,而这些开销完全是没有意义的。

    九、异步非阻塞

      异步的概念和同步相对的,也就是不是事件之间不是同时发生的。

      非阻塞的概念是和阻塞对应的,阻塞是事件按顺序执行,每一事件都要等待上一事件的完成,而非阻塞是如果事件没有准备好,这个事件可以直接返回,过一段时间再进行处理询问,这期间可以做其他事情。但是,多次询问也会带来额外的开销。

      总的来说,Nginx采用异步非阻塞的好处在于:

    · 不需要创建线程,每个请求只占用少量的内存

    · 没有上下文切换,事件处理非常轻量

      淘宝tengine团队说测试结果是“24G内存机器上,处理并发请求可达200万”。

     

    展开全文
  • 多进程与多线程的实现与原理

    千次阅读 2018-02-12 01:34:34
    多进程 关键字 开启进程的俩种方式 进程之间的内存空间是隔离的 进程中的join方法使用 进程对象的其它属性和方法 守护进程的使用 进程的互斥锁 进程实现队列 生产者和消费者模型 多线程 使用多线程实现tcp并发...

    多进程

    关键字

    #p1.terminate() #主动杀死子进程
    #p1.is_alive()  #判断子进程是否存活

    开启进程的俩种方式

    #方式1:直接使用默认的Process类
    #实验目的:查看进程起的时候是需要时间的,起的时间足够python把下面的代码运行完,
    from multiprocessing import Process
    import time
    
    def task(name):
        print('1、%s is running' %name)
        time.sleep(5)
        print('2、%s is done' %name)
    if __name__=='__main__':
        p=Process(target=task,args=('alex',)) #args后面使用的是一个元组的形式
        p.start()  #起进程是需要时间的,起的时间足够python把下面的代码运行完
        # time.sleep(1) #在start()后面sleep(1)秒,会发现先执行第一个print.
        print('3、主')
    
    #方法2:自定义一个Process类
    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess,self).__init__()
            self.name=name #自定义的功能
        def run(self):  #自定义的Process类,必须要自定义一个run方法。
            print('1、%s is running' %self.name)
            time.sleep(3)
            print('2、%s is done' %self.name)
    if __name__=='__main__':
        p=MyProcess('进程1')
         p.start()  #相等于p.run(),执行自定义进程的方法
    #     time.sleep(1)

    进程之间的内存空间是隔离的

    #实验目的:使用pid验证进程和进程之间数据是相互隔离的
    from multiprocessing import Process
    import time,os
    n=100
    def task():
        global n   #修改全局变量
        time.sleep(1)
        n=0
        print(n,os.getpid(),os.getppid())
    
    if __name__=='__main__':
        p=Process(target=task,)
        p.start()
        print(p.is_alive())
        p.join()  #等待p这个进程运行完成才运行下面的代码
        print(p.is_alive())  #判断p这个进程是否存活
        print(n,os.getpid(),os.getppid())
    # True
    # 0 7480 2156  #n的值,子进程的pid,和子进程的父进程的pid
    # False
    # 100 2156 8312 #n的值,主进程的pid,父进程的pid(pycham的pid)

    进程中的join方法使用

    #实验目的,使用join方法实现并发执行,并且查看到起进程是需要耗时间的,并且很长
    from  multiprocessing import Process
    import os,time
    
    def task(n):
        print('1、%s is running' %n,os.getpid(),os.getppid())
        time.sleep(n)
        print('2、%s is done' %n,os.getpid(),os.getppid())
    
    if __name__=='__main__':
        p1=Process(target=task,args=(1,))
        p2=Process(target=task,args=(2,))
        p3=Process(target=task,args=(3,))
        stat=time.time()
        p_l=[p1,p2,p3]
        for p in p_l:
            p.start()
        for p in p_l:
            p.join()
        stop=time.time()
        print('主进程,%s'%str(stop-stat),os.getpid(),os.getppid())
    # 1、1 is running 4680 6188
    # 1、2 is running 9988 6188
    # 1、3 is running 8564 6188
    # 2、1 is done 4680 6188
    # 2、2 is done 9988 6188
    # 2、3 is done 8564 6188
    # 主进程,3.3031890392303467 6188 8312  #3.3秒,由此可以看到起一个进程是需要耗时的,6188是主进程的pid,8312是pycharm的pid

    进程对象的其它属性和方法

    #实验目的,主动杀死进程,杀死进程也是给系统发起一个信号,需要比较长的时间才能把进程杀死
    from  multiprocessing import Process
    import time
    import os
    def task(n):
        print('1、%s is running' %n,os.getpid(),os.getppid())
        time.sleep(3)
        print('2、%s is done' %n,os.getpid(),os.getppid())
    
    if __name__=='__main__':
        stat=time.time()
        p1=Process(target=task,args=('lqx',))
        p1.start()
        time.sleep(1)
        print(p1.is_alive())
        p1.terminate()  #给系统发一个信号,杀死子进程,这个是需要时间的,所有下面的判断,子进程还是存活的,如果休息一秒,会发现子进程已经没了
        time.sleep(1)
        print(p1.is_alive())
        p1.join()
        stop=time.time()
        print('3、主进程',stop-stat,os.getpid(),os.getppid())

    守护进程的使用

    主进程的代码运行完毕以后守护进程就没有意义存在了,会立马死掉

    #守护进程:当子进程执行的任务在父进程代码运行完毕后就没有必要存在的必要了,那么该子进程需要设置为守护进程。
    #实验目的:开启守护进程,是不能在守护进程中再次开启子进程的。
    from multiprocessing import Process
    import time,os
    def bar():
        print('bar')
    def task(name):
        # p=Process(target=bar,)  #开启守护进程后不允许再次开启子进程
        # p.start()
        print('%s is runing' %name)
        time.sleep(2)
        print('%s is end' %name)
    if __name__=='__main__':
        p=Process(target=task,args=('lqx',))
        p.daemon=True  #守护进程开启必须在开启进程之前
        p.start()
        time.sleep(1)
        print('主进程')
    # lqx is runing  #在休息的1秒中的时候,守护进程启动,然后瞬间运行了第一条,然后sleep了,主进程运行完成了,程序结束
    # 主进程
    
    #主进程代码运行完毕,守护进程就会结束
    from multiprocessing import Process
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print('end')
    def bar():
        print(456)
        time.sleep(2)
        print('end456')
    
    if __name__=='__main__':
        p=Process(target=foo)
        p2=Process(target=bar)
    
        p.daemon=True
    
        p.start()
        p2.start()
        # time.sleep(0.5)
        print('main____')
    # main____
    # 456
    # end456
    
    #没有打印123的原因是没有等到p1这个进程启动,代码就已经运行完成了,导致123没有出来,如果电脑运行过快,申请内存空间快,在P2.start()的时候,p1已经启动了,有可能就打印出来123,只要main__出现,123就不会出现了,p1(守护进程)立马会死掉
    # 但是打印456,原因是:主进程需要等待非守护进程死掉后把pid给回收掉
    

    进程的互斥锁

    #实验目的:在修改数据文件的时候,如果并发执行,会导致数据文件修改紊乱的问题,因此使用互斥锁把并发改为串行,一个进程一个进程的去修改数据文件,保证数据的安全性
    from multiprocessing import Process,Lock
    import time,os,json,random
    def search():
        time.sleep(random.randint(1,2))
        dic=json.load(open('db.txt','r',encoding='utf-8'))
        print('%s 查看余票为%s' %(os.getpid(),dic['count']))
    def get():
        dic=json.load(open('db.txt','r',encoding='utf-8'))
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(random.randint(0,1))
            json.dump(dic,open('db.txt','w',encoding='utf-8'))
            print('%s 购票成功!' %os.getpid())
    def task(mutex):
        search()
        mutex.acquire()
        get()
        mutex.release()
    if __name__=='__main__':
        mutex=Lock()  #添加一个互斥锁,生成一个实例
        for i in range(10):
            p=Process(target=task,args=(mutex,))
            p.start()

    进程实现队列

    from multiprocessing import Queue
    
    q=Queue(3)  #设置队列最大长度
    q.put('first') #添加一个值到队列中
    q.put(2)
    q.put({'count':3})
    # q.put('fourth')   #如果添加值的个数超出队列长度,会发生队列阻塞。
    # q.put('fourth',block=False)   #q.put_nowait('fourth')   #发生阻塞的时候主动抛出一个异常queue.Full
    # q.put('fourth',block=True,timeout=3)   #发生阻塞的时候等待3秒主动抛出一个异常queue.Full
    
    print(q.get())  #从队列中取出一个值
    print(q.get())
    print(q.get())
    # print(q.get())  #如果队列为空,会发生队列阻塞
    # print(q.get(block=False))   #如果队列为空,会主动抛出一个异常 queue.Empty
    print(q.get(block=True,timeout=3))   #发生阻塞的时候等待3秒主动抛出一个异常queue.Empty

    生产者和消费者模型

    #实验目的:实现生产者生产东西,消费者去消费,生产多少消费多少
    #生产者->队列->消费者
    #生产者生产完成后,消费者去消费,会出现一种情况生产者p生产完成后就子进程就结束了,但是可能有多个,消费c在取值取到空的时候,会一直处于死循环中,一直卡在c.get()这一步
    #解决方法一:
    #在生产者生产完成后主进程q.put(None),有几个消费者就应该put几个空,然后当消费者取到空的时候,break掉循环
    from multiprocessing import Process,JoinableQueue
    import time,random
    def producer(name,food,q):
        for i in range(3):
            res='%s%s' %(food,i)
            time.sleep(random.randint(1,2))
            q.put(res)
            print('厨师%s生产%s'%(name,res))
    
    def consumer(name,q):
        while True:
            res=q.get()
            if res is None:break
            time.sleep(random.randint(1,2))
            print('吃货%s吃掉了%s'%(name,res))
    
    if __name__=='__main__':
        q=JoinableQueue()
    
        q1=Process(target=producer,args=('lqx','水',q))
        q2=Process(target=producer,args=('yft','食物',q))
    
        c1=Process(target=consumer,args=('dog',q))
        c2=Process(target=consumer,args=('horse',q))
        c3=Process(target=consumer,args=('pig',q))
    
        q_l=[q1,q2,c1,c2,c3]
        for qn in q_l:qn.start()
        q1.join()
        q2.join()
    
        q.put(None)
        q.put(None)
        q.put(None)
        print('主')
    
    #解决方法2:守护进程在生产者消费者模型中的应用
    from multiprocessing import Process,JoinableQueue
    import time
    import random
    
    def producer(name,food,q):
        for i in range(3):
            res='%s %s' %(name,food)
            time.sleep(random.randint(1,2))
            q.put(res)
            print('%s 生产出来 %s' %(name,food))
    def consumer(name,q):
        while True:
            res=q.get()
            if res is None:break
            time.sleep(random.randint(1,2))
            print('%s 吃掉 %s' %(name,res))
            q.task_done()  #消费者每取走一个值,给q.join()发送确认
    if __name__=='__main__':
        q=JoinableQueue()
    
        p1=Process(target=producer,args=('lqx','骨头',q))
        p2=Process(target=producer,args=('yft','肉',q))
    
        c1=Process(target=consumer,args=('dog',q))
        c2=Process(target=consumer,args=('pig',q))
        c3=Process(target=consumer,args=('horse',q))
    
        p1.start()
        p2.start()
    
        c1.daemon=True
        c2.daemon=True
        c3.daemon=True  #不添加消费者为守护进程,程序运行完成后,消费者c还是会循环取值,取到空值,卡在原地,把消费者设置为守护进程,等
        c1.start()
        c2.start()
        c3.start()
    
        p1.join()
        p2.join() #保障生产者把数据生产完
        q.join()  #主进程可以暂停,等待队列结束
        print('主')
    

    多线程

    使用多线程实现tcp并发

    #server端
    from threading import Thread,current_thread
    from socket import *
    
    def communicate(conn):
        print('线程2:%s'%current_thread().getName())  #查看线程的名字
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    def server():
        print('线程1:%s' %current_thread().getName())
        server=socket(AF_INET,SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind(('127.0.0.1',9090))
        server.listen(5)
        while True:
            conn,addr=server.accept()
            print(addr)
            t1=Thread(target=conmuicate,args=(conn,))   #启动一个线程,把建立的连接发送给这个线程去执行,实现并发
            t1.start()
        server.close()
    if __name__=='__main__':
        server()
    
    #client端
    from socket import *
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',9090))
    while True:
        cmd=input('>>>').strip()
        if not cmd: continue
        client.send(cmd.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('gbk'))
    client.close()

    开启线程的俩种方式

    #方法一:
    from threading import Thread
    import time
    import random
    
    def piao(name):
        print('%s is piaoing' %name)
        time.sleep(random.randint(1,2))
        print('%s is end' %name)
    
    if __name__=='__main__':
        t1=Thread(target=piao,args=('lqx',))
        t1.start()
        print('主')
    
    
    #方法二:使用class自定义线程名
    from threading import Thread
    import time
    import random
    class MyThread(Thread):
        def __init__(self,name):
            super(MyThread,self).__init__()   #super().__init__()
            self.name=name
        def run(self):
            print('%s is piaoing' %self.name)
            time.sleep(random.randint(1,2))
            print('%s is piaoing end ' %self.name)
    
    if __name__=='__main__':
        t1=MyThread('lqx')
        t1.start()
        print('主')

    进程和线程的俩种区别

    区别一:启动数据快
    from threading import Thread
    from multiprocessing import Process
    import time,random,os
    def piao():
        print('%s is piaoing' %os.getpid(),os.getppid())
        # time.sleep(random.randint(1,2))
    if __name__=='__main__':
        t1=Thread(target=piao)
        t2=Thread(target=piao)
        t3=Thread(target=piao)
        t4=Thread(target=piao)
        stat=time.time()
        # t1=Process(target=piao)
        # t2=Process(target=piao)
        # t3=Process(target=piao)
        # t4=Process(target=piao)
        l_t=[t1,t2,t3,t4]
        for t in l_t:t.start()
        print('主',os.getpid())
        stop=time.time()
        print('time %s' %(stop-stat))
    区别二:线程间资源共享,进程间资源独立
    from multiprocessing import Process
    from threading import Thread
    
    n=100
    def number():
        global n
        n=0
    
    if __name__=='__main__':
        # p=Process(target=number)
        # p.start()
        # p.join()
        t=Thread(target=number)
        t.start()
        t.join()
        print('主:n=%s'%n)
    #主:n=100
    主:n=0

    守护线程的使用

    #目的:开启守护线程后,主线程不会等待守护线程运行完,就会全部关闭
    from threading import Thread
    import time
    
    def sayhi(name):
        print('>>>')
        time.sleep(1)
        print('%s say hello ' %name)
    
    if __name__=='__main__':
        t=Thread(target=sayhi,args=('lqx',))
        t.daemon=True   # t.setDaemon(True)
        t.start()
        print('主线程')
    
    #主进程运行完成后,会等待非守护线程运行完成,在这个等待过程中守护进程是不会关闭的
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print('end 123')
    def bar():
        print(456)
        time.sleep(2)
        print('end 456')
    
    if __name__=='__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.setDaemon(True)
        t1.start()
        t2.start()
        print('主')

    线程的互斥锁

    #添加锁,会消耗代码的运行速度,但是保证了一个时间只能有一个线程去修改数据
    from threading import Thread,Lock
    import time
    n=100
    
    def task():
        global n
        # with mutex:   #添加锁的方法
        #     temp=n
        #     time.sleep(0.1)
        #     n=temp-1
        mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        mutex.release()
    
    if __name__=='__main__':
        start_time=time.time()
        mutex=Lock()
        t_l=[]
        for i in range(100):
            t=Thread(target=task)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        stop_time=time.time()
        print('主',stop_time-start_time,n)

    线程的GIL锁(解释器锁)

    #计算密集型:开多进程
    from multiprocessing import Process
    from threading import Thread
    import os,time
    def work():
        res=0
        for i in range(100000000):
            res*=i
    
    if __name__=='__main__':
        l=[]
        start=time.time()
        for i in range(4):
            # p=Process(target=work)   #使用进程:run time is 15.462884426116943
            p=Thread(target=work)  #使用线程:run time is 36.300076484680176
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop=time.time()
        print('run time is %s ' %(stop-start))
    #I/O密集型:多线程效率高
    from multiprocessing import Process
    from threading import Thread
    import threading
    import os,time
    def work():
        time.sleep(2)
    if __name__=='__main__':
        l=[]
        stat=time.time()
        for i in range(400):
            # p=Process(target=work)  #run time is 36.544090032577515
            p=Thread(target=work)  #run time is 2.0741186141967773
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop=time.time()
        print('run time is %s' %(stop-stat))

    paramiko模块实现ssh登录

    import paramiko
    
    #创建ssh对象
    ssh=paramiko.SSHClient()
    #允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    #连接服务器
    ssh.connect(hostname='119.28.17.250',port=22,username='root',password='xxxxxx')
    
    #执行命令:
    while True:
        cmd=input('>>>').strip()
        if not cmd :continue
        if cmd=='exit':break
        stdin,stdout,stderr=ssh.exec_command(cmd)
        #获取命令结果
        result=stdout.read() + stderr.read()
        print(result.decode('utf-8'))
    
    #关闭连接
    ssh.close()
    import paramiko
    
    transport=paramiko.Transport(('119.28.17.250',22))
    transport.connect(username='root',password='123,qwe.')
    
    sftp=paramiko.SFTPClient.from_transport(transport)
    
    sftp.put(r'D:\python20期课程\day9\02多线程\7GIL测试.py','/root/test.py')
    
    sftp.get('/root/softether-vpnserver-v4.24-9651-beta-2017.10.23-linux-x64-64bit.tar.gz','D:\softether-vpnserver-v4.24-9651-beta-2017.10.23-linux-x64-64bit.tar.gz')
    
    transport.close()

    死锁和递归锁RLock

    死锁:是指俩个或者俩个以上的进程或线程在执行过程中,因争夺资源而找出的一种互相等待的现象,若无外力作用,他们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

    from threading import Thread,RLock,Lock
    import time
    # mutexA=Lock()
    # mutexB=Lock()
    mutexA=mutexB=RLock()  #RLock锁可以解决死锁问题,只要锁的引用计数为0,大家就都可以抢了
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire() #1
            print('%s 拿到了A锁' %self.name)
            mutexB.acquire() #2
            print('%s 拿到了B锁' %self.name)
            mutexB.release() # 1
            mutexA.release() # 0
            #此时引用计数都是0了,第一个线程也会跟着其他的线程一起去抢才能抢到锁
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            time.sleep(0.1)
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
            mutexA.release()
            mutexB.release()
    
    if __name__=='__main__':
        for i in range(10):
            t=MyThread()
            t.start()

    信号量Semaphore

    Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1,调用release()时内置计数器+1,计数器不能小于0,当计数器为0时,acquire()将阻塞线程知道其他线程调用release()。
    ps:信号量与进程池是完全不同的俩个概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这4个进程,不会产生新的,而信号量是产生一对线程或者进程的

    from threading import Thread,Semaphore,current_thread
    import time,random
    #current_thread:
    #current_thread().getName() 显示线程的名字
    
    sm=Semaphore(5) #同时只有5个线程可以获得semaphore,即可以限制最大连接数为5
    
    def task():
        with sm: #添加锁的另一种方法
            print('%s is laing' %current_thread().getName())
            time.sleep(random.randint(1,3))
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=task,)
            t.start()

    Event事件

    event.is_Set() 返回event的状态值;
    event.wait() 如果event.isSet()==False将阻塞线程;
    event.set() 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度;
    event.clear() 恢复event的状态值为False

    #event事件就是:一个线程可以工作到某个时间点后,通知另一个线程开始运行
    from threading import Thread,Event,current_thread
    import time
    # event.is_Set():返回event的状态值;
    # event.wait():如果 event.isSet()==False将阻塞线程;
    # event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    # event.clear():恢复event的状态值为False
    
    event=Event()
    def check():
        print('checking MySQL...')
        # time.sleep(2)
        time.sleep(5)
        event.set()  #相等于把全局变量改为True
    
    def conn():
        count=1
        while not event.is_set():  #相等于这里不是True,就进入循环
            if count >3 :
                raise TimeoutError('超时')
            print('%s try to connect mysql time %s ' %(current_thread().getName(),count))
            event.wait(1)  #阻塞1秒
            count+=1
        print('%s connected mysql' %current_thread().getName())
    
    if __name__ == '__main__':
        t1=Thread(target=check)
        t2=Thread(target=conn)
        t3=Thread(target=conn)
        t4=Thread(target=conn)
    
        t1.start()
        t2.start()
        t3.start()
        t4.start()

    定时器

    指定一个线程多长时间后开始运行

    from threading import Timer
    
    def hello(name):
        print('hello,world %s' %name)
    t=Timer(3,hello,args=('egon',))
    t.start() 

    线程的queue

    import queue
    
    #队列:先进先出
    q=queue.Queue(3)
    
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4) #队列满以后,会阻塞,不会抛出异常
    # q.put_nowait(4)   # q.put(4,block=False)  #默认
    # q.put(4,block=True,timeout=3)
    print(q.get())
    print(q.get())
    print(q.get())
    
    #堆栈:后进先出
    q=queue.LifoQueue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    #优先级队列,值越低,优先级越大
    q=queue.PriorityQueue(3)
    q.put((10,'a'))
    q.put((-3,'b'))
    q.put((100,'c'))
    
    print(q.get()[1])
    print(q.get()[1])
    print(q.get())

    进程池ProcessPoolExecutor

    提交任务的俩种方式:
    1、同步调用:提交完任务后,就在原地等待,等待任务执行完毕,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
    2、异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行,程序是并发执行

    进程的执行状态:
    阻塞;同步调用,执行完任务后,在原地等待,也是阻塞状态
    非阻塞;

    1、介绍:
    concurrent.futures:模块提供高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor:进程池,提供异步调用
    2、基本方法:
    submit(fn,*args,**kwargs)  #异步提交任务
    
    
    #同步调用实例:
    #concurrent.futures:模块提供高度封装的异步调用接口
    #ThreadPoolExecutor:线程池,提供异步调用
    #ProcessPoolExecutor:进程池,提供异步调用
    #pool.shutdown(wait=True)  #shutdown意思是关闭池子,不能再次往池子中提交任务
    #pool.submit(fn,*args,**kwargs) #异步提交任务
    #result(timeout=None) #取得结果
    #add_done_callback(fn) #回调函数
    
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random,os
    
    def task(n):
        print('%s is running' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    def handle(res):
        # res=res.result() #另一种办法变成同步调用
        print('handle res %s' %res)
    
    if __name__ == '__main__':
        pool=ProcessPoolExecutor(2) #设置池子的大小
        for i in range(5):
            res=pool.submit(task,i).result()  #运行一个进程,等待运行完毕,使用result()拿到结果
            # res = pool.submit(task, i) #另一种办法变成同步调用
            handle(res)
        pool.shutdown(wait=True)
        print('主')
    
    #异步调用实例:
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random,os
    
    def task(n):
        print('%s is running' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    def handle(res):
        res=res.result()
        print('handle res %s' %res)
    
    if __name__ == '__main__':
        pool=ProcessPoolExecutor(2)
        for i in range(5):
            res = pool.submit(task, i)
            res.add_done_callback(handle)  #调用obj下面的方法,也就是回调函数
        pool.shutdown(wait=True)
        print('主')

    线程池ThreadPoolExecutor

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import requests
    import time
    
    def get(url):
        print('%s get %s' %(current_thread().getName(),url))
        response=requests.get(url)
        time.sleep(2)
        if response.status_code==200:
            return {'url':url,'content':response.text}
    
    def parse(res):
        res=res.result()
        print('parse:[%s] res:[%s]' %(res['url'],len(res['content'])))
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(2)
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
        ]
        for url in urls:
            pool.submit(get,url).add_done_callback(parse)
        pool.shutdown(wait=True)

    协程

    1、并发的本质是:切换+保存状态
    2、cpu正在运行一个任务,会在俩种情况下切走去执行其他的任务,一种情况是该任务发生阻塞,另一种情况是该任务计算的时间过长或者有一个更高优先级的程序替代了它
    3、协程:是单线程下的并发,又称为微线程,纤程。协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的

    yield本身就是一种在单线程下可以保存任务运行状态:
    1、yield可以保存状态,yield的状态保存于操作系统的保存线程状态很像,但是yield是代码级别控制,更轻量级
    2、send可以把一个函数的结果传到另一个函数,以此实现单线程内程序之间的切换
    
    #单纯地切换反而会降低运行效率
    #串行执行
    #基于yield并发执行:
    import time
    def consumer():
        '''任务1:接收数据,处理数据'''
        while True:
            print('consumer')
            x=yield
    
    def producer():
        '''任务2:生产数据'''
        g=consumer()
        next(g)
        for i in range(1000000):
            print('producer')
            g.send(i)
        return res
    
    start=time.time()
    #基于yield保存状态,实现俩个任务直接来回切换,即并发的效果
    #ps:如果每个任务中都加上打印,那么明显地看到俩个任务的打印是,你一次我一次的,即并发执行的
    res=producer()
    stop=time.time()
    print(stop-start)

    4、协程的本质就是单线程下,由用户自己控制一个任务遇到io阻塞了就切换到另外一个任务去执行,以此来提升效率。为了实现它,需要同时满足俩种条件:
    a、可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便于重新运行时,可以基于暂停的位置继续执行。
    b、作为a的补充:可以自动检测到io操作,在遇到io的情况下才发生切换。

    5、协程的优缺点:
    a、优点:协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更轻量级,单线程内就可以实现并发的效果,最大限度地利用cpu
    b、缺点:协程的本质是单线程下,无法利用多核,可以是一个程序开多个进程,每个进程内开多个线程,每个线程内开启协程;协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程;

    总结协程的特点:
    1、必须在只有一个单线程内实现并发
    2、修改共享数据不需加锁
    3、用户程序里自己保持多个控制流的上下文栈
    4、附加:一个协程遇到IO操作自动切换到其他协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
    

    greenley模块

    没有解决遇到IO问题,自动切换的问题

    from greenlet import greenlet
    import time
    
    def eat(name):
        print('%s eat 1' %name)
        # time.sleep(10) #并没有实现遇到io就切换的问题
        g2.switch('egon')
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        g1.switch()
        print('%s play 2' %name)
    g1=greenlet(eat)
    g2=greenlet(play)
    g1.switch('egon')

    gevent模块

    解决整个程序中的IO操作:是由gevent下面的模块monkey来实现的,
    monkey.patch_all() 识别整个程序中的全部IO操作

    from gevent import monkey
    monkey.patch_all()  #识别整个程序中的全部io的操作
    
    import gevent
    import time
    def eat(name):
        print('%s eat 1 ' %name)
        time.sleep(1)
        print('%s eat 2 ' %name)
    def play(name):
        print('%s play 1' %name)
        time.sleep(3)
        print('%s play 2' %name)
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,'alex')
    
    g1.join()
    g2.join()  #等同于 gevent.joinall([g1,g2])

    协程实现单线程实现并发实例

    #服务端:
    from gevent import monkey,spawn
    monkey.patch_all()
    from threading import Thread,current_thread
    from socket import *
    
    def communicate(conn):
        print('线程2:%s'%current_thread().getName())  #查看线程的名字
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port):
        print('线程1:%s' %current_thread().getName())
        server=socket(AF_INET,SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
    
        while True:
            conn,addr=server.accept()
            print(addr)
            spawn(communicate,conn)
        server.close()
    
    if __name__=='__main__':
        g=spawn(server,'127.0.0.1',9090)
        g.join()
    #客户端:
    from socket import *
    from threading import current_thread,Thread
    def client():
        client=socket(AF_INET,SOCK_STREAM)
        client.connect(('127.0.0.1',9090))
    
        while True:
            client.send(('%s say hello' %current_thread().getName()).encode('utf-8'))
            data=client.recv(1024)
            print(data.decode('utf-8'))
        client.close()
    
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=client)
            t.start()
    展开全文
  • 多进程共享动态链接库的原理

    千次阅读 2013-02-28 19:25:41
    当多个进程共享dll时,其实内存中只...所谓的多进程共享其实就在内存中保留dll代码的空间内“做文章”,如多个进程共享一个dll时,使用LoadLibrary和GetProcAddress得到的地址是一样的,也说明内存中保留一份dll,每个

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 407,400
精华内容 162,960
关键字:

多进程原理