精华内容
下载资源
问答
  • java里面实现MQ的原理是什么
    2021-02-26 10:13:21

    62ba45842b374d39e7db782c0ed8bd42.png

    繁星点点滴滴

    一般数据的放和收对应的是2个线程或进程,以达到异步的目的使得吞吐量最大化。所以你说的是对的,一个不停的add数据,一个不停的get数据,但这样有一个问题,如果你数据添加过慢或过快,或者数据处理的过慢或过快,都会出现队列空或者队列满的情况,这种情况一旦出现,意味着一方要等另一方完成动作才能继续,降低吞吐量,所以一般是会有一个超时返回的。下面一个是无超时的消息队列getpublic Message receive() throws InterruptedException {                synchronized (queue) {                        if(queue.isEmpty()){                queue.wait(1000);            }            if(queue.isEmpty()){                return null;//timeout            }            Message message=queue.get(0);            queue.remove(0);            return message;        }    }

    更多相关内容
  • MQ JAVA HTTP SDK Alyun MQ 文档: : 阿里云MQ控制台: ://ons.console.aliyun.com 用 添加Maven依赖 <groupId>com.aliyun.mq</groupId> <artifactId>mq-http-sdk</artifactId> <version>1.0.3 或与依赖项 ...
  • IBM MQ经常被一些政府公共部门,银行等企业用来做数据传输和报文收发,在互联网应用的开发中较少见到,资源为MQjava 代码
  • JAVA连接IBM MQ代码

    2019-01-25 11:34:27
    JAVA连接IBM MQ,具体的详细说明,请再csdn上面搜索JAVA连接IBM MQ关键词
  • 描述了java程序代码去访问MQ的SSL加密的通道。如何配置JKS,如何配置MQ服务器的SSL秘钥库,如何配置证书制作证书和秘钥库。主要是如何编写java代码去访问SSL通道并取到数据。
  • Java下操作IBM Websphere MQ的项目案例, eclipse工程压缩包, 导入直接可用.
  • java代码利用本地的mq配置,发送消息。从A队列管理至B队列管理器。
  • java MqDemo

    2018-03-13 13:52:48
    这个是javaMqdemo, 在这里备份,防止以后能够用到,
  • 该资源主要是用于java关联Mq中所需要的jar其中包含这些jar CL3Export.jar CL3Nonexport.jar com.ibm.mq.commonservices.jar com.ibm.mq.defaultconfig.jar com.ibm.mq.fta.jar com.ibm.mq.headers.jar ...
  • java IBM MQ 7.5.0 生产者和消费者实例
  • 此包是总包,包含java连接IBMMQ所需要的所有类。直接引入就可以编写ibmmq代码,无需添加其他依赖。
  • rabbit mq demo spring java

    2017-10-10 17:40:26
    docker 安裝 rabbit mq 並測試 http://knight-black-bob.iteye.com/blog/2395713
  • MQ队列管理器,队列,通道的配置和使用,包含编写Java程序来实现消息的发送。
  • java调用ibmmq最全版本jar包,包含connectorjava调用ibmmq最全版本jar包,包含connectorjava调用ibmmq最全版本jar包,包含connector
  • MQ什么用?MQ的优点和缺点都有哪些?开始今天起,我们会陆续开始一些新的文章系列,和Spring系列并行。学习,也要换换脑子,一天天的看源码,万一看吐子怎么办?隔壁的小妹妹会嫌弃我的。啥是MQA:你能给我解释了...

    MQ适用于哪些业务场景?MQ有什么用?MQ的优点和缺点都有哪些?

    开始

    今天起,我们会陆续开始一些新的文章系列,和Spring系列并行。学习,也要换换脑子,一天天的看源码,万一看吐子怎么办?隔壁的小妹妹会嫌弃我的。

    啥是MQ

    A:你能给我解释了下MQ吗?

    B:MQ就是消息队列。

    A:嗯,不错,然后呢?

    B:没了。

    A:……

    MQ,英文全拼为message queue,直译为消息队列。一个队列,用来存放消息。纳尼?难道MQ就是一个容器吗?没错,简单的理解,它就是一个容器。但是,当它作为一门技术时,就有了一些展开的问题。比如说,怎么存放?谁往进放?放进去又有什么用呢?Java里,MQ代表的是一门完整的技术。

    那么,如何在深入技术之前,有逼格的介绍一下MQ呢?不妨从以下几个问题入手:

    1. MQ经常应用于哪些业务场景?

    2. MQ的缺点有哪些?

    3. 常用的MQ组件有哪些?

    MQ经常应用于哪些业务场景

    要深刻理解这个问题,还是有必要再深入探讨一下MQ的本质。

    假定现在有一个业务流程,客户端访问A服务,A服务又依赖于B服务,那么其示意图如下:

    40db0a28b623f3da532d774edf90ad15.png示例业务流程

    也就是说,假如我们把B服务的处理时间从A服务的处理时间中独立出来,那么,整个业务的处理时间就是A服务的处理时间加上B服务的处理时间。

    现在,有个问题是,B服务是一个日志插入业务,即将用户及其操作的相关信息,在日志表里插入一条记录。我们知道,日志其实是一个非核心业务,并且,我们确实对其的准确性要求不高。即偶尔有某条日志插入失败,也并不会影响我们的核心业务。那么,A调用B还有必要存在于主业务流程中吗?看下面的示意图吗?

    a691918ac453bd4136b4c22191c5513d.pngMQ异步后的业务流程

    通过加入了一个MQ中间件,把B服务的处理时间从主业务流程中剥离了出去。也就是说,整个业务的处理时间变成了A服务的处理时间加上MQ的业务处理时间。

    MQ,就是简单的在队列中插入一条消息,这个时间成本,显然比B服务的时间要低。而B服务,会自己去MQ读取相关消息,并进行相应的操作,如插入日志等。

    MQ,消息队列,消息可以理解为一个业务现场,而队列则是保存这个业务现场的容器,而B服务对消息的处理,则是一个对业务现场的异步处理。所以,消息队列的本质,就是将某个业务现场暂存下来,异步处理。

    有了以上对于MQ的本质认识,那么,接下来的MQ可应用的几个业务场景,就会很好理解了。

    1. 异步。正如上面的demo,异步就是MQ的第一个能力。可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验。

    2. 解耦。假设现在,日志不光要插入到数据库里,还要在硬盘中增加文件类型的日志,同时,一些关键日志还要通过邮件的方式发送给指定的人。那么,如果按照原来的逻辑,A可能就需要在原来的代码上做扩展,除了B服务,还要加上日志文件的存储和日志邮件的发送。但是,如果你使用了MQ,那么,A服务是不需要做更改的,它还是将消息放到MQ中即可,其它的服务,无论是原来的B服务还是新增的日志文件存储服务或日志邮件发送服务,都直接从MQ中获取消息并处理即可。这就是解耦,它的好处是提高系统灵活性,扩展性。

    3. 消峰。这个其实也很好理解,因为MQ的本质就是业务的排队。所以,面对突然到来的高并发,MQ也可以不用慌忙,先排好队,不要着急,一个一个来。消峰的好处就是避免高并发压垮系统的关键组件,如某个核心服务或数据库等。

    异步,解耦,消峰,MQ的三大主要应用场景。

    MQ有哪些缺点?

    了解了MQ的主要应用场景,那么,其缺点也是显而易见的。

    1. 系统复杂性增加。毕竟是增加了一个中间件MQ,那么系统变得更复杂,就是不可避免的。但是,与其说是系统复杂性增加,不如说是给相关开发人员带来的新的学习成本。但是,一项技术本身就是这样,学时很痛苦,学会了,它就会变成一把利剑,帮助您开疆辟土。

    2. 系统可用性降低。假设一个系统由若干个节点链式组成,每个节点出问题的概率是相同的,那么,20个节点的系统出问题的概率显然要高于10个节点的系统。所以,从这个角度来看,毕竟是增加了一个MQ中间件,出问题的概率显然会增大,系统可用性就会降低。

    常用的MQ中间件

    MQ中间件产品还是比较多的,甚至,你可以自己去完成一个。但是,最常见的解决方案是直接使用一个符合业务场景且相对成熟的产品,如ActiveMQ,RabbitMQ,RocketMQ,kafka等。一般,到这里,就该比较一下这些的MQ中间件了,但是,本文,我们暂不作比较,因为这个内容,我们准备往后放。我们先留个思考题:假如,你来开发一个MQ中间件,需要解决哪些问题?

    展开全文
  • MQ工具类java

    2017-04-16 11:42:52
    包含了IBM的MQ初始化,发送,接收的工具类,方便极了,可直接放入到项目中。
  • java连接MQ

    2017-10-27 11:16:06
    java连接mq 连接通道,打开队列获取消息,提交事务;打开队列发送消息,设置消息头。
  • java连接MQdemo

    2018-11-19 22:56:25
    简单的mq例子有利于新手用
  • WebSphereMQ,也称MQSeries,以一致的、可靠的和易于管理的方式来连接应用程序,并为跨部门、企业范围的集成提供了可靠的基础。通过为重要的消息和事务提供可靠的、一次且仅一次的传递,MQ可以处理复杂的通信协议,...
  • mq使用入门案例demo

    2018-05-27 17:16:44
    这是一个mq的入门使用案例包括一对一队列,和订阅者队列(一对多),这是一个maven项目,需要大家导入的时候导入maven项目
  • JAVA实现MQ发送接收消息详解 MQ配置文档 MQ配置
  • MQ6.0_JAVA编程

    2010-06-06 17:52:41
    MQ6.0_JAVA编程电子书,mq使用
  • horsemq-java-client 适用于Horse MQ Server的Java客户端
  • Key Lime Box 的简单 MQ - Java 客户端 这是一个 Java 客户端库,用于与Key Lime Box 的 Simple MQ服务器进行交互。 春天准备好了 Simple MQ Java Client 在构建时考虑到了 Spring。 如何使用 首先,您必须设置所需...
  • 使用JAVA语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色首先我们必须需要搞明白MQ (消息队列)中的三个基本角色ProducerBrokerConsumer整体架构如下所示 自定义协议首先从上一篇中介绍了协议的相关信息,...

    使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)

    330ea27ccd86bd3f7363da50221420c1.png

    主要角色

    首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

    Producer

    Broker

    Consumer

    整体架构如下所示

    10a96868b6362b5b60d0cf08dde1cf42.png

    自定义协议

    首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

    消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

    消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

    消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

    消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

    消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

    流程顺序

    项目构建流程

    下面将整个MQ的构建流程过一遍

    新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能

    新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;

    新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法

    测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

    具体使用流程

    生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.

    消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

    代码演示

    消息处理中心 Broker

    /**

    * 消息处理中心

    */

    public class Broker {

    // 队列存储消息的最大数量

    private final static int MAX_SIZE = 3;

    // 保存消息数据的容器

    private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);

    // 生产消息

    public static void produce(String msg) {

    if (messageQueue.offer(msg)) {

    System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());

    } else {

    System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");

    }

    System.out.println("=======================");

    }

    // 消费消息

    public static String consume() {

    String msg = messageQueue.poll();

    if (msg != null) {

    // 消费条件满足情况,从消息容器中取出一条消息

    System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());

    } else {

    System.out.println("消息处理中心内没有消息可供消费!");

    }

    System.out.println("=======================");

    return msg;

    }

    }

    消息处理中心服务 BrokerServer

    /**

    * 用于启动消息处理中心

    */

    public class BrokerServer implements Runnable {

    public static int SERVICE_PORT = 9999;

    private final Socket socket;

    public BrokerServer(Socket socket) {

    this.socket = socket;

    }

    @Override

    public void run() {

    try (

    BufferedReader in = new BufferedReader(new InputStreamReader(

    socket.getInputStream()));

    PrintWriter out = new PrintWriter(socket.getOutputStream())

    )

    {

    while (true) {

    String str = in.readLine();

    if (str == null) {

    continue;

    }

    System.out.println("接收到原始数据:" + str);

    if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息

    //从消息队列中消费一条消息

    String message = Broker.consume();

    out.println(message);

    out.flush();

    } else if (str.contains("SEND:")){

    //接受到的请求包含SEND:字符串 表示生产消息放到消息队列中

    Broker.produce(str);

    }else {

    System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");

    }

    }

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    public static void main(String[] args) throws Exception {

    ServerSocket server = new ServerSocket(SERVICE_PORT);

    while (true) {

    BrokerServer brokerServer = new BrokerServer(server.accept());

    new Thread(brokerServer).start();

    }

    }

    }

    客户端 MqClient

    /**

    * 访问消息队列的客户端

    */

    public class MqClient {

    //生产消息

    public static void produce(String message) throws Exception {

    //本地的的BrokerServer.SERVICE_PORT 创建SOCKET

    Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);

    try (

    PrintWriter out = new PrintWriter(socket.getOutputStream())

    ) {

    out.println(message);

    out.flush();

    }

    }

    //消费消息

    public static String consume() throws Exception {

    Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);

    try (

    BufferedReader in = new BufferedReader(new InputStreamReader(

    socket.getInputStream()));

    PrintWriter out = new PrintWriter(socket.getOutputStream())

    ) {

    //先向消息队列发送命令

    out.println("CONSUME");

    out.flush();

    //再从消息队列获取一条消息

    String message = in.readLine();

    return message;

    }

    }

    }

    测试MQ

    public class ProduceClient {

    public static void main(String[] args) throws Exception {

    MqClient client = new MqClient();

    client.produce("SEND:Hello World");

    }

    }

    public class ConsumeClient {

    public static void main(String[] args) throws Exception {

    MqClient client = new MqClient();

    String message = client.consume();

    System.out.println("获取的消息为:" + message);

    }

    }

    我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

    接收到原始数据:SEND:Hello World

    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:1

    =======================

    接收到原始数据:SEND:Hello World

    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:2

    =======================

    接收到原始数据:SEND:Hello World

    成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:3

    =======================

    接收到原始数据:SEND:Hello World

    消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!

    =======================

    接收到原始数据:Hello World

    原始数据:Hello World没有遵循协议,不提供相关服务

    接收到原始数据:CONSUME

    已经消费消息:SEND:Hello World,当前暂存的消息数量是:2

    =======================

    接收到原始数据:CONSUME

    已经消费消息:SEND:Hello World,当前暂存的消息数量是:1

    =======================

    接收到原始数据:CONSUME

    已经消费消息:SEND:Hello World,当前暂存的消息数量是:0

    =======================

    接收到原始数据:CONSUME

    消息处理中心内没有消息可供消费!

    =======================

    小结

    本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 "生产者,消费者,消费处理中心,协议" 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

    展开全文
  • 高级JAVA开发 MQ部分

    千次阅读 2019-05-15 02:18:43
    高级JAVA开发 MQ部分MQMQ的作用、为什么要用MQ常见的MQ的优缺点使用MQ带来的问题以及处理办法MQ带来的问题列举消息重复消费(幂等)问题消息丢失问题消息顺序性问题消息过期丢失、大量积压等问题如何保证MQ高可用性...

    MQ

    参考和摘自:
    中华石杉 《Java工程师面试突击第1季》
    ActiveMQ—知识点整理
    消息总线真的能保证幂等
    ActiveMQ消息传送机制以及ACK机制详解

    MQ的作用、为什么要用MQ

    解耦、异步、消峰

    应用场景:

    1. 解耦:利用 发布/订阅(Publish/Subscribe)模型,多服务订阅同一queue,省去生产者主动调用并维护多个消费者(service),消费者可随时unSubscribe,生产者并不感知。
    2. 异步:利用 点对点( Point-to-Point)模型,将多个任务分别放到不同队列中,之后直接返回。消费者各自从不同队列取得任务并消费。这么做的好处是不用阻塞等待多个任务全部返回再响应用户操作,加速响应。
    3. 消峰:利用 点对点( Point-to-Point)模型,在系统请求高峰期不采用阻塞式调用,将任务全部打入MQ中,让系统调用链中消费者慢慢消化任务。防止系统被访问高峰打死(很大原因是直接访问数据库,数据库成为瓶颈,后面也会在 缓存 章节继续分析)。

    常见的MQ的优缺点

    摘自中华石杉老师的笔记

    特性ActiveMQRabbitMQRocketMQKafka
    单机吞吐量万级
    吞吐量比RocketMQ和Kafka要低了一个数量级
    万级
    吞吐量比RocketMQ和Kafka要低了一个数量级
    10万级
    RocketMQ也是可以支撑高吞吐的一种MQ
    10万级
    这是kafka最大的优点,就是吞吐量高。
    一般配合大数据类的系统来进行实时数据计算、日志采集等场景
    topic数量对吞吐量的影响topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降。
    这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic
    topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
    时效性ms级微秒级
    这是rabbitmq的一大特点,延迟是最低的
    ms级延迟在ms级以内
    可用性
    基于主从架构实现高可用性

    基于主从架构实现高可用性
    非常高
    分布式架构
    非常高
    kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    消息可靠性有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
    功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
    优劣势总结非常成熟,功能强大,在业内大量的公司以及项目中都有应用。
    偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本。
    而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用。
    erlang语言开发,性能极其好,延时很低;
    吞吐量到万级,MQ功能比较完备
    而且开源提供的管理界面非常棒,用起来很好用
    社区相对比较活跃,几乎每个月都发布几个版本分
    在国内一些互联网公司近几年用rabbitmq也比较多一些
    但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
    而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。
    而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。
    接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障
    日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景
    而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
    社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码
    还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的
    kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展
    同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量
    而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略
    这个特性天然适合大数据实时计算以及日志收集

    使用MQ带来的问题以及处理办法

    MQ带来的问题列举

    1. 系统可用性降低,需要保证MQ健康的运行,否则MQ挂掉会导致整个应用不可用,后果难以设想。
    2. 系统的复杂性变高,比如消息重复消费、消息丢失 等等,需要用一些手段来处理,怎么处理接下来细说,这里将会成为重要考点。
    3. 一致性问题(分布式事务)。A、B、C、D四个任务必须同时成功,放入MQ后A、B、C都成功了,偏偏D任务失败了该如何处理。这也是重要考点。

    消息重复消费(幂等)问题

    消费者拿到消息但是还没来得及ACK(或者还没来得及提交offset)就挂掉了,重启后会重新拿到已经消费但是还没通知(ACK)给MQ的消息,就产生了重复消费问题。(PS:这里如果MQ采用AUTO_ACK模式,消费者拿到消息后会第一时间给MQ做出ACK反馈,之后再去消费消息。但还没来得及消费完就挂掉了,那么MQ会认为此条消息处理过了,消费者重启后会继续从MQ拿消息,会产生消息丢失的问题,参看:<消息丢失问题>章节)
    解决办法:
    数据携带唯一id,基于内存Map或Set、Redis、数据库唯一键,保存消费成功的数据,重复数据来第二次时候可以感知到并直接丢弃处理。

    面试遇到过这么一个问题:
    面试官:有100w用户电话号码数据在数据库中,并不保证其中是否有重复,要求为每一个用户发一条短信并且不可重复发送,现提供一个基于http的短信发送微服务,该如何设计架构,最简单最快的把短信发送出去?

    我的思路:
    1.数据量过大,不能在数据库上做去重,并且用多线程读取更快。
    2.短信服务会成为瓶颈,需要缓冲。
    3.怎么在消费的过程中直接过滤掉重复消息。

    我:用多线程读取数据(计算好多少页,每个线程取多少次,一次取多少条)将用户数据灌入MQ,多个消费者从MQ取得数据先在Redis中读一下看看有没有消费过,并在Redis中存入消费过的电话号码,调用短信服务(多部署几个)发送。重复电话号码直接舍弃数据不消费。
    接着面试官问:如果就有那么几个消费者同时取到电话号码,巧了,他们取到的电话号码相同,查了一下Redis,没人发送过,就有重复发送的问题,怎么解决。
    我:呃…


    回来想了一下这个问题,因为先读取Redis再写入Redis并不能保证原子性,出现了并发问题。采用和分布式锁相同的思想来解决这个问题。参考下面文章:
    Redis分布式锁的正确实现方式
    用Redis的set加入NX(SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;)参数保证set原子性并且取得返回值。完美解决~~~


    PS:

    1. Redis是主从集群,上述分布式锁会有漏洞。Redisson分布式锁实现框架可以弥补漏洞。(slave节点异步同步master数据,有延时。master节点设置锁还未同步到slave时挂掉了,锁就失效了)
    2. 由于本人技术水平有限,上述的架构方案并不是最优的,读文章小伙伴有更好的解决方案请发邮件给我,不胜感激~~~~

    消息丢失问题

    1.生产者发送消息丢失:
    生产者发送消息到MQ过程中,网络原因丢了。
    MQ接收到消息后内部出错,没保存下来。
    解决办法:
    1.用 事务,发送时用try catch包裹发送消息代码块,如果发送失败可以拦截Exception再做进一步处理(重发或者…)
    事务机制是 同步阻塞的,影响发送消息吞吐量。
    2.(RabbitMq)利用 confirm机制。把channel设置成confirm模式,发送结果通知给本地实现的接口,如果通知失败再做进一步的处理(重发或者…) 异步不会阻塞,吞吐量高
    2.MQ本身问题丢失:
    MQ挂掉了导致内存中消息丢失
    解决办法: 配置持久化。但是也有一种可能,消息接收到了但是还没来得及持久化就挂掉了,还是会丢失一点点数据。
    3.MQ到消费者消费过程中导致的丢失,消费端弄丢了数据
    消费着拿到消息但是还没来得及消费就挂掉了,MQ以为消费者已经处理完了
    解决办法:关闭AUTO_ACK,采用CLIENT_ACK模式,客户端收到消息处理成功后再手动发送ACK给MQ。如果消费者挂掉了,MQ针对这条消息会ACK超时,重新发给别人继续消费。

    消息顺序性问题

    这里假设要处理一个订单的三个任务,三个任务为一组:

    1. RabbitMQ等非分布式MQ解决方案:
      思路:将同订单任务按顺序放到一个Queue中,一个消费者只从一个Queue消费。
      在这里插入图片描述
      这里不能建立无穷多个Queue,将订单号的hash值对Queue个数取余分发到对应Queue中即可。
    2. Kafka等分布式MQ解决方案:
      参考:kafka topic消息分配partition规则(Java源码)
      思路:由于Kafka是分布式的,每个Queue中的数据会分布到多个partition中。Kafka可以保证每个partition中的数据是有顺序的,那么指定key(比如订单ID),将需要保证顺序的任务放到同一partition中(kafka会把key值一样的任务放到一个partition中,和上述订单号对Queue个数取余原理相同),一个消费者只能从一个partition消费,这样就保证了按顺序执行。

    如果消费者是多线程并发的从partition取得数据,多线程不能保证执行顺序,那么上述模型就不好用了。
    解决办法: 在消费者端建立多个线程安全的队列(比如LinkedBlockingQueue,这里我认为采用阻塞队列比较好,省得消耗jvm内存过多导致oom异常),从MQ取得任务,将相同订单号的任务发送到同一个队列(也可以用hash取余的办法),再对每一个队列启动一个线程消费任务,保证顺序执行。
    在这里插入图片描述

    消息过期丢失、大量积压等问题

    1. 过期问题:
      一般不设置消息的过期时间。如果设置了过期时间,只能在事后从数据源头找出数据,写程序将数据重新发送到MQ中。
    2. 大量积压后如何快速消费:
      ActiveMQ、RabbitMQ等非分布式MQ单个Queue数据量过大增加临时MQ节点也不能解决问题(每个节点存储全量数据),需要增加消费者临时节点来加速消费。
      Kafka是分布式MQ,每个消费者只能指定一个partition消费,那么新建立一个更多节点的Kafka集群,增加临时服务将原Kafka集群中的数据直接分发到新Kafka集群,这样消息会平均分配到更多的机器中,减缓MQ压力,再针对新Kafka集群添加实际业务处理的消费者,增加消费速度即可。
    3. MQ积压导致磁盘空间不足:
      (Kafka)增加临时消费节点将消息写到临时MQ集群中。
      或者增加临时消费者拿到数据直接将数据扔掉,事后做补偿。防止MQ直接压垮掉导致整个系统不可用。

    如何保证MQ高可用性

    RabbitMQ高可用以及部署模式

    单机模式,普通集群模式,镜像集群模式

    1. 单机模式
    2. 普通集群模式
      集群中Master保存Queue元数据(Queue的属性之类的数据)和Queue数据,slave只保存Queue的元数据,在访问slave节点时,slave去和主节点通信取得Queue数据。
      总结:
      优点:提高消费吞吐量
      缺点:1. 性能开销大,在集群内部产生大量数据传输 2.可用性几乎没有保障,主节点挂掉了,整个MQ不可用。
    3. 镜像集群模式
      所有节点都保存Queue的所有数据。在写入数据时,各个节点自动同步。消费数据时同样。
      总结:
      优点:高可用,单个节点挂掉不影响整个集群
      缺点:1.性能开销大,需要所有机器同步所有数据。 2.不是分布式的,单Queue数据量超级大,超出单机最大容量时无法处理。 3.扩展性很差,和第2点一样,加机器也不能缓解数据量超级大的问题。

    如何开启镜像集群模式

    在管理控制台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定数量的节点,然后再次创建Queue的时候应用这个策略,就会自动将数据同步到其他的节点上去了。

    kafka的高可用性

    多个broker组成,每个broker是一个节点;创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition放一部分数据。(可以理解为每个Queue的数据被划分到不同机器上,换言说每个机器都持有同一个Queue的一部分数据)。在0.8版本以前没有HA机制,其中一台机器宕机则数据直接丢失。0.8版本后可以针对每个partition设置多个broker,其中一台是leader节点,其余是follower节点,只有leader节点提供对外读写服务,数据读写自动同步到follower节点,如果leader挂掉,follower们通过选举算法选举一个follower作为新的leader继续提供服务。

    摘自中华石杉《Java工程师面试突击第1季(可能是史上最好的Java面试突击课程)\06_引入消息队列之后该如何保证其高可用性?\视频\04》图片摘自中华石杉《Java工程师面试突击第1季(可能是史上最好的Java面试突击课程)\06_引入消息队列之后该如何保证其高可用性?\视频\04

    展开全文
  • java连接mq的demo

    2017-01-20 18:07:01
    项目介绍博客:http://blog.csdn.net/qq_17616169/article/details/54633005
  • JAVA IBM MQ 接收、发送

    热门讨论 2012-05-13 16:43:46
    JAVA IBM MQ 接收消息、发送消息例子
  • java-MQ学习

    2017-03-16 10:02:07
    java-MQ
  • Java获取MQ连接数的Demo.zip此为Java调用mq的demo,不多说看代码。。很简单

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 101,040
精华内容 40,416
关键字:

MQ是什么java

java 订阅