activemq 订阅
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。 展开全文
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
信息
支持系统
windows,linux
支持语言
Java,C,C++,C#,Python,Ruby,Perl
外文名
Apache ActiveMQ
出品厂商
Apache
中文名
ActiveMQ
支持规范
JMS1.1,J2EE 1.4 AMQP 1.0
应用协议
OpenWire,Stomp REST
ActiveMQ简介
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。 [1] 
收起全文
精华内容
参与话题
问答
  • ActiveMQ面试题总结

    万次阅读 多人点赞 2019-02-06 15:13:27
    是什么 消息中间件。可以在分布式系统的不同服务之间进行消息的发送和接收 它的出现解决了什么问题 可以让系统解耦 比如:使用消息中间件,某一个服务,可能依赖了其他好几个服务。比如课程里面的运营商后台...

    是什么

    • 消息中间件。可以在分布式系统的不同服务之间进行消息的发送接收

    它的出现解决了什么问题

    • 可以让系统解耦
    • 比如:使用消息中间件,某一个服务,可能依赖了其他好几个服务。比如课程里面的运营商后台依赖了4个服务,那不用mq就和4个服务耦合,用了mq,就只和1个mq耦合。参考下图:

    实际项目应用场景

    • 监听商品添加消息,接收消息,将对应的商品信息同步到索引库
    • 每次添加完商品并将同步商品到索引库如果,如果直接同步数据库,当数据库很大的时候,会影响服务器性能,这时我们,就使用ActiveMQ消息中间件,后台添加完消息后,搜索服务器发送一个消息【商品id】,并将接收到的商品id在数据库中查找跟商品id有关的信息,吧信息添加到索引库中

    ActiveMQ的特点如下

    • 完全支持JMS 1.1和J2EE 1.4规范(持久化,XA消息,事务)
    • 支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    • 可插拔的体系结构,可以灵活制定,如:消息存储方式,安全管理等
    • 很容易和Application Server集成使用
    • 多种语言和协议编写客户端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP
    • 从设计上保证了高性能的集群,客户端—服务器,点对点
    • 可以很容易的和Spring结合使用
    • 支持通过 JDBC 和 journal 提供高速的消息持久化
    • 支持和Axis的整合

    ActiveMQ消息发送失败

    • ActiveMQ有两种通信方式,点到点形式和发布订阅模式。
    • 如果是点到点模式的话,如果消息发送不成功,此消息默认会保存到ActiveMQ服务端直到有消费者将其消费,所以此消息是不会丢失的。
    • 如果是发布订阅模式的通信方式,默认情况只通知一次,如果接受不到此消息就没有了,这种场景使用于对消息发送率要求不高的情况,如果要求消息必须送达不可以丢失的话,需要配置持久订阅。每个订阅端定义一个id,在订阅是向ActiveMQ注册,发布消息和接受消息时需要配置发送模式为持久化,此时如果客户端接受不到消息,消息会持久化到服务端,直到客户端正常接收后为止。

    如何防止消息重复发送

    • 解决方法:增加消息状态表。
    • 通俗来说就是一个账本,用来记录消息的处理状态,每次处理消息之前,都去状态表中查询一次,如果已经有相同的消息存在,那么不处理,可以防止重复发送。

    什么情况下才使用ActiveMQ

    • 多个项目之间集成:
      • 跨平台
      • 多语言
      • 多项目
    • 优点:
      • 降低系统间模块的耦合度
      • 解耦
      • 软件扩展性

    丢消息怎么办

    • 解决方案:用持久化消息【可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB】,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

    持久化消息非常慢

    • 默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

    服务挂掉

    • 这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的<systemUsage>节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除

    Queue和Topic的区别

    • 点对点(point-to-point,简称PTP)Queue消息传递模型:
      • 在该消息传递模型下,一个消息生产者向消息服务器端一个特定的队列发送消息,一个消费者从该队列中读取消息。在这种模型下,消息生产者知道消息消费者的队列并直接将消息发送到消息消费者的队列。这种模型的特点是能够保证数据安全

    • 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:

      •  在该消息传递模型下,一个消息发布者向一个特定的消息主题发布消息,0或多个对此消息主题感兴趣的并且处于活动状态的消息订阅者或者建立了持久订阅的消息订阅者才可以接收到所发布的消息。可能造成数据丢失

    ActiveMQ【JMS的同步与异步】发送消息的方式有哪些

    • 同步方式
      • 两个通信应用服务之间必须要进行同步,两个服务之间必须都是正常运行的。发送程序和接收程序都必须一直处于运行状态,并且随时做好相互通信的准备。
      • 发送程序首先向接收程序发起一个请求,称之为发送消息,发送程序紧接着就会堵塞当前自身的进程,不与其他应用进行任何的通信以及交互,等待接收程序的响应,待发送消息得到接收程序的返回消息之后会继续向下运行,进行下一步的业务处理。
    • 异步方式
      • 两个通信应用之间可以不用同时在线等待,任何一方只需各自处理自己的业务,比如发送方发送消息以后不用登录接收方的响应,可以接着处理其他的任务。也就是说发送方和接收方都是相互独立存在的,发送方只管方,接收方只能接收,无须去等待对方的响应。

      • Java中JMS就是典型的异步消息处理机制,JMS消息有两种类型:点对点、发布/订阅

    有兴趣的同学可以关注我的个人公众号,期待我们共同进步!!!

    展开全文
  • 分布式面试之ActiveMQ面试题

    万次阅读 2020-10-11 12:42:13
    文章目录1、什么是ActiveMQ?2、ActiveMQ服务器宕机怎么办?3、丢消息怎么办?4.持久化消息非常慢。5、消息的不均匀消费。6、死信队列。7、ActiveMQ中的消息重发时间间隔和重发次数吗? 1、什么是ActiveMQ? activeMQ...

    1、什么是ActiveMQ?

    activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信

    2、ActiveMQ服务器宕机怎么办?

    这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的<systemUsage>节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。

    虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。
    那如果文件增大到达了配置中的最大限制的时候会发生什么?我做了以下实验:

    设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。

    设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。整个系统可连接,但是无法提供服务,就这样挂了。
    具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。

    3、丢消息怎么办?

    这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Softwarecausedconnectionabort:recvfailed错误。

    通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close。时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。

    解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit。方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

    4.持久化消息非常慢。

    默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

    5、消息的不均匀消费。

    有时在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费",如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这
    些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。

    解决方案:将prefetch设为1,每次处理1条消息,处理完再去取,这样也慢不了多少。

    6、死信队列。

    如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将ack交由程序自己处理。那如果使用了AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有!

    消费消息有2种方法,

    一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。

    另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。

    那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新拋异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回–获取-报错死循环了吗?在重试6次后,ActiveMQ认为这条消息是“有毒’‘的,将会把消息丢到死信队列里。如果你的消息不见了,去ActiveMQ.DLQ里找找,说不定就躺在那里。

    7、ActiveMQ中的消息重发时间间隔和重发次数吗?

    ActiveMQ:是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。JMS(Java消息服务):是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

    首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。

    1. 如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发.

    2. 如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发!

    3. 如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。

    4. 说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。

    我们可以来对ActiveMQ的重发策略(RedeliveryPolicy)来进行自定义配置,其中的配置参数主要有以下几个:

    可用的属性

    属性默认值说明

    1. collisionAvoidanceFactor默认值0.15,设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。
      lmaximumRedeliveries默认值6,最大重传次数,达到最大重连次数后拋出异常。为-1时不限制次数,为0时表示不进行重传。

    2. maximumRedeliveryDelay默认值-1,最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。

    3. initialRedeliveryDelay默认值1000L,初始重发延迟时间
      lredeliveryDelay默认值1000L,重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)

    4. useCollisionAvoidance默认值false,启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。

    5. useExponentialBackOff默认值false,启用指数倍数递增的方式增加延迟时间。

    6. backOffMultiplier默认值5,重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

    展开全文
  • 【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题【BAT必备】activeMQ面试题...
  • ActiveMQ面试题

    千次阅读 2018-10-19 14:42:39
    1.什么是ActiveMQ?  activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信  2. ActiveMQ服务器宕机怎么办? 这得从ActiveMQ的...

    1.什么是ActiveMQ?

     activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信 

    2. ActiveMQ服务器宕机怎么办?

    这得从ActiveMQ的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的<systemUsage>节点中配置。但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。

    那如果文件增大到达了配置中的最大限制的时候会发生什么?我做了以下实验:

    设置2G左右的持久化文件限制,大量生产持久化消息直到文件达到最大限制,此时生产者阻塞,但消费者可正常连接并消费消息,等消息消费掉一部分,文件删除又腾出空间之后,生产者又可继续发送消息,服务自动恢复正常。

    设置2G左右的临时文件限制,大量生产非持久化消息并写入临时文件,在达到最大限制时,生产者阻塞,消费者可正常连接但不能消费消息,或者原本慢速消费的消费者,消费突然停止。整个系统可连接,但是无法提供服务,就这样挂了。

    具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。

    3. 丢消息怎么办?

    这得从java的java.net.SocketException异常说起。简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。

     

    通过抓包得知,ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。

     

    解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。

     

    4. 持久化消息非常慢。

     

    默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。

     

    5. 消息的不均匀消费。

     

    有时在发送一些消息之后,开启2个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。

     

    解决方案:将prefetch设为1,每次处理1条消息,处理完再去取,这样也慢不了多少。

     

    6. 死信队列。

     

    如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将ack交由程序自己处理。那如果使用了AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有!

     

    消费消息有2种方法,一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新抛异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回--获取--报错死循环了吗?

     

    在重试6次后,ActiveMQ认为这条消息是“有毒”的,将会把消息丢到死信队列里。如果你的消息不见了,去ActiveMQ.DLQ里找找,说不定就躺在那里。

     

    7. ActiveMQ中的消息重发时间间隔和重发次数吗?

    ActiveMQ:是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。JMS(Java消息服务):是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。    

    首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。

    ① 如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发.

    ② 如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发!

    ③ 如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。

    ④ 说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。

    我们可以来对ActiveMQ的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:

    可用的属性

     属性 默认值 说明

    l  collisionAvoidanceFactor  默认值0.15 ,  设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。

    l  maximumRedeliveries  默认值6 ,  最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。

    l  maximumRedeliveryDelay  默认值-1,  最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。

    l  initialRedeliveryDelay  默认值1000L,  初始重发延迟时间

    l  redeliveryDelay  默认值1000L,  重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)

    l  useCollisionAvoidance  默认值false,  启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。

    l  useExponentialBackOff  默认值false,  启用指数倍数递增的方式增加延迟时间。

    l  backOffMultiplier  默认值5,  重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。
     

    展开全文
  • MQ之ActiveMQ

    千次阅读 2017-05-18 17:43:57
    ActiveMQ

    ActiveMQ

    什么是ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    主要特点:

    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    6. 支持通过JDBC和journal提供高速的消息持久化
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点
    8. 支持Ajax
    9. 支持与Axis的整合
    10. 可以很容易得调用内嵌JMS provider,进行测试

    ActiveMQ的消息形式

    • 对于消息的传递有两种类型:
      • 一种是点对点的,即一个生产者和一个消费者一一对应;
      • 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    • StreamMessage – Java原始值的数据流
    • MapMessage–一套名称-值对
    • TextMessage–一个字符串对象
    • ObjectMessage–一个序列化的 Java对象
    • BytesMessage–一个字节的数据流

    ActiveMQ的安装

    进入http://activemq.apache.org/ 下载ActiveMQ

    最新版本: 5.14.5

    安装环境:

    需要jdk
    安装Linux系统。生产环境都是Linux系统。

    安装步骤

    第一步: 把ActiveMQ 的压缩包上传到Linux系统。
    第二步:解压缩。
    第三步:启动。
    使用bin目录下的activemq命令启动:
    [root@localhost bin]# ./activemq start
    关闭:
    [root@localhost bin]# ./activemq stop
    查看状态:
    [root@localhost bin]# ./activemq status

    注意:如果ActiveMQ整合spring使用,不要使用activemq-all-5.14.5.jar包(spring 可能少方法)。建议使用5.11.2

    进入管理后台:

    http://192.168.25.168:8161/admin
    用户名:admin
    密码:admin

    可能出现的问题:

    405的问题:机器名和 ip 没有对上,修改 host 文件。
    查看 hostname,然后检查 hosts 文件中是否相同。
    /etc/sysconfig/network

    vim /etc/h/hosts

    ActiveMQ的使用方法

    Queue

    点对点(point-to-point,简称PTP)Queue消息传递模型

    Producer

    生产者:生产消息,发送端。
    把jar包添加到工程中。使用5.11.2版本的jar包。

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。

    @Test
    public void testQueueProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        //brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

    Consumer

    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    @Test
    public void testQueueConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }



    Topic

    发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型

    Producer

    使用步骤:
    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。

    @Test
    public void testTopicProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        // brokerURL服务器的ip及端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
        // 参数:话题的名称。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

    Consumer

    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    @Test
    public void testTopicConsumer() throws Exception {
        // 第一步:创建一个ConnectionFactory对象。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:从ConnectionFactory对象中获得一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启连接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象创建一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
    
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端03。。。。。");
        // 等待键盘输入
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
    展开全文
  • ActiveMQ 面试题(长期更新)

    万次阅读 2019-05-02 16:38:51
    什么是activemq activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。 activemq的作用以及原理 Activemq的作用就是系统之间进行...
  • 7道消息队列ActiveMQ面试题分享给你! 大家面试前,必须狠刷面试题,哥给你们整理了面试过程中必问的一些面试,希望对你们有帮助,祝你们早日找到满意的工作。
  • ActiveMq由浅入深讲解+面试题50道讲解

    千人学习 2018-07-04 15:21:23
    本课程共分36节,内容包括MQ概述和工作流程,启动过程与启动异常分析,消息的基本模型,基于...事务,死信队列,ACK策略,消息的丢失,重复重复消费,消息重发,springmvc集成,集群搭建,集群访问,50道面试精讲等。
  • 消息中间件之ActiveMq面试题

    千次阅读 2018-09-06 13:39:31
    ActiveMQ面试专题 什么是activemq activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。 activemq的作用以及原理 Activemq 的...
  • ActiveMq由浅入深讲解+面试题50道讲解 从事开发工作10余年,见证了...
  • 本课程共分36节,内容包括MQ概述和工作流程,启动过程与启动异常分析,消息的基本模型,...事务,死信队列,ACK策略,消息的丢失,重复重复消费,消息重发,springmvc集成,集群搭建,集群访问,50道面试精讲等。...
  • 面试题 1.为什么使用消息队列? 2.消息队列有什么优点和缺点? 3.Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: 第一,你知不知道你们系统...
  • docker安装activemq

    万次阅读 2020-04-28 14:39:37
    第一步下载镜像(我直接使用dockerhub上构建好的镜像)activeMq官网 docker pull rmohr/activemq 第二步创建数据卷 activemq_conf 和 activemq_data用来存储配置文件和数据 docker volume create activemq_conf...
  • ActiveMQ

    万次阅读 2019-03-16 14:34:36
    什么是消息中间件 面向消息的中间件,发送者将消息发送给消息服务器,消息服务器将消息存放在队列中,在合适的时候在将消息转发给接收者。 这种模式下,发送和接收是异步的,发送者无需等待,二者的生命周期未必...
  • ActiveMQ 学习

    千次阅读 2010-06-22 22:58:00
    ActiveMQ 学习
  • ActiveMQ系列之一:ActiveMQ简介

    千次阅读 2014-11-30 23:08:08
    ActiveMQ是什么 ActiveMQ是Apache推出的,一款开源的,完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现的消息中间件 (Message Oriented Middleware,MOM) ActiveMQ能干什么 最主要的功能就是:实现JMS ...
  • 经过一段时间对activeMQ的研究,首先我们觉得它无论从架构还是性能方面都应该可以承担起商业365*24的应用,但就像任何成熟的软件产品一样,尤其是这种分布式部署的消息中间件,在今天,如果没有一个好的可视化工具来...
  • 一、ActiveMQ安全机制 ActiveMQ是使用jetty部署的,修改密码需要到相应的配置文件 配置文件是这个: 在其第123行添加用户名和密码,添加配置如下: &lt;plugins&gt; &lt;... ...
  • ActiveMQ系列之五:ActiveMQ的Transport

    千次阅读 2014-11-30 23:55:34
    连接到ActiveMQ Connector:ActiveMQ提供的,用来实现连接通讯的功能。包括:client-to-broker、broker-to-broker。 ActiveMQ允许客户端使用多种协议来连接配置Transport Connector,在conf/activemq.xml里面,...
  • ActiveMQ系列之四:用ActiveMQ构建应用

    千次阅读 2014-11-30 23:46:45
    Broker:相当于一个ActiveMQ服务器实例 命令行启动参数示例如下: 1:activemq start :使用默认的activemq.xml来启动 2:activemq start xbean:file:../conf/activemq-2.xml :使用指定的配置文件来启动 3...
  • ActiveMQ使用教程

    万次阅读 2018-07-25 09:39:22
    一、下载 apache-activemq-5.13.2-bin.zip后解压,运行apache-activemq-5.13.2\bin\win64路径下的activemq.bat 运行成功如下 注:运行失败的话可以把apache-activemq-5.13.2\conf下的activemq.xml文件0.0.0.0:...
  • ActiveMQ—Windows操作系统中如何安装启动ActiveMQ
  • 在springboot项目中使用activemq,首先gradle(本人的项目是用gradle管理的,maven同理)中的build.gradle引入activemq依赖compile "org.springframework.boot:spring-boot-starter-activemq" compile "or
  • 1.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。 启动ActiveMQ以后,...
  • MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过...
  • 在前面的一篇ActiveMQ入门实例中我们实现了消息的异步传送,这篇博文将如何在spring环境下集成ActiveMQ。如果要在spring下集成ActiveMQ,那么就需要将如下jar包导入项目: 本文有两篇参考文献,因此有两个实例...
  • 下载最新的ActiveMQ 2:直接解压,然后拷贝到你要安装的位置就好了 启动运行 1:普通启动:到ActiveMQ/bin下面,./activemq start 2:启动并指定日志文件 ./activemq start > /tmp/activem

空空如也

1 2 3 4 5 ... 20
收藏数 30,627
精华内容 12,250
关键字:

activemq