• python线程学习笔记

    2019-03-15 21:59:04
    线程 常用方法介绍 为啥 要使用多线程 使用多线程应该注意的问题. thread中join 的用法 线程安全 线程间如何通信 , 锁机制比较复杂的内容 . ...python线程编程 为什么会有多线程呢? 多线程的优势是什么呢? 首先举...

    线程 常用方法介绍
    为啥 要使用多线程
    使用多线程应该注意的问题.

    thread中join 的用法
    线程安全
    线程间如何通信 , 锁机制比较复杂的内容 .
    Queue,线程 同步问题 Event, Condition 等…

    死锁问题
    线程池的使用, 为啥要有线程池呢?
    结合实战,看看项目中如何使用多线程?

    1 思考

    python 多线程编程 为什么会有多线程呢?

    多线程的优势是什么呢?

    首先举个例子,如果 你有两件事情要做,这两件事情相对相关性不高. 如果我选择 先做事情A, 在做事情B, 假设 A 需要5s , B 需要2s . 那么 如果要把两件事情做完,是不是要需要7s 时间呢?

    能不能这样呢? 我 A 和B 同时做,这样是不是我可以用 5s的时间把事情做完呢.

    1-1 入门示例
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 18:46
    @File    : preface.py
    @Author  : frank.chang@shoufuyou.com
    """
    
    import time
    import threading
    
    
    def job1():
        time.sleep(5)
        print('job1 finished')
    
    
    def job2():
        time.sleep(2)
        print('job2 finished')
    
    
    def multithread():
        start = time.time()
    
        # 创建 线程
        job1_thread = threading.Thread(target=job1, name='JOB1')
    
        # 创建线程
        job2_thread = threading.Thread(target=job2, name='JOB2')
    
        # 启动线程
        job1_thread.start()
        job2_thread.start()
    
        job2_thread.join()
        job1_thread.join()
    
        end = time.time()
        print(f"all time: {end-start}")
    
    
    def singlethread():
        start = time.time()
    
        job1()
        job2()
    
        end = time.time()
    
        print(f"all time: {end-start}")
    
        pass
    
    
    
    if __name__ == '__main__':
        pass
        print("main thread begin")
        singlethread()
        multithread()
        print("main  thread  done")
    
    
    

    结果如下:
    multithread result:

    job2 finished
    job1 finished
    all time: 5.00341010093689
    

    singlethread result :

    job1 finished
    job2 finished
    all time: 7.008417129516602
    

    从结果看出, 显然是多线程情况下, 即’同时’ 干两件事,速度快点. 如果 你有很多事情 是不是也可以 一起工作提高 效率呢? 所以 这就是多线程存在的意义.

    在这里插入图片描述

    image

    什么是多线程编程
    线程创建

    如何创建一个线程呢? 有两种方法.

    • 方法1
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 18:16
    @File    : testthread1.py
    @Author  : frank.chang@shoufuyou.com
    
    
    # 线程的创建
    """
    
    import threading
    
    import time
    import random
    
    
    def job(num):
        time.sleep(2)
    
        ret = num + 1
    
        print(f"ret:{ret}")
    
        return num + 1
    
    
    if __name__ == '__main__':
        print("main thread begin")
    
        # 创建一个线程, target 就是对应 要创建 的函数, args 对应函数 的参数
        thread = threading.Thread(target=job, args=(10,))
    
        # 启动线程
        thread.start()
    
        print("main  thread  done")
    
    • 方法2
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 18:16
    @File    : testthread1.py
    @Author  : frank.chang@shoufuyou.com
    
    
    # 线程的创建2
    """
    
    import threading
    import time
    import random
    
    
    class MyJob(threading.Thread):
    
        def __init__(self, name='myjob', num=0):
            super().__init__(name=name)
    
            self.num = num
    
        def run(self) -> None:
            time.sleep(2)
    
            ret = self.num + 1
            print(f"ret:{ret}")
    
    
    if __name__ == '__main__':
        print("main  thread begin")
    
        myjob = MyJob(num=10)
        myjob.start()
        print("main  thread  done")
    
        pass
    

    线程中常用方法

    thread.is_alive() # 判断 线程是否存活

    thread.ident # 这个是属性, 拿到活动线程的唯一标识, 是一个非零整数.如果线程没有启动则是None.

    thread.daemon # 属性, 设置该线程是否 为 daemon,之后我会给出 什么叫daemon 线程.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 19:42
    @File    : thread_method.py
    @Author  : frank.chang@shoufuyou.com
    """
    
    import time
    import threading
    
    
    def job1():
        time.sleep(5)
        print('job1 finished')
    
    
    if __name__ == '__main__':
        job1_thread = threading.Thread(target=job1, name='JOB1')
    
        print(f"isalive: {job1_thread.is_alive()}")
    
        print(f"thread name: {job1_thread.getName()}")
    
        """
         线程 属性 , 这个 可以用来唯一标识 一个线程,但是 如果线程退出后,可能 这个 值 会被重复使用!
         如果线程没有启动则返回None 
         返回唯一标识当前线程的非零整数 .
         同时存在的其他线程, 这可用于识别每线程资源。
         尽管在某些平台上线程标识可能看起来像 分配从1开始的连续数字,这种行为不应该可以依赖,这个数字应该被视为一个神奇的饼干。
         线程的标识可以在退出后重新用于另一个线程。
        """
    
        print(f"thread ident:{job1_thread.ident}")
    
        # 开启线程
        job1_thread.start()
    
        print(f"isalive: {job1_thread.is_alive()}")
    
        print(f"thread.ident:{job1_thread.ident}")
    

    result 如下:

    isalive: False
    thread name: JOB1
    thread ident:None
    isalive: True
    thread.ident:123145335967744
    job1 finished
    
    官方文档    https://docs.python.org/3/library/threading.html
    
    线程的daemon 属性
    • 还有daemon 属性
    一个布尔值,指示此线程是否为守护程序线程(True)或不是(False). 必须在调用start() 之前设置它,否则引发RuntimeError. 它的初始值继承自创建线程; 主线程不是守护程序线程,因此在主线程中创建的所有线程都默认为daemon = False
    
    
    这个属性 就是默认值 是False
    
    官方解释: 
    The entire Python program exits when no alive non-daemon threads are left.
    当没有剩下活着的非守护程序线程时,整个Python程序退出.
    

    看一个简单的例子理解一下:

    import time
    import threading
    
    
    def job1():
        for _ in range(5):
            print('job1 sleep 1 second.')
            time.sleep(1)
        print('job1 finished')
    
    
    if __name__ == '__main__':
        print("====main thread begin====")
    
        print('Thread name: {}'.format(threading.current_thread()))
    
        # 创建一个线程 ,默认情况
        job1_thread = threading.Thread(target=job1, name='JOB1',daemon=False)
        # job1_thread.daemon = True
    
        # 启动线程
        job1_thread.start()
    
        print('Main Thread :  Hello World')
        print("====main  thread  done====")
    

    看下这个图形
    在这里插入图片描述
    image

    结果如下:

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    job1 sleep 1 second.
    Main Thread :  Hello World
    ====main  thread  done====
    job1 sleep 1 second.
    job1 sleep 1 second.
    job1 sleep 1 second.
    job1 sleep 1 second.
    job1 finishe
    
    

    结果分析:

    可以看出 主线程结束之后, job1 线程是非daemon 线程. 所以当主线程退出的时候, job1线程 还是要 执行完成 才会退出. 
    
    如果把 daemon 这个属性 设置成 True, 则结果可能不是我想要的, 
    主线程 退出的时候, job1 也被迫退出了. 
    

    来看下另一个示例:

    def job1():
        for _ in range(5):
            print('job1 sleep 1 second.')
            time.sleep(1)
        print('job1 finished')
    
    
    if __name__ == '__main__':
        print("====main thread begin====")
    
        print('Thread name: {}'.format(threading.current_thread()))
    
        # 创建一个线程 设置daemon=True
        job1_thread = threading.Thread(target=job1, name='JOB1',daemon=True)
        # job1_thread.daemon = True
    
        # 启动线程
        job1_thread.start()
    
        print('Main Thread :  Hello World')
        print("====main  thread  done====")
    
    
    

    结果如下:

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    job1 sleep 1 second.
    Main Thread :  Hello World
    ====main  thread  done====
    

    可以 清楚的看到主线程退出了, job1 其实还没有完成任务,被迫退出了任务. 这个可能不是我们想要的结果.

    看下运行时的图形:
    在这里插入图片描述
    image

    设置 daemon 线程 有两种方式.可以在 线程创建 的时候, 也可以创建完成后 直接设置.

        job1_thread = threading.Thread(target=job1, name='JOB1',daemon=True)
    
        job1_thread = threading.Thread(target=job1, name='JOB1')
        job1_thread.daemon = True
    
    简单总结一下:  daemon 线程设置
    
    如果设置成daemon 为True ,这 主线程完成的时候, 子线程会跟着退出的. 
    daemon 设置成  False(默认值) , 主线程 退出的时候, 子线程必须完成任务后才会退出. 
    

    thread中join 的用法

    python3对多线程join的理解


    线程安全

    什么是线程安全?

    线程安全百科

    线程安全
    线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。

    线程不安全
    线程不安全就是不提供数据访问保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据.

    来看下这个代码

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/9 16:07
    @File    : python_gil.py
    @Author  : frank.chang@shoufuyou.com
    python  中的
    
    
    CPython 解释器的实现
    GIL  使得 同一时刻 只有一个线程 在一个cpu 上执行字节码,  无法将多个线程  映射到 多个CPU 上 .
    
    gil  会根据 字节码行数, 以及 时间片 , 释放 gil
    
    gil  遇到IO 操作 的时候 ,这个时候  锁会被释放掉. 让给其他线程.
    
    # def add(a):
    #     a = a + 1
    #
    #     return a
    #
    # print(dis.dis(add))
    
    """
    
    import threading
    import dis
    
    total = 0
    
    
    def add():
        # 把total 减100w 次 
        global total
    
        for i in range(100_0000):
            total += 1
    
    
    def desc():
        global total
        # 把total加100w 次 
        for i in range(100_0000):
            total -= 1
    
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    print(f"total:{total}")
    

    结果 是什么呢? 为什么会出现这样的结果?

    TODO 结果分析:

    主要是因为 +1 ,-1 并不是线程安全的操作. 如果 说 在 +1 或者-1 的过程中 另外的线程 拿到了CPU , 那么就会出现 数据计算不正确的情况.

    
    CPython 解释器的实现
    GIL  使得 同一时刻 只有一个线程 在一个cpu 上执行字节码,  无法将多个线程  映射到 多个CPU 上 .
    
    GIL  会根据 字节码行数, 以及 时间片 , 释放 gil
    
    GIL  遇到IO 操作 的时候 ,这个时候锁会被释放掉.让给其他线程.
    
    如何取thread 运算结果,thread 间 如何通信?

    有的时候 可能开启一个线程任务,希望可以拿到 线程的结果这个时候 应该怎么做呢 ?

    我的建议可以用 python中 queue.Queue 这个对象.因为这个首先是一个阻塞队列, 并且是线程安全的.即使在多线程环境下,也可以保证线程的安全.

    比如一个线程 需要把 list 中每一个数据 都要进行 翻倍的操作.这个时候就需要使用一个数据结构,把结果存放起来,可以用queue 存放结果.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 20:42
    @File    : thread communicate.py
    @Author  : frank.chang@shoufuyou.com
    """
    
    import time
    import threading
    
    from queue import Queue
    
    
    def job1(datas: list, q: Queue):
        """
        需要返回结果, 怎么处理呢?
        可以把 处理结果存入到 Queue 中 .
    
        对datas 中的 数据 简单 的*2 操作.
        :param datas: list
        :param q: Queue 对象
        :return:
        """
    
        time.sleep(2)
        for data in datas:
            tmp = data * 2
            # 把结果 存放到q中
            q.put(tmp)
    
    if __name__ == '__main__':
    
        q = Queue()
        datas = list(range(10))
        # 创建一个线程
        job1_thread = threading.Thread(target=job1, args=(datas, q), name='JOB1')
    
        # 启动线程
        job1_thread.start()
    
        # 等待线程结束
        job1_thread.join()
    
        while not q.empty():
            #  取出结果
            print(q.get())
    

    结果如下:

    0
    2
    4
    6
    8
    10
    12
    14
    16
    18
    

    lock 的使用

    为什么要使用锁呢?
    其实就是保证线程安全操作, 因为线程间变量 是共享的, 为了 是每个线程,操作一个变量的时候,不受到影响. 需要 同一时间 只能有一个 线程 执行相应的代码块.

    import  threading  
    help(type(threading.Lock()))
    
    

    举个例子 有两个线程, 他们 工作 的任务就是数数, job1 从 0 …9 , job2 从 100,…109
    但是 这两个线程 都不想被打扰, 就是我一旦开始工作, 不希望有人 打扰我数数, 怎么办呢?

    import time
    import threading
    import random
    
    
    def job1():
        """
        数数 的 任务 , 从  1,2...9
        :return:
        """
    
        for i in range(0, 10):
            time.sleep(random.randint(1, 10) * 0.1)
            print(f'job1 :{i}')
    
    
    def job2():
        """
         数数 的 任务 , 从  100,101,...  109
        :return:
        """
    
        for i in range(100, 110):
            time.sleep(random.randint(1, 10) * 0.1)
            print(f'job2 :{i}')
    
    
    if __name__ == '__main__':
        print("====main thread begin====")
    
        print('Thread name: {}'.format(threading.current_thread()))
    
        job1_thread = threading.Thread(target=job1, name='JOB1')
        job2_thread = threading.Thread(target=job2, name='JOB2')
    
        job1_thread.start()
        job2_thread.start()
    
        job1_thread.join()
        job2_thread.join()
    
        print('Main Thread :  Hello World')
        print("====main  thread  done====")
    
    

    结果如下:

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    job2 :100
    job2 :101
    job1 :0
    job2 :102
    job1 :1
    job1 :2
    job1 :3
    job1 :4
    job2 :103
    job1 :5
    job2 :104
    job1 :6
    job2 :105
    job1 :7
    job2 :106
    job2 :107
    job1 :8
    job2 :108
    job1 :9
    job2 :109
    Main Thread :  Hello World
    ====main  thread  done====
    

    从 执行结果看, job2 先开始工作, 之后 job1 插入了进来, 然后job2 又插入了进来. 这样如此反复, 导致这两个job 都不能好好的工作. 不能安心的把事情干完. 就被迫停下来,继续 等其他工作线程.

    有一个办法: 可以在 工作线程中加锁,来解决这个问题.

    看下 加锁的代码:

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 19:42
    @File    : thread_method.py
    @Author  : frank.chang@shoufuyou.com
    
    
    在多线程程序中安全使用可变对象,你需要使用 threading 库中的 Lock 对象
    
    
    refer:
    
    https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p04_locking_critical_sections.html
    
    https://github.com/MorvanZhou/tutorials/blob/master/threadingTUT/thread6_lock.py
    
    https://juejin.im/post/5b17f4305188257d6b5cff6f
    
    
    """
    
    import time
    import threading
    import random
    
    
    def job1():
        """
        数数 的 任务 , 从  1,2...9
        :return:
        """
        global lock
        # 加锁
        with lock:
            for i in range(0, 10):
                time.sleep(random.randint(1, 10) * 0.1)
                print(f'job1 :{i}')
    
    
    def job2():
        """
         数数 的 任务 , 从  100,101,...  109
        :return:
        """
        global lock
        # 加锁
        with lock:
            for i in range(100, 110):
                time.sleep(random.randint(1, 10) * 0.1)
                print(f'job2 :{i}')
    
    
    if __name__ == '__main__':
        print("====main thread begin====")
        print('Thread name: {}'.format(threading.current_thread()))
    
        # 定义一个锁
        lock = threading.Lock()
        job1_thread = threading.Thread(target=job1, name='JOB1')
        job2_thread = threading.Thread(target=job2, name='JOB2')
    
        job1_thread.start()
        job2_thread.start()
    
        job1_thread.join()
        job2_thread.join()
    
        print('Main Thread :  Hello World')
        print("====main  thread  done====")
    

    结果如下:

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    job1 :0
    job1 :1
    job1 :2
    job1 :3
    job1 :4
    job1 :5
    job1 :6
    job1 :7
    job1 :8
    job1 :9
    job2 :100
    job2 :101
    job2 :102
    job2 :103
    job2 :104
    job2 :105
    job2 :106
    job2 :107
    job2 :108
    job2 :109
    Main Thread :  Hello World
    ====main  thread  done====
    
    

    这样可以看到, job1, job2 之间 就能安全工作了. 每个线程都不会打扰另外的线程,都安心的开始数数了.

    在举一个例子 :

    有一个线程函数 功能是:接收一个num, 把这个num, 加入到 一个list 里面.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/28 19:42
    @File    : thread_method.py
    @Author  : frank.chang@shoufuyou.com
    
    
    在多线程程序中安全使用可变对象,你需要使用 threading 库中的 Lock 对象
    
    
    refer:
    
    https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p04_locking_critical_sections.html
    
    https://github.com/MorvanZhou/tutorials/blob/master/threadingTUT/thread6_lock.py
    
    https://juejin.im/post/5b17f4305188257d6b5cff6f
    
    
    Python threading中lock的使用   https://blog.csdn.net/u012067766/article/details/79733801
    
    """
    
    import time
    import threading
    import random
    
    
    def job11(num=None):
        global lock
        with lock:
            # 模拟一些费时间的操作
            time.sleep(random.randint(1, 10) * 0.1)
            # 关键代码 加锁
            datas.append(num)
            print(datas)
    
    
    def job(num=None):
        # 模拟一些费时间的操作
        time.sleep(random.randint(1, 10) * 0.1)
        # 关键代码 
        datas.append(num)
        print(datas)
    
    
    if __name__ == '__main__':
        print("====main thread begin====")
        print('Thread name: {}'.format(threading.current_thread()))
    
        # 定义一个锁
        lock = threading.Lock()
    
        # 数据
        datas = []
    
        for i in range(10):
            t = threading.Thread(target=job, args=(i,))
            t.start()
    
        print("====main  thread  done====")
    
    

    结果如下:

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    ====main  thread  done====
    [9]
    [9, 4]
    [9, 4, 8]
    [9, 4, 8, 7]
    [9, 4, 8, 7, 3]
    [9, 4, 8, 7, 3, 1]
    [9, 4, 8, 7, 3, 1, 2]
    [9, 4, 8, 7, 3, 1, 2, 5]
    [9, 4, 8, 7, 3, 1, 2, 5, 0]
    [9, 4, 8, 7, 3, 1, 2, 5, 0, 6]
    

    可以看出 由于每个线程 拿到list 的时机不一样, 所以 list 打印的结果也不一样.

    # 换成 加锁的代码
     t = threading.Thread(target=job11, args=(i,))
    

    把上面的 target=job11 ,job11 定义了一个lock , 这样就可以锁住关键的代码, 同一时刻,只会有一个线程进入到代码段.

    ====main thread begin====
    Thread name: <_MainThread(MainThread, started 140735991698304)>
    ====main  thread  done====
    [0]
    [0, 1]
    [0, 1, 2]
    [0, 1, 2, 3]
    [0, 1, 2, 3, 4]
    [0, 1, 2, 3, 4, 5]
    [0, 1, 2, 3, 4, 5, 6]
    [0, 1, 2, 3, 4, 5, 6, 7]
    [0, 1, 2, 3, 4, 5, 6, 7, 8]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    加了一个锁 ,之后 发现代码 就能按照预想的方式 打印了.

    如何 解决死锁问题??

    12.5 防止死锁的加锁机制

    在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。举个例子:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。 解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的

    threading 中 event的使用

    https://docs.python.org/3/library/threading.html

    Event(事件):事件处理的机制:全局定义了一个内置标志Flag,
    如果Flag值为 False,那么当程序执行 event.wait()方法时就会阻塞,
    如果Flag值为True,那么event.wait() 方法时便不再阻塞。

    demon1 中 event 的基本使用
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/1 15:04
    @File    : thread_event.py
    @Author  : frank.chang@shoufuyou.com
    
    refer :
    python笔记12-python多线程之事件(Event)  https://www.cnblogs.com/yoyoketang/p/8341972.html
    
        event.is_set()
    
        event.wait()
    
        event.set()
    
        event.clear()
    
    Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位真,则其他线程等待直到信号接触。
    Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等用于实现线程间的通信。
    
    event = threading.Event() 创建一个event
    
    1 设置信号
    event.set()
    
    使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的状态。
    当使用event对象的set()方法后,isSet()方法返回真
    
    
    # isSet(): 获取内置标志状态,返回True或False
    
    
    2 清除信号
    event.clear()
    
    使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假
    
    3 等待
    event.wait()
    
    Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,
    则wait方法一直等待到其为真时才返回。也就是说必须set新号标志位真
    
    
    
    Event(事件):事件处理的机制:全局定义了一个内置标志Flag,
    
    如果Flag值为 False,那么当程序执行 event.wait()方法时就会阻塞,
    如果Flag值为True,那么event.wait() 方法时便不再阻塞。
    
    """
    
    import threading
    import time
    
    
    def eat_hotpot(name):
        """
        吃火锅的函数
        :param name:
        :return:
        """
        # 等待事件,进入等待阻塞状态
        print('%s 已经启动' % threading.currentThread().getName())
        print('小伙伴 %s 已经进入就餐状态!' % name)
        time.sleep(2)
        event.wait()
        # 收到事件后进入运行状态
        print('%s 收到通知了.' % threading.currentThread().getName())
        print('小伙伴 %s 开始吃咯!' % name)
    
    
    def test1():
    
        # 设置线程组
        threads = []
    
        # 创建新线程
        thread1 = threading.Thread(target=eat_hotpot, name='frank-thread', args=("Frank",))
        thread2 = threading.Thread(target=eat_hotpot, name='shawn-thread', args=("Shawn",))
        thread3 = threading.Thread(target=eat_hotpot, name='laoda-thread', args=("Laoda",))
    
        # 添加到线程组
        threads.append(thread1)
        threads.append(thread2)
        threads.append(thread3)
    
        # 开启线程
        for thread in threads:
            thread.start()
    
        time.sleep(0.1)
        # 发送事件通知
        print('主线程通知小伙伴开吃咯!')
        # 把 flag 设置成 True
        event.set()
        # print(f" event.is_set():{event.is_set()}")
    
    
    if __name__ == '__main__':
    
        # 创建一个事件
        event = threading.Event()
        test1()
    
    

    结果如下:

    frank-thread 已经启动
    小伙伴 Frank 已经进入就餐状态!
    shawn-thread 已经启动
    小伙伴 Shawn 已经进入就餐状态!
    laoda-thread 已经启动
    小伙伴 Laoda 已经进入就餐状态!
    主线程通知小伙伴开吃咯!
    frank-thread 收到通知了.
    小伙伴 Frank 开始吃咯!
    shawn-thread 收到通知了.
    小伙伴 Shawn 开始吃咯!
    laoda-thread 收到通知了.
    小伙伴 Laoda 开始吃咯!
    

    demo2

    假设 有两个线程 ,有一个 打印 1 3 5 , 有一个线程 打印 2 4 6
    他们如何协调工作 完成 按数数的顺序打印 每一个数字呢?

    如果同时启动 两个线程,会导致 每个线程 执行的机会 可能 完全不一样, 如何控制呢?

    import threading
    from threading import Event
    import  time,random
    
    def print_odd():
        for item in [1, 3, 5,7,9]:
            time.sleep(random.randint(1,10)*0.1)
            print(item)
    
    
    def print_even():
        for item in [2, 4, 6,8,10]:
            time.sleep(random.randint(1,10)*0.1)
    
            print(item)
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=print_odd)
        t2 = threading.Thread(target=print_even)
        t1.start()
        t2.start()
    
    

    来看下 如何实现 这个功能呢?
    用event 来实现 线程之间的同步

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/5 13:54
    @File    : thread_event2.py
    @Author  : frank.chang@shoufuyou.com
    
    
    python多线程(6)---事件 Event  http://www.zhangdongshengtech.com/article-detials/185
    
    """
    
    import threading
    from threading import Event
    
    
    def print_odd(e1, e2):
        """
        打印 奇数
        """
        for item in [1, 3, 5]:
            e1.wait()
    
            print(item)
            e1.clear()
            e2.set()
    
    
    def print_even(e1, e2):
        """
        打印 偶数
        """
        for item in [2, 4, 6]:
            e1.wait()
    
            print(item)
            e1.clear()
            e2.set()
    
    
    if __name__ == '__main__':
        e1, e2 = Event(), Event()
        t1 = threading.Thread(target=print_odd, args=(e1, e2))
        t2 = threading.Thread(target=print_even, args=(e2, e1))
        t1.start()
        t2.start()
        e1.set()
    
    打印结果为:  1 2 3  4 5 6  
    

    通过 event.wait() , event.set(),event.clear() 来控制 线程之间协调工作.


    theading中 Condition 的使用

    线程之间的同步,condition介绍. condition 可以认为是更加高级的锁.

    https://my.oschina.net/lionets/blog/194577

    https://docs.python.org/3/library/threading.html

    condition内部是含有锁的逻辑,不然也没法保证线程之间的同步。

    
    condition  介绍: 
    
    常用方法: 
    acquire([timeout])/release(): 调用关联的锁的相应方法。
     with cond:
        pass
    
    
    condition 实现了上下文协议, 可以使用 with 语句 
    
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
        Wait until notified or until a timeout occurs.
    
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);
            其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
    notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。
                使用前线程必须已获得锁定,否则将抛出异常。
    
    
    
    Notice:
    
    condition 原理 介绍:
    condition 实际上有两层锁, 一把锁 底层锁,会在 线程调用 wait 方法的时候释放.
    上面的锁(第二把锁) 会在在每次调用wait 方法的时候 ,分配一把锁,并且放入到cond 的等待队列中, 等其他线程 notify唤醒该线程.
    
    注意 :
     调用 with cond 之后, 才能调用  wait  ,notify 方法  ,这两个方法必须拿到锁之后 才能使用.
    
    

    举个吃火锅的例子,来说明吃火锅的如何使用线程完成同步的.

    假设有一个需求:
    Eat hotpot 开始吃火锅, 的时候一次 放入5盘 牛肉 ,等 10min, 食物才能 煮熟, 才能开始吃.

    吃完后,通知 继续加食物 ,5盘 羊肉 , 等10min, 吃完就结束了. (大家 都吃饱了.)

    这个用 生产者消费者来模拟
    要保证每次开始吃的 时候, 食物必须要是熟的.
    用condition 实现 线程同步.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/5 13:54
    @File    : thread_condition0.py
    @Author  : frank.chang@shoufuyou.com
    
    refer:
    https://www.cnblogs.com/yoyoketang/p/8337118.html
    
    
    Condition():
    
    acquire(): 线程锁
    release(): 释放锁
    wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。
                    wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
    notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。
                    notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
    notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
    
    
    
    """
    
    import threading
    import time
    
    
    class Producer(threading.Thread):
    
        def __init__(self, cond):
            super().__init__()
            self.cond = cond
    
        def run(self):
            global num
    
            # 锁定线程
            self.cond.acquire()
    
            print("开始添加食物: 牛肉")
            for i in range(5):
                num += 1
    
            # 来模拟10min
            print('\n食物 正在 煮熟中.... 请稍等.')
            time.sleep(2)
            print(f"\n现在 火锅里面牛肉个数:{num}")
            self.cond.notify()
            self.cond.wait()
    
            # 开始添加 羊肉
            print("添加食物:羊肉")
            for i in range(5):
                num += 1
            print('\n食物 正在 煮熟中.... 请稍等...')
    
            time.sleep(2)
    
            print(f"\n现在  火锅里面羊肉个数:{num}")
    
            self.cond.notify()
            self.cond.wait()
    
            # 释放锁
            self.cond.release()
    
    
    class Consumers(threading.Thread):
        def __init__(self, cond):
            super().__init__()
            self.cond = cond
    
        def run(self):
            self.cond.acquire()
            global num
    
            self.cond.wait()
            print("------食物已经熟了,开始吃啦------")
    
            for _ in range(5):
                num -= 1
                print("火锅里面剩余食物数量:%s" % str(num))
                time.sleep(1)
    
            print("\n锅底没食物了,赶紧加食物吧!")
            self.cond.notify()  # 唤醒其它线程,wait的线程
    
            self.cond.wait()
            print("------食物已经熟了,开始吃啦------")
            for _ in range(5):
                num -= 1
                print("火锅里面剩余食物数量:%s" % str(num))
                time.sleep(1)
    
            print('\n吃饱了,今天火锅真好吃!')
            self.cond.notify()
    
            self.cond.release()
    
    
    if __name__ == '__main__':
        condition = threading.Condition()
    
        # 每次放入锅的数量
        num = 0
        p = Producer(condition)
        c = Consumers(condition)
    
        c.start()
        p.start()
    
    
    

    结果如下:

    开始添加食物: 牛肉
    
    食物 正在 煮熟中.... 请稍等.
    
    现在 火锅里面牛肉个数:5
    ------食物已经熟了,开始吃啦------
    火锅里面剩余食物数量:4
    火锅里面剩余食物数量:3
    火锅里面剩余食物数量:2
    火锅里面剩余食物数量:1
    火锅里面剩余食物数量:0
    
    锅底没食物了,赶紧加食物吧!
    添加食物:羊肉
    
    食物 正在 煮熟中.... 请稍等...
    
    现在  火锅里面羊肉个数:5
    ------食物已经熟了,开始吃啦------
    火锅里面剩余食物数量:4
    火锅里面剩余食物数量:3
    火锅里面剩余食物数量:2
    火锅里面剩余食物数量:1
    火锅里面剩余食物数量:0
    
    吃饱了,今天火锅真好吃!
    
    demo1

    通过条件, 实现两个线程的同步.
    例子 是两个机器人的对话. 两个一起数数. 要保证
    小爱说一句, 天猫说一句. 来看下例子.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 09:52
    @File    : test_condition.py
    @Author  : frank.chang@shoufuyou.com
    
    
    测试 condition
    
    acquire([timeout])/release(): 调用关联的锁的相应方法。
     with cond:
        pass
    
    
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
    
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);
            其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
    notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。
                使用前线程必须已获得锁定,否则将抛出异常。
    
    
    """
    
    import threading
    from threading import Condition
    
    
    class XiaoAi(threading.Thread):
    
        def __init__(self, cond):
            super().__init__(name='小爱')
            self.cond = cond
    
        def run(self) -> None:
            with self.cond:
                self.cond.wait()
                print(f"{self.name}: 在呢")
                self.cond.notify()
    
                self.cond.wait()
                print(f"{self.name}: 好啊.")
                self.cond.notify()
    
                for num in range(2, 11, 2):
                    self.cond.wait()
                    print(f'{self.name}: {num} ')
                    self.cond.notify()
    
    
    class TianMao(threading.Thread):
    
        def __init__(self, cond):
            super().__init__(name='天猫精灵')
            self.cond = cond
    
        def run(self) -> None:
            with self.cond:
                print(f'{self.name}: 小爱同学')
                self.cond.notify()
                self.cond.wait()
    
                print(f'{self.name}: 我们来数数吧')
                self.cond.notify()
                self.cond.wait()
    
                for num in range(1, 10, 2):
                    print(f'{self.name}: {num}')
                    self.cond.notify()
                    self.cond.wait()
    
                # self.cond.notify_all()
    
    
    if __name__ == '__main__':
        cond = Condition()
    
        xiaoai = XiaoAi(cond)
        tianmao = TianMao(cond)
    
        # 启动 顺序 很重要
        # 调用 with cond 之后, 才能调用  wait  ,notify 方法  ,这两个方法 必须拿到锁之后 才能使用.
        #  condition  有两层锁,一把底层锁会在线程调用了 wait 方法的时候释放, 上面的锁会在每次调用wait方法时候,分配一把,并方式 到cond 队列,等待 notify 唤醒.
        xiaoai.start()
        tianmao.start()
    
    

    threading中 Semaphore,BoundedSemaphore 信号量
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 11:17
    @File    : test_Semaphore.py
    @Author  : frank.chang@shoufuyou.com
    
    Semaphore   信号量
    
    
    实现 是通过 threading.Condition() 来实现的, 来控制启动线程的数量.可以看看源码. 
    
    acquire()
    release()
    
    每当调用acquire()时,内置计数器-1
    每当调用release()时,内置计数器+1
    
    
    1 控制启动线程数量的类
    
    
    """
    import threading
    import time
    
    from threading import Semaphore, BoundedSemaphore
    
    
    class HtmlParser(threading.Thread):
    
        def __init__(self, url, sem, name='html_parser'):
            super().__init__(name=name)
            self.url = url
            self.sem = sem
    
        def run(self):
            time.sleep(2)
            print('got html success')
            
            # 注意在这里释放信号量,完成任务后释放.
            self.sem.release()
    
    
    class UrlProducer(threading.Thread):
    
        def __init__(self, sem, name='url_produce'):
            super().__init__(name=name)
            self.sem = sem
    
        def run(self):
            for i in range(20):
                self.sem.acquire()
                html_thread = HtmlParser(url=f"http://www.baidu.com/item={i}",
                                         sem=self.sem
                                         )
    
                html_thread.start()
    
    
    if __name__ == '__main__':
        sem = Semaphore(5)  # 最多有5个线程.
        url_producer = UrlProducer(sem)
        url_producer.start()
    
    

    结果如下:
    每次输出 5个 返回结果

    got html success
    got html success
    got html success
    got html success
    got html success
    ....
    ....
    ....
    
    

    python threading 线程隔离

    参考文档

    12.6 保存线程的状态信息

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/2/27 10:43
    @File    : test_local.py
    @Author  : frank.chang@shoufuyou.com
    """
    from threading import local, Thread, currentThread
    
    # 定义一个local实例
    local_data = local()
    
    # 在主线中,存入name这个变量
    local_data.name = 'local_data'
    
    
    class MyThread(Thread):
    
        def __init__(self, name) -> None:
            super().__init__(name=name)
    
        def run(self) -> None:
            print(f"before:  {currentThread()} ,  {local_data.__dict__}")
    
            local_data.name = self.getName()
    
            print(f"after:  {currentThread()} ,  {local_data.__dict__}")
    
    
    if __name__ == '__main__':
        print("开始前-主线程:", local_data.__dict__)
    
        t1 = MyThread(name='T1')
        t1.start()
    
        t2 = MyThread(name='T2')
        t2.start()
    
        t1.join()
        t2.join()
        print("结束后-主线程:", local_data.__dict__)
    
        pass
    
    

    结果如下:

    开始前-主线程: {'name': 'local_data'}
    before:  <MyThread(T1, started 123145487306752)> ,  {}
    after:  <MyThread(T1, started 123145487306752)> ,  {'name': 'T1'}
    before:  <MyThread(T2, started 123145487306752)> ,  {}
    after:  <MyThread(T2, started 123145487306752)> ,  {'name': 'T2'}
    结束后-主线程: {'name': 'local_data'}
    

    线程池的使用,为啥要有线程池呢?
    初步常用函数
    task.done()   # 判断是否 完成 
    task.result()
    task.cancel()
    
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 13:53
    @File    : test_futures.py
    @Author  : frank.chang@shoufuyou.com
    
    线程池  是什么?     
    
    为什么 需要线程池?
    
    1  控制线程的数量
    2  获取某一个线程的状态 以及返回值 
    3  当一个线程 完成的时候,我们主线程能够立即知道
    4  concurrent.futures 这个包 可以让 多线程,多进程编程 变得如此简单. 多线程 与多进程接口几乎一致. 
    
    
    """
    
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def get_html(seconds):
        time.sleep(seconds)
    
        print(f"get_html {seconds} success.")
    
        return 'success'
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    task1 = executor.submit(get_html, 2)
    task2 = executor.submit(get_html, 4)
    task3 = executor.submit(get_html, 6)
    
    
    # 取消task3 , 只要task3 没有开始运行,是可以直接取消的.
    task3.cancel()
    
    
    print(f"task2.result(): {task2.result()}")
    
    
    # print(f"task1.done():{task1.done()}")
    # time.sleep(3)
    #
    # print(f"task1.done():{task1.done()}")
    #
    # print(f"task2.result(): {task2.result()}")  # 阻塞的方法
    
    
    
    如何使用线程池?

    ThreadPoolExecutor 源码分析 https://www.jianshu.com/p/b9b3d66aa0be

    ThreadPoolExecutor
    class ThreadPoolExecutor(_base.Executor):
    
        # Used to assign unique thread names when thread_name_prefix is not supplied.
        _counter = itertools.count().__next__
    
        def __init__(self, max_workers=None, thread_name_prefix=''):
            """Initializes a new ThreadPoolExecutor instance.
    
            Args:
                max_workers: The maximum number of threads that can be used to
                    execute the given calls.
                thread_name_prefix: An optional name prefix to give our threads.
            """
            if max_workers is None:
                # Use this number because ThreadPoolExecutor is often
                # used to overlap I/O instead of CPU work.
                max_workers = (os.cpu_count() or 1) * 5
            if max_workers <= 0:
                raise ValueError("max_workers must be greater than 0")
    
            self._max_workers = max_workers
            self._work_queue = queue.Queue()
            self._threads = set()
            self._shutdown = False
            self._shutdown_lock = threading.Lock()
            self._thread_name_prefix = (thread_name_prefix or
                                        ("ThreadPoolExecutor-%d" % self._counter()))
    
    

    有几点说明 :

    max_workers  最多的线程数 ,这个线程池里面,最多同时又多少线程在启动.
    self._work_queue  这个任务队列,提交的任务都会在这里.
    self._threads  启动线程的集合 
    self._shutdown  是否关闭 线程池的标志位 
    self._shutdown_lock  一把锁 
    self._thread_name_prefix  线程前缀名称 
    
    
    demo1

    executor.submit 这种方式,返回的结果不一定是顺序的.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 15:27
    @File    : test_futures_demo1.py
    @Author  : frank.chang@shoufuyou.com
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    import random
    
    
    def sleep(s):
        time.sleep(s * 0.1)
        return s
    
    
    if __name__ == '__main__':
    
        wait_for = []
        executor = ThreadPoolExecutor(4)
        l = list(range(10))
        random.shuffle(l)
        for seconds in l:
            future = executor.submit(sleep, seconds)
            wait_for.append(future)
            print('Scheduled for {}:{}'.format(seconds, future))
    
        results = []
    
        for f in as_completed(wait_for):
            res = f.result()
            msg = '{} result:{!r}'
            print(msg.format(f, res))
            results.append(res)
    
        print(l)
        print(results)
    
    

    结果如下

    Scheduled for 5:<Future at 0x109d76160 state=running>
    Scheduled for 3:<Future at 0x109d84e80 state=running>
    Scheduled for 0:<Future at 0x109d9a748 state=finished returned int>
    Scheduled for 9:<Future at 0x109d9a7f0 state=running>
    Scheduled for 1:<Future at 0x109e55780 state=pending>
    Scheduled for 4:<Future at 0x109e55e48 state=pending>
    Scheduled for 2:<Future at 0x109e5c908 state=pending>
    Scheduled for 6:<Future at 0x109e5c898 state=pending>
    Scheduled for 8:<Future at 0x109e5ca90 state=pending>
    Scheduled for 7:<Future at 0x109e5cfd0 state=pending>
    <Future at 0x109d9a748 state=finished returned int> result:0
    <Future at 0x109e55780 state=finished returned int> result:1
    <Future at 0x109d84e80 state=finished returned int> result:3
    <Future at 0x109d76160 state=finished returned int> result:5
    <Future at 0x109e5c908 state=finished returned int> result:2
    <Future at 0x109e55e48 state=finished returned int> result:4
    <Future at 0x109d9a7f0 state=finished returned int> result:9
    <Future at 0x109e5c898 state=finished returned int> result:6
    <Future at 0x109e5cfd0 state=finished returned int> result:7
    <Future at 0x109e5ca90 state=finished returned int> result:8
    [5, 3, 0, 9, 1, 4, 2, 6, 8, 7]
    [0, 1, 3, 5, 2, 4, 9, 6, 7, 8]
    
    

    看结果返回 是无序的,和任务提交的不一样的顺序的.

    demo2

    用map 我发现是有序的.

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 15:27
    @File    : test_futures_demo2.py
    @Author  : frank.chang@shoufuyou.com
    """
    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    
    def sleep(s):
        time.sleep(s * 0.1)
        return s
    
    
    if __name__ == '__main__':
    
        e = ThreadPoolExecutor(4)
        s = list(range(10))
    
        # 打乱顺序
        random.shuffle(s)
        print(s)
        start = time.time()
        for i in e.map(sleep, s):
            print(i, end='  ')
        print('\nelapsed:{} s'.format(time.time() - start))
        
        
    
    """
    看结果返回是 有序的, s 列表的顺序就是结果返回的顺序. 
    """
       
    [2, 5, 6, 7, 1, 3, 0, 9, 4, 8]
    2  5  6  7  1  3  0  9  4  8  
    elapsed:1.4122741222381592 s
    

    map 返回的结果一定是有序的, 再看一个例子

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @Time    : 2019/3/10 15:27
    @File    : test_futures_demo3.py
    @Author  : frank.chang@shoufuyou.com
    """
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    all_list = range(100)
    
    
    def fun(num):
        print(f'fun({num}) begin sleep 4s ')
        time.sleep(4)
        return num + 1
    
    
    with ThreadPoolExecutor() as executor:
        for result in executor.map(fun, all_list):                                               
            print(f"result:{result}")
    
    

    这两者的区别是什么呢?

    首先 executor.map 这中提交方式 非常像 python 中的 map 函数, 这样 就是 把一个函数 映射到一个序列中 ,
    
    def map(self, fn, *iterables, timeout=None, chunksize=1) :pass 
    
    
    好处:  1 返回 值,和任务提交顺序一致
           2 map 函数的返回值 ,是这个线程的运行的结果. 
    
    缺点:  1 提交任务的函数,是同一个函数,并且函数的参数 只能有一个.
    
    
    executor.submit 这种方式的提交, 相对比较灵活, 
    看下 submit 定义
    第一个参数 是函数名称, 第二个是 位置参数,第三个是关键字参数.
    def submit(self, fn, *args, **kwargs):pass
    
    
    这种好处是:  1 每次提交的函数 可以是不同的, 参数 可以根据实际需要进行传递
                 2  submit 函数 会返回一个 future 对象. 
                 3 可以通过 as_complete 这个方法 ,来取到任务完成的情况,也是比较方便.  需要调用f.result() 取值
                 
    缺点: 1  任务完成的返回,是无法确定顺序的. 只是根据情况, 只要完成就返回.
    
    总结

    多线程编程,或者说并发编程是一个比较复杂的话题, 建议还是使用一些已经使用的工具包,这样可以避免出现莫名奇怪的问题, 线程间数据交换可以考虑使用 Queue 这个队列 是线程安全的. 方便我们操作数据的安全性.
    说到python 多线程编程 肯定要说下 GIL ,其实 GIL 也没有那么糟糕,如何 你的应用是 I/O多一些, socket 编程, 网络请求, 其实多线程 是完全没有任何问题的.

    参考文档

    Python的多线程编程模块 threading 参考 https://my.oschina.net/lionets/blog/194577
    python多线程、锁、event事件机制的简单使用 https://segmentfault.com/a/1190000014619654

    Python并发编程之线程消息通信机制任务协调(四)https://juejin.im/post/5b1a9d7d518825137661ae82

    官方文档 https://docs.python.org/3/library/threading.html

    分享快乐,留住感动. 2019-03-15 21:47:35 --frank
    展开全文
  • python线程学习

    2020-03-30 17:25:24
    线程的概念 线程是CPU调度的基本单元,线程运行于进程当中,一个进程中可以运行很多个线程。同一进程中的所有线程共享内存空间 threading Thread # 表示一个线程的执行的对象 start() # 开始线程的执行 run() #...

    线程的概念

    线程是CPU调度的基本单元,线程运行于进程当中,一个进程中可以运行很多个线程。同一进程中的所有线程共享内存空间

    threading
    	Thread                   # 表示一个线程的执行的对象
    		start()              # 开始线程的执行
    		run()                # 定义线程的功能的函数(一般会被子类重写)
    		join(timeout=None)   # 允许主线程等待线程结束,程序挂起,直到线程结束;如果给了timeout,则最多等待timeout秒.
    		getName()            # 返回线程的名字
    		setName(name)        # 设置线程的名字
    		isAlive()            # 布尔标志,表示这个线程是否还在运行中
    		isDaemon()           # 返回线程的daemon标志
    		setDaemon(daemonic)  # 后台线程,把线程的daemon标志设置为daemonic(一定要在调用start()函数前调用)
    		# 默认主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)
    	Lock              # 锁原语对象
    	Rlock             # 可重入锁对象.使单线程可以在此获得已获得了的锁(递归锁定)
    	Condition         # 条件变量对象能让一个线程停下来,等待其他线程满足了某个条件.如状态改变或值的改变
    	Event             # 通用的条件变量.多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活
    	Semaphore         # 为等待锁的线程提供一个类似等候室的结构
    	BoundedSemaphore  # 与Semaphore类似,只是不允许超过初始值
    	Time              # 与Thread相似,只是他要等待一段时间后才开始运行
    	activeCount()     # 当前活动的线程对象的数量
    	currentThread()   # 返回当前线程对象
    	enumerate()       # 返回当前活动线程的列表
    	settrace(func)    # 为所有线程设置一个跟踪函数
    	setprofile(func)  # 为所有线程设置一个profile函数
    

    python中多线程

    import os
    import time
    from random import randint
    from threading import Thread
    
    
    def download(name):
        print("name:{} download task pid :{}, parent pid:P{}".format(name, os.getpid(), os.getppid()))
        download_time = randint(5, 10)
        time.sleep(download_time)
        print("download task finished cost_time:{}".format(download_time))
    
    
    if __name__ == '__main__':
        start = time.time()
        print("current python_thread pid is:%d" % os.getpid())
        thread_list = []
        for i in range(10):
            t = Thread(target=download, args=(i,))
            t.start()
            # t.run()
            thread_list.append(t)
    
        for j in thread_list:
            j.join()
    
        end = time.time()
        print("total cost_time:%.3f" % (end - start))
    

    自定义线程

    import time
    from random import randint
    from threading import Thread
    
    
    class MyThread(Thread):
        def __init__(self, name):
            super(MyThread, self).__init__()
            self._filename = name
    
        def run(self):
            print("MyThread start run %s" % self._filename)
            download_time = randint(5, 10)
            time.sleep(download_time)
            print("%s download finished time:%d" % (self._filename, download_time))
    
    
    if __name__ == '__main__':
        start = time.time()
        thread_list = []
        for i in range(10):
            t = MyThread(str(i))
            t.start()
            thread_list.append(t)
    
        for j in thread_list:
            j.join()
    
        end = time.time()
        print("total cost time %.3f" % (end - start))
    

    线程锁机制

    from time import sleep
    from threading import Thread, Lock
    
    
    class Account(object):
    
        def __init__(self):
            self._balance = 0
            self._lock = Lock()
    
        def deposit(self, money):
            self._lock.acquire()
            try:
                # 计算存款后的余额
                new_balance = self._balance + money
                # 模拟受理存款业务需要0.01秒的时间
                sleep(0.01)
                # 修改账户余额
                self._balance = new_balance
            finally:
                self._lock.release()
    
        @property
        def balance(self):
            return self._balance
    
    
    class AddMoneyThread(Thread):
    
        def __init__(self, account, money):
            super().__init__()
            self._account = account
            self._money = money
    
        def run(self):
            self._account.deposit(self._money)
    
    
    def main():
        account = Account()
        threads = []
        # 创建100个存款的线程向同一个账户中存钱
        for _ in range(100):
            t = AddMoneyThread(account, 1)
            t.start()
            threads.append(t)
        # 等所有存款的线程都执行完毕
        for t in threads:
            t.join()
        print('账户余额为: ¥%d元' % account.balance)
    
    
    if __name__ == '__main__':
        main()
    
    展开全文
  • Python线程学习

    2019-05-11 11:10:56
    前言 在爬虫学习的过程中,一旦爬取的数量过大,很容易带来效率问题,为了能够快速爬取我们想要的内容。为此我们可以使用多线程或者...关于 Python线程有这样一句名言——“Python下多线程是鸡肋,推荐使用多进程...

    前言

    在爬虫学习的过程中,一旦爬取的数量过大,很容易带来效率问题,为了能够快速爬取我们想要的内容。为此我们可以使用多线程或者多进程来处理。

    多线程和多进程是不一样的!一个是 threading 库,一个是 multiprocessing 库。而多线程 threading 在 Python 里面被称作鸡肋的存在!关于 Python 多线程有这样一句名言——“Python下多线程是鸡肋,推荐使用多进程!”

    为什么称 Python 多线程是鸡肋?原因如下:

    在大多数环境中,单核CPU情况下,本质上某一时刻只能有一个线程被执行,多核 CPU 则可以支持多个线程同时执行。但是在 Python 中,无论 CPU 有多少核,CPU在同一时间只能执行一个线程。这是由于 GIL 的存在导致的。

    GIL 的全称是 Global Interpreter Lock(全局解释器锁),是 Python 设计之初为了数据安全所做的决定。Python 中的某个线程想要执行,必须先拿到 GIL。可以把 GIL 看作是执行任务的“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。GIL 只在 CPython 解释器中才有,因为 CPython 调用的是 c 语言的原生线程,不能直接操作 CPU,只能利用 GIL 保证同一时间只能有一个线程拿到数据。在 PyPy 和 JPython 中没有 GIL。

    在 Python 多线程下,每个线程的执行方式:

    1. 拿到公共数据
    2. 申请GIL
    3. Python解释器调用操作系统原生线程
    4. cpu执行运算
    5. 当该线程执行一段时间消耗完,无论任务是否已经执行完毕,都会释放GIL
    6. 下一个被CPU调度的线程重复上面的过程

    任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

    那么是不是python的多线程就完全没用了呢?

    Python 针对不同类型的任务,多线程执行效率是不同的:

    • 对于 CPU 密集型任务(各种循环处理、计算等等),由于计算工作多,ticks 计数很快就会达到阈值,然后触发 GIL 的释放与再竞争(多个线程来回切换是需要消耗资源的),所以 Python 下的多线程对 CPU 密集型任务并不友好。
    • IO 密集型任务(文件处理、网络通信等涉及数据读写的操作),多线程能够有效提升效率(单线程下有 IO 操作会进行 IO 等待,造成不必要的时间浪费,而开启多线程能在线程 A 等待时,自动切换到线程 B,可以不浪费 CPU 的资源,从而能提升程序执行效率)。所以 Python 的多线程对 IO 密集型任务比较友好。

    实际中的建议

    Python 中想要充分利用多核 CPU,就用多进程。因为每个进程有各自独立的 GIL,互不干扰,这样就可以真正意义上的并行执行。在 Python 中,多进程的执行效率优于多线程(仅仅针对多核 CPU 而言)。同时建议在 IO 密集型任务中使用多线程,在计算密集型任务中使用多进程。

    Python 创建多线程

    1.直接利用函数创建多线程

    在 Python3 中,Python 提供了一个内置模块 threading.Thread,可以很方便地让我们创建多线程。
    threading.Thread()一般接收两个参数:

    • target(线程函数名):要放置线程让其后台执行的函数,由我们自已定义,注意不要加();
    • args(线程函数的参数):线程函数名所需的参数,以元组的形式传入。若不需要参数,可以不指定。
    import time
    from threading import Thread
    
    # 自定义线程函数。
    def main(name="Python"):
        for i in range(2):
            print("hello", name)
            time.sleep(2)
    
    # 创建线程01,不指定参数
    thread_01 = Thread(target=main)
    # 启动线程01
    thread_01.start()
    
    # 创建线程02,指定参数,注意逗号
    thread_02 = Thread(target=main, args=("MING",))
    # 启动线程02
    thread_02.start()
    

    输出结果:

    hello Python
    hello MING
    hello Python
    hello MING
    

    2. 使用 Threading 模块构建类对象

    使用 Threading 模块创建线程,直接从 threading.Thread 继承,然后重写 init 方法和 run 方法。

    import time
    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self, name="Python"):
            # 注意,super().__init__() 一定要写
            # 而且要写在最前面,否则会报错。
            super().__init__()
            self.name=name
    
        def run(self):
            for i in range(2):
                print("hello", self.name)
                time.sleep(2)
    
    if __name__ == '__main__':
        # 创建线程01,不指定参数
        thread_01 = MyThread()
        # 创建线程02,指定参数
        thread_02 = MyThread("MING")
    
        thread_01.start()
        thread_02.start()
    

    输出结果:

    hello Python
    hello MING
    hello MING
    hello Python
    

    上述是最基本的多线程创建方法,看起来很简单,但在实际的应用中,会复杂许多。

    接下来,关于我实际学习中的多线程练习,我会一一记录下来。

    多线程练习

    在讲述下面的线程同步前,我先用个实例描述一下线程未同步。

    import threading
    import time
    
    
    class myThread(threading.Thread):
        def __init__(self, threadID, name, counter):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.counter = counter
    
        def run(self):
            print('Starting '+self.name)
            print_time(self.name, self.counter, 3)
            print("Exiting " + self.name)
    
    
    def print_time(threadName, delay, count):
        while count:
            time.sleep(1)
            print('{0} processing {1}'.format(threadName, time.ctime(time.time())))
            count -= 1
    
    
    if __name__ == '__main__':
        threadList = ["Thread-1", "Thread-2"]
    
        threads = []
        threadID = 1
    
        for tName in threadList:
            thread = myThread(threadID, tName, threadID)
            thread.start()
            threads.append(thread)
            threadID += 1
    
        # for t in threads:
        #     t.join()
    
        print("Exiting Main Thread")
    

    输出结果:

    Starting Thread-1Starting Thread-2
    
    Exiting Main Thread
    Thread-1 processing Sat May 11 11:06:11 2019
    Thread-2 processing Sat May 11 11:06:12 2019
    Thread-1 processing Sat May 11 11:06:12 2019
    Thread-1 processing Sat May 11 11:06:13 2019
    Exiting Thread-1
    Thread-2 processing Sat May 11 11:06:14 2019
    Thread-2 processing Sat May 11 11:06:16 2019
    Exiting Thread-2
    

    由输出结果中可以看出,第二个线程在打印过程中没有回车打印,说明线程间没有同步。
    线程同步——Lock

    多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

    使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。

    import threading
    import queue
    import time
    
    exitFlag = 0
    
    class myThread(threading.Thread):
        def __init__(self, que):
            threading.Thread.__init__(self)
            self.que = que
    
        def run(self):
           print("Starting " + threading.currentThread().name)
            process_data(self.que)
            print("Ending " + threading.currentThread().name)
    
    def process_data(que):
        while not exitFlag:
            queueLock.acquire()
            if not workqueue.empty():
                data = que.get()
                queueLock.release()
                print('Current Thread Name %s, data: %s ' % (threading.currentThread().name, data))
            else:
                queueLock.release()
            time.sleep(0.1)
    
    
    if __name__ == '__main__':
        start_time = time.time()
        queueLock = threading.Lock()
        workqueue = queue.Queue()
    
        threads = []
    
        # 填充队列
        queueLock.acquire()
        for i in range(1,10):
            workqueue.put(i)
        queueLock.release()
    
        for i in range(4):
            thread = myThread(workqueue)
            thread.start()
            threads.append(thread)
    
        #等待队列清空
        while not workqueue.empty():
            pass
    
        #通知线程退出
        exitFlag = 1
    
        for t in threads:
            t.join()
    
        print("Exiting Main Thread")
        end_time = time.time()
        print('耗时{}s'.format((end_time - start_time)))
    

    输出结果:

    Starting Thread-1
    Current Thread Name Thread-1, data: 1, time: Sat May 11 10:47:25 2019 
    Starting Thread-2
    Current Thread Name Thread-2, data: 2, time: Sat May 11 10:47:25 2019 
    Starting Thread-3
    Current Thread Name Thread-3, data: 3, time: Sat May 11 10:47:25 2019 
    Starting Thread-4
    Current Thread Name Thread-4, data: 4, time: Sat May 11 10:47:25 2019 
    Current Thread Name Thread-1, data: 5, time: Sat May 11 10:47:25 2019 
    Current Thread Name Thread-2, data: 6, time: Sat May 11 10:47:25 2019 
    Current Thread Name Thread-3, data: 7, time: Sat May 11 10:47:25 2019 
    Current Thread Name Thread-4, data: 8, time: Sat May 11 10:47:25 2019 
    Current Thread Name Thread-2, data: 9, time: Sat May 11 10:47:25 2019 
    Ending Thread-4
    Ending Thread-1
    Ending Thread-2
    Ending Thread-3
    Exiting Main Thread
    耗时0.31415677070617676s
    

    线程同步——Queue

    Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括 FIFO(先入先出)队列 Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

    import threading
    import time
    import queue
    
    def get_num(workQueue):
    	print("Starting " + threading.currentThread().name)
        while True:
            if workQueue.empty():
                break
            num = workQueue.get_nowait()
            print('Current Thread Name %s, Url: %s ' % (threading.currentThread().name, num))
            time.sleep(0.3)
    
            # try:
            #     num = workQueue.get_nowait()
            #     if not num:
            #         break
            #     print('Current Thread Name %s, Url: %s ' % (threading.currentThread().name, num))
            # except:
            #     break
            # time.sleep(0.3)
        print("Ending " + threading.currentThread().name)
    
    
    if __name__ == '__main__':
        start_time = time.time()
        workQueue = queue.Queue()
    
        for i in range(1,10):
            workQueue.put(i)
    
        threads = []
        thread_num = 4  #线程数
        for i in range(thread_num):
            t = threading.Thread(target=get_num, args=(workQueue,))
            t.start()
            threads.append(t)
    
        for t in threads:
            t.join()
    
        end_time = time.time()
        print('耗时{}s'.format((end_time - start_time)))
    

    输出结果:

    Starting Thread-1
    Current Thread Name Thread-1, data: 1, time: Sat May 11 10:48:27 2019 
    Starting Thread-2
    Current Thread Name Thread-2, data: 2, time: Sat May 11 10:48:27 2019 
    Starting Thread-3
    Current Thread Name Thread-3, data: 3, time: Sat May 11 10:48:27 2019 
    Starting Thread-4
    Current Thread Name Thread-4, data: 4, time: Sat May 11 10:48:27 2019 
    Current Thread Name Thread-2, data: 5, time: Sat May 11 10:48:27 2019 
    Current Thread Name Thread-1, data: 6, time: Sat May 11 10:48:27 2019 
    Current Thread Name Thread-4, data: 7, time: Sat May 11 10:48:27 2019 
    Current Thread Name Thread-3, data: 8, time: Sat May 11 10:48:27 2019 
    Current Thread Name Thread-2, data: 9, time: Sat May 11 10:48:28 2019 
    Ending Thread-1
    Ending Thread-4
    Ending Thread-3
    Ending Thread-2
    耗时0.9028356075286865s
    

    因为考虑到爬虫方面的实际应用,所以在程序中使用了 Python 的 Queue 模块,使用队列来实现线程间的同步(关于线程同步在后面会讲到)。在 get_num() 方法中注释的内容是另一种判断队列为空,结束子线程任务的代码实现。

    在实际运行中,通过调节线程数可以看到,执行时间会随着线程数的增加而缩短。

    展开全文
  • 捋一捋Python线程概念 标签: Python 引言 从刚开始学习Python爬虫的时候,就一直惦记着多线程这个东西, 想想每次下载图片都是单线程,一个下完继续下一个,多呆啊! 没占满的带宽(10M带宽),1%的CPU...

    小猪的Python学习之旅 —— 6.捋一捋Python线程概念

    标签: Python


    引言

    从刚开始学习Python爬虫的时候,就一直惦记着多线程这个东西,
    想想每次下载图片都是单线程,一个下完继续下一个,多呆啊

    没占满的带宽(10M带宽),1%的CPU占用率(笔者的是i7 6700K),要不要
    那么浪费,所以,不搞点多线程,多进程,协程这样的东西提高下资源利用
    率,怎么也说不过去吧?然而关于线程这种话题,一般都是会让很多新手
    玩家望而却步,而且听说Python里还有什么全局解释器锁(GIL),搞得Py无法
    实现高效的多线程,一听就感觉很难:

    虚个卵子哦,跟着小猪把Python里和多线程相关的东西都撸一遍吧!
    本节主要是对一些概念进行了解~


    1.程序,进程,线程,多线程,多进程

    多线程与多进程的理解

    操作系统原理相关的书,基本都会提到一句很经典的话:
    进程是资源分配的最小单位,线程则是CPU调度的最小单位“。

    说到进程,如果你是windows的电脑的话,Ctrl+Alt+Del打开任务
    管理器,可以看到当前电脑上正在运行的很多个进程,网易云啊,
    QQ,微信啊,等等;这就是多进程,只是每个进程各司其职完成
    对应的功能而已,播放、聊天,互不干扰。这是吃瓜群众的看法,
    而对于我们开发来说,多进程的概念更倾向于:多个进程协同地去完成
    同一项工作,为什么要在应用里使用多线程,个人的看法如下:
    为了摆脱系统的一些限制和为自己的应用获取更多的资源,举个例子:
    在Android中为每个应用(进程)限制类最大内存,单个进程超过这个
    阀值是会OOM的,而使用多进程技术可以减少内存溢出的问题;
    再举个例子:Python在实现Python解析器(CPython)时引入GIL锁
    这种东西,使得任何时候仅有一个线程在执行,多线程的效率还
    可能比不上单线程,使用多线程可以规避这个限制。

    说完多进程,然后说下多线程,首先为何会引入线程呢?举个例子:
    你有一个文本程序,接收用户的输入,显示到屏幕上,并保存到硬盘里,
    由三个进程组成:输入接收进程A,显示内容进程B,写入硬盘进程C,
    而他们之间共同需要要拥有的东西——文本内容,因为进程A,B,C
    运行在不同的内存空间,这就涉及到进程通信问题了,而频繁的切换
    势必导致性能上的损失。有没有一种机制使得做这三个任务时共享资源呢?
    这个时候线程(轻量级的进程)就粉墨登场啦!感觉就像进程又开辟了
    一个小世界一样:系统 -> 进程 -> 线程,系统里有很多进程,进程里
    又有很多线程。(有点像斗破小说那种套路…)

    相信到这里你对多进程和多线程的概念就应一清二楚了,简单比较下
    两者的区别与使用场景吧:(摘自:浅谈多进程多线程的选择)

    对比维度 多进程 多线程
    数据共享、同步 数据共享复杂,需要用IPC;
    数据是分开的,同步简单
    共享进程数据,数据共享简单,
    但也是因为这个原因导致同步复杂
    内存、CPU 占用内存多,切换复杂,CPU利用率低 占用内存少,切换简单,CPU利用率高
    创建销毁、切换 创建销毁、切换复杂,速度慢 创建销毁、切换简单,速度很快
    编程、调试 编程简单,调试简单 编程复杂,调试复杂
    可靠性 进程间不会互相影响 一个线程挂掉将导致整个进程挂掉
    分布式 适应于多核、多机分布式;如果一台
    机器不够,扩展到多台机器比较简单
    适应于多核分布式

    2.线程的生命周期

    各个状态说明:

    • 1.New(新建),新创建的线程进过初始化,进入Runnable(就绪)状态;
    • 2.Runnable(就绪),等待线程调度,调度后进入Running(运行)状态;
    • 3.Running(运行),线程正常运行,期间可能会因为某些情况进入Blocked(堵塞)
      状态(同步锁;调用了sleep()和join()方法进入Sleeping状态;执行wait()
      方法进入Waiting状态,等待其他线程notify通知唤醒);
    • 4.Blocked(堵塞),线程暂停运行,解除堵塞后进入Runnable(就绪)状态
      重新等待调度;
    • 5.Dead(死亡):线程完成了它的任务正常结束或因异常导致终止;

    3.并行与并发

    并行是同时处理多个任务,而并发则是处理多个任务,而不一定要同时
    并行可以说是并发的子集。


    4.同步与异步

    同步:线程执行某个请求,如果该请求需要一段时间才能返回信息,
    那么这个线程会一直等待,直到收到返回信息才能继续执行下去;

    异步:线程执行完某个请求,不需要一直等,直接继续执行后续操作,
    当有消息返回时系统会通知线程进程处理,这样可以提高执行的效率;
    异步在网络请求的应用非常常见~


    5.线程同步安全问题

    当有两个或以上线程在同一时刻访问同一资源,可能会带来一些问题,
    比如:数据库表不允许插入重复数据,而线程1,2都得到了数据X,然后
    线程1,2同时查询了数据库,发现没有数据X,接着两线程都往数据库中
    插入了X,然后就GG啦,这就是线程的同步安全问题,而这里的数据库
    资源我们又称为:临界资源(共享资源)


    6.如何解决同步安全问题(同步锁)

    当多个线程访问临界资源的时候,有可能会出现线程安全问题;
    而基本所有并发模式在解决线程安全问题时都采用”系列化访问
    临界资源“的方式,就是同一时刻,只能有一个线程访问临界资源,
    也称”同步互斥访问“。通常的操作就是加锁(同步锁),当有线程访问
    临界资源时需要获得这个锁,其他线程无法访问,只能等待(堵塞),
    等这个线程使用完释放锁,供其他线程继续访问。


    7.与锁有关的特殊情况:死锁,饥饿与活锁

    有了同步锁不意味着就一了百了了,当多个进程/线程的操作涉及到了多个锁,
    就可能出现下述三种情况:

    • 死锁(DeadLock)

    两个或以上进程(线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,
    如果无外力作用,他们将继续这样僵持下去;简单点说:两个人互相持有对方想要的资源,
    然后每一方都不愿意放弃自己手上的资源,就一直那样僵持着。

    死锁发生的条件

    互斥条件(临界资源);
    请求和保持条件(请求资源但不释放自己暂用的资源);
    不剥夺条件(线程获得的资源只有线程使用完后自己释放,不能被其他线程剥夺);
    环路等待条件:在死锁发生时,必然存在一个”进程-资源环形链”,t1等t2,t2等t1;

    如何避免死锁

    破坏四个条件中的一个或多个条件,常见的预防方法有如下两种:
    有序资源分配法:资源按某种规则统一编号,申请时必须按照升序申请:
    1.属于同一类的资源要一次申请完;2.申请不同类资源按照一定的顺序申请。
    银行家算法:就是检查申请者对资源的最大需求量,如果当前各类资源都可以满足的
    申请者的请求,就满足申请者的请求,这样申请者就可很快完成其计算,然后释放它占用
    的资源,从而保证了系统中的所有进程都能完成,所以可避免死锁的发生。
    理论上能够非常有效的避免死锁,但从某种意义上说,缺乏使用价值,因为很少有进程
    能够知道所需资源的最大值,而且进程数目也不是固定的,往往是不断变化的,
    况且原本可用的资源也可能突然间变得不可用(比如打印机损坏)。

    • 饥饿(starvation)与饿死(starve to death)

    资源分配策略有可能是不公平的,即不能保证等待时间上界的存在,即使没有
    发生死锁, 某些进程可能因长时间的等待,对进程推进与相应带来明显影响,
    此时的进程就是 发生了进程饥饿(starvation),当饥饿达到一定程序即此时
    进程即使完成了任务也 没有实际意义时,此时称该进程被饿死(starve to death),
    典型的例子: 文件打印,采用短文件优先策略,如果短文件太多,长文件会一直
    推迟,那还打印个毛。

    • 活锁(LiveLock)

    特殊的饥饿,一系列进程轮询等待某个不可能为真的条件为真,此时进程不会
    进入blocked状态, 但会占用CPU资源,活锁还有几率能自己解开,而死锁则
    无法自己解开。(例子:都觉得对方优先级比自己高,相互谦让,导致无法
    使用某资源),简单避免死锁的方法:先来先服务策略。


    8.守护线程

    也叫后台线程,是一种为其他线程提供服务的线程,比如一个简单的例子:
    你有两个线程在协同的做一件事,如果有一个线程死掉,事情就无法继续
    下去,此时可以引入守护线程,轮询地去判断两个线程是否或者(调isAlive()),
    如果死掉就start开启线程,在Python中可以在线程初始化的时候调用
    setDaemon(True)把线程设置为守护线程,如果程序中只剩下守护线程
    的话会自动退出


    9.线程并发的经典问题:生产中与消费者问题

    说到线程并发,不得不说的一个经典问题就是:生产中与消费者问题:

    两个共享固定缓冲区大小的线程,生产者线程负责生产一定量的数据
    放入缓冲区, 而消费者线程则负责消耗缓冲区中的数据,关键问题是
    需要保证两点:

    • 1.缓冲区满的时候,生产者不再往缓冲区中填充数据
    • 2.缓存区空的时候,消费者不在消耗缓冲区中的数据

    听不懂也没什么,这个后面会写例子的~


    10.Python中的GIL锁

    概念

    全局解释器锁,用于同步线程的一种机制,使得任何时候仅有一个线程在执行
    GIL 并不是Python的特性,只是在实现Python解析器(CPython)时引入
    一个概念。换句话说,Python完全可以不依赖于GIL。

    Python解释器进程内的多线程是以协作多任务方式执行的,当一个线程遇到
    I/O操作时会释放GIL。而依赖CPU计算的线程则是执行代码量到一定的阀值,
    才会释放GIL。而在Python 3.2开始使用新的GIL,使用固定的超时时间来指示
    当前线程放弃全局锁,就是:当前线程持有这个锁,且其他线程请求这个锁时,
    当前线程就会再5毫秒后被强制释放掉该锁。

    多线程在处理CPU密集型操作因为各种循环处理计数等,会很快达到阀值,
    而多个线程来回切换是会消耗资源的,所以多线程的效率往往可能还比不上
    单线程!而在多核CPU上效率会更低,因为多核环境下,持有锁的CPU释放锁后,
    其他CPU上的线程都会进行竞争,但GIL可能马上又会被之前的CPU拿到拿到,
    导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度
    状态,从而造成线程颠簸(thrashing),导致效率更低。

    问题

    因为GIL锁的原因,对于CPU密集型操作,Python多线程就是鸡肋了?

    答:是的!尽管多线程开销小,但却无法利用多核优势!
    可以使用多进程来规避这个问题,Python提供了multiprocessing
    这个跨平台的模块来帮助我们实现多进程代码的编写。
    每个线程都有自己独立的GIL,因此不会出现进程间GIL
    锁抢夺的问题,但是也增加程序实现线程间数据通讯和同步
    是的成本,这个需要自行进行权衡。


    11.Python中对多线程与多进程的支持

    Python与线程,进程相关的官方文档: 17. Concurrent Execution
    https://docs.python.org/3/library/concurrency.html

    简单介绍下里面的一些模块,后面会一个个啃~

    • threading —— 提供线程相关的操作
    • multiprocessing —— 提供进程程相关的操作
    • concurrent.futures —— 异步并发模块,实现多线程和多进程的异步并发(3.2后引入)
    • subprocess —— 创建子进程,并提供链接到他们输入/输出/错误管道的方法,
      并获得他们的返回码,该模块旨在替换几个较旧的模块和功能:os.systemos.spawn*
    • sched —— 任务调度(延时处理机制)
    • queue —— 提供同步的、线程安全的队列类

    还有几个是兼容模块,比如Python 2.x上用threading和Python 3.x上用thread:

    • dummy_threading:提供和threading模块相同的接口,2.x使用threading兼容;
    • _thread:threading模块的基础模块,应该尽量使用 threading 模块替代;
    • dummy_thread:提供和thread模块相同的接口,3.x使用threading兼容;

    小结

    本节我们围绕着线程以及进程相关的概念进行了解析,尽管有些
    枯燥,但是如果坚持看完,相信你对于线程与进程的理解会更进
    一步,概念什么都是虚的,纸上得来终觉浅绝知此事要躬行
    下节开始我们来通过写代码的方式一一学习这些模块吧!


    参考文献


    来啊,Py交易啊

    想加群一起学习Py的可以加下,智障机器人小Pig,验证信息里包含:
    PythonpythonpyPy加群交易屁眼 中的一个关键词即可通过;

    验证通过后回复 加群 即可获得加群链接(不要把机器人玩坏了!!!)~~~
    欢迎各种像我一样的Py初学者,Py大神加入,一起愉快地交流学♂习,van♂转py。

    展开全文
  • 今天要跟大家一起来学习一下Python的多线程机制。有两个原因,其一是自己在学习中经常会使用到多线程,其二当然是自己对Python中的多线程并不是很了解。那么,今天和大家一起了解下~ Python线程机制 开发多线程...

    今天要跟大家一起来学习一下Python的多线程机制。有两个原因,其一是自己在学习中经常会使用到多线程,其二当然是自己对Python中的多线程并不是很了解。那么,今天和大家一起了解下~

    Python多线程机制

    开发多线程的应用系统,是在日常开发中经常会遇到的需求。同时,Python也为多线程系统的开发提供了很好的支持。
    大家应该都知道,Python多线程机制是在GIL(Global Interpreter Lock)全局解释锁的基础上建立的。

    那么Python为什么需要全局解释锁?

    为什么需要全局解释锁?

    我们知道,要支持多线程的话,一个基本的要求就是不同线程对共享资源访问的互斥,所以Python中引入了GIL,当然这是第一个原因。

    Python中的GIL是一个非常霸道的互斥实现,在一个线程拥有了解释器的访问权之后,其它的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。

    这样的说法也就意味着,无论如何,在同一时间,只能有一个线程能访问Python提供的API。因为单处理器的本质是不可能并行的,这里的同一时间确实对于单处理器是毫无意义的,但是对于多处理器,同一时间,确实可以有多个时间独立运行。然而正是由于GIL限制了这样的情形,使得多处理器最终退化为单处理器,性能大打折扣。那么,为什么还要使用GIL呢?这里就要提到第二个原因。

    当然,Python社区也早都认识到了这个问题,并且在不断探索,Greg Stein和Mark Hammond两位老兄曾经创建过一份去除GIL的branch,但是很不幸,这个分支在很多的基准测试中,尤其是在单线程的测试上,效率只有使用GIL的一半左右。

    使用GIL时,保护机制的粒度比较大,也就是我们似乎只需要将可能被多个线程共享的资源保护起来即可,对于不会被多个线程共享的资源,完全可以不用保护。但是,如果使用更细粒度的锁机制进行保护,那么,会导致大量的加锁和解锁功能,加锁和解锁对于操作系统来说,是一个比较重量级的动作,同时,没有GIL的保护,编写Python的扩展模块的难度也大大增加。

    所以,目前为止,GIL仍然是多线程机制的基石。

    对于Python而言,字节码解释器是Python的核心所在,所以Python通过GIL来互斥不同线程对解释器的使用。这里举个例子进行说明:

    假设,现在有三个线程A、B和C,它们都需要解释器来执行字节码,进行对应的计算,那么在这之前,它们必须获得GIL。那么现在假设线程A获得了GIL,其它线程只能等A释放GIL之后,才能获得。

    对!是这样没错,于是,有两个问题:

    1. 线程A何时释放GIL呢(如果A使用完解释器之后才释放GIL,那么,并行的计算退化为串行,多线程的意义何在?)

    2. 线程B和C谁将在A释放GIL之后获得GIL呢?

    所以毫无疑问的,Python拥有其自己的一套线程调度机制。

    关于线程调度

    和操作系统的进程调度一样,线程调度机制主要解决两个问题:

    1. 在何时挂起当前线程,选择处于等待状态的下一个线程?

    2. 在众多处于等待状态的线程中,应该选择激活哪个线程?

    对于何时进行线程调度的问题,是由Python自身决定的。我们可以联想操作系统进行进程切换的问题,当一个进程执行了一段时间之后,发生了时钟中断,于是操作系统响应时钟中断,并在这时开始进程的调度。

    与此类似,Python中通过软件模拟了这样的中断,来激活线程的调度。Python的字节码解释器是按照指令的顺序一条一条的顺序执行从而工作的,Python内部维护着这样一个数值,作为Python内部的时钟,假设这个值为N,那么Python将在执行了N条指令之后立刻启动线程调度机制。

    也就是说,当一个线程获得GIL后,Python内部的监测机制就开始启动,当这个线程执行了N条指令后,Python解释器将强制挂起当前线程,开始切换到下一个处于等待状态的线程。

    在Python中,可以这样获得这个数值(N):

    那么,下一个问题,Python会在众多等待的线程中选择哪一个呢?

    答案是,不知道。因为这个问题是交给了底层的操作系统来解决的,Python借用了底层操作系统所提供的线程调度机制来决定下一个获得GIL进入解释器的线程是谁。

    所以说,Python中的线程实际上就是操作系统所支持的原生线程。

    那么,接下来,我们一起揭开Python中GIL的真实面目。

    关于GIL

    应该知道,Python中多线程常用的两个模块:Thread和在其之上的threading。其中Thread是使用C实现的,而Threading是用python实现。

    我们可以通过Thread模块进行分析(以Python2.7.13为例)。

    创建线程
    首先从创建线程说起,在threadmodule.c中,thread_PyThread_start_new_thread()函数通过三个主要的动作完成一个线程的创建:

    //创建bootstate结构
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
    return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
    PyMem_DEL(boot);
    return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    // 初始化多线程环境
    PyEval_InitThreads(); 
    //创建线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    if (ident == -1) {
    PyErr_SetString(ThreadError, "can't start new thread");
    Py_DECREF(func);
    Py_DECREF(args);
    Py_XDECREF(keyw);
    PyThreadState_Clear(boot->tstate);
    PyMem_DEL(boot);
    return NULL;
    }
    return PyInt_FromLong(ident);

    1. 创建并初始化bootstate结构boot,在boot中,将保存关于Python的一切信息(线程过程,线程过程参数等)。

    2. 初始化Python的多线程环境。

    3. 以boot为参数,创建操作系统的原生线程。

    从以上代码可以看出,Python在刚启动时,并不支持多线程,也就是说,Python中支持多线程的数据结构以及GIL都是没有创建的。当然这是因为大多数的Python程序都不需要Python的支持。

    在Python虚拟机启动时,多线程机制并没有被激活,它只支持单线程,一旦用户调用thread.start_new_thread,明确的告诉Python虚拟机需要创建新的线程,这时Python意识到用户需要多线程的支持,这个时候,Python虚拟机会自动建立多线程需要的数据结构、环境以及GIL。

    建立多线程环境

    建立多线程环境,主要就是创建GIL。那么GIL是如何实现的呢?
    打开"python/ceval.c":

    static PyThread_type_lock interpreter_lock = 0; /* This is the GIL */
    static PyThread_type_lock pending_lock = 0; /* for pending calls */
    static long main_thread = 0;

    int
    PyEval_ThreadsInitialized(void)
    {
    return interpreter_lock != 0;
    }

    void
    PyEval_InitThreads(void)
    {
    if (interpreter_lock)
    return;
    interpreter_lock = PyThread_allocate_lock();
    PyThread_acquire_lock(interpreter_lock, 1);
    main_thread = PyThread_get_thread_ident();
    }

    在这段代码中,iterpreter_lock就是GIL。

    无论创建多少个线程,Python建立多线程环境的动作只会执行一次。在创建GIL之前,Python会检查GIL是否已经被创建,如果是,则不再进行任何动作,否则,就会去创建这个GIL。

    在上述代码中,我们可以看到,创建GIL使用的是Pythread_allocate_lock完成的,下面看看该函数的内部实现:

    PyThread_type_lock
    PyThread_allocate_lock(void)
    {
    PNRMUTEX aLock;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
    PyThread_init_thread();

    aLock = AllocNonRecursiveMutex() ;

    dprintf(("%ld: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));

    return (PyThread_type_lock) aLock;
    }

    可以看到该函数返回了alock,alock是结构体PNRMUTEX,实际上就是我们需要创建的那个interperter_lock(GIL)。这么说来,GIL就是结构体PNRMUTEX呀,于是我们找来它的真身:

    typedef struct NRMUTEX {
    LONG owned ;
    DWORD thread_id ;
    HANDLE hevent ;
    } NRMUTEX, *PNRMUTEX ;

    这里又三个变量,owned、thread_id和hevent。这里的hevent是windows平台下的Event这个内核对象,也就是通过Event来实现线程之间的互斥。thread_id将记录任一时刻获得GIL的线程的id。

    那么owned是什么呢?

    GIL中的owned是指示GIL是否可用的变量,它的值被初始化为-1,Python会检查这个值是否为1,如果是,则意味着GIL可用,必须将其置为0,当owned为0后,表示该GIL已经被一个线程占用,不可再用;同时,当一个线程开始等待GIL时,其owned就会被增加1;当一个线程最终释放GIL时,一定会将GIL的owned减1,这样,当所有需要GIL的线程都最终释放了GIL之后,owned将再次变为-1,意味着GIL再次变为可用。

    关于Python中的多线程,今天我们就学到这里。

    展开全文
  • python 线程之间的通信

    2019-09-02 18:06:34
    创建方法: import threading cond=threading.Condition() ...wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。 使用前线程必须已获得锁定,否则将抛出异常。 notif...
  • Python学习或项目开发过程中,许多小伙伴反应说Python线程是鸡肋,效率不升反降。难道多线程不好吗?在我们的常识中,多线程通过并发模式充分利用硬件资源,大大提升了程序的运行效率,怎么在 Python 中反而成了...
  • 前面写过一篇关于python线程的实现的文章, 但是效果不是最佳的,写法也不是很好。通过网上学习,也了解到了semaphore这个东西。 百度给的解释:Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个...
  • 大家都知道python的多线程不是真正的多线程,但是对于io类型的任务,多线程还是能发挥作用的。那么多个线程之间是如何进行变量共享的呢,很多时候我们可以借助queue模块,方便。今天就做一个学习。 二、threading...
  • python线程读取列表

    2019-11-04 11:57:39
    本文代码实现了python线程读取列表,包括python线程初始化、开始和释放线程锁、分配多线程列表数等内容,可做参考。
  • python-线程加锁

    2019-06-06 14:05:18
    刚开始学习python,之前c++ c# 中,线程加锁尤为重要,当前看到一个博文,学习下。 python 提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是...
  • Python线程入门指南

    2017-02-25 21:48:54
    一直懒得写Python相关的文章,恰好有天需要简单的给童鞋们讲点课,仓促之余就诞生了此文.今天本来准备全面的聊聊有关高性能并发这个话题来着,但是周末马上要来了啊.所以我就取了其中的一点来介绍,关于其他的方面,有...
  • Python是一门非常适合处理数据和自动化完成重复性工作的编程语言,我们在用数据训练机器学习模型之前,通常都需要对数据进行预处理,而Python就非常适合完成这项工作,比如需要重新调整几十万张图像的尺寸,用Python...
  • 线程即同时执行多个应用程序,这样可以减少时间消耗,提高程序性能,所以下面就和大家分享Python中多线程的实现。主要包括以下几个方面: 什么是Python中的多任务处理? 什么是线程? 何时在Python中使用多线程? ...
  • python 实现多线程编程

    2017-08-27 19:03:11
    虽然没有涉及到企业级的项目,但还是体会到了有几个知识点是非常重要的,包括:面向对象的思想、如何架构一个项目、设计模式来具体解决问题、应用机器学习和深度学习的方法,当然也包括我这篇文章的内容——多线程和...
  • 您观看课程学习后 免费入群领取【超全Python资料包+17本学习电子书】 ... 2019年是Python彻底崛起的...本课程通过循序渐进的讲解,让学生能够利用Python线程+生产者和消费者模式来构造一个表情包下 爬虫程序。
  •  当初在刚学习python线程时,上网搜索资料几乎都是一片倒的反应python没有真正意义上的多线程python线程就是鸡肋。当时不明所以,只是了解到python带有GIL解释器锁的概念,同一时刻只能有一个线程在运行,...
  • 最近在学习python线程和网络socket编程,以一个小的多线程socket程序作为练习,展示python线程及网络socket编程的主要使用方法。1、python线程 python线程实现方式主要有三种:创建一个 Thread 的实例,传...
  • 比较好的文章 http://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter1/index.html #
  • Python3多线程

    2017-10-11 16:22:24
    学习Python线程:Python3 线程中常用的两个模块为: _thread threading(推荐使用) thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用”thread” 模块。为了兼容性,Python3 ...
1 2 3 4 5 ... 20
收藏数 71,076
精华内容 28,430