精华内容
下载资源
问答
  • 多线程消费一个队列问题

    千次阅读 2019-08-02 11:16:38
    问题描述 ...但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。 贴代码 往队列里push数据 void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque) { KafkaC...

    问题描述

    最近公司有个转发服务,业务逻辑是从kafka消费到大量的数据,然后放入一个队列中。之后用一个线程池,去消费这个队列。

    但是发现这四个线程消费队列的地方又严重的延迟。特此想解决此问题。

    贴代码

    • 往队列里push数据
    void KafkaConsumer::msgConsume(RdKafka::Message* message, void* opaque)
    {
    	KafkaConsumer::Data cData;
    	int errcode = message->err();
    
    	if (errcode == RdKafka::ERR__TIMED_OUT)
    	{
    		return;
    	}
    	else if (errcode == RdKafka::ERR_NO_ERROR)  //消费数据,放入队列
    	{
    		Data *pData=new Data;
    		pData->buffer.writeBlock(static_cast<const char*>(message->payload()),static_cast<int>(message->len())); // payload 装载,载荷;这里就是里面的内容
    		//pData->topic = message->topic()->name();  
    		pData->topic = message->topic_name();   // 注意这里
    		pData->ipartition = message->partition();
    
    		_cMutex.lock();
    		_cDataQue.push(pData); // 放入队列
    		_cMutex.unlock();
    	}
    	else if (RdKafka::ERR__PARTITION_EOF)
    	{
    		if (_exit_eof) _run = false;
    	}
    	else
    	{
    		LOG(INFO) << "kafkaConsumer--error: Consumer failed:" << message->errstr();
    	}
    }
    • 取队列数据,处理篇
    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1)
    	{
    		if (tag == CDATA)
    		{
    			if(_cDataQue.size() == 0) {
    				usleep(2000);
    				continue;
    			}
    			_cMutex.lock();
    			while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
    			{
    				Data *pData = _cDataQue.pop(); // 队列中取出
    				HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
    				SAFE_DELETE(pData);
    			}
    			_cMutex.unlock();
    		} else {
    			break;
    		}
    	}
    }

    代码错误分析

        _cMutex.lock();
                while(_cDataQue.size()>0) // 处理一次就都得处理完?!!
                {
                    Data *pData = _cDataQue.pop(); // 队列中取出
                    HandleMsg(pData);     // 取数据和处理数据放一起?都在锁里?!!
                    SAFE_DELETE(pData);
                }
                _cMutex.unlock();

     线程在数据队列_cDataQue中的数据时,先上锁,然后不断的循环取出队列中的数据并处理。(取出数据 和处理数据在一起)

    处理完每条数据之后delete.

    当锁定时的整个队列中的数据处理完毕之后,解锁。

    定义几个变量:

    N : 锁时队列的长度

    T1: pop 一条数据的时间

    T2:HandleMsg 函数执行的时间

    T3:push 一条数据的时间

    此活动中的动作:

    1. kafka消费到数据,锁队列,写队列,解锁队列。

    2.数据解析线程,锁队列,读数据,解锁队列,处理数据。

    此时的处理方式,几乎没有发挥多线程的优势,每次都是把锁时的队列的全部内容处理完。其他三个线程和生产数据的线程干等

    t = N * (T1+T2) 的时间。 若此时是程序刚启动。kafka瞬间消费到很多数据成万条的数据。 那么t 将是一个很大的时间。且kafka消费到的数据还不能及时的存放如队列中。于是就造成了延迟。

    隐患就是:

    1.根本没发挥多线程的优势和能力

    2.若数据量大,取数据和处理数据放一起,导致锁态占用的时间很长,影响其他线程(往queue里放数据的线程)干活

    3.其他线程竞争不到,干等,浪费CPU时间。一个线程死干活,处理不完,数据堆积。延迟。

    改进方法

    1. 将取数据的地方放在锁的里面,处理数据的地方放在锁的外面。

    2.每次取固定数量的nCount 个数据,放在一个容器里。然后出锁后慢慢处理。

    同时,每次取固定数量的来处理,锁占用的时间是固定的,t = nCount * T1 .也就是说,其他3个处理线程和1个往queue里塞数据的线程。最多只等 3 * t 的时间就能拿到 queue的控制权,并对其进行操作。

    而数据处理的时间 T2 与queue的操作(加锁,读取,塞入)没有关系。

    不过要控制nCount的值,太小。锁的次数很频繁; 太大,t 的时间会变大。

    这样多线程就用其来了。队列应用也灵活了。处理能力大大提升。

    void KafkaConsumer::run(void* param)
    {
    	int tag;
    	memcpy(&tag,&param,sizeof(int));
    	while (1){
    		if(_cDataQue.size() == 0) {
    			usleep(2000);
    			continue;
    		}
    		std::vector<Data*> vDatas;
    		_cMutex.lock();
    		while(_cDataQue.size()>0) {//上锁的时间尽量短,为其他线程争取到和写入线程腾出时间
    			Data *pData = _cDataQue.pop(); // 队列中取出
    			vDatas.push_back(pData);
    			if(vDatas.size() > 10){ //这里能限制这个长度 ,最多弄10条。处理快,节省时间。
    				break;
    			}
    		}
    		_cMutex.unlock();
    		// 将处理移除在锁之外,慢慢处理这些数据,处理完释放
    		for(vector<Data*>::iterator iter = vDatas.begin(); iter != vDatas.end(); ++iter){
    			Data *pData = *iter;
    			HandleMsg(pData);
    			SAFE_DELETE(pData);
    		}	
    	}
    }

     

    用生活实例来解释描述:

    1.角色 : 大厨 (生产者) , 取餐台/口(queue),包子(数据),顾客(消费处理线程)

    2.动作:生产数据(push进queue),取出数据(pop出queue),占住取餐台(Lock),放开取餐台(UNLock),吃包子(HandleMsg)

     

    方案一

    大厨们生产包子,锁住取餐口,放下包子。然后顾客1 占住取餐口,假如这里有10个包子,他就取一个吃了,再去一个吃了,直到10个取完吃完才离开取餐口。此时,大厨没法往里放包子,其他三个顾客都干等着。

    方案二

    大厨们生产包子,占住取餐口,放下包子。顾客1,占住取餐口,取了10个包子,去一边吃去。顾客2 ,马上来也取10个,然后一遍吃去。同理顾客3,4 也一样。当然这里只是理想情况,顾客1去完之后,也可能大厨又占住取餐口,放了1w个包子。

    关键是,每次取餐口被占用的时间,之后顾客们取包子的时间。非常短。而且每个顾客取完之后就去一边吃包子。同时大家可能都在吃包子,实现了多线程处理。


    哈哈。就酱紫。


     

    展开全文
  • java多线程队列实例

    万次阅读 2016-06-02 13:54:39
    java多线程队列实例

    第一步:创建一个无边界自动回收的线程池,在此用 JDK提供的ExecutorService类

    此线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。


    package com.thread.test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPool {
    	private static ExecutorService threadPool = null;
    	public static ExecutorService getThreadPool(){
    		if(threadPool==null){
    			threadPool = Executors.newCachedThreadPool();
    		}
    		return 	threadPool;
    	}
    
    }

    第二步:使用单例模式创建一个无界队列,并提供入队的方法

    无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    package com.thread.test;
    
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TaskQueue {
    	private static  LinkedBlockingQueue queues = null;
    	
    	public static LinkedBlockingQueue getTaskQueue(){
    		if(queues==null){
    			queues =  new LinkedBlockingQueue();
    			System.out.println("初始化 队列");
    		}
    		return queues;
    	}
    	
    	public static void add(Object obj){
    		if(queues==null)
    			queues =  getTaskQueue();
    		queues.offer(obj);
    		System.out.println("-------------------------------");
    		System.out.println("入队:"+obj);
    	}
    }
    

    第三步:提供一个入队的线程,实际使用中的生产者

    package com.thread.test;
    
    public class Produce implements Runnable {
    	private static volatile int i=0;
    	private static volatile boolean isRunning=true;
    
    	public void run() {
    		while(isRunning){
    			TaskQueue.add(Integer.valueOf(i+""));
    			Produce.i++;
    			try {
    				Thread.sleep(1*1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		
    	}
    
    }
    

    第四步:提供一个出队的线程,实际使用中的消费者

    package com.thread.test;
    
    public class Consumer implements Runnable {
    	private static Consumer consumer;
    	
    	public static volatile boolean isRunning=true;
    	public void run() {
    		while(Thread.currentThread().isInterrupted()==false && isRunning)  
            {  
    			try {
    				System.out.println("出队"+TaskQueue.getTaskQueue().take());
    				Thread.sleep(1*1000);  
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
            }
    		
    	}
    	public static Consumer getInstance(){
    		if(consumer==null){
    			consumer = new Consumer();
    			System.out.println("初始化消费线程");
    		}
    		return consumer;
    	}
    
    }
    

    第五步:启动生产消费策略

    package com.thread.test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Test {
    	
    	public static void main(String[] args) {
    		ExecutorService threadPool = ThreadPool.getThreadPool();
    		Produce consumer2 = new Produce();
    		threadPool.execute(consumer2);
    		Consumer consumer=Consumer.getInstance();
    		threadPool.execute(consumer);
    	}
    
    }
    



    展开全文
  • 多线程任务队列的一个超级简单的生产者-消费者实现。 您可以使用它,因为: 寻找一个轻量级且易于集成的多线程任务队列。 你没有 c++11 支持。 你想要跨平台。 得到它 git clone --recurse-submodules ...
  • RabbitMQ设置多线程处理队列消息

    千次阅读 2019-05-17 19:08:32
    @RabbitListener注解指定消费方法,默认...可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 1、RabbitmqConfig.java中添加容器工厂配置: @Bean("customContainerFac...

    @RabbitListener注解指定消费方法,默认是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源

    可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

    1、RabbitmqConfig.java中添加容器工厂配置:

     @Bean("customContainerFactory")
     public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConcurrentConsumers(10);  //设置线程数
            factory.setMaxConcurrentConsumers(10); //最大线程数
            configurer.configure(factory, connectionFactory);
            return factory;
        }
    

    2、@RabbitListener注解指定容器工厂

    @RabbitListener(queues = {"监听队列名"},containerFactory = "customContainerFactory")
    

    再次测试当队列有多个任务时消费端的并发处理能力,可以在执行方法中打印日志,查看对应执行的请求是否为多线程请求!

    展开全文
  • package mutliThread; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class test { ...private static Queue queue = new ConcurrentLinkedQueue();...private static final

    package mutliThread;

    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;

    public class test {
    private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
    private static final int threadNum = 3;


    public static void main(String[] args) throws Exception {
      insertThread it = new insertThread();
      it.start();
      ConcumerThread pt = new ConcumerThread();
      for (int i = 0; i < threadNum; i++) {
       pt = new ConcumerThread();
       pt.start();
      }
    }

    public static  boolean insert(String str) throws Exception {
      System.out.println("insert="+str+" , queue.size="+queue.size());
      queue.offer(str);
     
      if (queue.size() > 20000) {
       // do some thing
      }
      return true;
    }
      public synchronized static String get(){
       if(queue.isEmpty()){
        return null;
       }
      return queue.poll();
      }
    }

    class ConcumerThread extends Thread {
    public ConcumerThread() {

    }

    public void run() {
      String str = "";
      while (true) {
       str = test.get();
       if(str == null){
        System.out.println("任务池为空!");
        try {
         Thread.sleep(5*1000L);
        } catch (InterruptedException e) {
         e.printStackTrace();
        }
       }else{
        System.out.println(Thread.currentThread().getName()+"="+str);
       }
       
      }

    }
    }
    class insertThread extends Thread {
    public insertThread() {

    }

    public void run() {
      int i = 0;
      while (true) {
        try {
         test.insert("insert" + (i++));
        } catch (Exception e) {
         e.printStackTrace();
        }
      }

    }
    }

    展开全文
  • JAVA多线程队列

    千次阅读 2018-07-07 10:14:20
    JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。 在
  • 几种主要的阻塞队列ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先...
  • Queue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。 首先我们组要明白为什么要使用队列队列的性质, 多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题 ...
  • Java多线程总结之线程安全队列Queue

    万次阅读 2017-09-19 10:21:54
    在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • 线程消息队列

    2018-09-16 10:32:38
    所谓线程消息队列,就是一个普通的循环队列加上“生产者-单(消费者的存/取操作”。流水线方式中的线程是单消费者,线程池方式中的线程多消费者。
  • c++ 多线程 消息队列 同步

    千次阅读 2019-12-24 11:14:11
    一个线程专门用于与客户端进行通信,一个线程用于将客户端发过来的消息放入自己定义的一个队列中。一个线程用于将队列中的消息取出然后进行运算。比如线程1是专门负责与客户端进行通信...)这个涉及到多线程的同步问...
  • 多线程操作日志队列

    2014-08-01 09:24:28
    多线程 队列 线程执行器 调度 生产者与消费者进行 消息队列入队出列
  • // 单线程和多线程并发队列测试(同步和异步) // // 基础原理: // 1)队列: // 1.1 dispatch_queue_t 是一个队列,一个FIFO的消费队列 // 1.2 消费者:队列配置的运行线程 // 1.3 被消费对象: 添加到队列中的运行任务...
  • python 多线程处理队列

    2018-11-06 11:40:17
    转载自: ... from threading import Thread import time import random from queue import Queue from collections import deque #创建队列,设置队列最大数限制为3个 queue ...
  • Java多线程队列Queue

    2014-08-23 10:30:50
    在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • 一,Java中三种实现生产者消费者 1,使用wait()/notify()的方式 2,使用J.U.C下Condition的await()/signal()的方式实现 3,使用阻塞队列实现 注:这篇博文主要将使用阻塞队列实现,至于前面的两种可以看看我的...
  • C# 多线程+队列处理大批量数据,进而缩短处理时间
  • 本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902欢迎关注微博:...多线程同步
  • Java多线程阻塞队列和并发集合

    万次阅读 2012-02-10 19:23:02
    Java多线程 阻塞队列和并发集合  本章主要探讨在多线程程序中与集合相关的内容。在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成程序崩溃。Java为多线程专门提供了特有的线程安全的集合类,通过...
  • Python多线程队列结合demo

    千次阅读 2019-07-08 15:47:59
    大家都知道python的多线程不是真正的多线程,但是对于io类型的任务,多线程还是能发挥作用的。那么多个线程之间是如何进行变量共享的呢,很多时候我们可以借助queue模块,方便。今天就做一个学习。 二、threading...
  • Java多线程 阻塞队列和并发集合

    千次阅读 2012-01-12 13:52:16
    本章主要探讨在多线程程序中与集合相关的内容。在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成...java阻塞队列应用于生产者消费者模式、消息传递、并行任务执行和相关并发设计的大多数常见使用上下文。
  • 这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许消费者,每一个消费者都在自己的线程中运行。 我们使用数组来跟踪线程。 Thread[] _workers; 通过跟踪线程可以让我们在所有的线程都结束后再结束...
  • 单线程和多线程耗时对比:(请自行对比,深刻理解) 单线程运行: """ ...@desc: 多线程消费一个队列的例子 """ import threading import time import queue # 下面来通过多线程来处理Queu...
  • IOS多线程队列的使用

    万次阅读 2013-12-11 17:56:18
    最近搞一款塔防游戏,提到塔防,自然就想到了A星寻路。的确,它是一种高效的寻路...实在没辙了,我想到了队列线程。之前都没接触过这个东东,还好在网上找到很详细的线程介绍。当然,我只是用到了其中的一点点。分享给
  • 多线程(八)线程队列

    千次阅读 2017-03-31 09:13:50
    本文参考以下文章整理而成,希望大家多多指教!共同学习!原文地址如下:Java 并发工具包...Java线程(篇外篇):阻塞队列BlockingQueue引言  在上一篇多线程(七)线程池详解中提到了线程队列,相信大家现在已清楚明白线
  • 多线程——12队列模式

    千次阅读 2020-02-22 11:36:35
    实现一个队列来存储任务, 然后启动一个生产者向队列添加任务, 启动服务者消费队列里面的任务 主要角色 任务是什么: 这里客户端的请求所以使用request 任务的队列:requestQueue在这里面get如果对列为空让get的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 106,018
精华内容 42,407
关键字:

多线程消费队列