精华内容
下载资源
问答
  • 来自公众号:京东技术消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法。应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用链接来连接它们。消息传递指的是程序之间通过在消息中发送数据进行...

    来自公众号:京东技术

    消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法。应用程序通过写入和检索出入列队的数据(消息)来通信,而无需通过专用链接来连接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用(Remote Procedure Call. RPC)的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求,这样天然的就实现了异步的目标。那么MQ还有哪些功能场景呢?下面逐一介绍。

    解耦

    5e1bbfd97ec9caa67f358b47e7308f88.png

    MQ最直接的使用场景就是可以将两个系统进行解耦,比如我们的货款抵扣业务场景,用户生成订单发送MQ后立即返回,结算系统去消费该MQ进行用户账户金额的扣款。这样订单系统只需要关注把订单创建成功,最大可能的提高订单量,并且生成订单后立即返回用户。而结算系统重点关心的是账户金额的扣减,保证账户金额最终一致。这个场景里面还会涉及到重试幂等性问题,后面有介绍。

    削峰填谷

    还是以订单系统和结算系统场景为例,如果订单系统通过RPC框架来调用结算系统,在有高峰促销的情况下生成订单的量会非常大,而且由于生成订单的速度也非常快,这样势必会给结算系统造成系统压力,服务器利用率则会偏高,但在不是高峰的时间点订单量比较小,结算系统的服务器利用率则会偏低。对于结算系统来说就会出现下面这样的高峰波谷现象图。

    6a966160773960fd302fa7b99cd45c36.png

    那么如果通过MQ的方式,将订单存储到MQ队列中,消费端通过拉取的方式,并且拉去速度有消费端来控制,则就可以控制流量趋于平稳。这样对于结算系统来讲,就达到了削峰填谷的目的。或者说起到了流控的目标。接下来,我们介绍一下拉取方式。

    拉取模式指用户在代码里主动调用pull方法,不需要在配置文件里面再配置,拉取的速度由用户控制,调用一次拉取一次消息进行消费,这里要重视消费的速度如果消费性能下降一定会造成积压,因此用户自己启用多线程控制并行度以提高消费速度。

    代码样例:

    messageConsumer.start();

    for (;;){ //手动拉取消息

    messageConsumer.pull(topic,messageListener);

    }

    method:pull(String topic,MessageListener listener)

    topic:指消费的主题名

    listener:是一个回调对象,当pull拉取到消息后会主动调用listener.onMessage(),

    与监听模式的区别是:

    监听模式由MQ客户端守护线程去不停的拉取消息进行消费,拉取模式由用户控制拉取的频率,不主动调用就不会消费消息。但是都不需要主动对消息进行确认。这种方式更适合写场景,保证最终结果落地即可,因为读是需要立即返回以免让用户长时间等待从而影响用户体验。

    最终一致性

    ae25fa6ecb72afc8e8cb3753e2835800.png

    一致性问题分为强一致性、弱一致性、最终一致性。大多数互联网业务要求实现最终一致性。还是以订单系统和结算系统业务场景举例,订单系统创建成功一个订单后给用户返回的结果即是成功并明确告诉用户会从账户中扣除相应的金额。那么结算系统需要保持跟订单系统相同的状态即从用账户中实际扣除一致的金额。订单系统会涉及两个动作,一个是创建成功订单,一个是发送成功通知到MQ,我们就可以把这两个动作放入到一个本地事务中,要么成功要么失败。当一次发送MQ失败之后,可以结合定时任务进行补偿,这样可以保证生成订单的结果可以落地到mq的存储中。同样结算系统消费端依靠MQ重试机制一直发送消息,直到消费端最终确认扣款业务成功处理完成。这样我们通过消息落地加补偿,消费端从业务上面考虑重复消费的保障,也就是做好幂等性操作,利用MQ实现了最终一致性。

    广播消费

    MQ有两种消息模式一种是点对点模式,一种是发布/订阅模式(最常用的模式)。同时发布/订阅模式按照消费类型又可以分为集群消费和广播消费。大部分情况下我们使用的是集群消费。

    集群消费:MQ发送任何一条消息,集群中只有一台服务器可随机消费到这条消息。如下图:

    a7fb1c164a9fb9b054c50f0be3af62c8.png

    广播消费:MQ发送每一条消息,集群中的每一台服务器至少消费到一次。如下图:

    ccb75e5172e7a8f6ec5cb738c6702d47.png

    广播消费举例:消息推送系统。首先某一个客户端与消息中心应用集群中的一台服务器建立长连接并将连接session信息保存到当前服务器内存中,集群在消费业务消息的时候,是不知道该客户端建立的长连接在哪一台服务器上面。这个时候通过广播消费,集群中的每一台服务器都可以消费到业务消息。在决定向用户推送通知之前会判断当前服务器内存中是否有该客户端的连接session信息,如果有则推送,进而客户端通过http协议拉取用户的消息实体。如果session信息不在当前服务器上面,则丢弃。如下图:

    c3009737bd3904ff2d9c783c1f581c68.png

    广播消费注意事项:

    1、消费进度在消费端管理,比如默认会在主目录下创建offset文件夹,偏移量文件存储在offset目录下,出现重复的概率要大于集群消费。

    2、MQ可以确保每条消息至少被每台消费方服务器消费一次,但是如果消费方消费失败,不会进入重试,因此业务方需要关注消费失败的情况。

    3、由于广播消费消息不会进行确认,所以管理端上显示的积压数会一直不变,需要以出对数为准。

    使用集群消费模拟广播

    在发布/订阅模式中,如果是集群消费,那么一条消息只能被集群中的随机一台服务器消费到,如果我们有需要集群中的每台服务器消费到比如上面的消息推送的例子,我们使用广播消费来实现。但是广播消费有一些弊端比如不支持顺序消息,消费进度在客户端维护出现重复的几率要大于集群模式,广播模式下不能维护消费进度所以管理端上面的积压数一直保持不变,我们就必须以出队数为准,也就是不能够支持消息堆积的查询。如果要规避这些弊端,那么我们可以利用集群消费来模拟广播,在集群消费中,我们的每台服务器上面的消费APPID是相同的,如果要达到广播的效果,那么每台服务器上面的消费APPID保持不同就可以了。

    715b187b9afd42456eb15121d690270c.png

    重试之坑

    f002d49b83ebfe3c827e988888f8cced.png

    MQ的重试功能可以保证数据结果最终得到处理,但同时也正因为有重试那么在业务处理的时候就需要格外注意幂等性的问题。比如货款抵扣业务,订单系统生成订单之后调用结算平台去扣除用户的账户金额。结算平台要根据流水号去计算,如果订单系统在调用结算平台的时候发生了网络异常,造成了结算平台实际上已经得到请求并且已处理。订单系统一侧认为发生异常需要重试,后续再发送到结算平台的订单就会造成重复扣款问题。所以流水号尤其要注意需要保证重试过程中每次发送的流水号是一致的,结算平台会根据流水号去做业务校验,如果已经处理,则丢弃,最终确保幂等性。

    总结

    我们介绍了MQ常见的使用场景,以及每种场景下的使用注意事项。尤其是在重试功能中,重试本来是MQ提供的一种保持数据最终可以得到确认的方法,但是如果业务使用上面不注意幂等性,则会带来业务数据的不一致甚至像重复扣款这样比较严重的后果。我们还介绍了发布/订阅模式下的广播消费的使用举例,也介绍了它的缺点以及可以使用集群消费来模拟广播。鉴于以上每种场景都给我们提供了很好的说明使得大家在以后使用MQ的过程中可以更好的发挥MQ的强大作用。

    转载至链接:https://my.oschina.net/zjllovecode/blog/1829065

    展开全文
  • 监听多个mq消息服务器 内容精选换一换创建后端云服务器组。将多个后端云服务器添加到后端云服务器组中后,请求会在后端云服务器间按后端云服务器组的负载均衡算法和后端云服务器的权重来做请求分发。指定session-...

    监听多个mq消息服务器 内容精选

    换一换

    c8a5a5028d2cabfeeee0907ef5119e7e.png

    创建后端云服务器组。将多个后端云服务器添加到后端云服务器组中后,请求会在后端云服务器间按后端云服务器组的负载均衡算法和后端云服务器的权重来做请求分发。指定session-persistence参数时,只有当type是APP_COOKIE时,才可以设置cookie_name。POST /v2.0/lbaas/pools请求样例1 创建后端云

    创建后端云服务器组。将多个后端云服务器添加到后端云服务器组中后,请求会在后端云服务器间按后端云服务器组的负载均衡算法和后端云服务器的权重来做请求分发。指定session-persistence参数时,只有当type是APP_COOKIE时,才可以设置cookie_name。POST /v2/{project_id}/elb/pools请求

    监听多个mq消息服务器 相关内容

    DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:https://www.rabbitmq.com/getstarted.html。本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端部署在弹性云服务器上。如果RabbitMQ实例开启了S

    DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:https://www.rabbitmq.com/getstarted.html本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端部署在弹性云服务器上。参考创建实例章节创建RabbitM

    监听多个mq消息服务器 更多内容

    e08a3c1d383ce0289aa478984d9adca2.png

    创建实例时开启SSL访问,则数据加密传输,安全性更高。DMS的RabbitMQ实例兼容开源协议,请参考RabbitMQ官网提供的不同语言的连接和使用向导:https://www.rabbitmq.com/getstarted.html。本节以DMS提供的demo为例,介绍VPC内访问与使用RabbitMQ的方法,假设RabbitMQ客户端

    0a0ca88b94963916c79cc35f28e2f501.png

    创建实例时开启SSL访问,则数据加密传输,安全性更高。本节介绍VPC内访问开启SSL的RabbitMQ实例的方法。参考购买实例章节创建RabbitMQ实例,并记录创建时输入的用户名和密码。创建完成后,单击实例名称,查看并记录实例详情中的“连接地址”。已创建弹性云服务器,并且弹性云服务器的VPC、子网、安全组与RabbitMQ实例的VPC、

    a0c42bb47a44c6ed1cd778f97e224009.png

    根据指定ID删除监听器。删除listener之前必须通过删除后端云服务器组删除与其关联的pool或通过更新监听器将监听器的default_pool_id更新为null,并且通过删除转发策略删除与其关联的l7policy。DELETE /v2.0/lbaas/listeners/{listener_id}无无请求样例 删除监听器DELETE

    784dc64e49dbbf1bc7916486d97eab2c.png

    根据指定ID删除监听器。删除listener之前必须通过删除后端云服务器组删除与其关联的pool或通过更新监听器将监听器的default_pool_id更新为null,并且通过删除转发策略删除与其关联的l7policy。DELETE /v2/{project_id}/elb/listeners/{listener_id}无无请求样例 删除

    6fc16b91fddf423fbce11d0989b79e5d.png

    负载均衡器支持两种类型的证书,服务器证书和CA证书。配置HTTPS监听器时,需要为监听器绑定服务器证书,如果开启双向认证功能,还需要绑定CA证书。服务器证书:在使用HTTPS协议时,服务器证书用于SSL握手协商,需提供证书内容和私钥。CA证书: 又称客户端CA公钥证书,用于验证客户端证书的签发者;在开启HTTPS双向认证功能时,只有当客户

    eb51cd3fd20e03ccff8238b899621069.png

    客户端可以连接同个RabbitMQ下多个vhost。vhost(Virtual Hosts)是RabbitMQ的基本特性,每个vhost相当于一个相对独立的RabbitMQ服务器,每个vhost数据目录不同,共用一个进程。性能上,连接多个vhost和单独使用一个vhost差别不大,只是RabbitMQ进程多一些对象,建议使用业务模型实测。

    b139ef593fb8558052cf7d856d8ac3a5.png

    共享型负载均衡下的弹性云服务器,一个Pool可以对应多个弹性云服务器。支持对云服务器进行增加删除。

    e8856eb4a33744e9e363326f14bf7b97.png

    根据后端云服务器组的ID查询后端云服务器组详情。GET /v2.0/lbaas/pools/{pool_id}无请求样例 查询后端云服务器组的详情GET https://{Endpoint}/v2.0/lbaas/pools/5a9a3e9e-d1aa-448e-af37-a70171f2a332响应样例{

    "pool": {

    55a2638139d68369d49b3058cd5d88e8.png

    七层负载均衡HTTP和HTTPS可以通过监控指标项可以查看ELB的平均响应时间,同时可以通过日志查看每一次请求的响应时间。登录控制台,并单击需要查询的负载均衡名称。切换到“监控”页签,并选择正确的七层监听器。查看“7层后端RT平均值”参数,可以得到负载均衡器到后端服务器的平均响应时间。平均响应时间参数名解释7层后端的RT平均值统计监听器当

    b5693ff7a1d6bd5360d83fe2bfafdfb3.png

    根据后端云服务器组的ID查询后端云服务器组详情。GET /v2/{project_id}/elb/pools/{pool_id}无请求样例1 查询后端云服务器组的详情GET https://{Endpoint}/v2/1867112d054b427e808cc6096d8193a1/elb/pools/5a9a3e9e-d1aa-448e

    展开全文
  • IBMMQ监听消息队列

    2021-03-21 08:35:42
    **IBMMQ发送和接收消息示例:pom.xml下载jar包: com.ibm.mqcom.ibm.mq.allclient9.1.0.0org.springframeworkspring-jms${spring.version}public class MessageByMQ {// 定义队列管理器和队列的名称private static ...

    **

    IBMMQ发送和接收消息示例:

    pom.xml下载jar包: com.ibm.mq

    com.ibm.mq.allclient

    9.1.0.0

    org.springframework

    spring-jms

    ${spring.version}

    public class MessageByMQ {

    // 定义队列管理器和队列的名称

    private static String qmName;

    private static String qName;

    private static MQQueueManager qMgr;

    public static void main(String args[]) {

    /* 下面两个方法可同时使用,也可以单独使用 */

    sendMessage("Hello Java MQ!");

    getMessage();

    }

    static {

    /**

    * 设置环境: MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量

    * MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用

    * 因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.

    */

    MQEnvironment.hostname = "127.0.0.1"; // MQ服务器的IP地址

    MQEnvironment.channel = "OUT"; // 服务器连接的通道

    // 服务器MQ服务使用的编码1381代表GBK、1208代表UTF-8

    MQEnvironment.CCSID = 1208;

    MQEnvironment.port = 8080; // MQ 端口

    qmName = "DGE"; // MQ 的队列管理器名称

    qName = "abc"; // MQ 远程队列的名称

    try {

    // 定义并初始化队列管理器对象并连接

    // MQQueueManager 可以被多线程共享,但是从MQ 获取信息的时候是同步的,任何时候只有一个线程可以和MQ 通信。

    qMgr = new MQQueueManager(qmName);

    } catch (MQException e) {

    // TODO Auto-generated catch block

    System.out.println("初使化MQ出错");

    e.printStackTrace();

    }

    }

    /**

    * 往MQ发送消息

    *

    * @param message

    * @return

    */

    public static int sendMessage(String message) {

    int result = 0;

    try {

    // 设置将要连接的队列属性

    // 目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性

    // int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;

    // 以下选项可适合远程队列与本地队列

    int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;

    // 连接队列

    // MQQueue provides inquire, set, put and get operations for WebSphere MQ

    // queues.

    // The inquire and set capabilities are inherited from MQManagedObject.

    /* 关闭了就重新打开 */

    if (qMgr == null || !qMgr.isConnected()) {

    qMgr = new MQQueueManager(qmName);

    }

    MQQueue queue = qMgr.accessQueue(qName, openOptions);

    // 定义一个简单的消息

    MQMessage putMessage = new MQMessage();

    // 将数据放入消息缓冲区

    putMessage.writeUTF(message);

    // 设置写入消息的属性(默认属性)

    MQPutMessageOptions pmo = new MQPutMessageOptions();

    // 将消息写入队列

    queue.put(putMessage, pmo);

    queue.close();

    System.out.println("*******success*********");

    } catch (MQException ex) {

    System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code "

    + ex.reasonCode);

    ex.printStackTrace();

    } catch (IOException ex) {

    System.out.println("An error occurred whilst writing to the message buffer: " + ex);

    } catch (Exception ex) {

    ex.printStackTrace();

    } finally {

    try {

    qMgr.disconnect();

    } catch (MQException e) {

    e.printStackTrace();

    }

    }

    return result;

    }

    /**

    * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息

    * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。

    *

    * @return

    */

    public static String getMessage() {

    String message = null;

    try {

    // 设置将要连接的队列属性

    int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;

    MQMessage retrieve = new MQMessage();

    // 设置取出消息的属性(默认属性)

    // 设置放置消息选项

    MQGetMessageOptions gmo = new MQGetMessageOptions();

    gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;

    // 在同步点控制下获取消息

    gmo.options = gmo.options + MQC.MQGMO_WAIT;

    // 如果在队列上没有消息则等待

    gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;

    // 如果队列管理器停顿则失败

    gmo.waitInterval = 1000; // 设置等待的毫秒时间限制

    /* 关闭了就重新打开 */

    if (qMgr == null || !qMgr.isConnected()) {

    qMgr = new MQQueueManager(qmName);

    }

    MQQueue queue = qMgr.accessQueue(qName, openOptions);

    // 从队列中取出消息

    queue.get(retrieve, gmo);

    message = retrieve.readUTF();

    System.out.println("The message is: " + message);

    queue.close();

    } catch (MQException ex) {

    System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code "

    + ex.reasonCode);

    } catch (IOException ex) {

    System.out.println("An error occurred whilst writing to the message buffer: " + ex);

    } catch (Exception ex) {

    ex.printStackTrace();

    } finally {

    try {

    qMgr.disconnect();

    } catch (MQException e) {

    e.printStackTrace();

    }

    }

    return message;

    }

    }

    IBMMQ消息监听:

    pom.xml下载jms依赖jar包:

    org.springframework

    spring-jms

    ${spring.version}

    spring 配置文件:

    java代码:

    com.csdn.mq.SGMessageListener类:

    public abstract class SGMessageListener implements MessageListener {

    private static final Logger LOG = LoggerFactory.getLogger(SGMessageListener.class);

    private MessageConverter jmsConverter;

    public SGMessageListener() {

    }

    public void setJmsConverter(MessageConverter jmsConverter) {

    this.jmsConverter = jmsConverter;

    }

    public void onMessage(Message message) {

    try {

    //判断message中发送消息的类型,String还是byte[] ,根据类型进行转换

    byte[] fromMessage = (byte[]) jmsConverter.fromMessage(message);

    String xml = new String(fromMessage, "UTF-8");

    processMessage(xml);

    } catch (Throwable e) {

    LOG.error("Throwable {}", e.getMessage());

    }

    }

    public abstract void processMessage(String s);

    }

    com.csdn.mq.MQReceive类:

    public class MQReceive extends SGMessageListener{

    @Override

    public void processMessage(String msg)

    {

    System.out.println(msg);

    }

    }

    JMS只是一套规范和接口,IBM MQ是一种实现这个规范的产品,就像JDBC规范,每种数据库厂商会实现自己的JDBC JAR包,oracle的mysql的sqlserver的等等。

    展开全文
  • 每个人遇到的场景可能不同,我本次遇到的就是队列已存在,我只需要监听即可。话不多说,上码! application.properties server.port=9009 spring.rabbitmq.host=192.168.1.118 spring.rabbitmq.port=5672 spring....

    每个人遇到的场景可能不同,我本次遇到的就是队列已存在,我只需要监听即可。话不多说,上码!

    application.properties

    server.port=9009
    spring.rabbitmq.host=192.168.1.118
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=rabbit_dev
    spring.rabbitmq.password=Dd.1111

    pom.xml

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!--amqp-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.0.3.RELEASE</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-extra -->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-extra</artifactId>
                <version>5.5.6</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    RbMQReceiverHand.java
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ...
     * @version 1.0
     * @date 2020/12/29 18:02
     * @Email ...@163.com
     * @DESC:
     */
    /**
     * 监听接收消息
     */
    @Component
    public class RbMQReceiverHand implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            System.out.println("====接收到One" + message.getMessageProperties().getConsumerQueue() + "队列的消息=====");
            System.out.println(message.getMessageProperties().toString());
            System.out.println(new String(message.getBody()));
        }
    }
    RabbitMQConfig.java
    
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Import;
    
    import javax.annotation.Resource;
    
    /**
     * @author ...
     * @version 1.0
     * @date 2020/12/29 18:05
     * @Email ...@163.com
     * @DESC:
     */
    @Configuration
    @Import(cn.hutool.extra.spring.SpringUtil.class)
    public class RabbitMQConfig {
        @Resource
        RbMQReceiverHand RbMQReceiverHand;
    
        @Bean
        public Queue bw_Cabinet_OnOff() { return new Queue("Cabinet_OnOff",true); }
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("Cabinet_OnOff");
            container.setMessageListener(RbMQReceiverHand);
            return container;
        }
    }
    
    
    RbTestController.java
    package com.bw.Test;
    
    import cn.hutool.extra.spring.SpringUtil;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.web.bind.annotation.*;
    
    /**
     * @author ...
     * @version 1.0
     * @date 2020/12/30 17:59
     * @Email ...@163.com
     * @DESC:
     */
    @RestController
    @RequestMapping("/test")
    public class RbTestController {
        @PostMapping
        public String addQueue(@RequestParam String queueNmae) {
            SimpleMessageListenerContainer container = SpringUtil.getBean("messageListenerContainer");
            container.addQueueNames(queueNmae);
            return "添加" + queueNmae + " ok";
        }
    
        @DeleteMapping
        public String delQueue(@RequestParam String queueNmae) {
            SimpleMessageListenerContainer container = SpringUtil.getBean("messageListenerContainerTwo");
            container.removeQueueNames(queueNmae);
            return "删除 " + queueNmae + " ok";
        }
    }
    

    到此结束,就算完成了。

    我这里用getBean好处是可扩展

    展开全文
  • 我需要编写一个监听WebSphere MQ服务器的Java客户机。消息放入服务器的队列中。Java客户端正在监听WebSphere MQ服务器?我开发了这个代码,但我不确定它是否正确。如果正确,那我该如何测试它?这是一个独立的Java...
  • (其实类中已经判断如果不进行注入就设置一个默认的,但是自己注入的话,方便我们控制) listener-container是Spring提供的一个监听器容器,用于统一控制我们的监听类来接收处理消息。这里面有一些配置,schema有...
  • setMessageListener(MessageListener) 中 MessageListener 的方法 onMessage 需要自己自定义实现,这样就可以实现动态新增监听队列了,如不清楚可以看下我的源码地址 源码地址:...
  • 一、需求介绍后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。二、引入依赖jar包org.springframeworkspring-jms4.3.18.releasejavax.jmsjavax.jms-api...
  • 最近soa项目要和官网系统对接,实现mq信息监听,保存等一些列操做。项目用的是Maven+SSM框架。而后学习和开发用了两天时间,算是搞定,趁加班时间作个总结。对于Maven工程的ssm框架,整合RabbitMq首先就是java1.引入...
  • 背景:消息队列中有非常多的消息需要处理,并且监听器onMessage()方法中的业务逻辑也相对比较复杂,为了加快队列消息的读取、处理速度。可以通过加快读取速度和加快处理速度来考虑。因此从这两个方面都使用多线程来...
  • rabbitmq可以动态的增减监听队列,目前我想到的使用场景是: 当有消息积压,或者预计不久的将来的某一段时间内,会有大量的消息需要消费时,可以增加监听队列,当恢复平常时候就减少监听队列。 先看消费者工程的...
  • IBM MQ消息侦听器

    2021-07-17 00:31:50
    Hi does anyone know how to create a message listener using IBM MQ? I know how to do it using the JMS spec but I am not sure how to do it for IBM MQ. Any links or pointers are greatly appreciated.解决...
  • 白开水<... 2016/7/11 10:27:33 UML中自动执行的用例怎么画,比如,系统自动发送邮件潘加宇...系统监听MQ队列中的消息。如果有消息进来的话,自动通知相关的人员进行处理。另外,潘老师,我想问一下怎么区分用来,还是
  • 前面三篇,第一篇讲了安装IBM MQ时遇到的一些问题。第二篇讲了点对点模式的调用。第三篇讲了发布订阅模式。本篇说一下监听模式。?监听模式只是在消费者端监听就可以了。对于消息发布者,代码不用做改动。改动的代码...
  • 软件方法(下)分析和设计第8章连载[20210518更新]>>白开水<... 2016/7/11 10:27:33UML中自动执行的用例怎么画,比如,系统自动发送邮件潘加宇(3504847) 17:...系统监听MQ队列中的消息。如果有消息进来的话,自动
  • 一、配置mq参数:.yml文件或是相似config配置文件javatest:host:127.0.0.1port:1414username:testpassword:testchannel:TEST_CHANNEL //通道queue:TEST_QUEUE //队列名queue.manager:MANAGER //队列管理器ccsid:1381...
  • 我需要编写一个监听WebSphere MQ Server的Java客户机.消息被放入服务器的队列中.我开发了这段代码,但不确定它是否正确.如果正确,那我该怎么测试呢?这是一个独立的Java项目,没有应用程序服务器支持.我应该把哪些罐放...
  • spring监听器+定时任务

    2021-03-08 21:49:40
    背景:在原SSM项目中,拟定在每晚的23:59:...定时任务由spring的监听器去启动。jdk版本:1.8.0上代码web.xml,添加监听com.test.listener.Listener添加监听类import com.test.timmer.TimmerTest;import javax.servl...
  • IBM MQ监控方法

    2021-10-16 19:42:18
    IBM MQ监控工具已开源,参见: https://github.com/zollty/IBM-MQ-Monitoring-Script 1、检查连接数 1)底层网络连接检测方法 netstat -tnp | grep amqrmppa | wc -l 或 netstat -tn | grep :14 | wc -l ...
  • 介绍rabbitmq默认有7个交换机,其中amq.rabbitmq.log为系统日志的交换机,这个日志为topic类型,会有三个等级的(routing_key)的日志发送到这个交换机上。代码如下#!/usr/bin/env python# -*- coding: utf-8 -*-...
  • xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> 并且监听消息的Java类如下... import javax.jms.MessageListener; public class MyJMSMessageListener implements MessageListener{ @Override public void onMessage(Message message) { // Do your work ...
  • 监听RabbitMQ日志

    2021-01-27 05:41:26
    win下,查看rabbitmq的exchange:.\rabbitmqctl list_exchanges,可以看到有个amq.rabbitmq.log的topic类型的交换器,这个就是mq的日志输出的exchange 1、创建连接、channelConnectionFactory factory = new ...
  • Java调用MQ队列

    2021-03-13 19:27:38
    IBM MQ 6.0中设置两个队列,(远程队列、通道之类都不设置)。队列管理器是XIR_QM_1502队列名称是ESBREQIP地址是10.23.117.134(远程的一台电脑,跟我的电脑不在一个局域网内)端口1414CCSID 1208MQ配置可以参考这个,有...
  • 监控mq队列深度

    2020-12-23 01:33:31
    大规模应用性能监控,工业物联网设备监控 边缘计算,立即前往 最新产品和实时动态重磅发布 ,立即查看","btn1":"立即开通","link2":"https://help.aliyun.com/product/54825.html","title":"时间序列数据库 TSDB"}],...
  • 通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。 二、实现 1、新建一个demo数据库并添加几条数据来进行测试 站点设备信息...
  • 本文主要介绍RocketMQ的多端口监听机制,通过本文,你可以了解到Broker端源码中remotingServer和fastRemotingServer的区别,以及客户端配置中,vipChannelEnabled的作用。1 多端口监听在RocketMQ中,可以通过broker....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,141
精华内容 13,656
关键字:

动态监听mq