精华内容
下载资源
问答
  • 高并发:线程、线程锁与线程池(精华)

    千次阅读 多人点赞 2019-04-18 19:44:24
    单线程——多线程的开启——线程锁——线程同步工具——手写连接池——连接池工具类。 一、线程 1.线程的概念 2.线程与进程的关系 3.定义: 区别:如上!!! 4.wait()和sleep() 5.线程的状态及其他...

    前文:

              单线程——多线程的开启——线程锁——线程同步工具——手写连接池——连接池工具类。

    一、线程

    1.线程的概念

    2.线程与进程的关系

    3.定义:

    区别:如上!!!

    4.wait()和sleep()

    5.线程的状态及其他API

     

    二、线程锁

    线程并发同步时,引入了锁机制。

    1. 普通锁机制:synchronized  修饰代码块与volatile  修饰成员变量

    2.Lock

    !!共同点:都是从外面创建锁类、再把锁传到线程里对变量对象赋值。

    (1)重入锁

    (2)读写分离锁

    区别:

     

    二、线程同步工具类

    !!共同点:都是从外面创建工具类、再把工具类的参数传到线程里面执行。

    1.CountDowmLatch闭锁:等待所有线程执行完

    2.CyclicBarrier栅栏:等待所有线程达到后开启

    3.Exchanger交换机:交流数据

    4.信号量

    (1)概念

    (2)应用场景一

    (3)应用场景二

     

    三、线程池

    1.为什么使用线程池

    2.线程池的核心队列

    阻塞式队列:只用于线程对象,主要用于引出线程池

    3.手动创建线程池

    4.Executors工具创建线程

    核心线程:0(临时线程)、1(队列)、N(队列)

    定时线程:

     

     

    四、彩蛋图

     

    展开全文
  • Python中线程同步与线程锁

    千次阅读 2019-06-09 21:12:02
    文章目录Python中线程同步与线程锁线程同步threading.Event对象threading.Timer定时器,延迟执行threading.Lock锁可重入锁RLockCondition条件锁,等待通知therading.Semaphore信号量threading.BoundedSemaphore有界...

    线程同步与线程锁

    线程同步

    概念
    * 线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完 成对数据的操作。

    1.threading.Event对象

    • Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化 来进行操作
    名称含义
    event.set()标记设置为True
    event.clear()标记设置为False
    event.is_set()标记是否为True
    event.wait(timeout=None)设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False
    • 老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子
    import threading
    import time
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker(event:threading.Event,count = 10):
        logging.info("我是worker工作线程")
        cups = []
        while True:
            logging.info("制作了一个 cup")
            time.sleep(0.2)
            cups.append(1)
            if len(cups)>=count:
                event.set()
                break
        logging.info("制作完成:{}".format(cups))
    
    def boss(event:threading.Event):
        logging.info("我是boss")
        event.wait()
        logging.info("Good Job")
    
    event = threading.Event()
    b = threading.Thread(target=boss,args=(event,))
    w = threading.Thread(target=worker,args=(event,))
    b.start()
    w.start()
    

    threading2_001

    • 使用同一个Event对象的标记flag。

    • 谁wait就是等到flag变为True,或等到超时返回False。不限制等待的个数。

    • wait的使用

    from threading import Thread,Event
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker(event:Event,interval:int):
        while not event.wait(interval):
            logging.info("没有等到。。")
    
    e = Event()
    Thread(target=worker,args=(e,1)).start()
    
    e.wait(5)
    e.set()
    
    print("======end========")
    

    threading2_002

    2.threading.Timer定时器,延迟执行

    方法含义
    Timer.cancel()取消定时器,(定时器为执行函数时可以取消,在函数执行中无法取消)
    Time.start()启动定时器
    • threading.Timer继承自Thread,这个类用来定义延迟多久后执行一个函数。
    • class threading.Timer(interval, function, args=None, kwargs=None)
      1. interval #多少时间后执行function函数
      2. function #需要执行的函数
    • start方法执行之后,Timer对象会处于等待状态,等待了interval秒之后,开始执行function函数的。
    • Timer是线程Thread的子类,Timer实例内部提供了一个finished属性,该属性是Event对象。
    • cancel方法,本质上 是在worker函数执行前对finished属性set方法操作,从而跳过了worker函数执行,达到了取消的效果。
    from threading import Timer
    import logging
    import time
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    def worker():
        logging.info("in worker")
        time.sleep(5)
        logging.info("end in worker")
    
    t = Timer(2,worker)
    t.setName("timer1") #设置线程名称
    # t.cancel() #取消定时器后,定时器不在执行
    t.start()
    # t.cancel() #取消定时器后,定时器不在执行
    time.sleep(4) #等待4秒后,定时器已经开始执行
    t.cancel() #当定时器执行后,无法取消
    
    print("======end========")
    

    threading2_003

    3.threading.Lock锁

    锁(Lock):一旦线程获得锁,其他试图获取锁的线程将被阻塞等待。
    锁:凡是存在共享支援争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。

    名称含义
    Lock.acquire(blocking=True,timeout=-1)获取锁,获取成功返回True,否则返回False
    当获取不到锁时,默认进入阻塞状态,直到获取到锁,后才继续。阻塞可以设置超时时间。非阻塞时,timeout禁止设置。如果超时依旧未获取到锁,返回False。
    Lock.rease()释放锁,可以从任何线程调用释放。
    已上锁的锁,会被设置为unlocked。如果未上锁调用,会抛出RuntimeError异常。
    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def worker(lock):
        print("worker start",threading.get_ident(),threading.current_thread().name)
        lock.acquire()
        print("worker over",threading.get_ident(),threading.current_thread().name)
    
    lock = threading.Lock()
    lock.acquire()
    print(" -"*30)
    for i in range(5):
        threading.Thread(target=worker,args=(lock,),name="w{}".format(i)).start()
    
    print("- "* 30)
    while True:
        time.sleep(0.1)
        cmd = input(">>").strip()
        if cmd == "r":
            lock.release()
        elif cmd == "q":
            break
        else:
            print(threading.enumerate())
    

    threading2_004
    上例可以看出不管在哪一个线程中,只要对一个已经上锁的锁阻塞请求,该线程就会阻塞。

    • 加锁,解锁
      一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁是无 法释放,但是当前线程可能因为这个异常被终止了,这也产生了死锁

    • 加锁、解锁常用语句:

      1. 使用try…finally语句保证锁的释放
      2. with上下文管理,锁对象支持上下文管理
    • 计数器类,可加,可减。

    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    class Counter:
        def __init__(self):
            self._val = 0
            self.lock = threading.Lock()
    
        @property
        def value(self):
            with self.lock:
                return self._val
    
        def inc(self):
            with self.lock:
                self._val += 1
    
        def dec(self):
            with self.lock:
                self._val -= 1
    
    def run(c:Counter,count=100):
        for _ in range(count):
            for i in range(-50,50):
                if i <0:
                    c.dec()
                else:
                    c.inc()
    
    c = Counter()
    c1 = 10 #线程数
    c2 = 1000
    for i in range(c1):
        threading.Thread(target=run,args=(c,c2)).start()
    
    for k in threading.enumerate():
        if k.name != "MainThread":
            k.join()
    
    print(c.value)
    
    • 锁的应用场景
      锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

    • 使用锁的注意事项:

      1. 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
        • 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道 上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须 加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
      2. 加锁时间越短越好,不需要就立即释放锁
      3. 一定要避免死锁
    • 非阻塞锁的使用

    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def worker(lock:threading.Lock):
        while True:
            if lock.acquire(False):
                print("do something.")
                time.sleep(1)
                lock.release()
                break
            else:
                print("try again")
                time.sleep(1)
    
    lock = threading.Lock()
    for i in range(5):
        threading.Thread(target=worker,name="w{}".format(i),args=(lock,)).start()
    

    threading2_005

    4.可重入锁RLock

    • 可重入锁,是线程相关的锁
    • 线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release
    import threading
    import sys
    import time
    
    def print(*args):
        sys.stdout.write(" ".join(map(str,args))+"\n")
    
    def fib(num,rlock:threading.RLock):
        with rlock:
            if num<3:
                return 1
            return fib(num-1,rlock)+fib(num-2,rlock)
    
    def work(num,rlock):
        print(fib(num,rlock))
    
    rlock = threading.RLock()
    for i in range(1,10):
        threading.Thread(target=work,args=(i,rlock)).start()
    

    可重入锁
    * 与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞多次获取锁
    * 当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁
    * 锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次

    5.Condition条件锁,等待通知

    构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认是RLock。

    名称含义
    Condition.acquire(self,*args)获取锁
    Condition.wait(self,timeout=None)等待通知,timeout设置超时时间
    Condition.notify(self,n=1)唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
    Condition.notify_all(self)唤醒所有等待的线程
    • 每个线程都可以通过Condition获取已把属于自己的锁,在锁中可以等待其他进程的同级锁的通知。当获取到同级锁的通知后,会停止等待。

    • 当使用Condition(lock=Lock())初始化锁时,锁只能一级等待,不能出现多级等待。

    • 简单示例:

    import threading
    import time
    
    def work(cond:threading.Condition):
        with cond:
            print("开始等待")
            cond.wait()
            print("等到了")
    
    cond = threading.Condition()
    # cond = threading.Condition(threading.Lock())
    threading.Thread(target=work,args=(cond,)).start()
    threading.Thread(target=work,args=(cond,)).start()
    
    with cond:
        with cond:
            time.sleep(1)
            print("开始释放二级等待")
            print(cond.notifyAll())
        time.sleep(2)
        print("开始释放一级等待")
        cond.notifyAll()
    

    threading2_006

    • 广播模式示例:
    from threading import Thread,Condition,Lock
    import time
    import logging
    import random
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    class Dispachter:
        def __init__(self):
            self.data = None
            self.cond = Condition(lock=Lock())
    
        #生成者
        def produce(self,total):
            for _ in range(total):
                data = random.randint(1,100)
                with self.cond:
                    logging.info("生产了一个数据:{}".format(data))
                    self.data = data
                    self.cond.notify(1)
                time.sleep(1) #模拟生成数据需要耗时1秒
    
        #消费者
        def consume(self):
            while True:
                with self.cond:
                    self.cond.wait() #等待
                    data = self.data
                    logging.info("消费了一个数据 {}".format(data))
                    self.data = None
    
    d = Dispachter()
    p = Thread(target=d.produce,name="producer",args=(10,))
    
    # 增加消费者
    for i in range(5):
        c = Thread(target=d.consume,name="consumer{}".format(i))
        c.start()
    
    p.start()
    

    threading2_007

    上面例子中演示了生产者生产一个数据,就通知一个消费者消费。

    • Condition总结
      Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。采用了通知机制,非常有效率。
    • 使用方式
      使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用 with上下文。
      消费者wait,等待通知。
      生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

    6.therading.Semaphore信号量

    和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求 的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

    名称含义
    Semaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    Semaphore.acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。如果_value的值为0会变成阻塞状态。获取成功返回True
    Semaphore.release(self)释放信号量,计数器加1。即_value的值加1
    Semaphore._value信号量,当前信号量
    • 注意
      1. 计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。
      2. 信号量没有做超界限制
    from threading import Semaphore
    
    s =Semaphore(3)
    print(s._value)
    s.release() #会增加信号量
    print(s._value) #可以看出没有做信号量上线控制
    print("----------------")
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire())
    print(s._value)
    print(s.acquire()) #当信号量为0再次acquire会被阻塞
    print("~~~~~~阻塞了吗?")
    print(s._value)
    

    threading2_008

    • 跨线程使用演示
    from threading import Thread,Semaphore
    import time
    import logging
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    #定义获取信号量
    def worker(s:Semaphore):
        while s.acquire():
            logging.info("被执行了一次,获取一个信号量 _value={}".format(s._value))
    
    #释放信号量
    def cunn(s:Semaphore):
        while True:
            logging.info("释放一个信号量")
            s.release()
            time.sleep(1)
    
    s = Semaphore(3)
    #创建3个线程获取信号量
    for i in range(3):
        Thread(target=worker,args=(s,),name="w{}".format(i)).start()
    
    #开启一个线程释放信号量
    Thread(target=cunn,args=(s,)).start()
    

    threading2_009

    7.threading.BoundedSemaphore有界信号量

    • 有界信号量,不允许使用release超出初始值的范围,否则,抛出ValueError异常
    名称含义
    BoundedSemaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    BoundedSemaphore.acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。如果_value的值为0会变成阻塞状态。获取成功返回True
    BoundedSemaphore.release(self)释放信号量,计数器加1。即_value的值加1,超过初始化值会抛出异常ValueError。
    BoundedSemaphore._value信号量,当前信号量
    from threading import BoundedSemaphore
    
    bs = BoundedSemaphore(3)
    print(bs._value)
    bs.acquire()
    bs.acquire()
    bs.acquire()
    print(bs._value)
    bs.release()
    bs.release()
    bs.release()
    print(bs._value)
    bs.release()
    

    threading2_010

    • 应用举例
      连接池
      因为资源有限,且开启一个连接成本高,所以,使用连接池。
      一个简单的连接池
      连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。
    from threading import Thread,BoundedSemaphore
    import time
    import logging
    import random
    
    logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)s %(message)s",level=logging.INFO)
    
    #链接类
    class Conn:
        def __init__(self,name):
            self.name = name
    
    class Pool:
        def __init__(self,count:int):
            self.count = count
            #池中放着链接备用
            self.pool = [self._connect("conn-{}".format(i)) for i in range(count)]
            self.bsemaphore = BoundedSemaphore(count)
    
        #创建连接方法,返回一个连接对象
        def _connect(self,conn_name):
            return Conn(conn_name)
    
        #获取一个链接
        def get_conn(self):
            self.bsemaphore.acquire()
            self.pool.pop()
            logging.info("从连接池拿走了一个连接~~~~~~~")
    
        #归还一个连接
        def return_conn(self,conn:Conn):
            logging.info("归还了一个连接----------")
            self.pool.append(conn)
            self.bsemaphore.release()
    
    def worker(pool:Pool):
        conn = pool.get_conn()
        logging.info(conn)
        #模拟使用了一段时间
        time.sleep(random.randint(1,5))
        pool.return_conn(conn)
    
    pool = Pool(3)
    for i in range(6):
        Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()
    

    threading2_011
    上例中,使用信号量解决资源有限的问题。
    如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完 归还资源后信号量加1,等待线程就可以被唤醒拿走资源。
    注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。

    • 问题分析
    1. 边界问题
      • 假设一种极端情况,计数器还差1就归还满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时 候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。 因此信号量,可以保证,一定不能多归还。
    2. 正常使用分析
      • 正常使用信号量,都会先获取信号量,然后用完归还。
      • 创建很多线程,都去获取信号量,没有获得信号量的线程都阻塞。能归还的线程都是前面获取到信号量的线程,其 他没有获得线程都阻塞着。非阻塞的线程append后才release,这时候等待的线程被唤醒,才能pop,也就是没有 获取信号量就不能pop,这是安全的。
      • 经过上面的分析,信号量比计算列表长度好,线程安全。
    • 信号量和锁
      1. 信号量,可以多个线程访问共享资源,但这个共享资源数量有限。
      2. 锁,可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源。

    总结

    threading模块中的类

    常用方法含义
    Eventset(self)将标记设置为True
    clear(self)将标记设置为False
    is_set()判断当前标记是否为True,是True返回True,否则返回False
    相当于为返回当前标记
    wait(self,timeout=None)如果当前标记为True,立即返回True,如果当前标记为False,会产生一个阻塞,直到标记为True时返回True。timeout等待超时时间,默认为None表示无限等待。未等到超时了返回False
    Time定时器,延迟执行Timer(interval, function, args=None, kwargs=None) 实例化构造方法
    1. interval #多少时间后执行function函数
    2. function #需要执行的函数
    cancel(self)取消定时器
    (定时器为执行函数时可以取消,在函数执行中无法取消)
    start()启动定时器
    Lock锁acquire(self,blocking=True,timeout=-1)获取锁,获取成功返回True,否则返回False
    当获取不到锁时,默认进入阻塞状态,直到获取到锁,后才继续。
    阻塞可以设置超时时间。
    非阻塞时,timeout禁止设置。如果超时依旧未获取到锁,返回False。
    rease(self)释放锁,可以从任何线程调用释放。
    已上锁的锁,会被设置为unlocked。如果未上锁调用,会抛出RuntimeError异常。
    RLock可重入锁和Lock类似但是:
    线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release
    ConditionCondition(self,lock=None)构造方法,lock默认值为None表示使用RLock()锁,也可以自己传入为Lock()
    acquire(self,*args)获取锁
    rease(self)释放锁
    wait(self,timeout=None)等待通知,timeout设置超时时间。
    注意:必须获取锁后才能等待通知,notify或notify_all可以发通知
    notify(self,n=1)唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
    notify_all(self)唤醒所有等待的线程
    Semaphore信号量Semaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。
    如果_value的值为0会变成阻塞状态。获取成功返回True
    release(self)释放信号量,计数器加1。即_value的值加1
    `_value`属性信号量,当前信号量
    BoundedSemaphore有界信号量BoundedSemaphore(value=1)构造方法。value为初始信号量。value小于0,抛出ValueError异常
    acquire(self,blocking=True,timeout=None)获取信号量,技术器减1,即_value的值减少1。
    如果_value的值为0会变成阻塞状态。获取成功返回True
    release(self)释放信号量,计数器加1。即_value的值加1,超过初始化值会抛出异常ValueError。
    _value信号量,当前信号量
    展开全文
  • 线程锁

    千次阅读 2015-07-03 18:45:29
    1、Monitor的使用Monitor.Enter(obj); //对某个对象进入独占模式 ...可以让一个线程先执行,其他线程随后执行。//方式1、 //A线程 lock(obj){ //执行操作 Monitor.Pulse(obj); } //B线程 lock(obj){ Monito

    C#线程锁的常用方法

    1、Monitor的使用

    Monitor.Enter(obj);
    //对某个对象进入独占模式
    Monitor.Exit(obj);
    //对某个对象释放独占模式

    Monitor的顺序执行。
    可以让一个线程先执行,其他线程随后执行。

    //方式1、
    //A线程
    lock(obj){
        //执行操作
        Monitor.Pulse(obj);
    }
    //B线程
    lock(obj){
        Monitor.Wait(obj);
        //执行操作
    }

    上述方法的问题:如果B先执行,那么会释放锁,然后确保A先执行。如果A先执行。执行完成以后,那么B执行到Monitor.Wait的时候,就会跳出去,等到,形成死锁。

    //方式2
    private static bool isRun = false;
    private static Object obj = new Object();
    
    public void MethodA(){
        lock(obj){
            //执行操作
            isRun = true;
            Monitor.Pulse(obj);
        }
    }
    
    public void MethodB(){
        lock(obj){
            if(isRunning){
                Monitor.Wait(obj);
            }
            //执行操作
        }
    }

    2、lock(){ }的使用

    private static Object obj  = new Obj();
    lock(obj){
        //执行操作
    }

    3、自旋锁SpinLock

    private SpinLock _spinlock = new SpinLock();
    public void DoWork(){
        _spinlock.Enter(ref lockTaken);
        _spinlock.Exit(lockTaken);
    }

    自旋锁比Monitor在多核cpu的电脑上有优势,通过空转,减少CPU的切换

    4、原子锁 Interlocked

    Interlocked.Increment(SharedResource.IntValue);
    Interlocked.Decrement(SharedResource.IntValue);
    //类似的方法
    //Add
    //CompareExchange
    //Read
    //Exchange
    展开全文
  • python多线程,线程锁

    万次阅读 2019-01-19 23:39:10
    python使用多线程, 不一定运行速度快,这里引入GIL(global interpreter lock) python解释器中任意时刻都只有一个线程在执行; GIL执行过程: 1). 设置一个GIL; 2). 切换线程去准备执行任务(Runnale就绪状态)...

    python使用多线程, 不一定运行速度快,这里引入GIL(global interpreter lock)
    python解释器中任意时刻都只有一个线程在执行;

    • GIL执行过程:
      • 1). 设置一个GIL;
      • 2). 切换线程去准备执行任务(Runnale就绪状态);
      • 3). 运行;
      • 4). 可能出现的状态:
        - 线程任务执行结束;
        - time.sleep()
        - 需要获取其他的信息才能继续执行(eg: 读取文件, 需要从网络下载html网页)
      • 5). 将线程设置为睡眠状态;
      • 5). 解GIL的锁;

    多线程的应用场景: I/O密集型(input, output) — 爬虫
    不建议使用多线程的场景: 计算密集型(cpu一直占用)

    1. 队列与线程

    1). 理论上多线程执行任务是不能获取返回结果的, 因此需要一个容器来存储产生的数据;
    2). 容器该如何选择? list(栈, 队列), tuple(元组是不可变的, 不可使用),
    set(集合默认会去重, 所以不选择), dict
    选择队列类型存储(FIFO===first input first output)
    例1:

    
    import threading
    from mytimeit import timeit #自己写的,附在下面
    from queue import Queue
    
    
    def job(li, queue):
        queue.put(sum(li))   # 将任务的执行结果存储到队列中;
    @timeit
    def use_thread():
        # 实例化一个队列, 用来存储每个线程执行的结果
        q = Queue()
        # q.get()  -- 出队
        # q.put(value)  -- 入队
    
        lis = [range(5), range(2,10), range(1000, 20000), range(3000, 10000)]
        # create 5 threads
        threads = []
        for li in lis:
            t = threading.Thread(target=job, args=(li, q))
            t.start()
            threads.append(t)
        [thread.join() for thread in threads]
        # 从队列中拿出所有线程执行的结果;
        results  = [q.get() for li in lis]
        print(results)
    
    if __name__ == "__main__":
        use_thread()
    # 结果:
    # [10, 44, 199490500, 45496500]
    # use_thread函数运行时间:0.00187874
    

    mytimeit.py

    import time
    
    def timeit(f):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            res = f(*args, **kwargs)
            end_time = time.time()
            print("%s函数运行时间:%.8f" % (f.__name__, end_time - start_time))
            return res
    
        return wrapper
    

    例2:生产者-消费者模型,用继承实现
    什么是生产者-消费者模型?
    某个模块专门负责生产+数据, 可以认为是工厂;
    另外一个模块负责对生产的数据进行处理的, 可以认为是消费者.
    在生产者和消费者之间加个缓冲区(队列queue实现), 可以认为是商店.

    生产者 -----》缓冲区 -----》 消费者
    优点:
    1). 解耦:生产者和消费者的依赖关系减少;
    2). 支持并发;是两个独立的个体, 可并发执行;

    """
    # 需求1: 给定200个ip地址,  可能开放端口为80,  443,  7001,  7002,  8000,  8080,  
    9000(flask),  9001
             以http://ip:port形式访问页面以判断是否正常访问.
    
             1). 构建所有的url地址;===存储到一个数据结构中
             2). 依次判断url址是否可以成功访问
    
    
    实现多线程:
            1). 实例化对象threading.Thread;
            2). 自定义类, 继承threading.Thread, 重写run方法(存储任务程序);
    
    """
    def create_data():
        """创建测试数据,  文件中生成200个IP"""
        with open('doc/ips.txt', 'w') as f:
            for i in range(200):
                f.write('172.25.254.%s\n' % (i + 1))
            print("测试数据创建完成!")
    
    import time
    import threading
    from queue import Queue
    from urllib.request import urlopen
    
    class Producer(threading.Thread):
        def __init__(self, queue):
            super(Producer, self).__init__()
            self.q = queue
    
    
        def run(self):
            """生产测试需要的url地址http://ip:port"""
            ports = [80, 443, 7001, 7002, 8000, 8080, 9000, 9001]
            with open('doc/ips.txt') as f:
                for line in f:
                    ip = line.strip()
                    for port in ports:
                        url = "http://%s:%s" %(ip, port)
                        time.sleep(1)
                        self.q.put(url)
                        print("生产者生产url:%s" %(url))
    
    class Consumer(threading.Thread):
    
        def __init__(self, queue):
            super(Consumer, self).__init__()
            self.q = queue
    
    
        def run(self):
            
          
           url = self.q.get()
           try:
               urlObj = urlopen(url)     
           except Exception as e:
               print("%s不可访问" %(url))
           else:
               pageContentSize = len(urlObj.read().decode('utf-8'))
               print("%s可以访问, 页面大小为%s" %(url, pageContentSize))
    
    def main():
        q = Queue()
        p = Producer(q)
        p.start()
    
        for i in range(400):
            c = Consumer(q)
            c.start()
    
    if __name__ == '__main__':
    #    create_data()
         main()
    

    在这里插入图片描述

    2. 线程同步之线程锁

    1. 为什么需要线程锁?
      多个线程对同一个数据进行修改时, 可能会出现不可预料的情况.
      例如实现银行转账功能,money += 1 这句其实有三个步骤 money; money+1; money=money+1;假如这三步骤还没完成money-=1的线程就开始执行了,后果可想而知,money的值肯定时乱的
    2. 如何实现线程锁?
      1. 实例化一个锁对象;
      lock = threading.Lock()
      2. 操作变量之前进行加锁
      lock.acquire()
      3. 操作变量之后进行解锁
      lock.release()
    import threading
    
    
    #  银行存钱和取钱
    def add(lock):
        global money  # 生命money为全局变量
        for i in range(1000000):
            # 2. 操作变量之前进行加锁
            lock.acquire()
            money += 1  # money;  money+1; money=money+1;
            # 3. 操作变量之后进行解锁
            lock.release()
    
    
    def reduce(lock):
        global money
        for i in range(1000000):
            # 2. 操作变量之前进行加锁
            lock.acquire()
            money -= 1
            # 3. 操作变量之后进行解锁
            lock.release()
    
    
    if __name__ == '__main__':
        money = 0
        # 1. 实例化一个锁对象;
        lock = threading.Lock()
    
        t1 = threading.Thread(target=add, args=(lock,))
        t2 = threading.Thread(target=reduce, args=(lock,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
        print("当前金额:", money)
    

    例3 多线程实现简单的下载器

    """
    当你创建用户界面并想保持界面的可用性时,线程就特别有用。
    没有线程,用户界面将变得迟钝,当你下载一个大文件或者执
    行一个庞大的数据库查询命令时用户界面会长时间无响应。为
    了防止这样情况发生,你可以使用多线程来处理运行时间长的
    进程并且在完成后返回界面进行交互。
    """
    
    import threading
    from urllib.request import urlopen
    
    DOWNLOAD_DIR = 'doc'
    class DownloadThread(threading.Thread):
        def __init__(self, url):
            super(DownloadThread, self).__init__()
            self.url = url
        def run(self):
            try:
                urlObj = urlopen(self.url, timeout=3)
            except Exception as e:
                print("download %s error\n" % (self.url), e)
                imgContent = None
            else:
                # http://imgsrc.baidu.com/forum/w%3D580/sign=16d420cb8b01a18bf0eb1247ae2e0761/22a4462309f790520522e1d900f3d7ca7bcbd51c.jpg
                filename = self.url.split("/")[-1]
                # 'wb' === 写的是二进制文件(图片, 视频, 动图, .pdf)
                # 'ab'
                with open("%s/%s" % (DOWNLOAD_DIR, filename), 'ab') as f:
                    # 如果文件特别大的时候, 建议分块下载;每次只读取固定大小, 防止占用内存过大.
                    while True:
                        imgContentChunk = urlObj.read(1024 * 3)
                        if not imgContentChunk:
                            break
                        f.write(imgContentChunk)
                        # 可以添加下载的程度(百分率);
    
                    print("%s下载成功" % (filename))
    # 这些url是我测试用的,是另一个电脑上的一些书的url,速度比较块,可以换成视频的地址进行测试,从网上下载应该会比较慢
    url1  = "ftp://172.25.254.250/pub/book/python/01_MIT.Introduction.to.Computation.and.Programming.Using.Python%20revised%20and%20expanded%20edition.pdf"
    url2 = 'ftp://172.25.254.250/pub/book/python/02_interview_exercise.pdf'
    url3 = "ftp://172.25.254.250/pub/book/python/02_python-data-structure-cn.pdf"
    
    urls = [url1, url2, url3]
    
    
    for url in urls:
        thread = DownloadThread(url)
        thread.start()
    

    3. 线程池

    只能放指定个线程
    用法

    # python3.2版本之后才有的;
    from concurrent.futures import  ThreadPoolExecutor
    
    def job(num):
       # 需要执行的任务
       print("这是一个%s任务" %(num))
       return  "执行结果:%s" %(num)
    if __name__ == '__main__':
       #  1. 实例化线城池对象,线城池里面包含5个线程执行任务;
       pool = ThreadPoolExecutor(max_workers=5)
       futures = []
       for i in range(1000):
           # 往线程池里面扔需要执行的任务, 返回的是一个对象(_base.Future()),
           f1 = pool.submit(job, i)
           futures.append(f1)
    
       # 判断第一个任务是否执行结束;
    print(futures[0].done(),'------------')
    
       # 获取任务的执行结果;
       print(futures[0].result())
    

    在这里插入图片描述

    map的用法

    上述代码使用map可做以下修改

    if __name__ == '__main__':
       #  1. 实例化线城池对象,线城池里面包含5个线程执行任务;
       pool = ThreadPoolExecutor(max_workers=5)
    
       li=[i for i in range(1000)]
    
       pool.map(job,li)
    

    实现生产者消费者

    """
    文件名: {03_生产者_消费者模型_类的继承实现}.py
    日期: {2019}-{01}-{19}  {10}-{}
    作者: lvah
    联系: xc_guofan@qq.com
    代码描述:
    
    # 需求1: 给定200个ip地址,  可能开放端口为80,  443,  7001,  7002,  8000,  8080,
    9000(flask),  9001
             以http://ip:port形式访问页面以判断是否正常访问.
    
             1). 构建所有的url地址;===存储到一个数据结构中
             2). 依次判断url址是否可以成功访问
    
    
    实现多线程:
            1). 实例化对象threading.Thread;
            2). 自定义类, 继承threading.Thread, 重写run方法(存储任务程序);
    
    
    
    
    
    
    # 什么是生产者-消费者模型?
    某个模块专门负责身缠数据, 可以认为是工厂;
    另外一个模块负责对生产的数据进行处理的, 可以认为是消费者.
    在生产者和消费者之间加个缓冲区(队列queue实现), 可以认为是商店.
    
    生产者   -----》缓冲区   -----》 消费者
    
    
    # 优点:
        1). 解耦:生产者和消费者的依赖关系减少;
        2). 支持并发;是两个独立的个体, 可并发执行;
    
    """
    
    
    def create_data():
        """创建测试数据,  文件中生成200个IP"""
        with open('doc/ips.txt', 'w') as f:
            for i in range(200):
                f.write('172.25.254.%s\n' % (i + 1))
            print("测试数据创建完成!")
    
    
    import time
    import threading
    from queue import Queue
    from urllib.request import urlopen
    from concurrent.futures import ThreadPoolExecutor
    
    
    def producer(url):
        """生产测试需要的url地址http://ip:port"""
        print("生产者生产url:%s" % (url))
        return url
    
    
    def consumer(future):
        print(future.result(),'----------------')
    
        # 获取consumer的返回值;
        url = future.result()
    
        try:
            urlObj = urlopen(url)
        except Exception as e:
            print("%s不可访问" % (url))
        else:
            pageContentSize = len(urlObj.read().decode('utf-8'))
            print("%s可以访问, 页面大小为%s" % (url, pageContentSize))
    
    
    def main():
        pool = ThreadPoolExecutor(max_workers=5)
        ports = [80, 443, 7001, 7002, 8000, 8080, 9000, 9001]
    
        with open("doc/ips.txt") as f:
            for line in f:
                ip = line.strip()
                for port in ports:
                    url = "http://%s:%s" % (ip, port)
                    # producer函数的返回值会回调给consumer函数;
                    res = pool.submit(producer, url).add_done_callback(consumer)
    # create_data()
    main()
    
    展开全文
  • 多线程中线程锁的使用

    千次阅读 2018-05-07 13:15:40
    线程锁的5个要素:CRITICAL_SECTION g_cs; //定义线程锁InitializeCriticalSection(&amp;g_cs); //初始化DeleteCriticalSection(&amp;g_cs); //删除EnterCriticalSection(&amp;g_c...
  • Linux 线程锁详解

    千次阅读 2018-11-08 00:50:54
    Linux 线程锁详解
  • 快速理解线程锁

    万次阅读 2018-05-30 17:28:13
    线程锁 线程锁真的好麻烦啊!!! 找了几篇博客发现写的都不一样 相关联内容太多不容易理解 所以现在需要理清 什么是线程锁 应用场景 怎么用 优缺点 1. 什么是线程锁机制 多线程可以同时运行多个任务 ...
  • 在编写GUI界面中,通常用会有一些按钮,点击后触发事件,比如去下载一个文件或者做一些操作,这些操作会耗时,如果不能及时结束,主线程将会阻塞,这样界面就会出现未响应的状态,因此必须使用多线程来解决这个问题...
  • 线程锁的升级原理是什么?

    万次阅读 多人点赞 2019-05-20 11:04:06
    线程锁的升级原理是什么? 锁的级别从低到高: 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁 锁分级别原因: 没有优化以前,sychronized是重量级锁(悲观锁),使用 wait 和 notify、notifyAll 来切换...
  • Java提供了多种多线程锁机制的实现方式,常见的有: synchronized ReentrantLock Semaphore AtomicInteger等 每种机制都有优缺点与各自的适用场景,必须熟练掌握他们的特点才能在Java多线程应用开发时得心应手。...
  • android 线程锁Lock

    千次阅读 2017-09-20 08:41:17
    今天,简单讲讲android的线程锁  Lock的使用。 这个其实和SynchronizedClass 是一样的。我记得我的一篇博客写过这个内容。再次记录一下。 一、同步机制关键字synchronized 对于java来说,最常用的同步...
  • 线程锁、进程锁以及分布式锁 1.线程锁 2.进程锁 3.分布式锁 【技术分享篇】线程锁,进程锁以及分布式锁丨锁的实现及原理分析丨高效的使用 更多精彩内容包括:C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,MongoDB...
  • 多线程面试题和答案:线程锁+线程池+线程同步1、并发编程三要素?2、多线程的价值?3、创建线程的有哪些方式?区别是什么?4、创建线程的三种方式的对比?4、线程的生命周期及五种基本状态及转换条件1、Java线程具有...
  • 线程锁的基本概念与简单封装

    千次阅读 2016-08-27 20:39:53
    线程锁的基本概念与简单封装 1.在多线程环境中,当我们需要保持线程同步时,通常通过锁来实现。 互斥锁的使用过程中,主要有 第一:线程锁的初始化,pthread_mutex_init 第二:线程锁的释放,pthread_mutex_...
  • 线程锁:当多个线程几乎同时修改一个共享数据的时候,需要进行同步控制,线程同步能够保证多个线程安全的访问竞争资源(全局内容),最简单的同步机制就是使用互斥锁。 某个线程要更改共享数据时,先将其锁定,此时...
  • Linux进程锁和线程锁的本质区别

    千次阅读 2019-01-22 20:59:56
    转载: https://blog.csdn.net/daiyudong2020/article/details/51707823 转载: https://blog.csdn.net/junwua/article/details/80576433
  • 线程锁/进程锁/文件锁

    千次阅读 2017-12-19 11:06:38
    线程锁/进程锁/文件锁  1.线程锁是锁线程的,锁住禁用,如果4线程的CPU锁一个线程剩余三个(如果可以锁的话),就像四车道封锁一条车道还剩3个车道可以跑车;  2.进程锁是锁进程的,进程就是正在运行的程序,锁住...
  • c语言 多线程的简单实现 线程锁

    千次阅读 2018-08-22 14:47:38
    void *thread1(void* a) //线程函数 { pthread_mutex_lock(&mut); //加锁,用于对共享变量操作 lua_State *L = luaL_newstate(); luaL_openlibs(L); if(luaL_loadfile(L, "*****.lua") || lua_pcall(L, 0,0,0)){...
  • C#多线程锁

    千次阅读 2017-04-18 19:33:43
    c#多线程操作队列
  • 首先整理多线程同步的知识点,开头肯定是要先探讨探讨多线程同步的问题。那么嘛叫线程安全问题呢? 答: 我们知道Jvm虚拟机的设计中线程的执行是抢占式的,线程的执行时间是由底层系统决定的。所以就会有多个线程...
  • 线程锁+线程池+线程同步等

    千次阅读 2019-04-28 16:20:39
    1、并发编程三要素? 1)原子性:原子性指的是一个或者多个操作,要么全部执行并且在执行的...synchronized或者Lock:保证同一个时刻只有一个线程获取执行代码,释放之前把最新的值刷新到主内存,实现可见性。...
  • c# Thread 线程(三) lock线程锁

    千次阅读 2018-06-02 10:16:08
    在多线程编程中,可能会有多个线程并发的(或同时)执行一段代码,但是某些情况下需要在同一时刻只能有一个线程执行,避免某些对象的调用冲突或内存使用冲突,这就需要用到(lock)。lock 关键字可以用来确保代码...
  • vc++ 线程和线程锁 (一)

    千次阅读 2016-12-14 18:55:12
    无线程代码 火车票第100张重复两次 利用互斥对象实现同步 互斥对象mutex 包含一个使用数量,一个线程id 一个计数器.其中id用于标识系统中哪个对象拥有当前对象,计数器指明线程拥有该计数器的次数 ...
  • C# 关于线程锁lock的使用方法

    万次阅读 多人点赞 2016-09-02 10:07:05
    在某些情况下,我们希望A中的代码块(B)同步的执行,即同一时刻只有一个线程执行代码块B,这就需要用到(lock)。lock 关键字可以用来确保代码块完成运行,而不会被其他线程中断。它可以把一段代码定义为互斥段...
  • 在一个线程加锁,另一个线程解锁

    千次阅读 2017-08-03 15:03:30
    一般来讲,一个线程加锁,另一个线程解锁,是很容易死锁的。 产生死锁的四个必要条件: (1) 互斥条件:一个资源每次只能被一个进程使用。 (2) 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持...
  • PYTHON 线程锁释放的时间

    千次阅读 2018-05-27 09:55:48
    为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用。Lock(指令)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。可以...
  • java 多线程 降级

    千次阅读 2018-04-19 21:44:22
    如果当前线程拥有写,然后将其释放,最后再获取读,这种并不能称之为降级,降级指的是把持住(当前拥有的)写,再获取到读,随后释放(先前有用的)写的过程。下面给出一个降级的示例,当数据变动时,...
  • 【java并发】线程锁技术的使用

    万次阅读 多人点赞 2016-06-04 07:52:09
    Lock好比传统线程模型中的synchronized技术,但是比sychronized方式更加面向对象,与生活中的类似,本身也应该是个对象。两个线程执行的代码片段如果要实现同步互斥的效果,它们必须用同一个Lock对象。是上...
  • 文章目录前言一、pa ? 一.synchronized的缺陷 二.Lock接口的特性 三.Lock接口基本的方法: 四、Lock接口的实现类:ReentrantLock 五、ReadWriteLock 六、Condition接口 ...八、公平与非公平 九、可重入

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 677,869
精华内容 271,147
关键字:

线程锁