精华内容
下载资源
问答
  • Java可视化实现生产者消费问题

    千次阅读 2019-09-28 22:14:06
    引言:生产者消费问题是一个十分经典的多线程问题。为了更加形象地描述这个问题,采用可视化的形式展示此过程。

    引言:生产者消费者问题是一个十分经典的多线程问题。为了更加形象地描述这个问题,采用可视化的形式展示此过程。

    在这里插入图片描述
    在这里插入图片描述

    1、问题重述

         生产者消费者问题也称有限缓冲问题。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

        当我们在思考如何解决这个问题的时候一定要联系实际。实际生活中(不考虑直接点对点模式),工厂生产的商品一般会通过经销商销售,消费者从经销商中购买商品。经销商手中的存货是有限的,同时他需要的商品也是有限的。当他不需要商品的时候,生产商不能再生产更多商品;当他没有存货的时候,消费者无法从他手中购买。这里,经销商就相当于一个缓冲区。

        这种生产者-消费者模式有什么好处呢?使用这种模式,可以让消费者、生产者互相独立,生产者不需要依赖消费者的消费速度,它只关心缓冲区的状况。

    2、前期准备

    实现方法有四种:
    • wait()和notify()方法
    • await()和signal()方法
    • 阻塞队列方法
    • 管道方法
    • 此教程采用第一种方法。

      2.1 synchronized

          多线程并发存在着线程安全问题,主要在于存在共享数据以及多个线程共同操作共享数据。使用synchronized可以保证线程互斥的访问代码。其原理在于它可以保证方法或者代码块在同一时刻只有一个可以进入到临界区,还可以保证共享变量的内存可见性

          synchronized一般称"同步锁",在修饰代码块的时候需要传入一个对象作为"锁"的对象。线程同步就是利用锁机制先给共享资源上锁,只有拿到锁的线程才可以访问共享资源,其他线程进入等待状态

      2.2 wait()方法

          wait方法()是Object类的方法,其作用是使当前执行代码的线程进入等待。在调用wait()方法之前,线程必须获得该对象的锁,即只能在同步方法或同步块中调用wait()方法。wait()方法执行之后,当前线程释放锁。从wait()返回之前,线程会与其他线程产生资源竞争。当调用wait()时,如果没有锁将会抛出异常

      2.3 notify()方法与notifyAll()方法

          notify()方法用来通知那些可能等待该对象的对象锁的其他线程,若有多个线程,则随机选择wait状态的线程对其notify,使它获得该对象的对象锁。但是,线程不会马上释放锁,而是等到执行notify()方法的线程将程序执行完后即退出synchronized语句块后,该线程才会释放锁。注意,操作不当可能会出现过早通知

          notifyAll()使所有线程退出wait状态

          总之,简单来讲:wait()使线程停止运行,notify()使线程继续运行

      3、实现消费者-生产者模型

      3.1 界面设计

      创建Frame类和UI类,Frame创建窗体,UI添加控件以及绑定事件。该部分只介绍布局,绑定事件后面再细讲。用进度条代表生产/消费进度。界面如下图所示:

      在这里插入图片描述

      class Frame extends JFrame
      {
      	public int width=1500;
      	public int height=1500;
      	UI ui=new UI();
      	public Container container;
      	public Frame()
      	{
      		setTitle("生产者消费者问题");
      		setSize(width,height);
      		setLocation(300,0);		
      		container=getContentPane();
      		container.add(ui);
      	}
      }
      
      class UI extends JPanel
      {
      	public static JProgressBar producerBar;//表示生产者生产进度
      	public static JLabel producerJLabel;//表示生产者生产个数
      	public static JLabel bufferJLabel;//缓冲区
      	public static JLabel amountJLabel;//缓冲区个数
      	public static JProgressBar consumerBar;//表示消费者消费速度
      	public static JLabel consumerJLabel;//表示消费者消费个数
      	JButton bt1;//生产者开始生产
      	JButton bt2;//生产者停止生产
      	JButton bt3;//消费者开始消费
      	JButton bt4;//消费者停止消费
      	public UI()
      	{
      		setLayout(null);
      		setSize(1400,1400);
      		producerBar=new JProgressBar();
      		consumerBar=new JProgressBar();
      		bt1=new JButton("开始生产");
      		bt2=new JButton("停止生产");
      		bt3=new JButton("开始消费");
      		bt4=new JButton("停止消费");
      		producerJLabel=new JLabel("生产者");
      		consumerJLabel=new JLabel("消费者");
      		bufferJLabel=new JLabel("缓冲区 (最大容量25)");
      		amountJLabel=new JLabel("商品数量:0");
      		producerBar.setBackground(Color.WHITE);
      		producerBar.setForeground(Color.BLACK);
      		consumerBar.setBackground(Color.WHITE);
      		producerBar.setForeground(Color.BLACK);
      		bt1.setBounds(50,170,70,60);
      		bt2.setBounds(150,170,70,60);
      		bt3.setBounds(950, 700, 70, 60);
      		bt4.setBounds(1050,700,70,60);
      		bufferJLabel.setBounds(550,350,200,200);
      		amountJLabel.setBounds(550,300,200,400);
      		producerJLabel.setBounds(50,70,50,50);
      		producerBar.setBounds(50, 120, 480, 40);
      		consumerJLabel.setBounds(950,600,50,50);
      		consumerBar.setBounds(950,650,480,40);
      		add(producerBar);
      		add(producerJLabel);
      		add(consumerBar);
      		add(consumerJLabel);
      		add(bt1);
      		add(bt2);
      		add(bt3);
      		add(bt4);
      		add(bufferJLabel);
      		add(amountJLabel);
      	}
      }
      

      3.2 模型设计

      根据问题描述即可知道设计思路,创建三个类分别是:Buffer类(缓冲区)、Producer类(生产者)、Consumer类(消费者)。在Buffer类里面我们要模仿生产者生产商品以及消费者消费商品,这里就产生了共享资源——商品。因此,我们需要用到synchronized锁保证线程安全,采用wait()/notify()方法。注意到一点,在消费者-生产者模式下,生产者在缓冲区未满的情况下只管不断生产,消费者在缓冲区未空的情况下只管不断消费。

      3.2.2 Buffer类

      在Buffer类,我们需要实现的是模拟商品运到仓库和商品从仓库出去以及相应的UI。UI方面,需要实时显示缓冲区商品数量。
      class Buffer
      {
      	private static final int max=25;//缓冲区最大容量
      	private LinkedList<Object>list=new LinkedList<Object>();//表示商品实体
      	JLabel amount;//缓冲区当前商品数量		
      	public Buffer(JLabel amount)
      	{
      		this.amount=amount;
      	}
      	public synchronized void produce()//
      	{
      		while(list.size()==max)//当缓冲区达到最大容量时,如果不释放资源,则生产进程一直处于阻塞状态
      		{
      			amount.setText("缓冲区已满,生产阻塞");
      			try {
      				wait();//生产阻塞
      			} catch (Exception e) {
      				// TODO: handle exception
      				e.printStackTrace();
      			}
      		}
      		//没有达到缓冲区最大容量
      		list.add(new Object());//生产商品		
      		amount.setText("商品数量: "+list.size());
      		notifyAll();//当生产一个商品之后,可以唤醒其他线程
      	}
      	public synchronized void consume()
      	{
      		while(list.size()==0)//当缓冲区为空时
      		{
      			amount.setText("缓冲区已空,消费阻塞");
      			try {
      				wait();
      			} catch (Exception e) {
      				// TODO: handle exception
      				e.printStackTrace();
      			}
      		}
      		list.remove();//消费商品
      		amount.setText("商品数量: "+list.size());
      		notifyAll();
      	}
      }
      

      3.2.3 Producer类

      用进度条模拟消费者不断生产的过程
      class Producer implements Runnable
      {
      	Buffer buffer;
      	JProgressBar produceBar;//生产进度条
      	int i;
      	public Producer(Buffer buffer,JProgressBar produceBar) {
      		// TODO Auto-generated constructor stub
      		this.buffer=buffer;
      		this.produceBar=produceBar;
      	}
      	@Override
      	public void run()
      	{
      		while(true)
      		{
      			try {
      				if(i<=25)
      				{
      					i++;
      					produceBar.setValue(i*4);
      					try {
      						Thread.sleep(200);
      					} catch (Exception e) {
      						// TODO: handle exception
      						e.printStackTrace();
      					}
      				}
      				buffer.produce();
      			} catch (Exception e) {
      				// TODO: handle exception
      				e.printStackTrace();
      			}
      		}
      	}
      }
      

      3.2.4 Consumer类

      用进度条模拟消费者不断消费的过程
      class Consumer implements Runnable
      {
      	Buffer buffer;	
      	JProgressBar consumeBar;//消费进度条
      	int i;
      	public Consumer(Buffer buffer,JProgressBar consumeBar) {
      		// TODO Auto-generated constructor stub
      		this.buffer=buffer;
      		this.consumeBar=consumeBar;
      	}
      	@Override
      	public void run()
      	{
      		while(true)
      		{
      			try {				
      				if(i<=25)
      				{
      					i++;
      					consumeBar.setValue(i*4);
      					try {
      						Thread.sleep(200);
      					} catch (Exception e) {
      						// TODO: handle exception
      						e.printStackTrace();
      					}
      				}
      				buffer.consume();
      			} catch (Exception e) {
      				// TODO: handle exception
      				e.printStackTrace();
      			}
      		}
      	}
      }
      

      3.2.5 事件绑定

      主要是通过按钮来控制生产/消费的开始与暂停。在UI类的构造函数里面实例化对象,并为按钮绑定监听器。同时申请两个线程,分别控制生产行为和消费行为。
      	Buffer buffer;//缓冲区
      	Producer producer;//生产者
      	Consumer consumer;//消费者
      	Thread thread1;//线程1用于控制生产行为的开始与暂停
      	Thread thread2;//线程2用于控制消费行为的开始与暂停
      	buffer=new Buffer(amountJLabel);
      	public UI()
      	{
      		bt1.addActionListener(new ActionListener() {//开始生产		
      			@Override
      			public void actionPerformed(ActionEvent e) {
      				// TODO Auto-generated method stub
      				if(thread1!=null)
      				{
      					try {
      						thread1.stop();
      					} catch (Exception e2) {
      						// TODO: handle exception
      					}
      				}
      				thread1=new Thread(new Producer(buffer,producerBar));
      				thread1.start();
      			}
      		});
      		bt2.addActionListener(new ActionListener() {		
      			@Override
      			public void actionPerformed(ActionEvent e) {
      				// TODO Auto-generated method stub
      				if(thread1!=null)
      				{
      					try {
      						thread1.stop();
      					} catch (Exception e2) {
      						// TODO: handle exception
      						e2.printStackTrace();
      					}
      				}
      			}
      		});
      		bt3.addActionListener(new ActionListener() {			
      			@Override
      			public void actionPerformed(ActionEvent e) {
      				// TODO Auto-generated method stub
      				if(thread2!=null)
      				{
      					try {
      						thread2.stop();
      					} catch (Exception e2) {
      						// TODO: handle exception
      						e2.printStackTrace();
      					}
      				}
      				thread2=new Thread(new Consumer(buffer,consumerBar));
      				thread2.start();
      			}
      		});
      		bt4.addActionListener(new ActionListener() {			
      			@Override
      			public void actionPerformed(ActionEvent e) {
      				// TODO Auto-generated method stub
      				if(thread2!=null)
      				{
      					try {
      						thread2.stop();
      					} catch (Exception e2) {
      						// TODO: handle exception
      						e2.printStackTrace();
      					}
      				}
      			}
      		});
      	}
      
    展开全文
  • 2.系统复杂性增加:要多考虑很多方面问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。 如何保证消息队列是高可用的? 使用集群的...

    转载:https://blog.csdn.net/varyall/article/details/79111745

    https://www.jianshu.com/p/5ade5bf0dcd9

    2018年01月20日 00:03:31 varyall 阅读数:16722

    五一期间去韩国游玩,顺便去了朋友公司扯淡去了。 所谓的扯淡,就是过去听技术分享,有python, golang, devops,docker一些话题。总的来说,技术方面跟国内还是有一些差距的。 

     

    正题开始,因为业务的各方面的强需求,我们使用了rabbitmq作为消息队列,利用rabbitmq的ack机制来确认消息的可靠性。 但是rabbitmq本身是没有绝对的消息顺序机制的,单个queue在多消费者下不能保证其先后顺序。  另外ack的机制会触发消息重复消费的,需要我们在设计上避免该问题。

     

    该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.  http://xiaorui.cc/?p=4493

     

    关于消息的重复执行

    首先我们可以确认的是,触发消息重复执行的条件会是很苛刻的! 也就说 在大多数场景下不会触发该条件!!! 一般出在任务超时,或者没有及时返回状态,引起任务重新入队列,重新消费!  在rabbtimq里连接的断开也会触发消息重新入队列。  

    消费任务类型最好要支持幂等性,这样的好处是 任务执行多少次都没关系,顶多消耗一些性能! 如果不支持幂等,比如发送信息? 那么需要构建一个map来记录任务的执行情况! 不仅仅是成功和失败,还要有心跳!!!  这个map在消费端实现就可以了!!!    这里会出现一个问题,有两个消费者 c1, c2 ,一个任务有可能被c1消费,如果再来一次,被c2执行? 那么如何得知任务的情况? 任务派发!  任务做成hash,固定消费者!

    坚决不要想方设法在mq扩展这个future。

    一句话,要不保证消息幂等性,要不就用map记录任务状态.

     

     

     

    关于消息的绝对顺序执行

     

    我们遇到的大多数场景都不需要消息的有序的,如果对于消息顺序敏感,那么我们这里给出的方法是 消息体通过hash分派到队列里,每个队列对应一个消费者,多分拆队列。

    为什么要这么设计?  同一组的任务会被分配到同一个队列里,每个队列只能有一个worker来消费,这样避免了同一个队列多个消费者消费时,乱序的可能! t1, t2 两个任务, t1 虽然被c1先pop了,但是有可能c2先把 t2 任务给完成了。

    一句话,主动去分配队列,单个消费者。

     

     

    基于Rabbitmq的多任务处理框架

    这里提一下,我们用rabbitmq实现了一个颇为复杂的架构,节省了太多的mq连接及消耗。通过python pika gevent实现的,因py有gil,所以使用多进程来跑多核,进程之间不使用共享变量,而是用队列来传递ack信号。  

    这里实现的场景是用pika消费rabbitmq,然后把获取到的任务提丢到队列,另一个进程去消费该任务,然后触发ack。 

     

     

     

    END.

     

     

     

    http://xiaorui.cc/2017/05/04/%E8%A7%A3%E5%86%B3rabbitmq%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E7%9A%84%E9%A1%BA%E5%BA%8F%E5%8F%8A%E9%87%8D%E5%A4%8D%E6%B6%88%E8%B4%B9%E9%97%AE%E9%A2%98/

     

     

     

    想想为什么要使用MQ?

    1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

    2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

    3.削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

    使用了消息队列会有什么缺点?

    1.系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低

    2.系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

    如何保证消息队列是高可用的?

    使用集群的方式维持MQ的可高用性。

    如何保证消息不被重复消费?

    保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

    1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

    2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

    3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

    如何解决丢数据的问题?

    1.生产者丢数据

    生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

    transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

    然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

    2.消息队列丢数据

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

    ①、将queue的持久化标识durable设置为true,则代表是一个持久的队列

    ②、发送消息的时候将deliveryMode=2

    这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

    3.消费者丢数据

    启用手动确认模式可以解决这个问题

    ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

    ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

    如何保证消息的顺序性?

    针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息一定是顺序消息的。我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。例如B消息的业务应该保证在A消息后业务后执行,那么我们保证A消息先进queueA,B消息后进queueB就可以了。

    加Java架构师进阶交流群获取Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的直播免费学习权限 都是大牛带飞 让你少走很多的弯路的 群..号是:855801563 对了 小白勿进 最好是有开发经验

    注:加群要求

    1、具有工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加。

    2、在公司待久了,过得很安逸,但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加。

    3、如果没有工作经验,但基础非常扎实,对java工作机制,常用设计思想,常用java开发框架掌握熟练的,可以加。

    4、觉得自己很牛B,一般需求都能搞定。但是所学的知识点没有系统化,很难在技术领域继续突破的可以加。

    5.阿里Java高级大牛直播讲解知识点,分享知识,多年工作经验的梳理和总结,带着大家全面、科学地建立自己的技术体系和技术认知!



    作者:架构师springboot
    链接:https://www.jianshu.com/p/5ade5bf0dcd9
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    展开全文
  • RabbitMQ如何解决被重复消费和数据丢失的问题? 想想为什么要使用MQ? 1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦! 2.异步,将消息写入消息...

    RabbitMQ如何解决被重复消费和数据丢失的问题?

     

    想想为什么要使用MQ?

    1.解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

    2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

    3.削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

    使用了消息队列会有什么缺点?

    1.系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低

    2.系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

    如何保证消息队列是高可用的?

    使用集群的方式维持MQ的可高用性。

    如何保证消息不被重复消费?

    保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

    1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

    2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

    3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

    如何解决丢数据的问题?

    1.生产者丢数据

    生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

    transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

    然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

    2.消息队列丢数据

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

    ①、将queue的持久化标识durable设置为true,则代表是一个持久的队列

    ②、发送消息的时候将deliveryMode=2

    这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

    3.消费者丢数据

    启用手动确认模式可以解决这个问题

    ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

    ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

    如何保证消息的顺序性?

    针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息一定是顺序消息的。我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。例如B消息的业务应该保证在A消息后业务后执行,那么我们保证A消息先进queueA,B消息后进queueB就可以了。

     

     

    原文地址:https://www.jianshu.com/p/5ade5bf0dcd9

    展开全文
  • RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,重要的事情说三遍。 基本上说我很讨厌有人问这个问题,问这个问题首先你对消息的生命周期缺乏理解,其次RocketMQ的定位不是...

    RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,重要的事情说三遍。
    基本上说我很讨厌有人问这个问题,问这个问题首先你对消息的生命周期缺乏理解,其次RocketMQ的定位不是很清楚,RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节。换句话说单机RocketMQ的每分钟处理的请求是12W*60=720W,每小时处理的请求是720W*60=44400W。在这么大的信息量面前任何额外的要求不觉得都有点吹毛求疵。(所以如果有人坚持去要求你从RocketMQ的角度解决消息重复消费问题,你可以开始撕逼模式了。)

           然而消息重复的问题确实存在,我们该如何解决呢。
           消息的生命周期包括分为消息产生源,消息传播(消息发送,消息投递,消息消费),消息持久化这几个环节。在使用RocketMQ过程中,RocketMQ主要负责消息发送,消息的传递,消息消费的问题,然而RocketMQ解决不了消息重复消费的问题,实际上RocketMQ还有消息重复发送和消息重复投递的问题。
    1.1 生产者重发导致相同
            由于网络闪断或者客户端宕机,导致服务端对客户端应答失败。此时如果生产者再次发送消息,消费者就会出现内容相同并且MessageID也相同的消息。
    1.2服务端重推导致相同
            当客户端给服务端反馈应答的时候网络闪断或者客户端消费完成反馈前宕机,服务端会在网络恢复后重发一次。
    上面两条是RocketMQ本身存在的MessageID相同的问题,之前也有人说通过业务key来保证消息是不重复的。
    1.3消费者解决不了重复消费
           RocketMQ是分布式环境,消息系统自身解决消费重复问题,需要消费者端进行大量的确认。一方面这种确认会导致大量消息阻塞,另一方面分布式环节下需要网络确认,消息在网络传递过程中具有不可靠性。

         事实证明消息的传递具有不可靠性;网络不可靠性,只要通过网络传输的消息都具有网络不可靠性;或者说系统受到黑客的恶意篡改,导致的消息完全一致等等。只要消息经过传递,希望在传递层保证消息都无法100%保证消息的可靠性。传递过程无法确保消息不重复,那么消息源也就不需要关注了,因为即使消息源确保唯一,传递过程中还是会产生重复消息。 

           消息流从五个环节从消息源,消息发送,消息传递,消息消费都无法保证消息不重复,那么我们能做的只有在消息持久化环节保证消息不重复。其实所有的保证消息不重复的策略都需要一个消息持久化的位置供消息重复验证,然而不巧的是除非和消息最本源的位置做验证,其他环节的验证都具有不可靠性。
    消息持久层做消息唯一性的策略
           1.持久化过程中业务唯一标识验证,每个消息具有业务唯一标识,在消息最终持久化之前通过验证唯一性标识保证消息的唯一性。消息持久化位置如果出现同样的消息,系统就不做处理,期间无任何传递过程,保证消息的唯一性。
           2.使用过程中业务唯一标识验证,使用过程中如果出现同样的消息,系统进行相应的异常处理。

    备注:例如mysql去重表

    总结:
    消息的生命周期中只有在消息产生源和消息持久源才是具有意义的,在过程中不必太苛求。

    展开全文
  • 选择一个经典的同步问题(生产者-消费问题、读者-写者问题、哲学家就餐问题等),并模拟实现该同步问题的进程管理; 采用信号量机制及相应的P操作、V操作; 应避免出现死锁; 能够显示相关的状态。 我这里选择的是...
  • 消费者行为分析包含了哪些内容?

    万次阅读 2019-07-30 16:24:06
    消费者市场 指个人或家庭为满足生活需求而购买或租用商品的市场。 消费者市场特点 1.购买者众多,购买数量零星,对日用品的消费需要经常性购买,购买频率高且量小,支付的金额数也小。 2.需求差异性大,不同消费者对...
  • 搭建的canal是高可用模式,在IDEA里面进行消费的,但是在服务端进行切换时,出现了数据重复被消费问题。salve1:11111开启服务时,往数据库里面插入了一条数据,然后又删除了这条数据,这是Mysql的bin-log会产生两...
  • Linux进程间通信与生产者消费问题

    千次阅读 2013-01-20 20:43:48
    生产者消费问题(英语:Producer-consumerproblem),也称有限缓冲问题(英语:Bounded-bufferproblem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和...
  • 1、问题描述:一组生产者进程和一组消费者进程共享一个初始为空大小为n的缓冲区,只有缓冲区没满时,生产者才能给缓冲区投放信息,否则必须等待;只有缓冲区不空时,消费者才能继续取出消息,否则也必须等待。由于...
  • 2.系统复杂性增加:要多考虑很多方面问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。 如何保证消息队列是高可用的? 使用集群的...
  • Kafka中的消息的丢失和重复消费问题

    千次阅读 2019-04-08 19:38:23
    要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费 1、消息发送 Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置...
  • 2.系统复杂性增加:要多考虑很多方面问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。 如何保证消息队列是高可用的? 使用集群的...
  • 生产者消费问题:(经典) 一直生产 一直消费 中间有阀值 避免供求关系不平衡 #线程安全问题,要是线程同时来,听谁的 #锁:一种数据结构 队列:先进线出 栈:先进后出 #生产者消费者的优点(为什么经典的设计...
  • 实验要求我们用C语言在Linux操作系统下利用信号量函数和共享内存函数实现经典的生产者消费问题。也借此博客把所学知识记录下来。实验要求如下: 在Linux操作系统下用C实现经典同步问题:生产者—消费者,...
  • 这两天由于学习的需要,学了一下C#的多线程,并写了个经典生产者与消费者的案例,但从中也发现了一个问题,总体上感觉C#比java在各个方面来说,总是有些不如,也许是长期使用java和刚刚接触C#吧,存在些偏见,言归正...
  • 大家好,我是 Tom哥~马上要开启国庆小长假了,祝大家节日快乐,吃喝玩乐走起~为了便于大家查找问题,了解全貌,整理个目录,我们可以快速全局了解关于消息队列,面试官一般会问哪些问题。本篇文章的目录:消息队列的...
  • 昨天线上发现一个现象:kafka一个topic有数据生产,但是没有消费了,各种排查es集群、内存使用情况等都正常,但是就是消费线程死了一样,重启jboss也没有用,最后发现是因为消费该topic的kafka八的partition因为遇到...
  • 最近学习java多线程,遇到生产者,消费问题。记录一下,加以巩固 生产者和消费问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系: 1. 对缓冲区的访问是互斥的。由于两者都会修改缓冲...
  • 生产者/消费者模式的理解及实现

    万次阅读 多人点赞 2018-05-31 10:05:37
    ★简介 生产者消费者模式并不是GOF提出的23种设计模式之一,23种设计模式都是建立在面向对象的基础之上的,但其实面向过程的编程中也有很多高效的编程模式,生产者消费者模式便是其中之一,它是我们编程过程中最...
  • 我们使用 Flume 将数据从 Kafka 加载到 Hive 中。 由于启动一个 Flume 实例时,数据加载的...我们知道 Kafka 数据消费是以 Partition 为单位的,即一个 Partition 只能被一个 Flume 实例消费。当启动第二个 Flume ...
  • 2.rocketmq 吞吐量高,时效性高,但在大数据方面需要自己写代码支持 3.kafka 超高的吞吐量,消息较少时可能会有延迟(kafka是堆积一波消息后发送) 二.消息队列模型 1.常用的是topic订阅发布模型 同一个topic下,...
  • 好奇心驱使下,笔者就“00后收入”问题在百度上进行了搜索,得到的结果触目惊心,9102年了,“00后”已经可以自力更生了。 笔者认真读完并对比了其中几篇分析报告后发现,关于 “00后”具体收入情况,因为调研...
  • RocketMQ消费优化

    千次阅读 2019-01-03 20:42:18
    双十一前进行了低延时(毛刺)优化,保障了双十一万亿消息的流转如丝般顺滑,在2016年双十一种,MetaQ以接近万亿的消息总量支撑着全集团数千个应用,在系统解耦、削峰填谷、数据库同步、位点回滚消费等多种业务场景...
  • 在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还...
  • 消息队列:生产者/消费者模式

    万次阅读 多人点赞 2019-02-28 15:34:10
    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找...
  • 生产者-消费问题

    千次阅读 2015-06-14 18:54:14
    问题描述有一群生产者进程在生产产品,并将这些产品提供给消费者进程去消费.为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将它所生产的产品放入一个缓冲区中;消费者...
  •  消费者购买行为是指人们为满足需要和欲望而寻找、选择、购买、使用、评价及处置产品、服务时介入的过程活动,包括消费者的主观心理活动和客观物质活动两个方面消费者购买行为分析的环节  消费者购买行为...
  • RabbitMQ:消费者和生产者。

    千次阅读 2019-06-12 15:37:08
    如果我们为所有线程只使用一条TCP连接以满足性能方面的要求,但又能确保每个线程的私密性,就像拥有独立连接一样的话,那不就非常完美吗?这就是要引入信道概念的原因。线程启动后,会在现成的连接上创建一条信道,...
  • 生产者/消费者模式之深入理解

    万次阅读 多人点赞 2016-11-01 15:52:08
    ★简介  在实际的软件开发过程中,经常会碰到如下... 单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 212,130
精华内容 84,852
关键字:

关于消费方面的问题