精华内容
下载资源
问答
  • python多线程队列使用

    2021-02-10 05:32:51
    队列 -> 线程2 -> url_web####!/usr/bin/envpython#!coding=utf-8#!by=xiaoheimportQueue,threading,time,randomfrommoniItemsimportmonimporturllib2fromflaskimportFlask,request,jsonqueu...

    #线程1 -> 队列 -> 线程2 -> url_web

    #

    #

    ##!/usr/bin/env python

    #! coding=utf-8

    #!by=xiaohe

    import Queue,threading,time,random

    from moniItems import mon

    import urllib2

    from flask import Flask,request,json

    queueLock = threading.Lock()

    class t1(threading.Thread):

    def __init__(self,t_name,queue):

    threading.Thread.__init__(self,name=t_name)

    self.data = queue

    def run(self):

    #for i in range(5):

    while True:

    queueLock.acquire()

    if  self.data.empty():

    print time.ctime(),self.getName()

    self.data.put(mon().runAllGet())

    time.sleep(1)

    queueLock.release()

    else:

    queueLock.release()

    #print time.ctime(),self.getName(),"end"

    class t2(threading.Thread):

    def __init__(self,t_name,queue):

    threading.Thread.__init__(self,name=t_name)

    self.data=queue

    def run(self):

    #for i in range(5):

    while True:

    queueLock.acquire()

    if not self.data.empty():

    vai = self.data.get()

    #print  self.getName() , vai

    a=urllib2.Request("http://reboot:8088", json.dumps(vai), {‘Content-Type‘: ‘application/json‘})

    urllib2.urlopen(a)

    queueLock.release()

    else:

    queueLock.release()

    def mai():

    queue = Queue.Queue(5)

    tt1 = t1(‘shou‘,queue)

    tt2 = t2(‘fa‘,queue)

    tt1.start()

    tt2.start()

    tt1.join()

    tt2.join()

    if __name__==‘__main__‘:

    mai()

    原文:http://zhangxiaohe.blog.51cto.com/7821029/1571305

    展开全文
  • #Python queue队列,实现并发,在网站多线程推荐最后也一个例子,比这货简单,但是不够规范#encoding: utf-8__author__ = 'yeayee.com' #由本站增加注释,可随意Fork、Copyfrom queue import Queue #Queue在3.x中...

    #Python queue队列,实现并发,在网站多线程推荐最后也一个例子,比这货简单,但是不够规范

    #encoding: utf-8

    __author__ = 'yeayee.com' #由本站增加注释,可随意Fork、Copy

    from queue import Queue #Queue在3.x中改成了queue

    importrandomimportthreadingimporttimeclassProducer(threading.Thread):"""Producer thread 制作线程"""

    def __init__(self, t_name, queue): #传入线程名、实例化队列

    threading.Thread.__init__(self, name=t_name) #t_name即是threadName

    self.data =queue"""run方法 和start方法:

    它们都是从Thread继承而来的,run()方法将在线程开启后执行,

    可以把相关的逻辑写到run方法中(通常把run方法称为活动[Activity]);

    start()方法用于启动线程。"""

    defrun(self):for i in range(5): #生成0-4五条队列

    print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)) #当前时间t生成编号d并加入队列

    self.data.put(i) #写入队列编号

    time.sleep(random.randrange(10) / 5) #随机休息一会

    print("%s: %s producing finished!" % (time.ctime(), self.getName)) #编号d队列完成制作

    classConsumer(threading.Thread):"""Consumer thread 消费线程,感觉来源于COOKBOOK"""

    def __init__(self, t_name, queue):

    threading.Thread.__init__(self, name=t_name)

    self.data=queuedefrun(self):for i in range(5):

    val=self.data.get()print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val)) #编号d队列已经被消费

    time.sleep(random.randrange(10))print("%s: %s consuming finished!" % (time.ctime(), self.getName())) #编号d队列完成消费

    defmain():"""Main thread 主线程"""queue= Queue() #队列实例化

    producer = Producer('Pro.', queue) #调用对象,并传如参数线程名、实例化队列

    consumer = Consumer('Con.', queue) #同上,在制造的同时进行消费

    producer.start() #开始制造

    consumer.start() #开始消费

    """join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

    join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。"""producer.join()

    consumer.join()print('All threads terminate!')if __name__ == '__main__':

    main()"""运行结果:

    Thu Feb 4 11:05:48 2016: Pro. is producing 0 to the queue!

    Thu Feb 4 11:05:48 2016: Pro. is producing 1 to the queue!

    Thu Feb 4 11:05:48 2016: Con. is consuming. 0 in the queue is consumed!

    Thu Feb 4 11:05:49 2016: Pro. is producing 2 to the queue!

    Thu Feb 4 11:05:50 2016: Pro. is producing 3 to the queue!

    Thu Feb 4 11:05:51 2016: Pro. is producing 4 to the queue!

    Thu Feb 4 11:05:52 2016: Con. is consuming. 1 in the queue is consumed!

    Thu Feb 4 11:05:53 2016: > producing finished!

    Thu Feb 4 11:06:00 2016: Con. is consuming. 2 in the queue is consumed!

    Thu Feb 4 11:06:06 2016: Con. is consuming. 3 in the queue is consumed!

    Thu Feb 4 11:06:06 2016: Con. is consuming. 4 in the queue is consumed!

    Thu Feb 4 11:06:12 2016: Con. consuming finished!

    All threads terminate!"""

    展开全文
  • python3,开一个线程,间隔1秒把一个递增的数字写入队列,再开一个线程,从队列中取出数字并打印到终端复制代码 代码如下:#! /usr/bin/env python3import timeimport threadingimport queue# 一个线程,间隔一定的...

    python3,开一个线程,间隔1秒把一个递增的数字写入队列,再开一个线程,从队列中取出数字并打印到终端

    复制代码 代码如下:

    #! /usr/bin/env python3

    import time

    import threading

    import queue

    # 一个线程,间隔一定的时间,把一个递增的数字写入队列

    # 生产者

    class Producer(threading.Thread):

    def __init__(self, work_queue):

    super().__init__() # 必须调用

    self.work_queue = work_queue

    def run(self):

    num = 1

    while True:

    self.work_queue.put(num)

    num = num+1

    time.sleep(1) # 暂停1秒

    # 一个线程,从队列取出数字,并显示到终端

    class Printer(threading.Thread):

    def __init__(self, work_queue):

    super().__init__() # 必须调用

    self.work_queue = work_queue

    def run(self):

    while True:

    num = self.work_queue.get() # 当队列为空时,会阻塞,直到有数据

    print(num)

    def main():

    work_queue = queue.Queue()

    producer = Producer(work_queue)

    producer.daemon = True # 当主线程退出时子线程也退出

    producer.start()

    printer = Printer(work_queue)

    printer.daemon = True # 当主线程退出时子线程也退出

    printer.start()

    work_queue.join() # 主线程会停在这里,直到所有数字被get(),并且task_done(),因为没有调用task_done(),所在这里会一直阻塞,直到用户按^C

    if __name__ == '__main__':

    main()

    queue是线程安全的,从多个线程访问时无需加锁。

    如果在work_queue.get()之后调用work_queue.task_done(),那么在队列空时work_queue.join()会返回。

    这里work_queue.put()是间隔一定时间才往队列放东西,如果调用work_queue.task_done(),在数字1被get()后,队列空时,join()就返回,程序就结束了。

    也就是程序只打印了1然后就退出了。

    所以在这种使用情景下,不能调用task_done(),程序会一直循环下去。

    https://docs.python.org/3/library/queue.html

    展开全文
  • private static List queueCache = new LinkedList();2、定义队列缓冲池最大消息数,如果达到该值,...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中new Thread(){public void run(){while(...

    private static List queueCache = new LinkedList();

    2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

    private Integer offerMaxQueue = 2000;

    3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

    new Thread(){

    public void run(){

    while(true){

    String ip = null;

    try {

    synchronized (queueCache) {

    Integer size = queueCache.size();

    if(size==0){

    //队列缓存池没有消息,等待。。。。queueCache.wait();

    }

    Queue queue = queueCache.remove(0);

    if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理

    queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,

    continue;

    }else{

    ;//这里是处理该消息的操作。

    }

    size = queueCache.size();

    if(size=0){queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。

    }

    }

    } catch (Exception e) {

    e.printStackTrace();

    }finally{

    try {//检出该消息队列的锁

    unIpLock(queueStr);

    } catch (Execption e) {//捕获异常,不能让线程挂掉

    e.printStackTrace();

    }

    }

    }

    }.start();

    4、检入队列

    synchronized (queueCache) {

    while(true){

    Integer size = queueCache.size();

    if(size>=offerMaxQueue){

    try {

    queueCache.wait();

    continue;//继续执行等待中的检入任务。

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }//IF

    if(size<=offerMaxQueue&&size>0){

    queueCache.notifyAll();

    }

    break;//检入完毕

    }//while

    }

    5、锁方法实现

    /**

    * 锁

    * @param ip

    * @return

    * @throws

    */

    public Boolean isLock(String queueStr) {

    return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;

    }

    //解锁

    public void unIpLock(String queueStr) {

    if(ip!=null){

    this.redisManager.del(queueStr+"_lock");

    //lock.unlock();

    }

    }

    分享到:

    18e900b8666ce6f233d25ec02f95ee59.png

    72dd548719f0ace4d5f9bca64e1d7715.png

    2014-10-12 23:38

    浏览 4127

    评论

    展开全文
  • 因此,我使用多线程来加快进程。但问题是,例如,我使用50作为线程数,在处理了50行之后,脚本什么也不做。它不会终止,也不会显示任何其他内容。在这是我的参考代码:#!/usr/bin/env pythonfrom __future__ import ...
  • 先用python实现, 把每一个要处理的参数存放到队列Queue中, 然后创建线程队列中取出class ThreadExecuteJob(threading.Thread):"""Threaded Url Grab"""def __init__(self, queue):threading.Thread.__init__(self)...
  • 线程需要共享数据或者资源的时候,可能会使得线程使用变得复杂。线程模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易...
  • python多线程中消息队列怎么实现发布时间:2020-11-19 09:23:31来源:亿速云阅读:67作者:小新这篇文章主要介绍了python多线程中消息队列怎么实现,具有一定借鉴价值,需要的朋友可以参考下。希望大家阅读完这篇...
  • 2.compareTo()方法的使用可以见我之前的文章,而getDelay方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者线程会wait等待到...
  • Java多线程生产、消费消息队列示例 package com.zf.util; import lombok.extern.slf4j.Slf4j; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * 多线程消费消息队列 * * @...
  • 可以在一个进程中由线程调用Linux函数msgget(),msgsnd()和msgrcv()吗?不同线程中的这些函数将尝试访问(r / w)一个进程的消息队列.所有种族条件都应该由系统来处理吗?如果没有,是否有任何好的方法来支持线程并...
  •   因为多线程会消耗当前服务器资源,根本不算加快系统的性能,如果线程挂掉,那线程中的信息仍然会丢失,消息队列可部署在其他中间件上,并且其中的信息可以额外保存。   在一个方法中,前后两段步骤没有联系,...
  • 本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记1、定义一个队列缓存池://static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被...
  • 消息队列是消息的链表,存放在内核中并有消息队列标示符标示。msgget用于创建一个新队列或打开一个现存的队列。msgsnd将新消息加入到消息队列中;每个消息包括一个long型的type;和消息缓存;msgrcv用于从队列中取出...
  • 并发的实现有多种方式,本文仅给出多线程+队列的一个简单示例。 import queue import threading # 创建一个队列对象,并初始化值 q = queue.Queue(maxsize=1000) for i in range(100): q.put(i) # 定义实际操作...
  • 我们先来看看线程池的四种拒绝策略: 注: 当线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略: ThreadPoolExecutor.AbortPolicy:...
  • 本文是基于单生产者单消费者线程的实现。struct {char buf[65536];unsigned short writer_index;unsigned short reader_index;}reader_index只由读线程改变,writer_index只由写线程改变。读线程读取reader_index到...
  • python多线程多线程threading库常用方法thread类继承thread类全局变量的问题不加线程锁添加线程锁Lock(线程同步)queue线程安全队列 多线程 单线程(按序列并发执行) #mermaid-svg-1mBkHgaMG5Tm8N85 .label{font-...
  • 1、DelayQueue是×××阻塞队列2、队列中的元素必须实现Delayed接口,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。3、剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。...
  • 前言 为什么需要无锁队列呢?我们知道,多核心优化是现在游戏开发的一个...因此,实现无锁队列等无锁数据结构,可以看作是迈入多线程编程的基石。 推荐视频:https://www.bilibili.com/video/BV1354y1b7nz/ 问题描述
  • 我们在使用多线程时候,有没有听过“消息队列”这个名词呢?先从字面上跟大家说下,队列就是尽然有序的排队,消息,肯定是跟传递有关,这样大家组合起来就应该知道什么是消息队列了吧,那就是把收取的内容进行井然...
  • 不着急知道结果,尽量使用消息队列,保证服务器的压力减小,因为多线程对cpu的消耗大: 用线程的话,会占用主服务器资源, 消息队列的话, 可以放到其他机器上运行, 让主服务器尽量多的服务其他请求。 需要解耦的...
  • 文章目录生产者消费者队列用途划分:容量划分:实现有界队列无界队列 生产者消费者队列 它是实现线程间协作,交互一种重要手段。...生产者线程产生不同类型的数据,通过队列分发给不同消费者线程 任务队列 队列
  • Unity 已可使用 Thread、Task 等处理多线程任务,但缺少成熟的多线程任务队列工具,所以在此实现一个,代码已上传 Git 项目 GRUnityTools,可直接下载源码或通过 UPM 使用实现目标串行与并发队列队列是首要实现目标...
  • QT线程之间队列操作

    2021-08-26 21:18:38
    实现使用一个线程入队,另一个线程出队。。 这样的操作运用范围很广,比如在播放器时候,一个线程解码,每一个picture入队,另一个线程出队显示。。在网络接收数据时候可以一个线程接收输入入队,另一个线程出队使用...
  • 多线程消费同一队列消费一条消息往往比产生一条消息慢很多,为了防止消息积压,一般需要开启多个工作线程同时消费消息。在 RabbitMQ 中,我们可以创建多个 Consumer 消费同一队列。示意图如下:gordon.study....
  • Python3,开一个线程,间隔1秒把一个递增的数字写入队列,再开一个线程,从队列中取出数字并打印到终端#! /usr/bin/env python3import timeimport threadingimport queue# 一个线程,间隔一定的时间,把一个递增的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 566,625
精华内容 226,650
关键字:

多线程队列的使用