精华内容
下载资源
问答
  • 为了让进程间传递更的信息量,我们需要其他的进程间通信方式。这些进程间通信方式可以分为两种:管道(PIPE)机制。在Linux文本流中,我们提到可以使用管道将一个进程的输出和另一个进程的输入连接起来,从而利用文件...
    e361fd078bd75896608836db0de2c8fc.png

    信号可以看作一种粗糙的进程间通信(IPC, interprocess communication)的方式,用以向进程封闭的内存空间传递信息。为了让进程间传递更多的信息量,我们需要其他的进程间通信方式。这些进程间通信方式可以分为两种:

    • 管道(PIPE)机制。在Linux文本流中,我们提到可以使用管道将一个进程的输出和另一个进程的输入连接起来,从而利用文件操作API来管理进程间通信。在shell中,我们经常利用管道将多个进程连接在一起,从而让各个进程协作,实现复杂的功能。
    • 传统IPC (interprocess communication)。我们主要是指消息队列(message queue),信号量(semaphore),共享内存(shared memory)。这些IPC的特点是允许多进程之间共享资源,这与多线程共享heap和global data相类似。由于多进程任务具有并发性 (每个进程包含一个进程,多个进程的话就有多个线程),所以在共享资源的时候也必须解决同步的问题 (参考Linux多线程与同步)。

    管道与FIFO文件

    一个原始的IPC方式是所有的进程通过一个文件交流。比如我在纸(文件)上写下我的名字和年纪。另一个人读这张纸,会知道我的名字和年纪。他也可以在同一张纸上写下他的信息,而当我读这张纸的话,同样也可以知道别人的信息。但是,由于硬盘读写比较慢,所以这个方式效率很低。那么,我们是否可以将这张纸放入内存中以提高读写速度呢?

    在Linux文本流中,我们已经讲解了如何在shell中使用管道连接多个进程。同样,许多编程语言中,也有一些命令用以实现类似的机制,比如在Python子进程中使用Popen和PIPE,在C语言中也有popen库函数来实现管道 (shell中的管道就是根据此编写的)。管道是由内核管理的一个缓冲区(buffer),相当于我们放入内存中的一个纸条。管道的一端连接一个进程的输出。这个进程会向管道中放入信息。管道的另一端连接一个进程的输入,这个进程取出被放入管道的信息。一个缓冲区不需要很大,它被设计成为环形的数据结构,以便管道可以被循环利用。当管道中没有信息的话,从管道中读取的进程会等待,直到另一端的进程放入信息。当管道被放满信息的时候,尝试放入信息的进程会等待,直到另一端的进程取出信息。当两个进程都终结的时候,管道也自动消失。

    217de58ae3dbe9979d03aa1450b290d9.png

    从原理上,管道利用fork机制建立(参考Linux进程基础和Linux从程序到进程),从而让两个进程可以连接到同一个PIPE上。最开始的时候,上面的两个箭头都连接在同一个进程Process 1上(连接在Process 1上的两个箭头)。当fork复制进程的时候,会将这两个连接也复制到新的进程(Process 2)。随后,每个进程关闭自己不需要的一个连接 (两个黑色的箭头被关闭; Process 1关闭从PIPE来的输入连接,Process 2关闭输出到PIPE的连接),这样,剩下的红色连接就构成了如上图的PIPE。

    c9faab9f623a37842d4a6e121ec4bceb.png

    由于基于fork机制,所以管道只能用于父进程和子进程之间,或者拥有相同祖先的两个子进程之间 (有亲缘关系的进程之间)。为了解决这一问题,Linux提供了FIFO方式连接进程。FIFO又叫做命名管道(named PIPE)。

    FIFO (First in, First out)为一种特殊的文件类型,它在文件系统中有对应的路径。当一个进程以读(r)的方式打开该文件,而另一个进程以写(w)的方式打开该文件,那么内核就会在这两个进程之间建立管道,所以FIFO实际上也由内核管理,不与硬盘打交道。之所以叫FIFO,是因为管道本质上是一个先进先出的队列数据结构,最早放入的数据被最先读出来(好像是传送带,一头放货,一头取货),从而保证信息交流的顺序。FIFO只是借用了文件系统(file system, 参考Linux文件管理背景知识)来为管道命名。写模式的进程向FIFO文件中写入,而读模式的进程从FIFO文件中读出。当删除FIFO文件时,管道连接也随之消失。FIFO的好处在于我们可以通过文件的路径来识别管道,从而让没有亲缘关系的进程之间建立连接。

    传统IPC

    这几种传统IPC实际上有很悠久的历史,所以其实现方式也并不完善 (比如说我们需要某个进程负责删除建立的IPC)。一个共同的特征是它们并不使用文件操作的API。对于任何一种IPC来说,你都可以建立多个连接,并使用键值(key)作为识别的方式。我们可以在一个进程中中通过键值来使用的想要那一个连接 (比如多个消息队列,而我们选择使用其中的一个)。键值可以通过某种IPC方式在进程间传递(比如说我们上面说的PIPE,FIFO或者写入文件),也可以在编程的时候内置于程序中。

    在几个进程共享键值的情况下,这些传统IPC非常类似于多线程共享资源的方式(参看Linux多线程与同步):

    • semaphore与mutex类似,用于处理同步问题。我们说mutex像是一个只能容纳一个人的洗手间,那么semaphore就像是一个能容纳N个人的洗手间。其实从意义上来说,semaphore就是一个计数锁(我觉得将semaphore翻译成为信号量非常容易让人混淆semaphore与signal),它允许被N个进程获得。当有更多的进程尝试获得semaphore的时候,就必须等待有前面的进程释放锁。当N等于1的时候,semaphore与mutex实现的功能就完全相同。许多编程语言也使用semaphore处理多线程同步的问题。一个semaphore会一直存在在内核中,直到某个进程删除它。
    • 共享内存与多线程共享global data和heap类似。一个进程可以将自己内存空间中的一部分拿出来,允许其它进程读写。当使用共享内存的时候,我们要注意同步的问题。我们可以使用semaphore同步,也可以在共享内存中建立mutex或其它的线程同步变量来同步。由于共享内存允许多个进程直接对同一个内存区域直接操作,所以它是效率最高的IPC方式。

    消息队列(message queue)与PIPE相类似。它也是建立一个队列,先放入队列的消息被最先取出。不同的是,消息队列允许多个进程放入消息,也允许多个进程取出消息。每个消息可以带有一个整数识别符(message_type)。你可以通过识别符对消息分类 (极端的情况是将每个消息设置一个不同的识别符)。某个进程从队列中取出消息的时候,可以按照先进先出的顺序取出,也可以只取出符合某个识别符的消息(有多个这样的消息时,同样按照先进先出的顺序取出)。消息队列与PIPE的另一个不同在于它并不使用文件API。最后,一个队列不会自动消失,它会一直存在于内核中,直到某个进程删除该队列。

    多进程协作可以帮助我们充分利用多核和网络时代带来的优势。多进程可以有效解决计算瓶颈的问题。互联网通信实际上也是一个进程间通信的问题,只不过这多个进程分布于不同的电脑上。网络连接是通过socket实现的。由于socket内容庞大,所以我们不在这里深入。一个小小的注解是,socket也可以用于计算机内部进程间的通信。

    总结

    PIPE, FIFO

    semaphore, message queue, shared memory; key

    作者:Vamei 出处:http://www.cnblogs.com/vamei

    展开全文
  • 目录并发处理:多进程和多线程多线程的变量机制基本用法进程复制进程池进程间通讯进程池的基本用法进程池中的进程复制在进程池中利用子进程的返回值PipeQueue前置多进程和多线程的比较多进程的机制和代码实现多线程...

    本文中的内容来自我的笔记。撰写过程中参考了胡俊峰老师《Python程序设计与数据科学导论》课程的内容。

    目录并发处理:多进程和多线程多线程的变量机制

    基本用法

    进程复制

    进程池

    进程间通讯

    进程池的基本用法

    进程池中的进程复制

    在进程池中利用子进程的返回值

    Pipe

    Queue

    前置

    多进程和多线程的比较

    多进程的机制和代码实现

    多线程

    并发处理:协程基础使用

    wait_for()

    实现生产者-消费者协程

    用简单的生成器实现协程

    用回调函数(callback)将普通函数变为协程

    用async/await实现协程

    并发处理:多进程和多线程

    前置

    概念:并发:一段时间内同时推进多个任务,但不一定要在一个时刻同时进行多个任务。

    并行:一段时间内同时推进多个任务,且在一个时刻要同时进行多个任务。

    并行是并发的子集;单核CPU交替执行多个任务是并发但不是并行;多核CPU同时执行多个任务既是并发也是并行。

    何时需要并发?需要同时处理多个任务

    经常需要等待资源

    多个子过程互相协作

    电脑执行任务的机制:操作系统内核 负责任务(i.e. 进程/线程)的挂起与唤醒,和资源的分配(比如一个程序能访问哪些内存地址)

    进程是资源分配的最小单元,不同进程之间不共享资源(比如可访问的内存区域);进程可再分为线程,线程之间共享大部分资源。正是因为 是否共享资源 上的区别,线程间的切换(即挂起和唤醒)比进程间的切换更快。

    线程是调度执行的最小单元。这意味着操作系统内核会负责将多个线程并发执行。

    多进程和多线程的比较

    多进程:将任务拆分为多个进程来进行由内核决定是并行还是仅仅并发。

    进程间不共享内存优点:一个进程挂了不影响别的

    缺点:切换进程耗时大、进程间通信不便

    多线程:将任务拆分为一个进程内的多个线程来进行由内核决定是并行还是仅仅并发。

    在CPython解释器中有全局解释器锁,导致多线程只能并发而不能并行(多进程可以并行)。

    进程间共享内存锁机制:一个线程使用一个全局变量时,先等待其(被其他线程)解锁,再将其上锁,再使用,用后再解锁。

    数据科学中可以为了提高效率而不使用锁机制,但同时要容忍由此带来的差错。

    如果不使用锁的话:100个a+=1的线程执行完成后(初始a=0),a可能<100 。

    优点:切换耗时低、通信方便

    缺点:在并行时对全局变量要使用锁机制

    多进程的机制和代码实现

    以下介绍的函数中,几乎每一个有阻塞可能的,都会有一个可选的timeout参数。这件事将不再重提。

    基本用法from multiprocessing import Process

    import os

    import time

    def task(duration, base_time):

    pid = os.getpid()

    print(f'son process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')

    time.sleep(duration)

    print(f'son process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

    if __name__ == '__main__':

    pid = os.getpid()

    base_time = time.perf_counter()

    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')

    p1 = Process(target=task, args=(1,base_time)) # a process that executes task(1,base_time); currently not running

    p2 = Process(target=task, args=(2,base_time)) # a process that executes task(2,base_time); currently not running

    p1.start()

    p2.start()

    print(p1.is_alive(), p2.is_alive()) # whether they are running

    print('main process can proceed while son processes are running')

    p1.join() # wait until p1 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes

    p2.join() # wait until p2 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes

    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')main process id 3316 starts at 0.000001s

    True True

    main process can proceed while son processes are running

    son process id 15640 starts at 0.002056s with parameter 1

    son process id 10716 starts at 0.003030s with parameter 2

    son process id 15640 ends at 1.002352s

    son process id 10716 ends at 2.017861s

    main process id 3316 ends at 2.114324s

    如果没有p1.join()和p2.join(),主进程会在很短的时间内完成执行,而此时子进程仍在运行。输出结果如下:main process id 11564 starts at 0.000001s

    True True

    main process can proceed while son processes are running

    main process id 11564 ends at 0.011759s

    son process id 13500 starts at 0.004392s with parameter 1

    son process id 11624 starts at 0.003182s with parameter 2

    son process id 13500 ends at 1.009420s

    son process id 11624 ends at 2.021817s

    为何0.004秒的事件在0.003秒的事件之前被输出?因为print语句的耗时未被计算在内

    因为perf_counter()在Windows下有bug,它给出的时间在不同进程之间不完全同步。

    需要注意,一个子进程结束运行后仍然处于存活状态;只有被join()之后才会正式死亡(即被从系统中除名)。

    关于if __name__ == '__main__'::在Python中,可以通过import来获取其他文件中的代码;在文件B的开头(其他位置同理)import文件A,相当于把A在B的开头复制一份。

    如果在复制A的内容时,我们希望A中的一部分代码在执行时被忽略(比如说测试语句),就可以给A中的这些代码加上if __name__ == '__main__':对于从别处import来的代码,系统变量__name__在这段代码中会等于来源文件的名字(或模块名,这你不用在意);对于存在于本文件中的代码,__name__会等于__main__。

    由于某些原因,在Windows下,如果一个文件的代码中使用了多进程,则这个文件中会隐式地import自己(一次或多次);将所有零级缩进的代码放在if __name__ == '__main__':中,可以避免产生重复执行的问题(注意到如果不这样做的话,import来的副本中还会再次import自身,导致无限递归import并报错)。暂时可以认为,采取这一措施后就可完全消除“隐式import自身”所产生的效应。

    进程复制from multiprocessing import Process

    import os

    pid = os.getpid()

    def task():

    global pid

    print(pid)

    pid = 1

    print(pid)

    if __name__ == '__main__':

    p = Process(target=task)

    p.start()

    p.join()

    print(pid)

    在Windows下的输出:4836

    1

    2944

    在Linux下的输出:511

    1

    511

    前两个数都是由子进程输出,第三个数由父进程输出。注意到pid在子进程中被赋为1后,在父进程中并不是1。这说明,子进程的target函数中对运行环境的修改,不影响父进程的运行环境。事实上,反之也是成立的(父不影响子)。也就是说,一旦子进程的运行环境完成创建之后,父进程的运行环境与子进程的运行环境之间就完全独立。由于这个独立性,子进程的运算结果也无法直接反馈给父进程。稍后会介绍两种解决方式:1. 进程间通信 2. 利用 进程池apply方法的返回值。

    注意到一三行的输出在Windows下不同,而在Linux下相同。这说明,子进程中全局变量pid的取值,在Linux下是直接复制父进程中pid的取值而得到的,在Windows下是通过重新运行pid = os.getpid()而得到的。更一般地,有以下这两个事实:这就是之前提到的“隐式import自身”的机制。

    在Windows中,Process(target)创建出的子进程是一张白纸(即运行环境空空如也);当调用start()的时候,它会先通过import语句来将父进程的整个代码文件完整执行一遍(从而创建出一个新的运行环境),然后再开始运行target函数。所以,if __name__ == '__main__':包起来的代码,就只会被父进程执行;而未被包起来的零级缩进代码,则也会被每个子进程(在自己的运行环境里)各自执行一遍。

    在Linux中,Process(target)创建出的子进程,会全盘复制父进程的运行环境,而不会自己重新创建。复制出来的子进程运行环境,与父进程的运行环境完全独立。

    Linux下的进程复制方式称为fork,Windows下的进程复制方式称为spawn。关于这些,详见 https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn 。from multiprocessing import Process

    import os

    def task():

    pass

    if __name__ == '__main__':

    p = Process(target=task)

    print('son process created')

    p.start()

    print('son process starts')

    p.join()

    print('son process ends')

    print('gu?')

    在Windows下的输出son process created

    son process starts

    gu?

    son process ends

    gu?

    由此可见,Windows下子进程(在初始化时)在执行父进程的代码文件时,父进程中son_process.start()以后的内容(比如print('gu?'))也会被执行。

    进程池

    如果我们有很多的任务要同时进行,为每个任务各开一个进程既低效(创建和销毁进程开销大、无法全部并行、内核难以调度)又可能不被内核允许。

    解决方案:使用进程池,池中放着几个进程(一般不会比CPU的核数多很多),有新任务时找一个空闲进程分配给它,没有空闲进程则等待。缺点是没有空闲进程时需要等待,因此不能算是完全的并发。

    进程池的基本用法from multiprocessing import Pool

    import os, time

    def task(duration, base_time, task_name):

    pid = os.getpid()

    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')

    time.sleep(duration)

    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')

    if __name__ == '__main__':

    pid = os.getpid()

    base_time = time.perf_counter()

    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')

    pool = Pool(3) # a pool containing 3 subprocesses

    print('start assigning tasks')

    for i in range(4):

    pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running

    # if all son processes are busy, wait until one is free and then start

    pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running.

    print('all tasks assigned; wait for son processes to finish')

    pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called

    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')

    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

    输出:(Win和Linux下输出相似)main process id 5236 starts at 0.000002s

    start assigning tasks

    all tasks assigned; wait for son processes to finish

    son process id 8724 starts working on TaskNo.1 at 0.030557s with parameter 1

    son process id 14584 starts working on TaskNo.2 at 0.037581s with parameter 1

    son process id 10028 starts working on TaskNo.3 at 0.041210s with parameter 1

    son process id 14584 ends working on TaskNo.2 at 1.042662s

    son process id 8724 ends working on TaskNo.1 at 1.040211s

    son process id 14584 starts working on TaskNo.4 at 1.044109s with parameter 1

    son process id 10028 ends working on TaskNo.3 at 1.054017s

    son process id 14584 ends working on TaskNo.4 at 2.055515s

    all tasks finished at 2.214534s

    main process id 5236 ends at 2.214884s

    当使用apply_async(“异步调用”)添加任务时,主进程在子进程执行任务期间会继续运行;如果用apply(“同步调用”)添加任务,则主进程会暂停(“阻塞”)直到该任务完成。一般使用apply_async而不是apply。

    进程池中的进程复制from multiprocessing import Pool

    import os, time

    all_tasks_on_this_son_process = []

    def task(duration, base_time, task_name):

    global all_tasks_on_this_son_process

    pid = os.getpid()

    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)

    time.sleep(duration)

    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')

    all_tasks_on_this_son_process += [task_name]

    if __name__ == '__main__':

    pid = os.getpid()

    base_time = time.perf_counter()

    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')

    pool = Pool(3)

    print('start assigning tasks')

    for i in range(4):

    pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))

    pool.close()

    print('all tasks assigned; wait for son processes to finish')

    pool.join()

    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')

    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

    print('gu?')

    Windows下输出:main process id 6116 starts at 0.000001s

    start assigning tasks

    all tasks assigned; wait for son processes to finish

    gu?

    gu?

    gu?

    son process id 16028 starts working on TaskNo.1 at 0.037577s with parameter 1, this process already executed []

    son process id 11696 starts working on TaskNo.2 at 0.041393s with parameter 1, this process already executed []

    son process id 5400 starts working on TaskNo.3 at 0.038409s with parameter 1, this process already executed []

    son process id 11696 ends working on TaskNo.2 at 1.041521s

    son process id 16028 ends working on TaskNo.1 at 1.038722s

    son process id 11696 starts working on TaskNo.4 at 1.042543s with parameter 1, this process already executed ['TaskNo.2']

    son process id 5400 ends working on TaskNo.3 at 1.052573s

    son process id 11696 ends working on TaskNo.4 at 2.053483s

    all tasks finished at 2.167447s

    main process id 6116 ends at 2.167904s

    gu?

    在Windows下,池中的每个线程会在(且仅在)它分配到的的第一个任务将要开始执行时,运行一遍父进程的代码以构建运行环境。一个进程在前一个任务中对运行环境的改变,会原样体现在下一个任务的运行环境里。(即接受新任务的时候会直接继续使用上一个任务遗留下的运行环境)

    Linux下输出:main process id 691 starts at 0.000001s

    all tasks assigned; wait for son processes to finish

    son process id 692 starts working on TaskNo.1 at 0.104757s with parameter 1, this process already executed []

    son process id 693 starts working on TaskNo.2 at 0.104879s with parameter 1, this process already executed []

    son process id 694 starts working on TaskNo.3 at 0.105440s with parameter 1, this process already executed []

    son process id 692 ends working on TaskNo.1 at 1.106427s

    son process id 693 ends working on TaskNo.2 at 1.106426s

    son process id 694 ends working on TaskNo.3 at 1.107157s

    son process id 692 starts working on TaskNo.4 at 1.107560s with parameter 1, this process already executed ['TaskNo.1']

    son process id 692 ends working on TaskNo.4 at 2.110033s

    all tasks finished at 2.117158s

    main process id 691 ends at 2.117452s

    gu?

    在Linux下,池中的每个线程会在(且仅在)它的第一个任务将要开始执行时,从父进程将运行环境完整复制一遍。一个进程在前一个任务中对运行环境的改变,会原样体现在下一个任务的运行环境里。(即接受新任务的时候会直接继续使用上一个任务遗留下的运行环境)from multiprocessing import Pool

    import os, time

    all_tasks_on_this_son_process = []

    def init(init_name):

    global all_tasks_on_this_son_process

    all_tasks_on_this_son_process += [init_name]

    def task(duration, base_time, task_name):

    global all_tasks_on_this_son_process

    pid = os.getpid()

    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)

    time.sleep(duration)

    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')

    all_tasks_on_this_son_process += [task_name]

    if __name__ == '__main__':

    pid = os.getpid()

    base_time = time.perf_counter()

    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')

    pool = Pool(3, initializer=init, initargs=('init',)) # look here

    print('start assigning tasks')

    for i in range(4):

    pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))

    pool.close()

    print('all tasks assigned; wait for son processes to finish')

    pool.join()

    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')

    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

    输出(Win下与Linux下相似):main process id 18416 starts at 0.000004s

    start assigning tasks

    all tasks assigned; wait for son processes to finish

    son process id 10052 starts working on TaskNo.1 at 0.053483s with parameter 1, this process already executed ['init']

    son process id 17548 starts working on TaskNo.2 at 0.040412s with parameter 1, this process already executed ['init']

    son process id 10124 starts working on TaskNo.3 at 0.049992s with parameter 1, this process already executed ['init']

    son process id 10124 ends working on TaskNo.3 at 1.054387s

    son process id 17548 ends working on TaskNo.2 at 1.044956s

    son process id 10052 ends working on TaskNo.1 at 1.062396s

    son process id 10124 starts working on TaskNo.4 at 1.055888s with parameter 1, this process already executed ['init', 'TaskNo.3']

    son process id 10124 ends working on TaskNo.4 at 2.060094s

    all tasks finished at 2.443017s

    main process id 18416 ends at 2.444705s

    在进程池中利用子进程的返回值from multiprocessing import Pool

    import time

    def task(duration, base_time, task_name):

    time.sleep(duration)

    return f'{task_name} finished at {"%.6f" % (time.perf_counter()-base_time)}s'

    if __name__ == '__main__':

    base_time = time.perf_counter()

    pool = Pool(2)

    return_values = []

    return_values.append(pool.apply(task, args=(1,base_time,'TaskNo.1_sync')))

    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))

    return_values.append(pool.apply_async(task, args=(2,base_time,'TaskNo.2_async')))

    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))

    pool.close()

    pool.join()

    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')

    assert return_values[1].ready() == True

    return_values[1] = return_values[1].get() # from ApplyResult to true return value

    print('results:', return_values)at time 1.2109459, r_v is ['TaskNo.1_sync finished at 1.027223s']

    at time 1.2124976, r_v is ['TaskNo.1_sync finished at 1.027223s', ]

    all tasks finished at 3.258190s

    results: ['TaskNo.1_sync finished at 1.027223s', 'TaskNo.2_async finished at 3.041053s']

    这里在pool.join()之后调用result.get(),所以可以立刻得到 子进程所执行的函数的返回值;如果在对应的子进程尚未return时就调用result.get(),则主进程会阻塞直到子进程返回,然后获取子进程所执行的函数的返回值。result.ready()返回一个bool,表示对应的子进程是否已经return。

    此外,result.wait()会阻塞直到子进程返回,但不会获取返回值。

    一个ApplyResult实例可以多次调用get(),即可以多次获取返回值。

    详见 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult 。

    进程间通讯

    可以认为,任何一个被跨进程传送的对象,在传送过程中都会被深拷贝。

    Pipefrom multiprocessing import Process, Pipe

    import time

    def send_through_pipe(conn, pipe_name, sender_name, content, base_time):

    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    conn.send(content)

    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))

    def receive_from_pipe(conn, pipe_name, receiver_name, base_time):

    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    content = conn.recv()

    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))

    return content

    def task(conn, pipe_name, process_name, base_time):

    receive_from_pipe(conn, pipe_name, process_name, base_time)

    time.sleep(1)

    send_through_pipe(conn, pipe_name, process_name, 142857, base_time)

    if __name__ == '__main__':

    base_time = time.perf_counter()

    conn_A, conn_B = Pipe() # two endpoints of the pipe

    p1 = Process(target=task, args=(conn_B,'pipe','son',base_time))

    p1.start()

    time.sleep(1)

    send_through_pipe(conn_A, 'pipe', 'main', ['hello','hello','hi'], base_time) # any object can be sent

    receive_from_pipe(conn_A, 'pipe', 'main', base_time)

    p1.join()son tries to receive content from pipe at 0.036439

    main tries to send ['hello', 'hello', 'hi'] through pipe at 1.035570

    main successfully finishes sending at 1.037174

    main tries to receive content from pipe at 1.037318

    son successfully receives ['hello', 'hello', 'hi'] at 1.037794

    son tries to send 142857 through pipe at 2.039058

    son successfully finishes sending at 2.040158

    main successfully receives 142857 at 2.040441

    另外,还可以用conn.poll()(返回Bool类型)来获知conn中是否有对面发来的未读信息。from multiprocessing import Process, Pipe

    import time

    def send_through_pipe(conn, pipe_name, sender_name, content, base_time):

    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    conn.send(content)

    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))

    def receive_from_pipe(conn, pipe_name, receiver_name, base_time):

    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    content = conn.recv()

    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))

    return content

    def task1(conn, pipe_name, process_name, base_time):

    receive_from_pipe(conn, pipe_name, process_name, base_time)

    time.sleep(1)

    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)

    def task2(conn, pipe_name, process_name, base_time):

    time.sleep(1)

    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)

    time.sleep(2)

    receive_from_pipe(conn, pipe_name, process_name, base_time)

    if __name__ == '__main__':

    base_time = time.perf_counter()

    conn_A, conn_B = Pipe()

    p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time))

    p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time))

    p1.start()

    p2.start()

    p1.join()

    p2.join()son1 tries to receive content from pipe at 0.033372

    son2 tries to send greetings from son2 through pipe at 1.058998

    son2 successfully finishes sending at 1.060660

    son1 successfully receives greetings from son2 at 1.061171

    son1 tries to send greetings from son1 through pipe at 2.062389

    son1 successfully finishes sending at 2.063290

    son2 tries to receive content from pipe at 3.061378

    son2 successfully receives greetings from son1 at 3.061843

    由此可见:Pipe可以暂存数据,而且其暂存的数据符合FIFO规则。但是,Pipe用于暂存数据的区域大小比较有限(具体大小随OS而定),如果这个区域满了,send()就会被阻塞,直到对面用recv()腾出位置为止。

    Pipe的两个端点可以分配给任意两个进程。不建议把同一个端点分配给多个进程,这可能会带来风险;如果确实需要的话,请使用Queue。

    Queue

    本质上是一个能够跨进程运行的队列。

    Queue的操作的时间开销约为Pipe中对应操作的两倍。from multiprocessing import Process, Queue

    import time

    def put_into_queue(q, queue_name, putter_name, content, base_time):

    print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    q.put(content)

    print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time))

    def get_from_queue(q, queue_name, getter_name, base_time):

    print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))

    content = q.get()

    print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time))

    return content

    def task1(q, delay, queue_name, process_name, base_time):

    time.sleep(delay)

    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)

    time.sleep(5)

    get_from_queue(q, queue_name, process_name, base_time)

    def task2(q, delay, queue_name, process_name, base_time):

    time.sleep(delay)

    get_from_queue(q, queue_name, process_name, base_time)

    time.sleep(5)

    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)

    if __name__ == '__main__':

    base_time = time.perf_counter()

    q = Queue()

    put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time))

    get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time))

    get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time))

    put_and_get_1.start()

    get_and_put_1.start()

    get_and_put_2.start()

    put_and_get_1.join()

    get_and_put_1.join()

    get_and_put_2.join()putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883

    putAndGet_No.1 successfully finishes putting at 0.079291

    getAndPut_No.1 tries to receive content from queue at 1.104196

    getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489

    getAndPut_No.2 tries to receive content from queue at 2.126434

    putAndGet_No.1 tries to receive content from queue at 5.081044

    getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381

    getAndPut_No.1 successfully finishes putting at 6.107820

    getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565

    getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579

    getAndPut_No.2 successfully finishes putting at 11.112493

    putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546

    另外,如果Queue的大小实在过大以至于达到了某个上限,则put()操作也会被阻塞。不过应该很难把大小弄到那么大。

    多线程

    基本语法和多进程很相似,但机制上有重要的不同。由于全局解释器锁的存在,Python多线程并不实用,这里仅作简单介绍。

    从下图中可以看到,多线程的基本代码和多进程完全一致。下图中的代码在CPython解释器中会运行大约3s。

    另外,多线程中其实不需要这个if __name__ == '__main__':的判断。

    1d43db4d938c726763863fa89c9f46fd.png

    多线程的变量机制import threading

    lock_n = threading.Lock()

    n = 0

    def inc_n(m):

    global n

    lock_n.acquire(blocking=True)

    n += m

    lock_n.release()

    threads = [threading.Thread(target=inc_n, args=(i,)) for i in range(1,11)]

    [t.start() for t in threads]

    [t.join() for t in threads]

    print(n)55由上可见,不同的线程之间共享运行环境(比如上面的变量n)。

    lock.acquire(blocking=True) 会一直阻塞直到锁空出来为止;一旦空出来就会把它锁上。

    并发处理:协程

    不同的过程在同一个线程内交替执行。每个协程在运行时独占资源,一段运行结束后自阻塞,等待着被外部(如main函数)控制它的代码唤醒。

    相比多线程的优点:轻量级(在解释器层面实现,不需要内核来做切换)、数量不限。

    和多线程一样,不同协程之间共用运行环境。

    用简单的生成器实现协程def sum(init):

    s = init

    while True:

    delta = yield s # output s, and then input delta

    s += delta

    g = sum(0)

    print(next(g)) # run entil receiving the first output

    print(g.send(1)) # send the first input, and then get the second output

    print(g.send(2)) # send the second input, and then get the third output0

    1

    3

    上例中只是演示了生成器的自阻塞,以及生成器与其调用者之间的交互。

    更进一步,还可以定义多个生成器执行不同的过程,并在main函数里进行对它们的调度(比如实现一个任务队列),从而实现协程。

    用回调函数(callback)将普通函数变为协程def calc(n,callback):

    r = 0

    for i in range(n):

    r += i

    callback()

    def pause():

    print('pause')

    yield # just pause, do not return anything

    g = calc(10,pause)

    用async/await实现协程

    相比生成器实现的优点:可以在等待IO/等待网络通信等情况下时阻塞当前协程执行其他协程(而且不会中断等待IO/通信)以节省时间(而只用生成器则无法做到);使用更灵活、方便。多线程其实也有前一个优点。所以CPython下的多线程也并不是毫无用处,但它的用处是协程用处的子集。

    一个注意事项:若想通过协程加速IO,必须使用python中专门的异步IO库才行。

    基础使用import time

    start = time.perf_counter()

    def sayhi(delay):

    time.sleep(delay)

    print(f'hi! at {time.perf_counter() - start}')

    def main():

    sayhi(1)

    sayhi(2)

    main()hi! at 1.0040732999914326

    hi! at 3.015253899997333import time

    import asyncio

    start = time.perf_counter()

    async def sayhi(delay):

    await asyncio.sleep(delay)

    print(f'hi! at {time.perf_counter() - start}')

    async def main():

    sayhi1 = asyncio.create_task(sayhi(1))

    sayhi2 = asyncio.create_task(sayhi(2))

    await sayhi1

    await sayhi2

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythonhi! at 1.0037910000100965

    hi! at 2.0026504999987083

    上面的程序中:async 声明当前函数是一个协程。这一声明使得函数体内可以使用create_task和await,也使得该函数本身可以被create_task和await。一旦一个函数 f 被声明为协程,f()做的事就不再是运行 f,而只是创建一个协程对象并返回之(这个对象并不会自动被运行)。需要使用 asyncio 中的相关工具来运行这个对象。run(main()) 表示开始执行协程 main() 。要求 main() 必须是“主协程”,即它是整个程序中所有协程的入口(类似主函数)。一个程序中 run(main()) 只应被调用一次,且在 main() 之外不应有任何协程被调用。run(main()) 是阻塞的。协程的并发特性只有在main()内部才会显现,从外部来看这就是一个普普通通的黑箱调用。

    run()的作用是启动 运行协程所需的环境(并在main()完成后将其关闭)。但在IPython中,一开始运行就已经自动帮你启动好了,所以可以直接用await(而且也不必把所有协程都放在一个主协程中,而可以散布在程序各处)。create_task(sayhi(1)) 表示为协程sayhi(1)在某个“任务池”中创建一个任务,并且开始执行该任务。返回值是这个任务的handle,或者说“遥控器”。任务池中的任务会并发地执行。任务在何时可以中断并切换到别的任务,这一点由await指定。

    await sayhi1 有两重含义:如果这里await的不是sayhi1而是,比如说,一个接受http请求的操作,那么在解释器切换协程后不会影响对这个请求的等待。这就是asyncio的强大之处。

    这一点在await asyncio.sleep(delay)就有体现。asyncio.sleep()就具有“切换协程不影响等待”的特性。告诉解释器,现在当前协程(该语句所在的协程)开始阻塞,你可以切换协程了。阻塞当前协程(该语句所在的协程,这里是main())的执行,直到任务sayhi1完成。(类似Process.join())

    关于await的几件事:await的可以不是已创建的任务而是一个协程对象(比如await sayhi(1)),此时不会将其加入任务池,而会直接开始执行(当然,也可能刚开始执行就被切换到别的协程,因为用了await),并一直阻塞直到完成。这会导致sayhi(1)无法作为一个任务、与其他任务平等地参与并发,但是它仍然可以随着父协程(这里是main())的中断和恢复而间接地参与并发。

    能够被await的不只有协程对象和任务handle,还有任何awaitable object,即任何实现了__await__方法(从而告诉了解释器如何在自己刚开始执行时就阻塞并切换协程,且不影响内部可能在进行的等待和其他操作)的对象。

    await 的对象只可能在刚开始执行时立刻阻塞并切换协程。执行过程中其他可以阻塞的位置,是由这个对象内部使用的其他await语句指定的,而不是调用这个对象的那条await语句。import time

    import asyncio

    start = time.perf_counter()

    async def sayhi(delay):

    await asyncio.sleep(delay)

    print(f'hi! at {time.perf_counter() - start}')

    async def main():

    await sayhi(1)

    await sayhi(2)

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythonhi! at 1.0072715999995125

    hi! at 3.0168006000021705

    wait_for()

    把await A(A为任意awaitable)改成await asyncio.wait_for(A,timeout),就可以给await操作加上timeout秒的时限,一旦await了这么多秒还没有结束,就会中断A的执行并抛出asyncio.TimeoutError。

    不用关心wait_for具体做了什么,你只需要记住await asyncio.wait_for(A,timeout)这个句子就行。可以认为这个句子和await A在(除了timeout以外的)其他方面上没有区别。下面是例子。import time

    import asyncio

    async def eternity():

    await asyncio.sleep(3600)

    print('yay!')

    async def main():

    try:

    await asyncio.wait_for(eternity(), timeout=1.0)

    except asyncio.TimeoutError:

    print('timeout!')

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythontimeout!import time

    import asyncio

    start = time.perf_counter()

    async def sayhi(delay):

    await asyncio.sleep(delay)

    print(f'hi! at {time.perf_counter() - start}')

    async def main():

    sayhi1 = asyncio.create_task(sayhi(1))

    sayhi2 = asyncio.create_task(sayhi(2))

    await asyncio.wait_for(sayhi1,1.05)

    await asyncio.wait_for(sayhi2,1.05)

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythonhi! at 1.0181081000046106

    hi! at 2.0045300999918254import time

    import asyncio

    start = time.perf_counter()

    async def sayhi(delay):

    await asyncio.sleep(delay)

    print(f'hi! at {time.perf_counter() - start}')

    async def main():

    sayhi1 = asyncio.create_task(sayhi(1))

    sayhi2 = asyncio.create_task(sayhi(2))

    await asyncio.wait_for(sayhi1,0.95)

    await asyncio.wait_for(sayhi2,1.05)

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPython---------------------------------------------------------------------------

    TimeoutError                              Traceback (most recent call last)

     in 

    15

    16 # asyncio.run(main()) # use outside IPython

    ---> 17 await main() # use inside IPython

     in main()

    11     sayhi1 = asyncio.create_task(sayhi(1))

    12     sayhi2 = asyncio.create_task(sayhi(2))

    ---> 13     await asyncio.wait_for(sayhi1,0.95)

    14     await asyncio.wait_for(sayhi2,1.05)

    15

    ~\anaconda3\lib\asyncio\tasks.py in wait_for(fut, timeout, loop)

    488             # See https://bugs.python.org/issue32751

    489             await _cancel_and_wait(fut, loop=loop)

    --> 490             raise exceptions.TimeoutError()

    491     finally:

    492         timeout_handle.cancel()

    TimeoutError:

    hi! at 2.0194762000028277

    另外,注意到即使协程sayhi1抛出了异常,父协程main()仍然能够继续执行sayhi2。可见不同协程间是有一定的独立性的。

    实现生产者-消费者协程

    为此需要使用 asyncio.Queue 。它相比普通的队列的区别是,其put/get操作会在无法执行时阻塞(这一点和multiprocessing.Queue很像),而且这些操作都是协程(注意到,这使得你调用它们时只会返回协程对象而不会实际执行),可以await。import time

    import asyncio

    start = time.perf_counter()

    async def producer(q):

    for i in range(5):

    await asyncio.sleep(1) # producing takes 1 sec

    await q.put(i) # will wait if q is full

    print(f'put {i} at {time.perf_counter() - start}')

    await q.join() # will wait until all objects produced are **taken out** and **consumed**.

    async def consumer(q):

    for i in range(5):

    item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command.

    print(f'get {item} at {time.perf_counter() - start}')

    await asyncio.sleep(1) # consuming takes 1 sec

    q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough!

    print(f'consumed {item} at {time.perf_counter() - start}')

    async def main():

    q = asyncio.Queue()

    P = asyncio.create_task(producer(q))

    C = asyncio.create_task(consumer(q))

    await P

    await C

    print(f'done at {time.perf_counter() - start}')

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythonput 0 at 1.0108397000003606

    get 0 at 1.0112231999955839

    put 1 at 2.017216499996721

    consumed 0 at 2.0176210000063293

    get 1 at 2.0177472999930615

    put 2 at 3.0279211000015493

    consumed 1 at 3.0283254999958444

    get 2 at 3.028457599997637

    put 3 at 4.039952199993422

    consumed 2 at 4.041183299996192

    get 3 at 4.041302300000098

    put 4 at 5.0465819999953965

    consumed 3 at 5.04690839999239

    get 4 at 5.047016099997563

    consumed 4 at 6.047789799995371

    done at 6.048323099996196import time

    import asyncio

    start = time.perf_counter()

    async def sleep_and_put(q):

    await asyncio.sleep(1)

    await q.put(1)

    async def main():

    q = asyncio.Queue()

    C = asyncio.create_task(q.get())

    P = asyncio.create_task(sleep_and_put(q))

    await C

    await P

    print(f'finished at {time.perf_counter() - start}')

    # asyncio.run(main()) # use outside IPython

    await main() # use inside IPythonfinished at 1.01112650000141

    由上例可见,Queue.get()(其实Queue.put()等其他方法也一样)是一个协程,因此也可以给它创建任务以进行并发。

    展开全文
  • 嵌入式linux是嵌入式开发必不可少的一份子,在科技高速发展的今天,嵌入式已然已经成为了最热门的技术之一了。对于想要学习好嵌入式的学员来说,现在学习好linux是很有必要的,因为这个是嵌入式的核心。那么学习...

    嵌入式linux是嵌入式开发必不可少的一份子,在科技高速发展的今天,嵌入式已然已经成为了最热门的技术之一了。对于想要学习好嵌入式的学员来说,现在学习好linux是很有必要的,因为这个是嵌入式的核心。那么学习嵌入式linux编程开发应该要必备什么条件呢?

    105d277ba11f46016abbaa4f4e2e431e.png

      linux嵌入式编程开发学习时,你要从这几个方面来下手:

      首先就是熟知的C语言了,C语言是必须要学的,不管学习什么,都是要会的,对于C语言是嵌入式领域最重要也是最主要的编程语言,通过大量编程实例重点理解C语言的基础编程以及高级编程知识。包括:基本数据类型、数组、指针、结构体、链表、文件操作、队列、栈等。

      再来就是Linux基础了,在这里你需要了解linux的操作概念,安装方法,详细了解Linux下的目录结构、基本命令、编辑器VI,编译器GCC,调试器GDB和Make项目管理工具,ShellMakefile脚本编写等知识,嵌入式开发环境的搭建,这是比较重要的,这是学习linux编程开发的基础。

      接下来就是重点了Linux系统编程,重点学习标准I/O库,Linux多任务编程中的多进程和多线程,以及进程间通信(pipe、FIFO、消息队列、共享内存、signal、信号量等),同步与互斥对共享资源访问控制等重要知识,主要提升对Linux应用开发的理解和代码调试的能力。

      要会编程那也要会数据结构与算法,数据结构及算法在嵌入式底层驱动、通信协议、及各种引擎开发中会得到大量应用,对其掌握的好坏直接影响程序的效率、简洁及健壮性。此阶段的学习要重点理解数据结构与算法的基础内容,包括顺序表、链表、队列、栈、树、图、哈希表、各种查找排序算法等应用及其C语言实现过程,不要小看数据结构,这些都是基础。

      Linux网络编程,计算机网络在嵌入式Linux系统应用开发过程中使用非常广泛,通过Linux网络发展、TCP/IP协议、socket编程、TCP网络编程、UDP网络编程、Web编程开发等方面入手,全面了解Linux网络应用程序开发。重点学习网络编程相关API,熟练掌握TCP协议服务器的编程方法和并发服务器的实现,了解HTTP协议及其实现方法,熟悉UDP广播、多播的原理及编程方法,掌握混合C/S架构网络通信系统的设计,熟悉HTML,Javascript等Web编程技术及实现方法。

      然后就是C++、QT,C++是Linux应用开发主要语言之一,本阶段重点掌握面向对象编程的基本思想以及C++的重要内容。图形界面编程是嵌入式开发中非常重要的一个环节。由于QT具有跨平台、面向对象、丰富API、支持2D/3D渲染、支持XML、多国语等强大功能,在嵌入式领域的GUI开发中得到了广范的应用,在本阶段通过基于QT图形库的学习使学员可以熟练编写GUI程序,并移植QT应用程序到Cortex-A8平台。包括IDE使用、QT部件及布局管理器、信息与槽机制的应用、鼠标、键盘及绘图事件处理及文件处理的应用。这些都是你要牢牢掌握的。

      最后就是linux嵌入式编程开发的驱动开发了,驱动程序设计是嵌入式Linux开发工作中重要的一部分,也是比较困难的一部分。本阶段的学习要熟悉Linux的内核机制、驱动程序与用户级应用程序的接口,掌握系统对设备的并发操作。熟悉所开发硬件的工作原理,具备ARM硬件接口的基础知识,熟悉ARMCortex-A8处理器s5pv210各资源、掌握Linux设备驱动原理框架,熟悉工程中常见Linux高级字符设备、块设备、网络设备、USB设备等驱动开发,掌握这些你就可以胜任工作了。

    展开全文
  • 作者:Vamei出处:http://www.cnblogs.com/vamei我们在Linux信号...为了让进程间传递更的信息量,我们需要其他的进程间通信方式。这些进程间通信方式可以分为两种:管道机制:可以使用管道将一个进程的输出和另一...

    作者:Vamei

    出处:http://www.cnblogs.com/vamei

    我们在Linux信号基础中已经说明,信号可以看作一种粗糙的进程间通信(IPC, interprocess communication)的方式,用以向进程封闭的内存空间传递信息。为了让进程间传递更多的信息量,我们需要其他的进程间通信方式。这些进程间通信方式可以分为两种:

    管道机制可以使用管道将一个进程的输出和另一个进程的输入连接起来,从而利用文件操作API来管理进程间通信。在shell中,我们经常利用管道将多个进程连接在一起,从而让各个进程协作,实现复杂的功能。

    传统IPC主要是指消息队列(message queue),信号量(semaphore),共享内存(shared memory)。这些IPC的特点是允许多进程之间共享资源,这与多线程共享heap和global data相类似。由于多进程任务具有并发性 (每个进程包含一个进程,多个进程的话就有多个线程),所以在共享资源的时候也必须解决同步的问题 (参考Linux多线程与同步)。 

    管道与FIFO文件一个原始的IPC方式是所有的进程通过一个文件交流。比如我在纸(文件)上写下我的名字和年纪。另一个人读这张纸,会知道我的名字和年纪。他也可以在同一张纸上写下他的信息,而当我读这张纸的话,同样也可以知道别人的信息。但是,由于硬盘读写比较慢,所以这个方式效率很低。那么,我们是否可以将这张纸放入内存中以提高读写速度呢?

    在Linux文本流中,我们已经讲解了如何在shell中使用管道连接多个进程。同样,许多编程语言中,也有一些命令用以实现类似的机制,比如在Python子进程中使用Popen和PIPE,在C语言中也有popen库函数来实现管道 (shell中的管道就是根据此编写的)。

    管道是由内核管理的一个缓冲区(buffer),相当于我们放入内存中的一个纸条。管道的一端连接一个进程的输出。这个进程会向管道中放入信息。管道的另一端连接一个进程的输入,这个进程取出被放入管道的信息。一个缓冲区不需要很大,它被设计成为环形的数据结构,以便管道可以被循环利用。

    当管道中没有信息的话,从管道中读取的进程会等待,直到另一端的进程放入信息。当管道被放满信息的时候,尝试放入信息的进程会等待,直到另一端的进程取出信息。当两个进程都终结的时候,管道也自动消失。

    af35c3033595438a903a941fdd7d478b.png

    从原理上,管道利用fork机制建立(参考Linux进程基础和Linux从程序到进程),从而让两个进程可以连接到同一个PIPE上。最开始的时候,上面的两个箭头都连接在同一个进程Process 1上(连接在Process 1上的两个箭头)。当fork复制进程的时候,会将这两个连接也复制到新的进程(Process 2)。随后,每个进程关闭自己不需要的一个连接 (两个黑色的箭头被关闭; Process 1关闭从PIPE来的输入连接,Process 2关闭输出到PIPE的连接),这样,剩下的红色连接就构成了如上图的PIPE。

    d0b210897d26508d29ddf69e743e4979.png

    由于基于fork机制,所以管道只能用于父进程和子进程之间,或者拥有相同祖先的两个子进程之间 (有亲缘关系的进程之间)。为了解决这一问题,Linux提供了FIFO方式连接进程。FIFO又叫做命名管道(named PIPE)。

    FIFO (First in, First out)为一种特殊的文件类型,它在文件系统中有对应的路径。当一个进程以读(r)的方式打开该文件,而另一个进程以写(w)的方式打开该文件,那么内核就会在这两个进程之间建立管道,所以FIFO实际上也由内核管理,不与硬盘打交道。之所以叫FIFO,是因为管道本质上是一个先进先出队列数据结构,最早放入的数据被最先读出来(好像是传送带,一头放货,一头取货),从而保证信息交流的顺序。FIFO只是借用了文件系统(file system, 参考Linux文件管理背景知识)来为管道命名。写模式的进程向FIFO文件中写入,而读模式的进程从FIFO文件中读出。当删除FIFO文件时,管道连接也随之消失。FIFO的好处在于我们可以通过文件的路径来识别管道,从而让没有亲缘关系的进程之间建立连接。传统IPC这几种传统IPC实际上有很悠久的历史,所以其实现方式也并不完善 (比如说我们需要某个进程负责删除建立的IPC)。一个共同的特征是它们并不使用文件操作的API。对于任何一种IPC来说,你都可以建立多个连接,并使用键值(key)作为识别的方式。我们可以在一个进程中中通过键值来使用的想要那一个连接 (比如多个消息队列,而我们选择使用其中的一个)。键值可以通过某种IPC方式在进程间传递(比如说我们上面说的PIPE,FIFO或者写入文件),也可以在编程的时候内置于程序中。在几个进程共享键值的情况下,这些传统IPC非常类似于多线程共享资源的方式(参看Linux多线程与同步):

    #semaphoremutex类似,用于处理同步问题。我们说mutex像是一个只能容纳一个人的洗手间,那么semaphore就像是一个能容纳N个人的洗手间其实从意义上来说,semaphore就是一个计数锁,它允许被N个进程获得。当有更多的进程尝试获得semaphore的时候,就必须等待有前面的进程释放锁。当N等于1的时候,semaphore与mutex实现的功能就完全相同,一个semaphore会一直存在在内核中,直到某个进程删除它。

    #共享内存与多线程共享global data和heap类似。一个进程可以将自己内存空间中的一部分拿出来,允许其它进程读写。当使用共享内存的时候,我们要注意同步的问题。我们可以使用semaphore同步,也可以在共享内存中建立mutex或其它的线程同步变量来同步。由于共享内存允许多个进程直接对同一个内存区域直接操作,所以它是效率最高的IPC方式。

    消息队列(message queue)与PIPE相类似。它也是建立一个队列,先放入队列的消息被最先取出。不同的是,消息队列允许多个进程放入消息,也允许多个进程取出消息。每个消息可以带有一个整数识别符(message_type)。你可以通过识别符对消息分类 (极端的情况是将每个消息设置一个不同的识别符)。

    某个进程从队列中取出消息的时候,可以按照先进先出的顺序取出,也可以只取出符合某个识别符的消息(有多个这样的消息时,同样按照先进先出的顺序取出)。消息队列与PIPE的另一个不同在于它并不使用文件API。最后,一个队列不会自动消失,它会一直存在于内核中,直到某个进程删除该队列。

    多进程协作可以帮助我们充分利用多核和网络时代带来的优势。多进程可以有效解决计算瓶颈的问题。互联网通信实际上也是一个进程间通信的问题,只不过这多个进程分布于不同的电脑上。网络连接是通过socket实现的。由于socket内容庞大,所以我们不在这里深入。一个小小的注解是,socket也可以用于计算机内部进程间的通信。

    6d8bb85c8ca21017f90009b1d686f82c.png

    展开全文
  • 前言对于一个大型的程序,划分子模块(比如 DDD,...你有以下几种选择:1)单线程2)共进程多线程部署3)共 docker 跨进程部署4)跨 vm(虚拟机)部署对于跨 vm 的场景,进程间主要靠 socket 通信(创建套接字)。而对于同...
  • Linux Socket + pthread + pipe 实现socket通信和多线程数据共享 大道至简,基础的东西不能忘,对于网路时代,开发应该都跑不了socket,作为一个基本功,也好久没写写,简单了一个复习下。主要目标复习下socket的...
  • linux多线程通信(一)

    千次阅读 2018-09-21 20:11:57
    linux下进行多线程编程,肯定会涉及到线程通信问题,本文主要分析pipe,即管道在多线之间通信实现。 #include&amp;amp;amp;amp;lt;unistd.h&amp;amp;amp;amp;gt; int pipe(int filedes[2]); 返回值:成功...
  • 多线程 同步的方法 1. 临界区 2. 互斥量(注意mutex只能用于线程的互斥,不能用于进程) 3. 信号量 4. 事件 多进程 同步方法 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的...
  • 线程之间通信的方式有哪些? 7 种进程间的通信方式: (1) 管道(pipe):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有血缘关系的进程间使用。进程的血缘关系通常指父子进程关系。 (2)有名管道...
  • tcp协议是全双工的,这就可以看成是两条单工信道,单工信道是管道PIPE,在对端调用close后,对端将发送一个FIN标志给本端,在本端进行read返回0的话,就表示对端关闭了对端负责的一条管道,但是本端这一条管道还可以...
  • Linux多线程编程—进程间通信一.匿名管道二.有名管道:亲属 非亲属三.信号四. 共享内存五.消息队列六.信号量 一.匿名管道 (1) int pipe(int pipefd[2]); 功能:创建管道 将读端的文件描述符返回到pipefd[0] 将写端...
  • 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信;  2. 信号(Signal):信号...
  • linux_进程线程通信

    2016-01-12 14:35:51
    一、进程间的通信方式 # 管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘...# 信号量(semophore ) : 信号量是一个计数器,可以用来控制个进程对共享资源的访问。它常作为一种
  • 首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。  不管是文件,还是套接字,还是管道,我们都可以把他们看作流。  之后我们来讨论I/O的操作,通过read...
  • 管道:pipe,fd0,fd1,read/write,一边关闭读,一边关闭写,半双工 消息队列:ftok,msgget/msgctl 共享内存:ftok,shmget/shmat/shmdt,最快 套接字:socket DBUS:消息总线,重量级 2. 睡眠 Sleep
  • linux学习-pipe

    2015-09-08 16:33:30
    下面管道的例子是多进程之间的用法, 如果在多线程中使用pipe, 在任何一个线程中关闭pipe[0] 或者 pipe[1] 都会导致另外一个线程无法正常使用pipe, 不像多进程中进程1中关闭pipe[0] , 进程2中还可以用pipe[0] 。
  • 我写了一个服务器程序, 在Windows下在cygwin环境编译后执行, 然后用C#写了多线程客户端进行压力测试. 程序一直运行正常. 但当在Linux下测试时, 总是莫名退出. 最后跟踪到是write调用导致退出. 用gdb执行程序, 退出时...
  • linux下进程间通信的几种主要手段简介:  1. 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘...
  • 1.创建任务并行/************************************************************************* > File Name: pipe6.c > Author: zhuan > Mail: 2065286676@qq.com > Created Time: 2016年10月18日 星期二 15...
  • 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信; 信号(Signal):信号是比较...
  • 对于多线程,其共享同一进程下进程的虚拟地址空间。包括运行时堆区域,进程的.text代码段,.rodata只读...早期的Linux对进程间通信支持的良好,以至于在很长一段时间内,Linux不支持多线程。 管道是一种特殊的文件,.
  • # 无名管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系或者兄弟进程。 # 有名管道 (FIFO) : 有名管道也是半双工的通信...
  • 探测c段存活主机多线程脚本 import subprocess as p # p等同于subprocess import threading def ping(ip,se): pings=p.Popen('ping -n 2 %s' % ip ,shell=True,stdin=p.PIPE,stdout=p.PIPE,stderr=p.PIPE,encoding=...
  • linux进程间通信方式 管道(pipe),流管道(s_pipe)和有名管道(FIFO) 信号(signal) 消息队列 共享内存 信号量 ...b,读写锁允许线程同时读共享数据,而对写操作是互斥的。 c,条件变量可以以原...
  • [使用 异步多线程TCP Socket 实现进程间通信(VC 6.0 , BCB6.0调试通过)进程间通信有很多种方式,比如说 Pipe,共享内存,DDE,Socket等,关于进程通信方面的知识我现在肉鸡上面linux越来越多,都默认安装了python,有时候...
  • linux进程通信的方式 管道( pipe): 管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。 命名管道(namedpipe): 命名管道也是半双工的...
  • PIPE和FIFO用来实现进程间相互发送非常短小的、频率很高的消息;这两种方式通常适用于两个进程间的通信。...在多进程、多线程、多模块所构成的今天最常见的分布式系统开发中,socket是第一选择。消息队列...

空空如也

空空如也

1 2 3 4 5 6
收藏数 108
精华内容 43
关键字:

linuxpipe多线程

linux 订阅