线程同步 订阅
线程同步:即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态,实现线程同步的方法有很多,临界区对象就是其中一种。 展开全文
线程同步:即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态,实现线程同步的方法有很多,临界区对象就是其中一种。
信息
外文名
thread synchronization
类    型
理论
定    义
协同步调,按预定的先后次序进行
中文名
线程同步
应    用
物理
线程同步简介
在一般情况下,创建一个线程是不能提高程序的执行效率的,所以要创建多个线程。但是多个线程同时运行的时候可能调用线程函数,在多个线程同时对同一个内存地址进行写入,由于CPU时间调度上的问题,写入数据会被多次的覆盖,所以就要使线程同步。同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。“同”字从字面上容易理解为一起动作其实不是,“同”字应是指协同、协助、互相配合。如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回,同时其它线程也不能调用这个方法。按照这个定义,其实绝大多数函数都是同步调用(例如sin, isdigit等)。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。例如Window API函数SendMessage。该函数发送一个消息给某个窗口,在对方处理完消息之前,这个函数不返回。当对方处理完毕以后,该函数才把消息处理函数所返回的LRESULT值返回给调用者。在多线程编程里面,一些敏感数据不允许被多个线程同时访问,此时就使用同步访问技术,保证数据在任何时刻,最多有一个线程访问,以保证数据的完整性。
收起全文
精华内容
下载资源
问答
  • Python多线程—线程同步

    千次阅读 2019-03-25 23:05:17
    线程同步的真实意思和字面意思恰好相反。 线程同步的真实意思,其实是“排队”:几个线程之间要排队,一个一个对共享资源进行操作,而不是同时进行操作。 Python threading模块提供了Lock/RLock、Condition、queue...

    当多个线程同时读写同一份共享资源的时候,可能会引起冲突。 这时候,我们需要引入线程“同步”机制,即各位线程之间要有个先来后到,不能一窝蜂挤上去抢作一团。 线程同步的真实意思和字面意思恰好相反。 线程同步的真实意思,其实是“排队”:几个线程之间要排队,一个一个对共享资源进行操作,而不是同时进行操作。
    Python threading模块提供了Lock/RLock、Condition、queue、Event等对象来实现线程同步。
    1. Lock/RLock对象
    Lock是比较低级的同步原语,当被锁定以后不属于特定的线程。一个所有两种状态:locked和unlocked。如果锁处于unlocked状态,acquire()方法将其修改为locked并立即返回;如果锁已处于locked状态,则阻塞当前线程并等待其他线程释放锁,然后将其修改为locked并立即返回。release()方法用来将锁的状态由locked修改为unlocked并立即返回,如果锁已经处于unlocked状态,调用该方法将抛出异常。
    可重入锁RLock对象也是一种常用的线程同步原语,可以被同一个线程acquire()多次。当处于locked状态时,某线程拥有该锁;当处于unlocked状态时,该锁不属于任何线程。
    RLock对象的acquire() / release()调用对可以嵌套,仅当最后一个或者最外层release()执行结束后,锁才被设置为unlocked。

    Lock对象成员如下:

    方法描述
    acquire()获得锁。该方法等待锁被解锁,将其设置为locked并返回True。
    release()释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发RuntimeError。
    locked()如果锁被锁定,返回True。

    例1:使用RLock/Lock实现线程同步:

    import threading
    import time
    
    
    # 自定义线程类
    class MyThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
    
        # 重写线程代码
        def run(self):
            global x
            # 获得锁
            lock.acquire()
            for i in range(3):
                x = x + i
            time.sleep(2)
            print(x)
            # 释放锁
            lock.release()
    
    
    # 创建锁
    lock = threading.RLock()
    # lock = threading.Lock()
    t1 = []
    for i in range(10):
        # 创建线程
        t = MyThread()
        t1.append(t)
    x = 0
    for i in t1:
        # 启动线程
        i.start()
    

    2. Condition对象
    使用Condition对象可以在某些事件触发后才处理数据,可以用于不同线程之间的通信或通知,以实现更高级别的同步。Condition对象除了具有acquire() / release()方法之外,还有wait()、notify()和notify_all()等方法。

    方法描述
    acquire()获取底层锁。此方法等待底层锁被解锁,将其设置为locked并返回True。
    notify(n=1)在此条件下最多唤醒n个等待的任务(默认为1)。如果没有任务在等待,则该方法是no-op。必须在调用此方法之前获取锁,并在调用后不久释放锁。如果使用未锁定的锁调用,则会引发RuntimeError错误。
    locked()如果获得了底层锁,则返回True。
    notify_all()唤醒所有在此条件下等待的任务。此方法的作用类似于notify(),但会唤醒所有等待的任务。必须在调用此方法之前获取锁,并在调用后不久释放锁。如果使用未锁定的锁调用,则会引发RuntimeError错误。
    release()释放底层锁。当在未锁定的锁上调用时,将引发RuntimeError。
    wait()等待通知。如果调用此方法时调用任务没有获得锁,则会引发RuntimeError。这个方法释放底层锁,然后阻塞它,直到它被notify()或notify_all()调用唤醒。一旦被唤醒,条件将重新获得锁,该方法将返回True。
    wait_for(predicate)等待predicate变为true。predicate必须是可调用的,其结果将被解释为布尔值。最后一个值是返回值。

    例2:使用Condition对象实现线程同步:

    import threading
    
    
    # 生产者类
    class Producer(threading.Thread):
        def __init__(self, thread_name):
            threading.Thread.__init__(self, name=thread_name)
    
        # 重写线程代码
        def run(self):
            global x
            # 获得锁
            con.acquire()
            if x == 20:
                # 等待通知
                con.wait()
            else:
                print("\nProducer: ", end=' ')
            for i in range(20):
                print(x, end=' ')
                x = x + 1
            print(x)
            con.notify()
            # 释放锁
            con.release()
    
    
    # 消费者类
    class Consumer(threading.Thread):
        def __init__(self, thread_name):
            threading.Thread.__init__(self, name=thread_name)
    
        # 重写线程代码
        def run(self):
            global x
            # 获得锁
            con.acquire()
            if x == 0:
                # 等待通知
                con.wait()
            else:
                print("\nConsumer: ", end=' ')
            for i in range(20):
                print(x, end=' ')
                x = x-1
            print(x)
            con.notify()
            # 释放锁
            con.release()
    
    
    # 创建锁
    con = threading.Condition()
    x = 0
    p = Producer('Producer')
    c = Consumer('Consumer')
    p.start()
    c.start()
    p.join()
    c.join()
    print('After Producer and Consumer all done:', x)
    

    3. queue对象
    queue模块实现多生产者、多消费者队列。当信息必须在多个线程之间安全地交换时,它在线程编程中特别有用。此模块中的Queue类实现所有必需的锁定语义。queue模块提供了Queue、LifoQueue或PriorityQueue对象。
    队列对象(Queue、LifoQueue或PriorityQueue)提供以下描述的公共方法:

    方法描述
    qsize()返回队列的大致大小。注意,qsize() > 0不保证后续get()不会阻塞,qsize() < maxsize也不保证put()不会阻塞。
    empty()如果队列为空,返回True,否则返回False。如果empty()返回True,则不能保证对put()的后续调用不会阻塞。类似地,如果empty()返回False,则不能保证对get()的后续调用不会阻塞。
    full()如果队列已满,返回True,否则返回False。如果full()返回True,则不能保证对get()的后续调用不会阻塞。类似地,如果full()返回False,则不能保证对put()的后续调用不会阻塞。
    put(item, block=True, timeout=None)将项放入队列。如果可选的参数block=True, timeout=None(缺省值),则在空闲插槽可用之前,如果有必要,将阻塞。如果timeout是一个正数,那么它将阻塞最多的超时秒,如果在这段时间内没有可用的空闲插槽,则引发完整的异常。否则(block为false),如果一个空闲插槽立即可用,则将一个项放到队列中,否则引发完全异常(在这种情况下忽略超时)。
    put_nowait(item)相当于put(item, False)。
    get(block=True, timeout=None)从队列中删除并返回一个项。如果可选的block=true, timeout=None(缺省值),则在项目可用之前,如果有必要,将阻塞。如果timeout是一个正数,那么它将阻塞最多的超时秒,如果在这段时间内没有可用的项,则引发空异常。否则(block为false),返回一个立即可用的项,否则引发空异常(在这种情况下忽略超时)。
    get_nowait()等价于get(False)。提供了两种方法来支持跟踪已加入队列的任务是否已被守护进程使用者线程完全处理。
    task_done()指示已完成先前排队的任务。由队列使用者线程使用。对于用于获取任务的每个get(),后续对task_done()的调用告诉队列任务上的处理已经完成。如果join()当前处于阻塞状态,那么在处理完所有项之后,它将继续运行(这意味着对于已经放入队列()的每个项,都收到了task_done()调用)。如果调用次数超过放置在队列中的项的次数,则引发ValueError。
    join()阻塞,直到获取和处理队列中的所有项。每当向队列添加项时,未完成任务的数量就会增加。每当使用者线程调用task_done()来指示检索了该项并完成了对该项的所有工作时,计数就会下降。当未完成任务的计数降为零时,join()解块。

    例3:使用Queue对象实现线程同步:

    def worker():
        while True:
            item = q.get()
            if item is None:
                break
            do_work(item)
            q.task_done()
    
    q = queue.Queue()
    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    
    for item in source():
        q.put(item)
    
    # block until all tasks are done
    q.join()
    
    # stop workers
    for i in range(num_worker_threads):
        q.put(None)
    for t in threads:
        t.join()
    
    

    4. Event对象
    Event对象是一种简单的线程同步通信技术,一个线程设置Event对象,另一个线程等待Event对象。

    方法描述
    wait()等待事件被设置。如果事件被设置,立即返回True。否则阻塞,直到另一个任务调用set()。
    set()设置事件。所有等待事件设置的任务将立即被唤醒。
    clear()清除(取消)事件。等待on wait()的任务现在将阻塞,直到再次调用set()方法。
    is_set()如果设置了事件,则返回True。

    例4:使用Event对象实现线程同步:

    import threading
    
    
    # 自定义线程类
    class MyThread(threading.Thread):
        def __init__(self, thread_name):
            threading.Thread.__init__(self, name=thread_name)
    
        # 重写线程代码
        def run(self):
            global my_event
            if my_event.isSet():
                my_event.clear()
                # 等待通知
                my_event.wait()
                print(self.getName())
            else:
                print(self.getName())
                my_event.set()
    
    
    # 创建锁
    my_event = threading.Event()
    my_event.set()
    t1 = []
    
    for i in range(10):
        t = MyThread(str(i))
        t1.append(t)
    
    for t in t1:
        t.start()
    
    展开全文
  • python的多线程及线程同步方式

    千次阅读 2019-07-17 09:37:23
    1.线程执行 join与setDaemon 1.子线程在主线程运行结束后,会继续执行完,如果给子线程设置为守护线程(setDaemon=True),主线程运行结束子线程即结束; 2 .如果join()线程,那么主线程会等待子线程执行完再执行...

    1.线程执行

    join与setDaemon

    • 1.子线程在主线程运行结束后,会继续执行完,如果给子线程设置为守护线程(setDaemon=True),主线程运行结束子线程即结束;

    • 2 .如果join()线程,那么主线程会等待子线程执行完再执行。

    import threading
    import time
    
    
    def get_thread_a():
        print("get thread A started")
        time.sleep(3)
        print("get thread A end")
    
    
    def get_thread_b():
        print("get thread B started")
        time.sleep(5)
        print("get thread B end")
    
    
    if  __name__ == "__main__":
        thread_a = threading.Thread(target=get_thread_a)
        thread_b = threading.Thread(target=get_thread_b)
        start_time = time.time()
        thread_b.setDaemon(True)
        thread_a.start()
        thread_b.start()
        thread_a.join()   
    
        end_time = time.time()
        print("execution time: {}".format(end_time - start_time))
    

    thread_a是join,首先子线程thread_a执行,thread_b是守护线程,当主线程执行完后,thread_b不会再执行

    执行结果如下:

    get thread A started
    get thread B started
    get thread A end
    execution time: 3.003199815750122
    

    2.线程同步

    多线程间共享全局变量,多个线程对该变量执行不同的操作时,该变量最终的结果可能是不确定的(每次线程执行后的结果不同),如:对count变量执行加减操作 ,count的值是不确定的,要想count的值是一个确定的需对线程执行的代码段加锁。

    3.线程同步的方式

    3.1 锁机制

    在这里插入图片描述
    python对线程加锁主要有Lock和Rlock模块

    Lock:

    from threading import Lock
     
    lock = Lock()
    lock.acquire()
    lock.release()
    

    Lock有acquire()和release()方法,这两个方法必须是成对出现的,acquire()后面必须release()后才能再acquire(),否则会造成死锁

    Rlock:

    鉴于Lock可能会造成死锁的情况,RLock(可重入锁)对Lock进行了改进,RLock可以在同一个线程里面连续调用多次acquire(),但必须再执行相同次数的release()

    from threading import RLock
    
    lock = RLock()
    lock.acquire()
    lock.acquire()
    lock.release()
    lock.release()
    

    当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。

    直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

    3.2 Semaphore(信号量)

    信号量也提供acquire方法和release方法,每当调用acquire方法的时候,如果内部计数器大于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,直到有线程调用了release方法将内部计数器更新到大于1位置。

    Semaphore(信号量)是计算机科学史上最古老的同步指令之一。Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。

    基于这个特点,Semaphore经常用来同步一些有“访客上限”的对象,比如连接池。

    BoundedSemaphore 与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

    构造方法:
    Semaphore(value=1): value是计数器的初始值。

    import time
    import threading
    
    
    def get_thread_a(semaphore,i):
        time.sleep(1)
        print("get thread : {}".format(i))
        semaphore.release()
    
    
    def get_thread_b(semaphore):
        for i in range(10):
            semaphore.acquire()
            thread_a = threading.Thread(target=get_thread_a, args=(semaphore,i))
            thread_a.start()
    
    
    if __name__ == "__main__":
        semaphore = threading.Semaphore(2)
        thread_b = threading.Thread(target=get_thread_b, args=(semaphore,))
        thread_b.start()
    

    3.3 条件判断

    所谓条件变量,即这种机制是在满足了特定的条件后,线程才可以访问相关的数据。
      
    它使用Condition类来完成,由于它也可以像锁机制那样用,所以它也有acquire方法和release方法,而且它还有wait,notify,notifyAll方法。
    在这里插入图片描述

    """
    一个简单的生产消费者模型,通过条件变量的控制产品数量的增减,调用一次生产者产品就是+1,调用一次消费者产品就会-1.
    """
    
    """
    使用 Condition 类来完成,由于它也可以像锁机制那样用,所以它也有 acquire 方法和 release 方法,而且它还有
    wait, notify, notifyAll 方法。
    """
    
    import threading
    import queue,time,random
    
    class Goods:#产品类
        def __init__(self):
            self.count = 0
        def add(self,num = 1):
            self.count += num
        def sub(self):
            if self.count>=0:
                self.count -= 1
        def empty(self):
            return self.count <= 0
    
    class Producer(threading.Thread):#生产者类
        def __init__(self,condition,goods,sleeptime = 1):#sleeptime=1
            threading.Thread.__init__(self)
            self.cond = condition
            self.goods = goods
            self.sleeptime = sleeptime
        def run(self):
            cond = self.cond
            goods = self.goods
            while True:
                cond.acquire()#锁住资源
                goods.add()
                print("产品数量:",goods.count,"生产者线程")
                cond.notifyAll()#唤醒所有等待的线程--》其实就是唤醒消费者进程
                cond.release()#解锁资源
                time.sleep(self.sleeptime)
    
    class Consumer(threading.Thread):#消费者类
        def __init__(self,condition,goods,sleeptime = 2):#sleeptime=2
            threading.Thread.__init__(self)
            self.cond = condition
            self.goods = goods
            self.sleeptime = sleeptime
        def run(self):
            cond = self.cond
            goods = self.goods
            while True:
                time.sleep(self.sleeptime)
                cond.acquire()#锁住资源
                while goods.empty():#如无产品则让线程等待
                    cond.wait()
                goods.sub()
                print("产品数量:",goods.count,"消费者线程")
                cond.release()#解锁资源
    
    g = Goods()
    c = threading.Condition()
    
    pro = Producer(c,g)
    pro.start()
    
    con = Consumer(c,g)
    con.start()
    

    Condition内部有一把锁,默认是RLock,在调用wait()和notify()之前必须先调用acquire()获取这个锁,才能继续执行;当wait()和notify()执行完后,需调用release()释放这个锁,在执行with condition时,会先执行acquire(),with结束时,执行了release();所以condition有两层锁,最底层锁在调用wait()时会释放,同时会加一把锁到等待队列,等待notify()唤醒释放锁

    wait() :允许等待某个条件变量的通知,notify()可唤醒

    notify(): 唤醒等待队列wait()

    # encoding: UTF-8
    import threading
    import time
     
    # 商品
    product = None
    # 条件变量
    con = threading.Condition()
     
    # 生产者方法
    def produce():
        global product
        
        if con.acquire():
            while True:
                if product is None:
                    print 'produce...'
                    product = 'anything'
                    
                    # 通知消费者,商品已经生产
                    con.notify()
                
                # 等待通知
                con.wait()
                time.sleep(2)
     
    # 消费者方法
    def consume():
        global product
        
        if con.acquire():
            while True:
                if product is not None:
                    print 'consume...'
                    product = None
                    
                    # 通知生产者,商品已经没了
                    con.notify()
                
                # 等待通知
                con.wait()
                time.sleep(2)
     
    t1 = threading.Thread(target=produce)
    t2 = threading.Thread(target=consume)
    t2.start()
    t1.start()
    

    3.4 同步队列

    put方法和task_done方法,queue有一个未完成任务数量num,put依次num+1,task依次num-1.任务都完成时任务结束。
      在这里插入图片描述

    import threading
    import queue
    import time
    import random
    
    '''
    1.创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。
    2.将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。
    3.每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。
    4.在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。
    5.对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。
    '''
    
    class jdThread(threading.Thread):
        def __init__(self,index,queue):
            threading.Thread.__init__(self)
            self.index = index
            self.queue = queue
    
        def run(self):
            while True:
                time.sleep(1)
                item = self.queue.get()
                if item is None:
                    break
                print("序号:",self.index,"任务",item,"完成")
                self.queue.task_done()#task_done方法使得未完成的任务数量-1
    
    q = queue.Queue(0)
    '''
    初始化函数接受一个数字来作为该队列的容量,如果传递的是
    一个小于等于0的数,那么默认会认为该队列的容量是无限的.
    '''
    for i in range(2):
        jdThread(i,q).start()#两个线程同时完成任务
    
    for i in range(10):
        q.put(i)#put方法使得未完成的任务数量+1
    

    3.5 Event对象

    Event对象是一种简单的线程同步通信技术,一个线程设置Event对象,另一个线程等待Event对象。
    在这里插入图片描述

    import threading
    
    
    # 自定义线程类
    class MyThread(threading.Thread):
        def __init__(self, thread_name):
            threading.Thread.__init__(self, name=thread_name)
    
        # 重写线程代码
        def run(self):
            global my_event
            if my_event.isSet():
                my_event.clear()
                # 等待通知
                my_event.wait()
                print(self.getName())
            else:
                print(self.getName())
                my_event.set()
    
    
    # 创建锁
    my_event = threading.Event()
    my_event.set()
    t1 = []
    
    for i in range(10):
        t = MyThread(str(i))
        t1.append(t)
    
    for t in t1:
        t.start()
    
    
    展开全文
  • linux中实现线程同步的6种方法

    万次阅读 多人点赞 2020-10-22 16:37:21
    linux线程同步的方法 下面是一个线程不安全的例子: #include<stdio.h> #include<pthread.h> int ticket_num=10000000; void *sell_ticket(void *arg) { while(ticket_num>0) { ticket_num--; }...

    linux线程同步的方法

    下面是一个线程不安全的例子:

    #include<stdio.h>
    #include<pthread.h>
    
    int ticket_num=10000000;
    
    void *sell_ticket(void *arg) {
        while(ticket_num>0) {
    	ticket_num--;
        }
    }
    
    int main() {
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &sell_ticket, NULL);
        pthread_create(&t2, NULL, &sell_ticket, NULL);
        pthread_create(&t3, NULL, &sell_ticket, NULL);
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
        printf("ticket_num=%d\n", ticket_num);
        return 0;
    }
    

    运行结果如下:

    # gcc no_lock_demo.c -o no_lock_demo.out -pthread
    # ./no_lock_demo.out 
    ticket_num=-2
    

    最后运行的结果不是固定的,有可能是0、-1,如果有这个ticket_num变量代表是库存的话,那么就会出现库存为负数的情况,所以需要引入线程同步来保证线程安全。

    Linux下提供了多种方式来处理线程同步,最常用的是互斥锁、自旋锁、信号量。

    互斥锁

    互斥锁本质就是一个特殊的全局变量,拥有lock和unlock两种状态,unlock的互斥锁可以由某个线程获得,当互斥锁由某个线程持有后,这个互斥锁会锁上变成lock状态,此后只有该线程有权力打开该锁,其他想要获得该互斥锁的线程都会阻塞,直到互斥锁被解锁。

    互斥锁的类型:

    • 普通锁(PTHREAD_MUTEX_NORMAL):互斥锁默认类型。当一个线程对一个普通锁加锁以后,其余请求该锁的线程将形成一个 等待队列,并在该锁解锁后按照优先级获得它,这种锁类型保证了资源分配的公平性。一个 线程如果对一个已经加锁的普通锁再次加锁,将引发死锁;对一个已经被其他线程加锁的普 通锁解锁,或者对一个已经解锁的普通锁再次解锁,将导致不可预期的后果。

    • 检错锁(PTHREAD_MUTEX_ERRORCHECK):一个线程如果对一个已经加锁的检错锁再次加锁,则加锁操作返回EDEADLK;对一个已 经被其他线程加锁的检错锁解锁或者对一个已经解锁的检错锁再次解锁,则解锁操作返回 EPERM。

    • 嵌套锁(PTHREAD_MUTEX_RECURSIVE):该锁允许一个线程在释放锁之前多次对它加锁而不发生死锁;其他线程要获得这个锁,则当前锁的拥有者必须执行多次解锁操作;对一个已经被其他线程加锁的嵌套锁解锁,或者对一个已经解锁的嵌套锁再次解锁,则解锁操作返回EPERM。

    • 默认锁(PTHREAD_MUTEX_ DEFAULT):一个线程如果对一个已经加锁的默认锁再次加锁,或者虽一个已经被其他线程加锁的默 认锁解锁,或者对一个解锁的默认锁解锁,将导致不可预期的后果;这种锁实现的时候可能 被映射成上述三种锁之一。

    相关方法:

    
    // 静态方式创建互斥锁
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
    
    // 动态方式创建互斥锁,其中参数mutexattr用于指定互斥锁的类型,具体类型见上面四种,如果为NULL,就是普通锁。
    int pthread_mutex_init (pthread_mutex_t* mutex,const pthread_mutexattr_t* mutexattr);
    
    int pthread_mutex_lock(pthread_mutex_t *mutex); // 加锁,阻塞
    int pthread_mutex_trylock(pthread_mutex_t *mutex); // 尝试加锁,非阻塞
    int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁
    

    例子:

    #include<stdio.h>
    #include<pthread.h>
    
    int ticket_num=10000000;
    
    pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
    
    void *sell_ticket(void *arg) {
        while(ticket_num>0) {
    	pthread_mutex_lock(&mutex);
    	if(ticket_num>0) {
    	    ticket_num--;
    	}
    	pthread_mutex_unlock(&mutex);
        }
    }
    
    int main() {
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &sell_ticket, NULL);
        pthread_create(&t2, NULL, &sell_ticket, NULL);
        pthread_create(&t3, NULL, &sell_ticket, NULL);
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
        printf("ticket_num=%d\n", ticket_num);
        return 0;
    }
    

    自旋锁

    自旋锁顾名思义就是一个死循环,不停的轮询,当一个线程未获得自旋锁时,不会像互斥锁一样进入阻塞休眠状态,而是不停的轮询获取锁,如果自旋锁能够很快被释放,那么性能就会很高,如果自旋锁长时间不能够被释放,甚至里面还有大量的IO阻塞,就会导致其他获取锁的线程一直空轮询,导致CPU使用率达到100%,特别CPU时间。

    相关方法:

    int pthread_spin_init(pthread_spinlock_t *lock, int pshared); // 创建自旋锁
    
    int pthread_spin_lock(pthread_spinlock_t *lock); // 加锁,阻塞
    int pthread_spin_trylock(pthread_spinlock_t *lock); // 尝试加锁,非阻塞
    int pthread_spin_unlock(pthread_spinlock_t *lock); // 解锁
    

    例子:

    #include<stdio.h>
    #include<pthread.h>
    
    int ticket_num=10000000;
    
    pthread_spinlock_t spinlock;
    
    void *sell_ticket(void *arg) {
        while(ticket_num>0) {
    	pthread_spin_lock(&spinlock);
    	if(ticket_num>0) {
    	    ticket_num--;
    	}
    	pthread_spin_unlock(&spinlock);
        }
    }
    
    int main() {
        pthread_spin_init(&spinlock, 0);
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &sell_ticket, NULL);
        pthread_create(&t2, NULL, &sell_ticket, NULL);
        pthread_create(&t3, NULL, &sell_ticket, NULL);
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
        printf("ticket_num=%d\n", ticket_num);
        return 0;
    }
    

    信号量

    信号量是一个计数器,用于控制访问有限共享资源的线程数。

    相关方法:

    // 创建信号量
    // pshared:一般取0,表示调用进程的信号量。非0表示该信号量可以共享内存的方式,为多个进程所共享(Linux暂不支持)。
    // value:信号量的初始值,可以并发访问的线程数。
    int sem_init (sem_t* sem, int pshared, unsigned int value);
    
    int sem_wait (sem_t* sem); // 信号量减1,信号量为0时就会阻塞
    
    int sem_trywait (sem_t* sem); // 信号量减1,信号量为0时返回-1,不阻塞
    
    int sem_timedwait (sem_t* sem, const struct timespec* abs_timeout); // 信号量减1,信号量为0时阻塞,直到abs_timeout超时返回-1
    
    int sem_post (sem_t* sem); // 信号量加1
    

    例子:

    #include<stdio.h>
    #include<pthread.h>
    #include <semaphore.h>
    
    int ticket_num=10000000;
    
    sem_t sem;
    
    void *sell_ticket(void *arg) {
        while(ticket_num>0) {
    	sem_wait(&sem);
    	if(ticket_num>0) {
    	    ticket_num--;
    	}
    	sem_post(&sem);
        }
    }
    
    int main() {
        sem_init(&sem, 0, 1); // value=1表示最多1个线程同时访问共享资源,与互斥量等价
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &sell_ticket, NULL);
        pthread_create(&t2, NULL, &sell_ticket, NULL);
        pthread_create(&t3, NULL, &sell_ticket, NULL);
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
        printf("ticket_num=%d\n", ticket_num);
        return 0;
    }
    

    条件变量

    条件变量可以让调用线程在满足特定条件的情况下运行,不满足条件时阻塞等待被唤醒,必须与互斥锁搭配使用。

    条件变量常用于生产者与消费者模型。

    相关方法:

    pthread_cond_t cond=PTHREAD_COND_INITIALIZER; // 创建条件变量,一个互斥锁可以对应多个条件变量
    
    int pthread_cond_wait (pthread_cond_t* cond,pthread_mutex_t* mutex); // 阻塞等待条件满足,同时释放互斥锁mutex
    
    int pthread_cond_timedwait (pthread_cond_t* cond,
        pthread_mutex_t* mutex,
        const struct timespec* abstime); // 带超时的阻塞等待条件满足,同时释放互斥锁mutex
    
    // 从条件变量cond中唤出一个线程,令其重新获得原先的互斥锁
    // 被唤出的线程此刻将从pthread_cond_wait函数中返回,但如果该线程无法获得原先的锁,则会继续阻塞在加锁上。
    int pthread_cond_signal (pthread_cond_t* cond);
    
    // 从条件变量cond中唤出所有线程
    int pthread_cond_broadcast (pthread_cond_t* cond);
    

    例子:

    #include<stdio.h>
    #include<pthread.h>
    
    int max_buffer=10;
    int count=0;
    
    pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t notempty=PTHREAD_COND_INITIALIZER;
    pthread_cond_t notfull=PTHREAD_COND_INITIALIZER;
    
    void *produce(void *args) {
        while(1) {
            pthread_mutex_lock(&mutex);
            while(count == max_buffer) {
                printf("buffer is full, wait...\n");
                pthread_cond_wait(&notfull, &mutex);
            }
            printf("produce ...\n");
            count++;
            sleep(1);
            pthread_cond_signal(&notempty);
            pthread_mutex_unlock(&mutex);
        }
    
    }
    
    void *consumer(void *args) {
        while(1) {
            pthread_mutex_lock(&mutex);
            while(count == 0) {
                printf("buffer is empty, wait...\n");
                pthread_cond_wait(&notempty, &mutex);
            }
            printf("consumer ...\n");
            count--;
            sleep(1);
            pthread_cond_signal(&notfull);
            pthread_mutex_unlock(&mutex);
        }
    
    }
    
    int main() {
        pthread_t t1,t2,t3,t4;
        pthread_create(&t1, NULL, &produce, NULL);
        pthread_create(&t2, NULL, &produce, NULL);
    
        pthread_create(&t3, NULL, &consumer, NULL);
        pthread_create(&t4, NULL, &consumer, NULL);
    
        pthread_join(t1, NULL);
        return 0;
    }
    

    读写锁

    读写锁可以有三种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。读写锁也叫做共享-独占锁,当读写锁以读模式锁住时,它是以共享模式锁住的,当它以写模式锁住时,它是以独占模式锁住的,读读共享,读写互斥。

    相关方法:

    // 创建读写锁
    pthread_rwlock_t rwlock=PTHREAD_RWLOCK_INITIALIZER;
    
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 加读锁,阻塞
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 加写锁,阻塞
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 释放读锁或者写锁
    
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); // 尝试加读锁,非阻塞
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); // 尝试加写锁,非阻塞
    

    例子:

    #include <stdio.h>
    #include <pthread.h>
    
    pthread_rwlock_t rwlock=PTHREAD_RWLOCK_INITIALIZER;
    
    void *read(void *arg) {
        while(1) {
            pthread_rwlock_rdlock(&rwlock);
            rintf("read message.\n");
            sleep(1);
            pthread_rwlock_unlock(&rwlock);
            sleep(1);
        }
    }
    void *write(void *arg) {
        while(1) {
            pthread_rwlock_wrlock(&rwlock);
            printf("write message.\n");
            sleep(1);
            pthread_rwlock_unlock(&rwlock);
            sleep(1);
        }
    }
    
    int main(int argc,char *argv[]) {
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &read, NULL);
        pthread_create(&t2, NULL, &read, NULL);
    
        pthread_create(&t3, NULL, &write, NULL);
    
        pthread_join(t1, NULL);
        return 0;
    }
    

    屏障

    屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后所有线程都从该点继续执行。pthread_join函数就是一种屏障,允许一个线程等待,直到另一个线程退出。但屏障对象的概念更广,允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出,当所有的线程达到屏障后可以接着工作。

    相关方法:

    // 创建屏障
    int pthread_barrier_init(pthread_barrier_t *barrier,const pthread_barrrierattr_t *attr,unsigned int count)
    
    // 阻塞等待,直到所有线程都到达
    int pthread_barrier_wait(pthread_barrier_t *barrier)
    

    例子:

    #include <stdio.h>
    #include <pthread.h>
    
    pthread_barrier_t barrier;
    
    void *go(void *arg){
        sleep (rand () % 10);
        printf("%lu is arrived.\n", pthread_self());
        pthread_barrier_wait(&barrier);
        printf("%lu go shopping...\n", pthread_self());
    }
    
    int main() {
        pthread_barrier_init(&barrier, NULL, 3);
    
        pthread_t t1,t2,t3;
        pthread_create(&t1, NULL, &go, NULL);
        pthread_create(&t2, NULL, &go, NULL);
        pthread_create(&t3, NULL, &go, NULL);
    
        pthread_join(t1, NULL);
        return 0;
    }
    
    展开全文
  • Python中线程同步与线程锁

    千次阅读 2019-06-09 21:12:02
    文章目录Python中线程同步与线程锁线程同步threading.Event对象threading.Timer定时器,延迟执行threading.Lock锁可重入锁RLockCondition条件锁,等待通知therading.Semaphore信号量threading.BoundedSemaphore有界...

    线程同步与线程锁

    线程同步

    概念
    * 线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完 成对数据的操作。

    1.threading.Event对象

    • Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化 来进行操作
    名称含义
    event.set()标记设置为True
    event.clear()标记设置为False
    event.is_set()标记是否为True
    event.wait(timeout=None)设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False
    • 老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子
    import threading
    import time
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker(event:threading.Event,count = 10):
        logging.info("我是worker工作线程")
        cups = []
        while True:
            logging.info("制作了一个 cup")
            time.sleep(0.2)
            cups.append(1)
            if len(cups)>=count:
                event.set()
                break
        logging.info("制作完成:{}".format(cups))
    
    def boss(event:threading.Event):
        logging.info("我是boss")
        event.wait()
        logging.info("Good Job")
    
    event = threading.Event()
    b = threading.Thread(target=boss,args=(event,))
    w = threading.Thread(target=worker,args=(event,))
    b.start()
    w.start()
    

    threading2_001

    • 使用同一个Event对象的标记flag。

    • 谁wait就是等到flag变为True,或等到超时返回False。不限制等待的个数。

    • wait的使用

    from threading import Thread,Event
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker(event:Event,interval:int):
        while not event.wait(interval):
            logging.info("没有等到。。")
    
    e = Event()
    Thread(target=worker,args=(e,1)).start()
    
    e.wait(5)
    e.set()
    
    print("======end========")
    

    threading2_002

    2.threading.Timer定时器,延迟执行

    方法含义
    Timer.cancel()取消定时器,(定时器为执行函数时可以取消,在函数执行中无法取消)
    Time.start()启动定时器
    • threading.Timer继承自Thread,这个类用来定义延迟多久后执行一个函数。
    • class threading.Timer(interval, function, args=None, kwargs=None)
      1. interval #多少时间后执行function函数
      2. function #需要执行的函数
    • start方法执行之后,Timer对象会处于等待状态,等待了interval秒之后,开始执行function函数的。
    • Timer是线程Thread的子类,Timer实例内部提供了一个finished属性,该属性是Event对象。
    • cancel方法,本质上 是在worker函数执行前对finished属性set方法操作,从而跳过了worker函数执行,达到了取消的效果。
    from threading import Timer
    import logging
    import time
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker():
        logging.info("in worker")
        time.sleep(5)
        logging.info("end in worker")
    
    t = Timer(2,worker)
    t.setName("timer1") #设置线程名称
    # t.cancel() #取消定时器后,定时器不在执行
    t.start()
    # t.cancel() #取消定时器后,定时器不在执行
    time.sleep(4) #等待4秒后,定时器已经开始执行
    t.cancel() #当定时器执行后,无法取消
    
    print("======end========")
    

    threading2_003

    3.threading.Lock锁

    锁(Lock):一旦线程获得锁,其他试图获取锁的线程将被阻塞等待。
    锁:凡是存在共享支援争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。

    名称含义
    Lock.acquire(blocking=True,timeout=-1)获取锁,获取成功返回True,否则返回False
    当获取不到锁时,默认进入阻塞状态,直到获取到锁,后才继续。阻塞可以设置超时时间。非阻塞时,timeout禁止设置。如果超时依旧未获取到锁,返回False。
    Lock.rease()释放锁,可以从任何线程调用释放。
    已上锁的锁,会被设置为unlocked。如果未上锁调用,会抛出RuntimeError异常。
    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def worker(lock):
        print("worker start",threading.get_ident(),threading.current_thread().name)
        lock.acquire()
        print("worker over",threading.get_ident(),threading.current_thread().name)
    
    lock = threading.Lock()
    lock.acquire()
    print(" -"*30)
    for i in range(5):
        threading.Thread(target=worker,args=(lock,),name="w{}".format(i)).start()
    
    print("- "* 30)
    while True:
        time.sleep(0.1)
        cmd = input(">>").strip()
        if cmd == "r":
            lock.release()
        elif cmd == "q":
            break
        else:
            print(threading.enumerate())
    

    threading2_004
    上例可以看出不管在哪一个线程中,只要对一个已经上锁的锁阻塞请求,该线程就会阻塞。

    • 加锁,解锁
      一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁是无 法释放,但是当前线程可能因为这个异常被终止了,这也产生了死锁

    • 加锁、解锁常用语句:

      1. 使用try…finally语句保证锁的释放
      2. with上下文管理,锁对象支持上下文管理
    • 计数器类,可加,可减。

    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    class Counter:
        def __init__(self):
            self._val = 0
            self.lock = threading.Lock()
    
        @property
        def value(self):
            with self.lock:
                return self._val
    
        def inc(self):
            with self.lock:
                self._val += 1
    
        def dec(self):
            with self.lock:
                self._val -= 1
    
    def run(c:Counter,count=100):
        for _ in range(count):
            for i in range(-50,50):
                if i <0:
                    c.dec()
                else:
                    c.inc()
    
    c = Counter()
    c1 = 10 #线程数
    c2 = 1000
    for i in range(c1):
        threading.Thread(target=run,args=(c,c2)).start()
    
    for k in threading.enumerate():
        if k.name != "MainThread":
            k.join()
    
    print(c.value)
    
    • 锁的应用场景
      锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

    • 使用锁的注意事项:

      1. 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
        • 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道 上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须 加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
      2. 加锁时间越短越好,不需要就立即释放锁
      3. 一定要避免死锁
    • 非阻塞锁的使用

    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def worker(lock:threading.Lock):
        while True:
            if lock.acquire(False):
                print("do something.")
                time.sleep(1)
                lock.release()
                break
            else:
                print("try again")
                time.sleep(1)
    
    lock = threading.Lock()
    for i in range(5):
        threading.Thread(target=worker,name="w{}".format(i),args=(lock,)).start()
    

    threading2_005

    4.可重入锁RLock

    • 可重入锁,是线程相关的锁
    • 线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release
    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def fib(num,rlock:threading.RLock):
        with rlock:
            if num<3:
                return 1
            return fib(num-1,rlock)+fib(num-2,rlock)
    
    def work(num,rlock):
        print(fib(num,rlock))
    
    rlock = threading.RLock()
    for i in range(1,10):
        threading.Thread(target=work,args=(i,rlock)).start()
    

    可重入锁
    * 与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞多次获取锁
    * 当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁
    * 锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次

    5.Condition条件锁,等待通知

    构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock。

    名称含义
    Condition.acquire(self,*args)获取锁
    Condition.wait(self,timeout=None)等待通知,timeout设置超时时间
    Condition.notify(self,n=1)唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
    Condition.notify_all(self)唤醒所有等待的线程
    • 每个线程都可以通过Condition获取已把属于自己的锁,在锁中可以等待其他进程的同级锁的通知。当获取到同级锁的通知后,会停止等待。

    • 当使用Condition(lock=Lock())初始化锁时,锁只能一级等待,不能出现多级等待。

    • 简单示例:

    import threading
    import time
    
    def work(cond:threading.Condition):
        with cond:
            print("开始等待")
            cond.wait()
            print("等到了")
    
    cond = threading.Condition()
    # cond = threading.Condition(threading.Lock())
    threading.Thread(target=work,args=(cond,)).start()
    threading.Thread(target=work,args=(cond,)).start()
    
    with cond:
        with cond:
            time.sleep(1)
            print("开始释放二级等待")
            print(cond.notifyAll())
        time.sleep(2)
        print("开始释放一级等待")
        cond.notifyAll()
    

    threading2_006

    • 广播模式示例:
    from threading import Thread,Condition,Lock
    import time
    import logging
    import random
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    class Dispachter:
        def __init__(self):
            self.data = None
            self.cond = Condition(lock=Lock())
    
        #生成者
        def produce(self,total):
            for _ in range(total):
                data = random.randint(1,100)
                with self.cond:
                    logging.info("生产了一个数据:{}".format(data))
                    self.data = data
                    self.cond.notify(1)
                time.sleep(1) #模拟生成数据需要耗时1秒
    
        #消费者
        def consume(self):
            while True:
                with self.cond:
                    self.cond.wait() #等待
                    data = self.data
                    logging.info("消费了一个数据 {}".format(data))
                    self.data = None
    
    d = Dispachter()
    p = Thread(target=d.produce,name="producer",args=(10,))
    
    # 增加消费者
    for i in range(5):
        c = Thread(target=d.consume,name="consumer{}".format(i))
        c.start()
    
    p.start()
    

    threading2_007

    上面例子中演示了生产者生产一个数据,就通知一个消费者消费。

    • Condition总结
      Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。采用了通知机制,非常有效率。
    • 使用方式
      使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用 with上下文。
      消费者wait,等待通知。
      生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

    6.therading.Semaphore信号量

    和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求 的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

    名称含义
    Semaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    Semaphore.acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。如果_value的值为0会变成阻塞状态。获取成功返回True
    Semaphore.release(self)释放信号量,计数器加1。即_value的值加1
    Semaphore._value信号量,当前信号量
    • 注意
      1. 计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。
      2. 信号量没有做超界限制
    from threading import Semaphore
    
    s =Semaphore(3)
    print(s._value)
    s.release() #会增加信号量
    print(s._value) #可以看出没有做信号量上线控制
    print("----------------")
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire()) #当信号量为0再次acquire会被阻塞
    print("~~~~~~阻塞了吗?")
    print(s._value)
    

    threading2_008

    • 跨线程使用演示
    from threading import Thread,Semaphore
    import time
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    #定义获取信号量
    def worker(s:Semaphore):
        while s.acquire():
            logging.info("被执行了一次,获取一个信号量 _value={}".format(s._value))
    
    #释放信号量
    def cunn(s:Semaphore):
        while True:
            logging.info("释放一个信号量")
            s.release()
            time.sleep(1)
    
    s = Semaphore(3)
    #创建3个线程获取信号量
    for i in range(3):
        Thread(target=worker,args=(s,),name="w{}".format(i)).start()
    
    #开启一个线程释放信号量
    Thread(target=cunn,args=(s,)).start()
    

    threading2_009

    7.threading.BoundedSemaphore有界信号量

    • 有界信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常
    名称含义
    BoundedSemaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    BoundedSemaphore.acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。如果_value的值为0会变成阻塞状态。获取成功返回True
    BoundedSemaphore.release(self)释放信号量,计数器加1。即_value的值加1,超过初始化值会抛出异常ValueError。
    BoundedSemaphore._value信号量,当前信号量
    from threading import BoundedSemaphore
    
    bs = BoundedSemaphore(3)
    print(bs._value)
    bs.acquire()
    bs.acquire()
    bs.acquire()
    print(bs._value)
    bs.release()
    bs.release()
    bs.release()
    print(bs._value)
    bs.release()
    

    threading2_010

    • 应用举例
      连接池
      因为资源有限,且开启一个连接成本高,所以,使用连接池。
      一个简单的连接池
      连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。
    from threading import Thread,BoundedSemaphore
    import time
    import logging
    import random
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    #链接类
    class Conn:
        def __init__(self,name):
            self.name = name
    
    class Pool:
        def __init__(self,count:int):
            self.count = count
            #池中放着链接备用
            self.pool = [self._connect("conn-{}".format(i)) for i in range(count)]
            self.bsemaphore = BoundedSemaphore(count)
    
        #创建连接方法,返回一个连接对象
        def _connect(self,conn_name):
            return Conn(conn_name)
    
        #获取一个链接
        def get_conn(self):
            self.bsemaphore.acquire()
            self.pool.pop()
            logging.info("从连接池拿走了一个连接~~~~~~~")
    
        #归还一个连接
        def return_conn(self,conn:Conn):
            logging.info("归还了一个连接----------")
            self.pool.append(conn)
            self.bsemaphore.release()
    
    def worker(pool:Pool):
        conn = pool.get_conn()
        logging.info(conn)
        #模拟使用了一段时间
        time.sleep(random.randint(1,5))
        pool.return_conn(conn)
    
    pool = Pool(3)
    for i in range(6):
        Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()
    

    threading2_011
    上例中,使用信号量解决资源有限的问题。
    如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完 归还资源后信号量加1,等待线程就可以被唤醒拿走资源。
    注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。

    • 问题分析
    1. 边界问题
      • 假设一种极端情况,计数器还差1就归还满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时 候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。 因此信号量,可以保证,一定不能多归还。
    2. 正常使用分析
      • 正常使用信号量,都会先获取信号量,然后用完归还。
      • 创建很多线程,都去获取信号量,没有获得信号量的线程都阻塞。能归还的线程都是前面获取到信号量的线程,其 他没有获得线程都阻塞着。非阻塞的线程append后才release,这时候等待的线程被唤醒,才能pop,也就是没有 获取信号量就不能pop,这是安全的。
      • 经过上面的分析,信号量比计算列表长度好,线程安全。
    • 信号量和锁
      1. 信号量,可以多个线程访问共享资源,但这个共享资源数量有限。
      2. 锁,可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源。

    总结

    threading模块中的类

    常用方法含义
    Eventset(self)将标记设置为True
    clear(self)将标记设置为False
    is_set()判断当前标记是否为True,是True返回True,否则返回False
    相当于为返回当前标记
    wait(self,timeout=None)如果当前标记为True,立即返回True,如果当前标记为False,会产生一个阻塞,直到标记为True时返回True。timeout等待超时时间,默认为None表示无限等待。未等到超时了返回False
    Time定时器,延迟执行Timer(interval, function, args=None, kwargs=None) 实例化构造方法
    1. interval #多少时间后执行function函数
    2. function #需要执行的函数
    cancel(self)取消定时器
    (定时器为执行函数时可以取消,在函数执行中无法取消)
    start()启动定时器
    Lock锁acquire(self,blocking=True,timeout=-1)获取锁,获取成功返回True,否则返回False
    当获取不到锁时,默认进入阻塞状态,直到获取到锁,后才继续。
    阻塞可以设置超时时间。
    非阻塞时,timeout禁止设置。如果超时依旧未获取到锁,返回False。
    rease(self)释放锁,可以从任何线程调用释放。
    已上锁的锁,会被设置为unlocked。如果未上锁调用,会抛出RuntimeError异常。
    RLock可重入锁和Lock类似但是:
    线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release
    ConditionCondition(self,lock=None)构造方法,lock默认值为None表示使用RLock()锁,也可以自己传入为Lock()
    acquire(self,*args)获取锁
    rease(self)释放锁
    wait(self,timeout=None)等待通知,timeout设置超时时间。
    注意:必须获取锁后才能等待通知,notify或notify_all可以发通知
    notify(self,n=1)唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
    notify_all(self)唤醒所有等待的线程
    Semaphore信号量Semaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。
    如果_value的值为0会变成阻塞状态。获取成功返回True
    release(self)释放信号量,计数器加1。即_value的值加1
    `_value`属性信号量,当前信号量
    BoundedSemaphore有界信号量BoundedSemaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。
    如果_value的值为0会变成阻塞状态。获取成功返回True
    release(self)释放信号量,计数器加1。即_value的值加1,超过初始化值会抛出异常ValueError。
    _value信号量,当前信号量
    展开全文
  • 线程同步机制

    千次阅读 2019-02-14 17:52:46
    从广义上说,Java平台提供的线程同步机制包括锁、volatile关键字、final关键字、static关键字和一些相关的API,如Object.wait( )/.notify( )等   1、锁的概述和概念: a 线程安全问题的产生: 多个线程并发访问...
  • C++线程同步的四种方式(Windows)

    万次阅读 多人点赞 2017-07-03 23:20:56
    为什么要进行线程同步? 在程序中使用多线程时,一般很少有多个线程能在其生命期内进行完全独立的操作。更多的情况是一些线程进行某些处理操作,而其他的线程必须对其处理结果进行了解。正常情况下对这种处理结果的...
  • C++多线程并发(二)---线程同步之互斥锁

    万次阅读 多人点赞 2019-03-20 00:08:29
    一、何为线程同步 在前一篇文章《C++多线程并发编程(一)—线程管理》中解释多线程并发时说到两个比较重要的概念: 多线程并发:在同一时间段内交替处理多个操作,线程切换时间片是很短的(一般为毫秒级),一个...
  • 线程同步(7种同步方法)

    万次阅读 2018-09-20 18:27:10
    java允许多线程并发控制,当多线程同时操作一个可共享的资源变量时,将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程调用,从而保证了该变量的唯一性和准确性 ...
  • java线程同步的实现方式

    万次阅读 2019-03-08 01:47:21
    当多个线程同时操作一个可共享的资源时会出现线程安全问题,将会导致数据不一致,因此使用同步锁来防止该操作执行完之前不许被其他线程执行,从而保证了该变量的唯一性和准确性。下面总结一些java线程实现同步方式,...
  • Java面试--线程同步方法

    千次阅读 2018-08-12 11:37:17
    面试题:线程同步有几种方法(百度面试题) 面试题:线程安全解释一下(大疆面试题) 为什么要线程同步? 当使用多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的...
  • C 线程同步的四种方式(Windows)

    千次阅读 2020-02-21 18:08:31
    一、为什么要进行线程同步? 在程序中使用多线程时,一般很少有多个线程能在其生命期内进行完全独立的操作。更多的情况是一些线程进行某些处理操作,而其他的线程必须对其处理结果进行了解。正常情况下对这种处理...
  • 实现线程同步的几种方式总结

    万次阅读 多人点赞 2018-07-25 21:31:26
    在多线程线程的执行顺序是依靠哪个线程先获得到CUP的执行权谁就先执行,虽然说可以通过线程的优先权进行设置,但是他只是获取CUP执行权的概率高点,但是也不一定必须先执行。在这种情况下如何保证线程按照一定的...
  • Android-线程常用方法-线程同步

    千次阅读 2019-02-16 15:45:52
    线程常用方法: 1.start():线程调用该方法将启动线程从新建状态进入就绪,一旦轮到享用CPU资源时,就开始自己的生命周期 2.run():Thread类的run()方法与Runnable接口的run()方法的功能和作用相同,都用来定义线程对象...
  • C++多线程并发(三)---线程同步之条件变量

    千次阅读 多人点赞 2019-05-03 12:43:12
    在前一篇文章《C++多线程并发编程(二)—线程同步之互斥锁》中解释了线程同步的原理和实现,使用互斥锁解决数据竞争访问问题,算是线程同步的加锁原语,用于排他性的访问共享数据。我们在使用mutex时,一般都会期望...
  • 关于C语言中线程同步的方式

    千次阅读 2020-07-31 10:48:49
    C语言中线程同步的方式线程同步互斥锁读写锁条件变量信号量 线程同步 在多线程环境中,线程之间由于竞争共享资源(临界资源)容易引起数据不一致的问题。一般采用互斥锁(互斥信号量)解决,保证只有一个线程进入...
  • 第1关:并发编程的三个概念 第2关:使用synchronized关键字同步线程 第3关:使用线程锁(Lock)实现线程同步,使用sleep()函数解决了第三关线程随机导致的需要评测多次问题 第3关:使用线程锁(Lock)实现线程同步
  • 线程概念2 线程管理3 线程模型4 多线程的关系二、线程同步1 同步的原因和目的2 线程同步必知概念3 线程同步方式: 前言 我们都知道,进程是运转中的程序,是为了在CPU上实现多道编程而发明的一个概念。但是进程在一...
  • C#实现多线程同步并发操作,在线源码,供你下载学习
  • C#线程(二)线程同步

    千次阅读 2018-08-05 08:26:57
    当多个线程同时对一个资源进行操作的时候,便会引发问题,这个时候就需要线程同步,比较典型的就是多线程执行加减操作。 解决方式: 尽可能的重新设计代码逻辑,避免使用线程同步 若必须使用线程同步,就...
  • windows系统多线程同步机制原理总结

    千次阅读 2018-12-24 21:24:33
    windows系统多线程同步机制原理总结 同步问题是开发过程中遇到的重要问题之一。同步是要保证在并发执行的环境中各个控制流可以有序地执行,包括对于资源的共享或互斥访问,以及代码功能的逻辑顺序。 为了保证多线程...
  • java多线程同步5种方法

    万次阅读 多人点赞 2018-05-28 22:11:07
    二、为什么要线程同步因为当我们有多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的状态出现混乱,从而导致程序异常。举个例子,如果一个银行账户同时被两个线程操作...
  • 线程同步的四种方式

    千次阅读 2019-10-15 14:05:26
    一、为什么要进行线程同步? 多个线程同时访问同一个全局变量,如果都是读取操作,则不会出现问题。如果一个线程负责改变此变量的值,而其他线程负责同时读取变量内容,则不能保证读取到的数据是经过写线程修改后的...
  • C#多线程——线程同步

    万次阅读 2018-08-25 13:11:53
    一、为什么要线程同步? 多个线程同时使用共享对象会造成很多问题,同步这些线程使得对共享对象的操作能够以正确的顺序执行是非常重要的。 二、实现线程同步的方法: • 使用Mutex类 • 使用SemaphoreSlim类 • ...
  • 什么是线程同步

    千次阅读 2019-02-22 11:18:59
    线程有自己的私有数据,比如栈和寄存器,同时与其它线程共享相同的虚拟内存和全局变量等资源,当多个线程同时读写同一份共享资源的时候,会引起冲突,这时候就需要引入线程同步机制使各个线程排队一个一个的对共享...
  • 线程同步

    千次阅读 2012-11-23 10:23:42
    什么是线程同步? 当使用多个线程来访问同一个数据时,非常容易出现线程安全问题(比如多个线程都在操作同一数据导致数据不一致),所以我们用同步机制来解决这些问题。 实现同步机制有两个方法: 1。同步代码块:...
  • 多线程:解释线程同步的必要性

    千次阅读 2020-08-25 17:12:44
    为什么需要线程同步? 当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。 这个时候,有个单线程模型下不...
  • 背景问题:在特定的应用场景下,多线程不进行同步会造成什么问题?通过多线程模拟多窗口售票为例:#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #...
  • 线程同步与异步

    千次阅读 2018-08-29 11:50:32
    线程同步与异步 线程 同步 (synchronized) 异步 (asynchronized) 特点 A线程要请求某个资源,但是此资源正在被B线程使用中,因为同步机制存在,A线程请求不到,怎么办,A线程只能等待下去 A线程要请求...
  • 前言--前言是为了帮助大家能够更好的理解线程通信和线程同步,了解Java内存模型的抽象。 前言部分引用文章地址:...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 877,487
精华内容 350,994
关键字:

线程同步