精华内容
下载资源
问答
  • java多线程消息队列的实现

    千次阅读 2015-06-16 10:47:31
    1、定义一个队列缓存池: private static ListQueue> queueCache = new LinkedListQueue>(); 2、定义队列缓冲池最大消息数,如果达到该值,...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中

    1、定义一个队列缓存池:

    private static List<Queue> queueCache = new LinkedList<Queue>();

    2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

    private Integer offerMaxQueue = 2000;

    3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

    new Thread(){
            public void run(){
              while(true){
                String ip = null;
                try {
                  synchronized (queueCache) {
                    Integer size = queueCache.size();
                    if(size==0){
    //队列缓存池没有消息,等待。。。。									queueCache.wait();
                    }
                    Queue queue = queueCache.remove(0);
    
                    if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
                      queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
                      continue;
                    }else{
                ;//这里是处理该消息的操作。
                    }
                    size = queueCache.size();
                    if(size<offerMaxQueue&&size>=0){									queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
                    }
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }finally{
                  try {//检出该消息队列的锁
                    unIpLock(queueStr);
                  } catch (Execption e) {//捕获异常,不能让线程挂掉
                    e.printStackTrace();
                  }	
                                                }
                }
          }.start();

    4、检入队列

    synchronized (queueCache) {
    while(true){
    Integer size = queueCache.size();
    if(size>=offerMaxQueue){
                try {
                  queueCache.wait();
    continue;//继续执行等待中的检入任务。
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
     }//IF
    
    if(size<=offerMaxQueue&&size>0){
      queueCache.notifyAll();
    }
    break;//检入完毕
    }//while
    }

    5、锁方法实现

    /**
       * 锁
       * @param ip
       * @return
       * @throws 
       */
      public Boolean isLock(String queueStr) {
        return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
      }
      //解锁
      public void unIpLock(String queueStr) {
        if(ip!=null){
          this.redisManager.del(queueStr+"_lock");
    //			lock.unlock();
        }
      }
    展开全文
  • java多线程队列实例

    万次阅读 2016-06-02 13:54:39
    java多线程队列实例

    第一步:创建一个无边界自动回收的线程池,在此用 JDK提供的ExecutorService类

    此线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。


    package com.thread.test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPool {
    	private static ExecutorService threadPool = null;
    	public static ExecutorService getThreadPool(){
    		if(threadPool==null){
    			threadPool = Executors.newCachedThreadPool();
    		}
    		return 	threadPool;
    	}
    
    }

    第二步:使用单例模式创建一个无界队列,并提供入队的方法

    无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    package com.thread.test;
    
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TaskQueue {
    	private static  LinkedBlockingQueue queues = null;
    	
    	public static LinkedBlockingQueue getTaskQueue(){
    		if(queues==null){
    			queues =  new LinkedBlockingQueue();
    			System.out.println("初始化 队列");
    		}
    		return queues;
    	}
    	
    	public static void add(Object obj){
    		if(queues==null)
    			queues =  getTaskQueue();
    		queues.offer(obj);
    		System.out.println("-------------------------------");
    		System.out.println("入队:"+obj);
    	}
    }
    

    第三步:提供一个入队的线程,实际使用中的生产者

    package com.thread.test;
    
    public class Produce implements Runnable {
    	private static volatile int i=0;
    	private static volatile boolean isRunning=true;
    
    	public void run() {
    		while(isRunning){
    			TaskQueue.add(Integer.valueOf(i+""));
    			Produce.i++;
    			try {
    				Thread.sleep(1*1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    		
    	}
    
    }
    

    第四步:提供一个出队的线程,实际使用中的消费者

    package com.thread.test;
    
    public class Consumer implements Runnable {
    	private static Consumer consumer;
    	
    	public static volatile boolean isRunning=true;
    	public void run() {
    		while(Thread.currentThread().isInterrupted()==false && isRunning)  
            {  
    			try {
    				System.out.println("出队"+TaskQueue.getTaskQueue().take());
    				Thread.sleep(1*1000);  
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
            }
    		
    	}
    	public static Consumer getInstance(){
    		if(consumer==null){
    			consumer = new Consumer();
    			System.out.println("初始化消费线程");
    		}
    		return consumer;
    	}
    
    }
    

    第五步:启动生产消费策略

    package com.thread.test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Test {
    	
    	public static void main(String[] args) {
    		ExecutorService threadPool = ThreadPool.getThreadPool();
    		Produce consumer2 = new Produce();
    		threadPool.execute(consumer2);
    		Consumer consumer=Consumer.getInstance();
    		threadPool.execute(consumer);
    	}
    
    }
    



    展开全文
  • JAVA多线程队列

    千次阅读 2018-07-07 10:14:20
    JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。 在
    原文地址为:JAVA多线程与队列

             JAVA 已经给我们提供了比较好的队列实现Queue,继承于Collection。 本次我使用的是BlockingQueue,继承于Queue。    

             在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

             首先利用BlockingQueue封装了一个队列类。队列里存放Map对象,这个依项目需求而定,供参考。

             

    import java.util.AbstractMap;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    /**
    * 单例的缓存map
    */

    public class CachePool<Key, Value> extends AbstractMap<Key, Value>{

    // 私有化缓存对象实例
    private static CachePool cachePool = new CachePool();
    private int maxCount = 1000;
    private BlockingQueue<Entry> queue = new LinkedBlockingQueue<Entry>();
    /**
    * private Constructor.
    * @return
    */
    private CachePool() {
    }
    /**
    * 开放一个公有方法,判断是否已经存在实例,有返回,没有新建一个在返回
    * @return
    */
    public static CachePool getInstance(){
    return cachePool;
    }

    /**
    * The Entry for this Map.
    * @author AnCan
    *
    */
    private class Entry implements Map.Entry<Key, Value>{
    private Key key;
    private Value value;

    public Entry(Key key, Value value){
    this.key = key;
    this.value = value;
    }

    @Override
    public String toString() {
    return key + "=" + value;
    }

    public Key getKey() {
    return key;
    }

    public Value getValue() {
    return value;
    }

    public Value setValue(Value value) {
    return this.value = value;
    }
    }



    /**
    * Constructor.
    * @param size the size of the pooled map;
    */
    public CachePool(int size) {
    maxCount = size;
    }

    @Override
    public Value put(Key key, Value value) {
    while(queue.size() >= maxCount){
    queue.remove();
    }
    queue.add(new Entry(key, value));
    return value;
    }

    @Override
    public Value get(Object key){
    for(Iterator<Entry> iter = queue.iterator();iter.hasNext();){
    Entry type = iter.next();
    if(type.key.equals(key)){
    queue.remove(type);
    queue.add(type);
    return type.value;
    }
    }
    return null;
    }

    @Override
    public Set<Map.Entry<Key, Value>> entrySet() {
    Set<Map.Entry<Key, Value>> set = new HashSet<Map.Entry<Key, Value>>();
    set.addAll(queue);
    return set;
    }

    @Override
    public void clear() {
    queue.clear();
    }

    @Override
    public Set<Key> keySet() {
    Set<Key> set = new HashSet<Key>();
    for(Entry e : queue){
    set.add(e.getKey());
    }
    return set;
    }

    @Override
    public Value remove(Object obj) {
    for(Entry e : queue){
    if(e.getKey().equals(obj)){
    queue.remove(e);
    return e.getValue();
    }
    }
    return null;
    }

    @Override
    public int size() {
    return queue.size();
    }
    }

                其中根据项目的需求重写了一些方法。

                先看下消费者类,使用多线程来处理队列中的内容:

          

    import java.util.Date;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;

    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServlet;

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;


    /**
    * 操作业务类,通过参数中的方法参数进行具体的操作
    */
    public class TicketTradeOper extends HttpServlet
    {
    /**
    * 缓存对象 map
    */
    public static CachePool<String, Object> mapPool = CachePool.getInstance();

    private static final int NTHREADS=5;
    // 使用线程池来避免 为每个请求创建一个线程。
    private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);

    //业务操作
    IETicketTradeOper ticketTradeOper;

    @Override
    public void init() throws ServletException
    {
    Timer timer = new Timer();
    timer.schedule(new TimerTask(){
    @Override
    public void run() {
    startThread();
    }
    }, new Date(), 5000);//间隔5秒执行一次定时器任务
    super.init();
    }


    public void startThread(){
    threadPool.execute(new Runnable(){
    public void run() {
    executeCodeOper();
    }
    });
    }

    public void executeCodeOper()
    {
    String key = "";
    Map param = null;
    synchronized (mapPool)
    {
    System.out.println(Thread.currentThread().getName() + "进来了。。。。");
    System.out.println("现在队列中共有----"+mapPool.size()+"---条数据");

    Iterator it = mapPool.keySet().iterator();
    //缓存不为空时,取出一个值
    while (it.hasNext())
    {
    key = (String) it.next();
    param = (Map) mapPool.get(key);
    }
    if (null != param)
    {
    //为防止重复,将其移除
    mapPool.remove(key);
    }
    }

    if (null != param)
    {
    boolean result =ticketTradeOperator(param);
    System.out.println("此条数据处理========"+result);
    if(!result){
    //若处理失败,重新放回队列
    mapPool.put(key, param);
    };
    }
    }


    public boolean ticketTradeOperator(Map<String, String> params)
    {
    //具体的处理工作
    return resultCode;
    }

    public IETicketTradeOper getTicketTradeOper()
    {
    return ticketTradeOper;
    }
    public void setTicketTradeOper(IETicketTradeOper ticketTradeOper)
    {
    this.ticketTradeOper = ticketTradeOper;
    }

    }
                生产者,根据业务需求将接收到的数据放到队列里:

         TicketTradeOper.mapPool.put(newParams.get("order_id"), newParams);


    以上便是整个队列生产消费的过程,有问题的欢迎交流。

    关于队列类Queue的介绍。下篇博客进行。。



             

      

    转载请注明本文地址:JAVA多线程与队列
    展开全文
  • java多线程消息队列的实现

    千次阅读 2016-01-28 15:37:57
    1、定义一个队列缓存池: private static ListQueue> queueCache = new LinkedListQueue>(); 2、定义队列缓冲池最大消息数,如果达到该值,...3、定义检出线程,如果队列缓冲池没有消息,那么检出线程线程等待中 new

    1、定义一个队列缓存池:

    private static List<Queue> queueCache = new LinkedList<Queue>();

    2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

    private Integer offerMaxQueue = 2000;

    3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

    new Thread(){
            public void run(){
              while(true){
                String ip = null;
                try {
                  synchronized (queueCache) {
                    Integer size = queueCache.size();
                    if(size==0){
    //队列缓存池没有消息,等待。。。。									queueCache.wait();
                    }
                    Queue queue = queueCache.remove(0);
    
                    if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理
                      queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理,
                      continue;
                    }else{
                ;//这里是处理该消息的操作。
                    }
                    size = queueCache.size();
                    if(size<offerMaxQueue&&size>=0){									queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。
                    }
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }finally{
                  try {//检出该消息队列的锁
                    unIpLock(queueStr);
                  } catch (Execption e) {//捕获异常,不能让线程挂掉
                    e.printStackTrace();
                  }	
                                                }
                }
          }.start();

    4、检入队列

    synchronized (queueCache) {
    while(true){
    Integer size = queueCache.size();
    if(size>=offerMaxQueue){
                try {
                  queueCache.wait();
    continue;//继续执行等待中的检入任务。
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
     }//IF
    
    if(size<=offerMaxQueue&&size>0){
      queueCache.notifyAll();
    }
    break;//检入完毕
    }//while
    }

    5、锁方法实现

    /**
       * 锁
       * @param ip
       * @return
       * @throws 
       */
      public Boolean isLock(String queueStr) {
        return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1;
      }
      //解锁
      public void unIpLock(String queueStr) {
        if(ip!=null){
          this.redisManager.del(queueStr+"_lock");
    //			lock.unlock();
        }
      }

    展开全文
  • java多线程任务队列模型

    千次阅读 2017-09-10 21:12:35
    此篇文章将从任务队列的设计;任务调度的方式(串行和并行)。代码很简单,主要是设计的思想。任务队列final class PendingPostQueue { // 含有头、尾指针的链表结构实现队列 private PendingPost head; private ...
  • import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util
  • package mutliThread;...import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class test { private static Queue queue = new ConcurrentLinkedQueue(); private static final
  • Java多线程阻塞队列和并发集合

    万次阅读 2012-02-10 19:23:02
    Java多线程 阻塞队列和并发集合  本章主要探讨在多线程程序中与集合相关的内容。在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成程序崩溃。Java为多线程专门提供了特有的线程安全的集合类,通过...
  • Java多线程 阻塞队列和并发集合

    千次阅读 2012-01-12 13:52:16
    本章主要探讨在多线程程序中与集合相关的内容。在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成...java阻塞队列应用于生产者消费者模式、消息传递、并行任务执行和相关并发设计的大多数常见使用上下文。
  • 本人不太擅长多线程,但最近因为工作需要,使用到了,所以来求助了,需求这样的:要固定创建线程池跑线程,同时访问数据进来了后加入队列,线程去读取队列后处理程序,这样使访问数据和线程耦合性降低,彼此减少影响...
  • Java多线程总结之线程安全队列Queue

    万次阅读 2017-09-19 10:21:54
    Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • java 多线程+队列 ping 操作

    千次阅读 2014-05-10 11:03:11
    之前在做ping操作的 时候是想模拟cmd ping的 多线程ping,这样会导致 cpu,内存 暴涨,之后采用的是java中的 InetAddress 类,InetAddress类具有一个缓存,用于存储成功及不成功的主机名解析,  采用方法 ...
  • Java 多线程:彻底搞懂线程池

    万次阅读 多人点赞 2019-07-09 19:27:00
    熟悉Java多线程编程的同学都知道,当我们线程创建过多时,容易引发内存溢出,因此我们就有必要使用线程池的技术了。 目录 1 线程池的优势 2 线程池的使用 3 线程池的工作原理 4 线程池的参数 4.1 任务队列...
  • 万字图解Java多线程

    万次阅读 多人点赞 2020-09-06 14:45:07
    java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,...
  • Java多线程——4 阻塞队列

    千次阅读 2013-08-13 23:38:41
    阻塞队列Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是:一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。...
  • Java多线程 之 生产者、消费者(十三) Java多线程 之 lock与condition的使用(十四) 详细阐述了多个任务之间的协同合作,需要使用wait、notify、notifyAll或者lock、condition、await、signal、signalAll方法...
  • Java 多线程 系列文章目录: Java 多线程(一)线程间的互斥和同步通信 Java 多线程(二)同步线程分组问题 Java 多线程(三)线程池入门 Callable 和 Future Java 多线程(四)ThreadPoolExecutor 线程池各参数...
  • RabbitMQ设置多线程处理队列消息

    千次阅读 2019-05-17 19:08:32
    @RabbitListener注解指定消费方法,默认...可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 1、RabbitmqConfig.java中添加容器工厂配置: @Bean("customContainerFac...
  • Java线程安全队列Queue

    千次阅读 2017-06-05 17:06:12
    Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是...
  • Java多线程-阻塞队列BlockingQueue

    千次阅读 2011-09-06 16:00:51
    前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各
  • java 多线程 线程池 工作队列

    千次阅读 2011-08-10 16:12:45
    1.线程中一些基本术语和概念 1.1线程的几个状态 初始化状态 就绪状态 运行状态 ...1.2 Daemon线程 ...Daemon线程区别一般线程之处是:主程序一旦结束,Daemon线程就会结束。...为了协调个并发运行的线程使用共享

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 281,037
精华内容 112,414
关键字:

java多线程消息队列

java 订阅