精华内容
下载资源
问答
  • 用来保存处理失败或者过期的消息。...当一个消息被重发超过最大重发次数(缺省为6次,消费者端可以修改)时,会给broker发送一个"有毒标记“,这个消息被认为是有问题,这时broker将这个消息发送到...

    用来保存处理失败或者过期的消息。 

    出现下面情况时,消息会被重发: 

    1. 事务会话被回滚。
    2. 事务会话在提交之前关闭。
    3. 会话使用CLIENT_ACKNOWLEDGE模式,并且Session.recover()被调用。 
    4. 自动应答失败

    当一个消息被重发超过最大重发次数(缺省为6次,消费者端可以修改)时,会给broker发送一个"有毒标记“,这个消息被认为是有问题,这时broker将这个消息发送到死信队列,以便后续处理。 

    在配置文件(activemq.xml)来调整死信发送策略。

    <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue=">" >
    <deadLetterStrategy>
    <individualDeadLetterStrategy  queuePrefix="DLQ." useQueueForQueueMessages="true" />
    </deadLetterStrategy>
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>

    可以单独使用死信消费者处理这些死信。参见代码

    生产者:

    public class DlqProducer {
    
        //默认连接用户名
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL
                = ActiveMQConnection.DEFAULT_BROKER_URL;
        //发送的消息数量
        private static final int SENDNUM = 1;
    
        public static void main(String[] args) {
            ActiveMQConnectionFactory connectionFactory;
            ActiveMQConnection connection = null;
            Session session;
            ActiveMQDestination destination;
            MessageProducer messageProducer;
    
            connectionFactory
                    = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
    
            try {
                connection = (ActiveMQConnection) connectionFactory.createConnection();
                connection.start();
    
                session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
    
                destination = (ActiveMQDestination) session.createQueue("TestDlq2");
                messageProducer = session.createProducer(destination);
                for(int i=0;i<SENDNUM;i++){
                    String msg = "发送消息"+i+" "+System.currentTimeMillis();
                    TextMessage message = session.createTextMessage(msg);
    
                    System.out.println("发送消息:"+msg);
                    messageProducer.send(message);
                }
                session.commit();
    
    
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
    
        }
    
    
    }

    消费者:

    public class DlqConsumer {
    
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL
                = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) {
            ActiveMQConnectionFactory connectionFactory;
            ActiveMQConnection connection = null;
            Session session;
            ActiveMQDestination destination;
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂
            connectionFactory
                    = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
            //限制了重发次数策略
            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
            redeliveryPolicy.setMaximumRedeliveries(1);
    
            try {
                //通过连接工厂获取连接
                connection = (ActiveMQConnection) connectionFactory.createConnection();
                //启动连接
                connection.start();
                // 拿到消费者端重复策略map
                RedeliveryPolicyMap redeliveryPolicyMap
                        = connection.getRedeliveryPolicyMap();
                //创建session
                session
                        = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                destination = (ActiveMQDestination) session.createQueue("TestDlq2");
                // 将消费者端重发策略配置给消费者
                redeliveryPolicyMap.put(destination,redeliveryPolicy);
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println("Accept msg : "
                                    +((TextMessage)message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        throw new RuntimeException("test");
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }

    消费死信队列

    public class ProcessDlqConsumer {
    
        private static final String USERNAME
                = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD
                = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL
                = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Destination destination;//消息的目的地
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(ProcessDlqConsumer.USERNAME,
                    ProcessDlqConsumer.PASSWORD, ProcessDlqConsumer.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接HelloWorld的消息队列
                //destination = session.createTopic("TestDlq");
                destination = session.createQueue("DLQ.>");
    
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            System.out.println("Accept DEAD msg : "
                                    +((TextMessage)message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }

    注意,该代码中展示了如何配置重发策略。同时,重试策略属于ActiveMQ的部分,所以有部分connectionFactory,connection的声明等等不能使用接口,必须使用ActiveMQ的实现。

     

    展开全文
  • MQTT作为一种消息中间件,是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。...

    MQTT作为一种消息中间件,是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。

    MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性

    (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

    (2)对负载内容屏蔽的消息传输。

    (3)使用TCP/IP提供网络连接。

    主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

    (4)有三种消息发布服务质量:

    "至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

    "至少一次",确保消息到达,但消息重复可能会发生。

    "只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

    (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

    这就是为什么在介绍里说它非常适合"在物联网领域,传感器与服务器的通信,信息的收集",要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

    (6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

    Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

    Testament:遗嘱机制,功能类似于Last Will。

    展开全文
  • 死信队列:DLX,Dead- Letter- Exchange 利用DLX,当消息在一个队列中变成死信( dead message)之后它能被重新 publish到另一个 Exchange,这个 Exchange就是DLX 死信队列消息变成死信有一下几种...队列达到最大长度...

    死信队列:DLX,Dead- Letter- Exchange

    • 利用DLX,当消息在一个队列中变成死信( dead message)之后它能被重新 publish到另一个 Exchange,这个 Exchange就是DLX

    死信队列消息变成死信有一下几种情况

    • 消息被拒绝( basic. reject/ basic nack)并且 requeue= false
    • 消息TTL过期
    • 队列达到最大长度

    死信队列

    • DLX也是一个正常的 Exchange,和一般的 Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
    • 当这个队列中有死信时, Rabbitmq就会自动的将这个消息重新发布设置的 Exchange上去,进而被路由到另一个队列
    • 可以监听这个队列中消息做相应的处理,这个特性可以弥补 rabbitmq3.0以前支持的 Immediate参数的功能
    • 死信队列设置
    • 首先需要设置死信队列的 exchange和 queue,然后进行绑定
    • Exchange: dix. exchange
    • Queue: dix.queue
    • Routingkey:#
    • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可: arguments;put(“x- dead-letter-exchangedlx exchange”)

    消费端代码实现

    public class Consumer {
    
        public static void main(String[] args) throws Exception{
            //1 创建一个connectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.0.159");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2通过连接工场创建连接
            Connection connection = connectionFactory.newConnection();
            //3通过connection创建channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.#";
            String queueName = "test_dlx_queue";
    
            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            Map<String,Object> arguments = new HashMap<>();
            //进行死信队列参数的配置
            arguments.put("x-dead-letter-exchange","dlx.exchange");
            //将死信队列绑定到一个队列上
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);
    
            //进行死信队列的申明
            channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
            channel.queueDeclare("dlx.queue",true,false,false,null);
            channel.queueBind("dlx.queue","dlx.exchange","#");
    
    
            //如果要使用限流方式  必须关闭自动签收下面的false
            channel.basicConsume(queueName,true,new MyConsumer(channel));
        }
    }
    
    

    生产端代码

    public class Producter {
        public static void main(String[] args) throws Exception{
            //1 创建一个connectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.0.159");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2通过连接工场创建连接
            Connection connection = connectionFactory.newConnection();
            //3通过connection创建channel
            Channel channel = connection.createChannel();
            //开启消息的确认模式
            channel.confirmSelect();
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.save";
    
            //发送消息
    
    //        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes());
            for(int i=0;i<1;i++){
                String msg = "hello";
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                AMQP.BasicProperties properties = builder.expiration("10000")
                        .expiration("10000")
                        .contentEncoding("UTF-8")
                        .deliveryMode(2).build();
                channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes());
            }
        }
    }
    

    自定义消费者

    public class MyConsumer extends DefaultConsumer {
        private Channel channel;
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel=channel;
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("consumerTag:  "+consumerTag);
            System.err.println("envelope  "+envelope);
            System.err.println("properties  "+properties);
            System.err.println("body  "+new String(body));
        }
    }
    
    展开全文
  • 消息中间件对比

    2020-09-28 10:38:59
    特性 ActiveMQ RabbitMQ RocketMQ kafka 单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 10万级,RocketMQ也是可以支撑高吞吐的...
    特性 ActiveMQ RabbitMQ RocketMQ kafka
    单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 10万级,RocketMQ也是可以支撑高吞吐的一种MQ 10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景
    topic数量对吞吐量的影响     topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十个到几百个的时候,吞吐量会大幅度下降所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
    时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
    可用性 高,基于主从架构实现高可用性 高,基于主从架构实现高可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    消息可靠性 有较低的概率丢失数据   经过参数优化配置,可以做到0丢失 经过参数优化配置,消息可以做到0丢失
    功能支持 MQ领域的功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
    优劣势总结 非常成熟,功能强大,在业内大量的公司以及项目中都有应用偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用 erlang语言开发,性能极其好,延时很低;吞吐量到万级,MQ功能比较完备而且开源提供的管理界面非常棒,用起来很好用社区相对比较活跃,几乎每个月都发布几个版本分在国内一些互联网公司近几年用rabbitmq也比较多一些但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。 接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的 kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集
    展开全文
  • 引子 随着2017双十一落下帷幕,notify和metaq这两个集团最大规模的消息中间件依然保持着超跑般顺畅的一贯作风,在0点峰值的秒级消息收发量达1亿,当天的消息收发总量更是突破了2万亿。随着notify和metaq的功能特性...
  • Redis是什么? 全称:REmote DIctionary Server Redis是一种key-value形式的NoSQL内存...Redis最大特性是它会将所有数据都放在内存中,所以读写速度性能非常好。当然,它也支持将内存中的数据以快照和日...
  • Sentinel 专门为这种场景提供了匀速器的特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高...
  • 本章重点: 1.消息发送源码分析 ... 使用MQ不仅满足多线程这些特点,因为它具有:异步,解耦,流量削峰,数据持久化存储,高性能,高可用,可伸缩特性,并且在弱一致性事务中,可以通过最大努力通知实现分...
  • 中间件能为客户解决什么问题

    千次阅读 2008-11-18 17:16:00
    像其中使用量最大的应用服务器功能逐渐包括消息和对象交易的功能,同时兼顾了Java的跨平台,及Web接入的特性,使互联网真正的动起来了。主要的产品是金蝶的APUSIC等。这是基础中间件的作用。主要是做一些基础的事物...
  • 转载自 Redis 的 4 大法宝,2018 必学中间件!Redis是什么?全称:REmote DIctionary ServerRedis是一种key-value形式的NoSQL...Redis最大特性是它会将所有数据都放在内存中,所以读写速度性能非常好。当然,它...
  • Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时的处理大量数据以满足各种需求场景: hadoop的...
  • Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时的处理大量数据以满足各种需求场景:比如基于...
  • 介绍如何最大化地发挥oracle数据库、oracle databasevault、oracle identity management、oracle application express以及oracle business intelligence suite的安全特性。本书通过独家的权威资源,为您提供经过验证...
  • Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop...
  • Redis是什么? 全称:REmote DIctionary Server Redis是一种key-value形式的NoSQL内存数据库,由...Redis最大特性是它会将所有数据都放在内存中,所以读写速度性能非常好。当然,它也支持将内存中的数据以快照...
  • Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的...
  • 简介Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop...
  • 本文作者:明成AS6咪咕二级用户中心 开发工程师招聘大事放首位:亚信招聘后端开发,欢迎有识之士投递简历,邮箱:liubao@asiainfo....它的最大特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hado...
  • 我们在做具体的选型时难免会纠结,在这里阐述点粗浅的看法,其实每个中间件在其设计上,都有其独有的特点或优化点,这些恰好应该是我们所关注的,这样才能做到物尽其用,将其特性发挥到最大;同时还要了解它们各自的...
  • RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用。作为一名合格的开发者,有必要了解一下相关知识,RabbitMQ(一):RabbitMQ快速入门已经入门RabbitMQ,本文介绍RabbitMQ的高级...
  • 作者简介:钱文品(老钱),互联网分布式高并发技术十年老兵,目前任掌阅科技资深后端工程师。熟练使用 Java、Python、Golang 等多种计算机语言,开发过游戏...而Redis5.0最大的新特性就是多出了一个数据结构Stream...
  • 作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。其主要特点有:灵活可扩展性RocketMQ 天然支持集群,...
  • [转] Erlang十分钟快速入门

    千次阅读 2008-05-02 11:08:00
    Erlang概述Erlang不但是一种编程语言,而且它具有比编程语言...这个Erlang初始开源版本包含了Erlang的实现,同时它也是用于构建分布式高可用性系统的Ericsson中间件最大组成部分。Erlang具有以下特性:并发性 - Er

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 136
精华内容 54
关键字:

中间件最大特性