精华内容
下载资源
问答
  • Python 多线程同步队列模型 我面临的问题是有个非常慢的处理逻辑(比如分词、句法),有大量的语料,想用多线程来处理。 这一个过程可以抽象成一个叫“同步队列”的模型。 具体来讲,有一个生产者(Dispatcher)一...

    Python 多线程同步队列模型


    我面临的问题是有个非常慢的处理逻辑(比如分词、句法),有大量的语料,想用多线程来处理。

    这一个过程可以抽象成一个叫“同步队列”的模型。 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理。 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致。

    消费者是典型的threading,它需要看见生成者的队列,从而从队列中拿一些数据。

    对于生产者,python中有一个叫Queue的module,实现了FIFO的同步队列。 但它只能保证输入与交付消费者的顺序的有序,但不能保障生产者在输出时有序,所以需要一个buffer来保存输出顺序。 程序的模型大概是这样的。有一个master(),用来分发任务。有N个多线程的slave用来处理任务。

    具体程序如下:

    #!/usr/bin/env python
    # real    3m0.263s
    # user    0m0.016s
    # sys     0m0.012s
    
    from time import sleep
    from random import random
    from Queue import Queue
    from threading import Thread, Lock
    
    class Segmentor(Thread):
        def __init__(self, dispatcher):
            Thread.__init__(self)
            self.d = dispatcher
    
        def run(self):
            while True:
                idx, item = self.d.get()
                # segment section
                sleep(random() * 5)
                # output section
                d.output( idx, item )
                self.d.task_done()
    
    class Dispatcher(Queue):
        def __init__(self):
            Queue.__init__(self)
            self.idx = 0
            self.box = {}
            self.lock = Lock()
    
        def output(self, idx, item):
            self.lock.acquire()
            if idx > self.idx:
                self.box[idx] = item
            elif idx == self.idx:
                self._output(item)
                self.idx += 1
                while self.idx in self.box:
                    item = self.box[self.idx]
                    self._output(item)
                    self.idx += 1
    
            self.lock.release()
    
        def _output(self, item):
            print item
    
    if __name__=="__main__":
        d = Dispatcher()
        for i in xrange(4):
            t = Segmentor(d)
            t.daemon = True
            t.start()
    
        num 
    展开全文
  • 为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。 正常请求的多线程,如果是消费之和生产者,通过列表实现...

    Queue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。

    首先我们组要明白为什么要使用队列,队列的性质,

    多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题
    为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。
    正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。

    FIFO

    Queue.Queue(maxsize=0)
    FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

    LIFO

    Queue.LifoQueue(maxsize=0)
    LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上.

    priority

    构造一个优先队列。maxsize用法同上。

    基本方法:

    Queue.Queue(maxsize=0) #FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,
    Queue.LifoQueue(maxsize=0) #LIFO, 如果maxsize小于1就表示队列长度无限
    Queue.qsize() #返回队列的大小
    Queue.empty() #如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
    Queue.full() # 如果队列满了,返回True,反之False,给生产者提醒
    Queue.get([block[, timeout]]) 读队列,timeout等待时间
    Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间
    Queue.queue.clear() 清空队列
    task_done() #意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
    join() #阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

    队列需求一(爬虫的请求地址)

    Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。
    对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:

    import threading
    from queue import Queue
    
    class ThreadCrawl(threading.Thread):
        def __init__(self, threadName, idQueue):
            # 继承父类的方法
            super(ThreadCrawl, self).__init__()
            self.threadName = threadName          # 线程名字
        def run(self):
            print('启动' + self.threadName)
            while not self.idQueue.empty():
                try:
                    id = self.idQueue.get(False)  # False 如果队列为空,抛出异常
                    time.sleep(1)
                    print("~"*300)
                    self.get_con(id)
    
                except Exception as e:
                    print('队列为空。。。。。', e)
                    pass
                print('#'*300)
    
        def get_con(self):  #自己封装的请求自定义
            pass
    def get_id(m, n):
        conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')
        cur = conn.cursor()
        sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)
        cur.execute(sql1)
        data = cur.fetchall()
        conn.commit()
        return data
    def main():
        n = 60
        while True:
            m = 20
            # m是固定值,一次去20条, n是第几条开始
            print('开始采集n的值为', n)
            if n == 200000:
                break
    
            # id的队列
            idQueue = Queue(20)
            if idQueue.empty():
                data = get_id(m, n)
                for i in data:
                    idQueue.put(i[0])
    
            # 采集线程的数量
            crawlList = []
            for id in range(1, 2):
                name = '采集线程{}'.format(id)
                crawlList.append(name)
    
            # 存储采集线程的列表集合
            threadcrawl = []
            for threadName in crawlList:
                thread = ThreadCrawl(threadName, idQueue)
                thread.start()
                threadcrawl.append(thread)
    
            for thread in threadcrawl:
                thread.join()
            n = n + m
        print("主线程退出..............")
    if __name__ == '__main__':
        main()
    

    以上代码是作者从数据库中取数据,间隔性取,然后拼装到url,进行请求。

    队列需求二(生产者、消费者模型)

    import threading
    import time
    from queue import Queue
    
    def put_id():
        i = 0
        while True:
            i = i + 1
            print("添加数据", i, id_queue.qsize())
            time.sleep(0.1)
            id_queue.put(i)
    
    def get_id(m):
        while True:
            i = id_queue.get()
            print("线程", m, '取值', i)
    
    
    if __name__ == "__main__":
    
    
        id_queue = Queue(20)
    
        Th1 = threading.Thread(target=put_id, )
    
        Th2 = threading.Thread(target=get_id, args=(2, ))
        Th3 = threading.Thread(target=get_id, args=(3, ))
        Th5 = threading.Thread(target=get_id, args=(4, ))
        Th4 = threading.Thread(target=get_id, args=(5, ))
    
        Th2.start()
        Th1.start()
    
        Th3.start()
        Th4.start()
        Th5.start()
    
    展开全文
  • 首先先了解多线程队列,生产消费模式的大致步骤。 1.主线程生成目标链接。 2.主线程开启子线程访问队列并爬取数据保存。 3.待队列目标为空时关闭线程。 示例代码 主要字段: city={ '河北省':['石家庄','保定市','...

    在爬取大量数据时,由于有成千上万的数据,单线程爬虫显然不能满足我们的需求,这时候多线程爬虫就来了,本篇文章使用Threading和Queue简单介绍。

    首先先了解多线程队列,生产消费模式的大致步骤。
    1.主线程生成目标链接。
    2.主线程开启子线程访问队列并爬取数据保存。
    3.待队列目标为空时关闭线程。

    示例代码

    主要字段:

    city={
        '河北省':['石家庄','保定市','秦皇岛','唐山市','邯郸市','邢台市','沧州市','承德市','廊坊市','衡水市','张家口'],
       '山西省':['太原市','大同市','阳泉市','长治市','临汾市','晋中市','运城市','晋城市','忻州市','朔州市','吕梁市'],
        '内蒙古':['呼和浩特','呼伦贝尔','包头市','赤峰市','乌海市','通辽市','鄂尔多斯','乌兰察布','巴彦淖尔'],
       '辽宁省':['盘锦市','鞍山市','抚顺市','本溪市','铁岭市','锦州市','丹东市','辽阳市','葫芦岛','阜新市','朝阳市','营口市'],
       '吉林省':['吉林市','通化市','白城市','四平市','辽源市','松原市','白山市'],
       '黑龙江省':['伊春市','牡丹江','大庆市','鸡西市','鹤岗市','绥化市','双鸭山','七台河','佳木斯','黑河市','齐齐哈尔市'],
       '江苏省':['无锡市','常州市','扬州市','徐州市','苏州市','连云港','盐城市','淮安市','宿迁市','镇江市','南通市','泰州市'],
       '浙江省':['绍兴市','温州市','湖州市','嘉兴市','台州市','金华市','舟山市','衢州市','丽水市'],
       '安徽省':['合肥市','芜湖市','亳州市','马鞍山','池州市','淮南市','淮北市','蚌埠市','巢湖市','安庆市','宿州市','宣城市','滁州市','黄山市','六安市','阜阳市','铜陵市'],
       '福建省':['福州市','泉州市','漳州市','南平市','三明市','龙岩市','莆田市','宁德市'],
       '江西省':['南昌市','赣州市','景德镇','九江市','萍乡市','新余市','抚州市','宜春市','上饶市','鹰潭市','吉安市'],
       '山东省':['潍坊市','淄博市','威海市','枣庄市','泰安市','临沂市','东营市','济宁市','烟台市','菏泽市','日照市','德州市','聊城市','滨州市','莱芜市'],
       '河南省':['郑州市','洛阳市','焦作市','商丘市','信阳市','新乡市','安阳市','开封市','漯河市','南阳市','鹤壁市','平顶山','濮阳市','许昌市','周口市','三门峡','驻马店'],
       '湖北省':['荆门市','咸宁市','襄樊市','荆州市','黄石市','宜昌市','随州市','鄂州市','孝感市','黄冈市','十堰市'],
       '湖南省':['长沙市','郴州市','娄底市','衡阳市','株洲市','湘潭市','岳阳市','常德市','邵阳市','益阳市','永州市','张家界','怀化市'],
       '广东省':['江门市','佛山市','汕头市','湛江市','韶关市','中山市','珠海市','茂名市','肇庆市','阳江市','惠州市','潮州市','揭阳市','清远市','河源市','东莞市','汕尾市','云浮市'],
       '广西省':['南宁市','贺州市','柳州市','桂林市','梧州市','北海市','玉林市','钦州市','百色市','防城港','贵港市','河池市','崇左市','来宾市'],
       '海南省':['海口市','三亚市'],
       '四川省':['乐山市','雅安市','广安市','南充市','自贡市','泸州市','内江市','宜宾市','广元市','达州市','资阳市','绵阳市','眉山市','巴中市','攀枝花','遂宁市','德阳市'],
       '贵州省':['贵阳市','安顺市','遵义市','六盘水'],
       '云南省':['昆明市','玉溪市','大理市','曲靖市','昭通市','保山市','丽江市','临沧市'],
       '西藏':['拉萨市','阿里'],
       '陕西省':['咸阳市','榆林市','宝鸡市','铜川市','渭南市','汉中市','安康市','商洛市','延安市'],
       '甘肃省':['兰州市','白银市','武威市','金昌市','平凉市','张掖市','嘉峪关','酒泉市','庆阳市','定西市','陇南市','天水市'],
       '青海省':['西宁市'],
       '银川省':['银川市','固原市','青铜峡市','石嘴山市','中卫市']
      }
    years=['2011','2012','2013','2014','2015','2016','2017','2018','2019','2020']
    month=['01','02','03','04','05','06','07','08','09','10','11','12']
    month_less=['01','02','03','04','05','06','07','08','09','10']
    title=['日期','最高气温','最低气温','天气','风向']
    

    导入所需要的包:

    from lxml import etree
    import csv
    import time
    import bs4
    import random
    import requests
    from xpinyin import Pinyin
    import os
    import threading
    from queue import Queue
    

    创建队列

    q=Queue()
    

    因为爬取的是全国省市2011年至今的天气数据,所以本段代码创建了省会路径并想队列传递目标链接。

    for province in city.keys():
        path='./Thread_Test/{}'.format(province)
        if os.path.exists(path):
            pass
        else:
            os.mkdir(path)
        for nano_city in city[province]:
            path='./Thread_Test/{}/{}'.format(province,nano_city)
            if os.path.exists(path):
                pass
            else:
                os.mkdir(path)
            p=Pinyin()
            if '市' in nano_city:
                str_city=nano_city.split('市')[0]
                str_city=p.get_pinyin(u'{}'.format(str_city),'')
                print(str_city)
            else:
                str_city=p.get_pinyin(u'{}'.format(nano_city),'')
            for y in years:
                path='./Thread_Test/{}/{}/{}'.format(province,nano_city,y)
                if os.path.exists(path):
                    pass
                else:
                    os.mkdir(path)
                if y=='2020':
                    for m in month_less:
                        url='https://lishi.tianqi.com/{}/{}.html'.format(str_city,y+m)
                        info=[province,nano_city,y,m]
                        q.put([url,info])
                else:
                    for m in month:
                        url='https://lishi.tianqi.com/{}/{}.html'.format(str_city,y+m)
                        info=[province,nano_city,y,m]
                        q.put([url,info])
    print(q.qsize())
    

    队列中共有31388条链接

    创建线程任务方法:

    def working():
        while True: #需要使用while 否则线程执行完一次操作就关闭了
            url = q.get() #默认队列为空时,线程暂停
            doing(url)
            q.task_done()#告诉队列本次取操作已经完毕
    
    def doing(url):  #线程在获取到链接后的行为
        rsp=requests.get(url=url[0],headers=headers)
        html_4=bs4.BeautifulSoup(rsp.text,'html.parser')
        ul=html_4.find('ul',class_='thrui')
        f=open('./Thread_Test/{}/{}/{}/{}.csv'.format(url[1][0],url[1][1],url[1][2],url[1][3]),'w',encoding='utf-8',newline='')
        csv_writer=csv.writer(f)
        csv_writer.writerow(title)
        for i in ul.find_all('li'):
            lis=[]
            a=i.find_all('div')
            for j in a:
                if len(j.text.split())==0:
                    pass
                else:
                    lis.append(j.text.split()[0])
            print(url[1][0],url[1][1],url[1][2],url[1][3],'剩余',q.qsize())
            csv_writer.writerow(lis)
        f.close()
    

    创建子线程

    threads = []
    for i in range(10): #开启十个子线程
        t = threading.Thread(target=working) #线程的目标任务为working方法
        threads.append(t)
    

    开启子线程

    for item in threads:
        item.setDaemon(True)
        item.start()
    q.join()  #在队列为空时才进行后面的语句,需要配合task_done()使用
    

    基本的多线程爬虫就完成了。

    还有一个更优的方法
    生成一个新的队列来储存主线程创建的十个子线程获取到网页数据,子线程并不直接写入,而是交给主线程,主线程再生成新的子线程来写入队列里的数据,这样就减少了十个子线程等待写入文件的时间,专注于爬取数据。

    由于数据量太大,测试仅选择了一个省的数据进行爬取。约1300条数据在这里插入图片描述
    此图为线程直接写入所需时间

    在这里插入图片描述
    此图为使用新线程专门处理写入数据所需时间

    在这里插入图片描述
    次图为单线程爬虫所需时间

    可以看到使用新线程专门处理写入数据时,速度比边爬边写快了30秒=百分之二十,而单线程花费了恐怖的1405秒,为多线程的十倍之多,可见多线程在爬取大量数据时是非常有用的。
    当然,这么快的速度肯定会遭到反扒处理,封ip之类的,所以要事先准备一个ip池,每个线程在爬取一段时间后就更换一个ip,线程不建议开太多,避免给目标网站服务器造成太大压力,做一个绅士爬虫!

    这段代码还可以再继续优化:
    1.爬虫的类库提取
    2.线程的类库提取
    3.存数据库的类库提取
    4.main()函数优化

    初上手多线程,对于锁什么的还不是很懂,若有错误的地方欢迎指明。

    展开全文
  • 之前使用Python都是现学现用,用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧多线程任务队列在实际项目中非常有用,关键的地方要实现队列多线程同步问题,也即保证队列多线程安全...

       最近学习spark,我主要使用pyspark api进行编程。

    之前使用Python都是现学现用,用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧

    多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全

    例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任务

    同时可以有多个生产者往队列发送消息,实现异步消息处理

    先复习下互斥量条件变量的概念:

    互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁以后,任何其他试图再次对互斥锁加锁的线程将会阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为运行状态的线程可以对互斥锁加锁,其他线程将会看到互斥锁依然被锁住,只能回去再次等待它重新变为可用。

    条件变量(cond)是在多线程程序中用来实现”等待–》唤醒”逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住

    条件变量总是与互斥锁一起使用的

    Python的threading中定义了两种锁:threading.Lock和threading.RLock

    两者的不同在于后者是可重入锁,也就是说在一个线程内重复LOCK同一个锁不会发生死锁,这与POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可递归锁的概念是相同的, 互斥锁的API有三个函数,分别执行分配锁,上锁,解锁操作。

    python的threading中的条件变量默认绑定了一个RLock,也可以在初始化条件变量的时候传进去一个自己定义的锁.

    最后贴出我自己实现的简单线程安全任务队列

    测试代码



    转载于:https://juejin.im/post/5c7787def265da2ddb29785e

    展开全文
  • python实现线程安全队列

    千次阅读 2016-10-20 19:32:06
     多线程任务队列在实际项目中非常有用,关键的地方要实现队列多线程同步问题,也即保证队列多线程安全  例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任
  • import threading import pika # 处理消息 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # channel.basic_ack(delivery_tag=method.delivery_tag) # 取出之后即删除 ...
  • python多线程队列安全

    2014-10-18 12:15:16
    这是一个经典的“生产者消费者”例子:python queue模块有三种队列:1、python queue模块的FIFO队列先进先出。2、LIFO类似于堆。即先进后出。3、还有一种是优先级队列级别越低越先出来。针对这三种队列分别有三个构造...
  • Python中,队列线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外。Queue是线程安全的,自带锁,使用的时候,不用对队列加锁操作。 1. 将一个值...
  • Queue在python3中重命名为queue,在python2到python3转换中可以自动转换队列可应用在多个生产者多个消费者的模型中,并且在多线程中可用于线程之间数据信息的安全交换通信,防止冲突。 在队列中已经实现多线程的锁...
  • 在上上篇文章讲多线程的时候附上了一则爬取腾讯招聘的多线程爬虫,也用到了队列,刚刚我做了写改动,就再写一篇文章把代码贴出来吧: class Tencent(Thread): #生产者线程,专门发送请求,然后将请求结果放入队列 ...
  • Python多线程实现消费者实例

    千次阅读 2019-04-16 21:06:08
    结合Python多线程,和队列queue实现一个简单的队列消费者实例。用例批量检测ip是否存在。 from __future__ import print_function import subprocess import threading from queue import Queue from queue import ...
  • # 创建多线程 for threadID in range(1, THREAD_NUM + 1): thread = BookThread(threadID, book_queue, func) thread.start() thread_list.append(thread) # 填充队列 queueLock.acquire() for book in ...
  • Python多线程

    2019-11-28 23:08:07
    本课程主要讲解使用Python中的_thread和threading模块实现多线程编程,以及队列Queue的使用。课程中使用队列实现了生产者和消费者的程序编写。 重点:_thread和threading、Queue
  • 该模式通过平衡生产线程消费线程的工作能力来提高程序的整体处理数据 的速度。 案例: 两个厨师对四个顾客 厨师做包子和顾客吃包子问题。 当生产的慢,消费的快的时候,get()会发生阻塞,等待 当生产的忙,...
  • python多线程

    2017-06-20 15:04:00
    python多线程模块包括:thread、threading、Queue 实现模块 thread:多线程的底层支持模块,一般不建议使用; threading:对thread进行了封装,将一些线程的操作对象化。  Queue:提供了一个适用于多线程编程的...
  • 写一个生产者消费者的测试Demo来演示: print("Test Thread") from threading import Thread from threading import Lock import time from queue import Queue # == False时退出所有线程 is_running = True # ...
  • 前面介绍了互斥锁和条件变量解决线程间的同步问题,并使用条件变量同步机制解决了生产者...Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先
  • 在爬取大量数据时,由于有成千上万的数据,单线程爬虫显然不能满足我们的需求,这时候多线程爬虫就来了,本篇文章使用Threading和Queue简单介绍。...首先先了解多线程队列,生产消费模式的大致步骤。 1.主线程
  • # -*- coding:utf-8 -*- ...“生产者-消费者”案例 queue.Queue put(item,block=True,timeout=None) get(block=True,timeout=None) full() empty() qsize() """ import time from threading import Th
  • Python多线程通信

    2020-06-16 15:49:37
    Python多线程通信为什么需要通信通信方法共享变量共享变量介绍实现消息队列(Queue)实现通信消息队列介绍实现 为什么需要通信 在生产者和消费者的模型中,如果有一个生产者和多个消费者,消费者之间就需要线程节的...

空空如也

空空如也

1 2 3 4 5 ... 18
收藏数 342
精华内容 136
关键字:

python多线程消费队列

python 订阅