精华内容
下载资源
问答
  • python线程安全队列

    千次阅读 2018-11-22 13:40:44
    一直困扰我的问题,在博客园和CSDN中都查阅过资料,答案千奇百怪,查阅官网跟几位大牛交流后做了一个小总结: ...但python中的queue队列属于线程安全,他底部封装了锁。 所谓线程安全,就是一个线程在同一时...

    一直困扰我的问题,在博客园和CSDN中都查阅过资料,答案千奇百怪,查阅官网跟几位大牛交流后做了一个小总结:

    python中的list,dict,set,tuple都不是线程全队列。因为例如list在append()或者remove()的时候,资源是共享的,然后python没有在这些队列里面封装锁。但python中的queue队列属于线程安全,他底部封装了锁。

    所谓线程安全,就是一个线程在同一时间只能被调用一次。

    在线程安全中,引入了锁的概念。所谓锁就是当你需要独占某一资源时,你可以锁住这个资源,使得这个资源不会被别的线程占用,当不占用的时候,释放锁就好了。threading模块中定义了Lock类,可以方便的处理锁定,简单代码如下:

    #创建锁
    mutex = threading.Lock()
    #锁定
    mutex.acquire([blocking])
    #释放
    mutex.release()

    其中,锁定方法acquire可以有一个blocking参数。

    如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True) 
    如果设定blocking为False,则当前线程不会堵塞 。

    上锁解锁过程 
    当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。

    每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。

    线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

    总结 
    锁的好处:

    确保了某段关键代码只能由一个线程从头到尾完整地执行 
    锁的坏处:

    阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了 
    由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁。

    展开全文
  • python线程安全队列用法

    千次阅读 2017-09-19 18:09:22
    1.今天来利用queue来实现一个线程安全队列。2.场景:有时我们需要将一个大任务划分成很多小任务,每个小任务执行完得到结果需要放入一个安全队列里面,一个收集结果的线程就实时从队列中取结果将子任务结果聚合,...

    1.今天来利用queue来实现一个线程安全队列。

    2.场景:有时我们需要将一个大任务划分成很多小任务,每个小任务执行完得到结果需要放入一个安全队列里面,一个收集结果的线程就实时从队列中取结果将子任务结果聚合,形成大任务计算的结果。

    3.代码实现

    import queue
    import threading
    import time
    
    ## 封装的安全队列类(十分通用)
    class SafeQueue(threading.Thread):
        # 退出队列的信号
        SIG_QUIT = 'sig_quit'
        def __init__(self,recv_calback):
            threading.Thread.__init__(self)
            ## 构造线程安全队列
            self.Q = queue.Queue()
            self.recv_calback = recv_calback
            self.start()
    
        #放入队列
        def put(self,datas):
            threadName = threading.currentThread().name
            self.Q.put(datas)
    
        #关闭队列
        def close(self):
            self.put(SafeQueue.SIG_QUIT)
    
        ##主循环,处理队列接收
        def run(self):
            while True:
               try:
                   datas = self.Q.get()
                   if datas == SafeQueue.SIG_QUIT: #收到退出队列信号
                       break
                    #回调客户端
                   self.recv_calback(datas)
               except: # 抛出打断异常
                   break
    
    ##队列回调函数
    def queue_callback(datas):
        print("接收到数据:",datas)
        ## 将子任务结果加入 全局集合
        try:
            array_mutex.acquire()#锁定
            datas_array.append(datas)
            if len(datas_array) == 4:
                safeQueue.close()
                print("=======大任务计算结束===========  result:", datas_array)
        finally:
            array_mutex.release()#释放
    
    ## 子任务计算函数
    def calclulate():
        threadName = threading.currentThread().name
        print(threadName , ' 正在计算')
        time.sleep(2)
        print(threadName, ' 计算完成,加入队列')
        #将结果放入队列
        safeQueue.put(threadName+"' result")
    
    ####  ----------  main start   ----------
    
    #创建锁
    array_mutex = threading.Lock()
    ## 存储 子任务计算结果的 集合
    datas_array = []
    
    ##构造安全队列
    safeQueue = SafeQueue(queue_callback)
    
    ##开启4 个子任务,开始计算
    for i in range(1,5):
        threading.Thread(target=calclulate).start()
    

    这个SafeQueue模型在项目开发中非常常见,这里有什么封装的不足的谢谢指出。今天队列就到这里,谢谢大家。

    老生常谈:深圳有爱好音乐的会打鼓(吉他,键盘,贝斯等)的程序员和其它职业可以一起交流加入我们乐队一起嗨。我的QQ:657455400

    展开全文
  • 用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全例如:可以开多个消费者线程,每...

       最近学习spark,我主要使用pyspark api进行编程。

    之前使用Python都是现学现用,用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧

    多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全

    例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任务

    同时可以有多个生产者往队列发送消息,实现异步消息处理

    先复习下互斥量条件变量的概念:

    互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁以后,任何其他试图再次对互斥锁加锁的线程将会阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为运行状态的线程可以对互斥锁加锁,其他线程将会看到互斥锁依然被锁住,只能回去再次等待它重新变为可用。

    条件变量(cond)是在多线程程序中用来实现”等待–》唤醒”逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住

    条件变量总是与互斥锁一起使用的

    Python的threading中定义了两种锁:threading.Lock和threading.RLock

    两者的不同在于后者是可重入锁,也就是说在一个线程内重复LOCK同一个锁不会发生死锁,这与POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可递归锁的概念是相同的, 互斥锁的API有三个函数,分别执行分配锁,上锁,解锁操作。

    python的threading中的条件变量默认绑定了一个RLock,也可以在初始化条件变量的时候传进去一个自己定义的锁.

    最后贴出我自己实现的简单线程安全任务队列

    测试代码



    转载于:https://juejin.im/post/5c7787def265da2ddb29785e

    展开全文
  • python实现线程安全队列

    千次阅读 2016-10-20 19:32:06
     多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全  例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任

         最近学习Spark,我主要使用pyspark api进行编程

       之前使用Python都是现学现用,用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧

       多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全

       例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任务

                  同时可以有多个生产者往队列发送消息,实现异步消息处理

       先复习下互斥量条件变量的概念:

       互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁以后,任何其他试图再次对互斥锁加锁的线程将会阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为运行状态的线程可以对互斥锁加锁,其他线程将会看到互斥锁依然被锁住,只能回去再次等待它重新变为可用。

       条件变量(cond)是在多线程程序中用来实现"等待--》唤醒"逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住

        条件变量总是与互斥锁一起使用的

        python的threading中定义了两种锁:threading.Lock和threading.RLock

       两者的不同在于后者是可重入锁,也就是说在一个线程内重复LOCK同一个锁不会发生死锁,这与POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可递归锁的概念是相同的, 互斥锁的API有三个函数,分别执行分配锁,上锁,解锁操作。

    threading.Lock()        分配一个互斥锁
    acquire([blocking=1])   上锁(阻塞或者非阻塞,非阻塞时相当于try_lock,通过返回False表示已经被其它线程锁住。)
    release()               解锁
        python的threading中的条件变量默认绑定了一个RLock,也可以在初始化条件变量的时候传进去一个自己定义的锁.

        

    threading.Condition([lock])  分配一个条件变量
    acquire(*args)               条件变量上锁
    release()                    条件变量解锁
    wait([timeout])              等待唤醒,timeout表示超时
    notify(n=1)                  唤醒最大n个等待的线程
    notifyAll()、notify_all()    唤醒所有等待的线程

       最后贴出我自己实现的简单线程安全任务队列

    #encoding='utf-8'
    #ConcurrentQueue.py
    import Queue
    import threading
    
    class ConcurrentQueue:
    
        def __init__(self, capacity = -1):
            self.__capacity = capacity          #初始化队列大小
            self.__mutex = threading.Lock()     #初始化互斥量
            self.__cond  = threading.Condition(self.__mutex)    #初始化条件变量
            self.__queue = Queue.Queue()        #初始化队列
    
        def get(self):
    
            if  self.__cond.acquire():          #获取互斥锁和条件变量,python中threading条件变量默认包含互斥量,因此只需要获取条件变量即可
                while self.__queue.empty():
                    self.__cond.wait()          #条件变量等待
                 
                elem = self.__queue.get()
    
                self.__cond.notify()
                self.__cond.release()
    
            return elem
    
        def put(self,elem):
    
            if self.__cond.acquire():
                while self.__queue.qsize() >= self.__capacity:
                    self.__cond.wait()
                self.__queue.put(elem)
    
                self.__cond.notify()
                self.__cond.release()
    
        def clear(self):
    
            if self.__cond.acquire():
    
                self.__queue.queue.clear()
    
                self.__cond.release()
                self.__cond.notifyAll()
    
        def empty(self):
    
            is_empty = False;
            if self.__mutex.acquire():            #只需要获取互斥量
                is_empty = slef.__queue.empty()
                self.__mutex.release()
    
            return is_empty
    
        def size(self):
            size = 0
            if self.__mutex.acquire():
                size = self.__queue.qsize()
                self.__mutex.release()
    
            return size
    
        def resize(self,capacity = -1):
            self.__capacity = capacity

     测试代码

    class CQTest2():
        
        def __init__(self):
            self.queue = ConcurrentQueue(10)
    
        def consumer(self):
            while True:
                task = self.queue.get()
    
                print 'get ', task, ' from queue'    #print是非线程安全的,因此如果又多个消费者会打印混乱,可以使用logging输出日志到文件查看
                
    
        def producer(self):
            while True:
                for i in range(10):
                    self.queue.put(i)
                    #print 'put ', i , ' into queue'    #
    
        def run(self):
            t1 = threading.Thread(target = self.consumer)
            t2 = threading.Thread(target = self.producer)
    
            t1.start()
            t2.start()
    
            #t1.join()
            #t2.join()
    
    if __name__ == '__main__':
        cq_test = CQTest()
        cq_test.init_log()
        cq_test.run()

            
    展开全文
  • python线程优先级队列( Queue)

    千次阅读 2016-11-24 23:00:44
    Python的Queue模块中提供了同步的、线程安全队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列...
  • Python的Queue模块中提供了同步的、线程安全队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列 LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 文章目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑 一、线程池组成  一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中...
  • 前言:就菜鸟教程里面所说,Python 的 Queue 模块中提供了同步的、线程安全队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。 这些队列都实现了锁原语,能够在...
  • python线程队列安全

    2014-10-18 12:15:16
    这是一个经典的“生产者消费者”例子:python queue模块有三种队列:1、python queue模块的FIFO队列先进先出。2、LIFO类似于堆。即先进后出。3、还有一种是优先级队列级别越低越先出来。针对这三种队列分别有三个构造...
  • python:线程队列 Queue

    2019-06-28 16:39:08
    python中的queue队列属于线程安全,他底部封装了锁。 所谓线程安全,就是一个线程在同一时间只能被调用一次。 线程队列它能省去手动加锁、解锁的步骤。 Queue模块中的常用方法: Queue.qs...
  • 队列是线程间最常用的交换数据的形式,Queue模块实现了线程安全队列,尤其适合多线程编程。Queue模块实现了三种类型队列: ❑Queue Queue:一个先进先出(FIFO)的队列,最先加入队列的元素最先取出; ❑LifoQueue...
  • python-线程队列

    2018-06-14 01:23:00
    列表是线程安全的,所以可以说队列是针对多线程出现的。 数据结构是用来存储数据用的,只不过各种数据结构的存储方式不同。比如:集合、队列、映射 列表:按索引值存储 字典:按key值的哈希表存储 list.pop() ...
  • 如果必须在多个线程之间安全地交换信息时,队列线程编程中尤其有用。 重要: q.put() :往队列里面放值,当参数block=Ture的时候,timeout参数将会有作用,当队列已经满了的时候,在往里面放值时,block为True程序...
  • 进程与线程 在引入了线程的操作系统中,通常一个进程都有若干个线程,至少包含一个线程。 根本区别:进程是操作系统资源分配的基本单位,而线程是处理器任务调度和执行的基本单位 资源开销:每个进程都有独立的代码...
  • 一、线程间的通信 Condition版的生产者与消费者模式 函数 描述 acquire() 上锁 release() 解锁 wait() 将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify()和noti_all()函数唤醒。被唤醒...
  • Queue线程安全队列

    2019-11-22 20:48:49
    Python中的queue模块中提供了同步的、线程安全队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列,LifoQueue这些队列都实现了锁的原语(可以理解为原子操作,即要么不做,要么都做完)能够在线程中直接...
  • Queue线程安全队列Python多线程编程中,虽然threading模块为我们提供了Lock类和Condition类借助锁机制来处理线程并发执行,但在实际开发中使用加锁和释放锁仍是一个经常性的且较为繁琐的过程。因此,Python中又为...
  • Python-线程-队列-阻塞队列解耦合

    千次阅读 2018-12-24 13:49:17
    # encoding=utf-8 import threading import time try: # python2中 from Queue import Queue except ModuleNotFoundError: # python3中 ...Python的Queue模块中提供了同步的、线程安全队列类, ...
  • Queue 线程安全队列 python内置queue 模块,用于线程安全管理,它提供了同步的、线程安全的队列类,包括FIFO, LIFO队列。这些队列都实现了锁原语,能在多线程中直接使用。可以使用队列来实现线程间的同步。 相关...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 500
精华内容 200
关键字:

python线程安全队列

python 订阅