精华内容
下载资源
问答
  • 更多相关内容
  • #资源达人分享计划#
  • WebSphereMQ,也称MQSeries,以一致的、可靠的和易于管理的方式来连接应用程序,并为跨部门、企业范围的集成提供了可靠的基础。通过为重要的消息和事务提供可靠的、一次且仅一次的传递,MQ可以处理复杂的通信协议,...
  • 利用Websphere MQ实现大文件交换,断点续传,可以将文件分割多段传输
  • 1.MQ生产集群

    1.MQ生产集群

    前面经过了解RocketMQ的核心架构原理,还有小规模集群的部署和压测,以及最终生产环境的集群部署,全部都搞定了。

    如图,目前已经有一套3台NameServer机器+6台Broker机器的生产集群,而且经过对集群的生产参数都进行了适当优化,足以抗下每秒十多万消息的处理。

    进入下一步,基于MQ对订单系统架构做改造。

    2.订单系统改造方向

    订单系统面临的技术问题包括以下几个环节:

    1. 下单核心流程环节太多,性能较差
    2. 订单退款的流程可能面临退款失败的风险
    3. 关闭过期订单的时候,存在扫描大量订单数据的问题
    4. 跟第三方物流系统耦合在一起,性能存在抖动的风险
    5. 大数据团队要获取订单数据,存在不规范直接查询订单数据库的问题
    6. 做秒杀活动时订单数据库压力过大

    这里优先解决第一个问题,因为下单流程性能差比较明显,且直接影响用户体验。而订单退款失败是小概率问题,可以先通过人工处理。

    关闭过期订单存在大量订单数据扫描的问题在订单数据量不大的情况下并不凸显严重。

    跟第三方物流系统的耦合导致系统性能抖动,也是小概率出现的,并不是经常出现的。

    大数据团队直接查询订单数据库跑报表出来虽然会造成压力,但影响还不大。

    至于秒杀是订单数据库压力过大由于活动不是经常有,即使压力过大,只要将MYSQL部署在高配置物理机上,基本上也能扛住了。

    3.引入MQ实现订单核心流程的异步化改造

    3.1 支付订单的核心流程:

    如上图,每次支付完一个订单后,都需要执行一系列的动作,包括:

    • 更新订单状态
    • 扣减库存
    • 增加积分
    • 发优惠券
    • 发短信
    • 通知发货

    上述的一系列动作会导致整个订单支付的核心链路执行时间过长,可能长达好几秒。

    3.2 MQ优化改造:

    核心业务不变:在用户支付完毕后,只要执行最核心的更新订单状态和扣减库存就可以了,保证速度足够快。

    支付后的动作异步化:诸如增加积分、发送优惠券、发送短信、通知发货的操作,都可以通过MQ实现异步化执行。

    如图,订单系统仅同步执行更新订单状态和扣减库存两个最关键的操作,一旦支付成功,只要保证订单状态变为“已支付”,库存扣减掉,就可以保证核心数据不错乱。

    然后订单系统会发送一个订单支付的消息到RocketMQ中去,然后积分系统会从RocketMQ里获取到消息,然后根据消息去累加积分。

    营销系统会从RocketMQ里获取到消息然后发送优惠券,推送系统会从MQ里获取到信息然后推送短信,仓储系统会从MQ里获取消息然后生产物流单和发货单,去通知仓库管理员打包商品,准备交接给物流公司去发货。

    以上,是改造后的业务执行流程。

    3.3 优化后的效果

    案例:原有架构中,更新订单状态耗费30ms,调用库存服务的接口扣减库存耗费80ms,增加积分需要耗费50ms,派发优惠券耗费60ms,发送短信耗费100ms(最高可能1s),通知发货耗费500ms(与第三方交互最高耗时可达1秒+)。总计每次订单核心链路的执行需要接近1秒钟,甚至更长时间。

    MQ优化后,耗时为更新订单状态(30ms) +扣减库存(80ms)+发送订单消息到RocketMQ(10ms) ,一共120ms就可以了。

    说明:这里不再出现一个圆圈不停的选择提醒用户等待后台检查订单是否支付成功的界面了,而是一旦支付成功就退回到App界面,在用户反映过来之前,就显示给用户订单支付成功的界面。

    这里可以考虑一点:如果支付返回结果太慢怎么办?其实这种情况发生概率很低,sucess基本都是秒回,如果因为网络延迟原因导致结果回调太慢,也可以直接跳到订单模块主页,由前端处理。

    4.需要落地的实现

    一个是订单系统自身的改造,他需要去除掉调用积分系统、营销系统、推送系统以及仓储系统的逻辑,而改成发送一个订单支付消息到RocketMQ里去。

    另外一个是积分系统、营销系统、推送系统以及仓储系统的改造,需要从RocketMQ里获取消息,然后根据消息执行自己的业务逻辑。

    5.在订单系统中如何发送消息到RocketMQ?

    想要发送消息到RocketMQ,首先要在项目里引入相关依赖:

                    <dependency>
    			<groupId>org.apache.rocketmq</groupId>
    			<artifactId>rocketmq-client</artifactId>
    			<version>4.3.0</version>
    		</dependency>

    编写RockeMQ生产者的类

    /**
     * @ClassName RocketMQProducer
     * @Description TODO
     * @Author wushaopei
     * @Date 2021/6/21 11:20
     * @Version 1.0
     */
    public class RocketMQProducer {
    
        // 生产者类
        private static DefaultMQProducer producer;
    
        public RocketMQProducer(){
    
        }
    
        /**
         * @Description TODO 实例化生产者
         */
        @PostConstruct
        public void defaultMQProducer(){
    
            if(this.producer == null){
                // 实例化消息生产者Producer
                this.producer = new DefaultMQProducer("order_producer_group");
                // 这个是为Producer设置NameServer的地址,让他可以拉取路由信息
                // 这样才知道每个Topic的数据分散在哪些Broker机器上
                // 然后才可以把消息发送到Broker上去
                this.producer.setNamesrvAddr("localhost:9876");
            }
            try {
                // 这里启动一个Producer
                this.producer.start();
                System.out.println("--------producer start--------");
            }catch (MQClientException e){
                e.printStackTrace();
            }
        }
    
        /**
         * @Description TODO 主方法:生产者发送消息用的
         * @param topic
         * @param message
         * @return
         */
        public static void send(String topic,  String message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
            // 这里是构建一条消息对象
            Message msg = new Message(
                    topic,      // 这是指定发送消息到哪个topic上去
                    "",
                    message.getBytes(RemotingHelper.DEFAULT_CHARSET));    // 这是消息
    
            // 利用Producer发送消息并接收返回结果,可以不接收
            SendResult result = producer.send(msg);
            System.out.println(result.getMsgId());
            System.out.println(result.getSendStatus());
        }
    }
    

    通过上述代码就可以让订单系统把订单消息发送到RocketMQ的一个Topic里去了。

    6.订单消息会进入哪个Broker里去呢?

    这里要明确一点,MQ集群中,Master Broker有两台,此时生产者的消息会进入到哪个Broker里去呢?

    Topic的数据是分布式存储在多个Master Broker中的。此时有“TopicOrderPaySuccess” 这个Topic,那么它的数据会分散在两个Broker中。

    当生产者发送一个订单消息过去的时候,会根据一定的负载均衡算法和容错算法把消息发送到一个Broker中去。

    7.消费者从RocketMQ中获取订单消息

    消费者代码如下:

    package com.rocketmq.payOrder;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @ClassName RocketMQConsumer
     * @Description TODO
     * @Author wushaopei
     * @Date 2021/6/21 12:41
     * @Version 1.0
     */
    public class RocketMQConsumer {
    
        public static void start(){
            new Thread(){
    
                public void run(){
                    try {
                        // 这是RocketMQ消费者实例对象呢
                        // "credit_group"之类的就是消费者分组
                        // 一般来说比如积分系统就用“credis_consumer_group”
                        // 比如营销系统就用“marketing_consumer_group”
                        // 以此类推,不同的系统给自己取不同的消费组名字
                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("credit_group");
    
                        // 这是给消费者设置NameServer的地址
                        // 这样就可以拉取到路由信息,知道Topic的数据在哪些broker上
                        // 然后可以从对应的broker上拉取数据
                        consumer.setNamesrvAddr("localhost:9876");
    
                        // 选择订阅“TopicOrderPaySuccess”的消息
                        // 这样会从这个Topic的broker机器上拉取订单消息过来
                        // 消费者订阅的主题,topic代表主题名字、* 代表所有消息
                        consumer.subscribe("TopicOrderPaySuccess","*");
    
                        // 注册监听器来处理拉取到的订单消息
                        // 如果consumer拉取到了订单消息,就会回调这个方法交给你处理
                        consumer.registerMessageListener(new MessageListenerConcurrently() {
                            @Override
                            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                                // 在这里对获取到的msgs订单进行处理
                                // 比如增加积分、发送优惠券、通知发货,等等
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        });
                        // 启动消费者实例
                        consumer.start();
                        System.out.println("Consumer Started.%n");
                        while (true){  // 别让线程退出,就让创建好的consumer不停消费数据
                            Thread.sleep(1000);
                        }
                    } catch (MQClientException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
    

    通过上述代码,积分系统、营销系统、推送系统、仓储系统,就可以从RocketMQ里消费“TopicOrderPaySuccess” 中的订单消息,然后根据订单消息执行增加积分、发送优惠券、发送短信、通知发货之类的业务逻辑了。

    8.第三方系统对接解耦

    (1)第三方系统

    在订单核心流程中,订单系统间接耦合了两个第三方系统,分别是:第三方短信系统、第三方物流系统。

    • 第三方短信系统,是用来推送短信给用户的;
    • 第三方物流系统,用来生成物流单通知物流公司来收货和配送的。

    同步调用流程: 订单系统会同步调用推送系统,然后推送系统调用第三方短信系统去发送短信给用户,接着订单系统会同步调用仓储系统,然后仓储系统调用第三方物流系统去生成物流单以及通知发货。

    (2)第三方系统性能抖动的影响

    订单系统是间接性的跟第三方短信系统和第三方物流系统耦合在一起的,这样的话,一旦第三方系统出现了性能抖动就会影响到订单系统的性能。

    场景:比如正常第三方短信系统发送一个短信,只需要100ms,结果某一天突然性能下降变成发送短信需要1s了,此时会连带导致订单系统的性能也急剧下降。

    (3)第三方系统性能抖动解耦

    由于订单系统已经跟仓储系统和推送系统用MQ实现一步了。也就只是仓储系统自己跟第三方物流系统耦合,推送系统自己跟第三方短信系统耦合而已。

    此时,即使第三方系统出现了严重的性能抖动,甚至是接口故障无法访问,也给跟订单系统没有关系。最多就是导致仓储系统调用第三方物流系统的接口时会出现短暂性的速度较慢的问题罢了。

    展开全文
  • MQ实现分布式事务

    2019-01-04 17:21:51
    分布式事务不能保证强一致性,只能保证最终一致性,我使用mq实现不同系统之间的通讯,但是mq也会出现错误的时候,还有不方便保存mq消息,所以再做一个消息服务来储存、定时发送消息,失败消费消息时进行重发消息,...

    目录

    detail

    项目地址:

    总结


    分布式事务不像单体应用一样使用@transactional进行事务管理即可

    分布式事务不能保证强一致性,只能保证最终一致性,我使用mq来实现不同系统之间的通讯,但是mq也会出现错误的时候,还有不方便保存mq消息,所以再做一个消息服务来储存、定时发送消息,失败消费消息时进行重发消息,记录死亡消息等等。

    这个项目让a系统减少一,b系统也减少一,activitymq作为通讯

    a,b系统基本的操作数据库,不做解释,下面讲下消息服务:

    detail

    sql:

    CREATE TABLE `message`  (
      `id` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `message` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消息内容\r\n',
      `sendcount` bigint(20) NULL DEFAULT NULL COMMENT '重复发送消息次数',
      `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '队列名称',
      `sendsystem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '发送消息的系统',
      `status` int(255) NULL DEFAULT NULL COMMENT '状态:0等待消费 1已消费 2 已经死亡',
      `customerdate` timestamp(0) NULL DEFAULT NULL COMMENT '消费时间',
      `customersystem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消费消息的系统',
      `cdate` timestamp(0) NULL DEFAULT NULL COMMENT '创建时间',
      `diecount` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '死亡次数',
      `diedate` datetime(0) NULL DEFAULT NULL COMMENT '死亡时间',
      PRIMARY KEY (`id`) USING BTREE,
      INDEX `id`(`id`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
    
    SET FOREIGN_KEY_CHECKS = 1;

    service类

    package com.example.demo.service;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.core.PreparedStatementSetter;
    import org.springframework.stereotype.Service;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.List;
    
    /**
     * @program: demoA
     * @description
     * @author: dajitui
     * @create: 2019-01-03 02:20
     **/
    @Service
    public class messageService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public List getMessage(){
            return jdbcTemplate.queryForList("SELECT * FROM message WHERE STATUS = 0 AND sendcount<=10");
        }
    
        public int updateMessage(String id){
            //SQL+结果`
            int resRow = jdbcTemplate.update("UPDATE message SET sendcount=sendcount+1 WHERE id=?", new PreparedStatementSetter() {
                //映射
                // 数据
                @Override
                public void setValues(PreparedStatement ps) throws SQLException {
                    ps.setString(1, id);
                }
            });
            //返回结果
            return resRow;
        }
    
        public int updateMessageStatusAndDate(){
            //SQL+结果`
            int resRow = jdbcTemplate.update("UPDATE message SET status = 2 , diedate = NOW() where sendcount=10 ", new PreparedStatementSetter() {
                //映射
                // 数据
                @Override
                public void setValues(PreparedStatement ps) throws SQLException {
                    //ps.setString(1, id);
                }
            });
            //返回结果
            return resRow;
        }
    
        public int updateMessageCustomerDate(String id){
            //SQL+结果`
            int resRow = jdbcTemplate.update("UPDATE message SET customerdate=NOW() , STATUS=1 WHERE id = ?", new PreparedStatementSetter() {
                //映射
                // 数据
                @Override
                public void setValues(PreparedStatement ps) throws SQLException {
                    ps.setString(1, id);
                }
            });
            //返回结果
            return resRow;
        }
    }
    

    Controller,为了使用fegin进行访问

    package com.example.demo.controller;
    
    import com.example.demo.service.messageService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.core.PreparedStatementSetter;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * @program: demoA
     * @description
     * @author: dajitui
     * @create: 2018-12-31 17:04
     **/
    @RestController
    public class bController {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        @Autowired
        private messageService messageService;
    
        @RequestMapping(value = "/message")
        public int b(@RequestParam(value = "message")String message,@RequestParam(value = "queue")String queue,@RequestParam(value = "sendsystem")String sendsystem,@RequestParam(value = "customersystem")String customersystem){
    
                String id= UUID.randomUUID().toString();
    
                String sendcount="0";
    
                //等待消费
                String status="0";
    
                //创建时间
            //创建时间
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            String date=format.format(new Date());
    
                String diecount="0";
    
                //SQL+结果
                int resRow = jdbcTemplate.update("INSERT INTO `message`(`id`, `message`, `sendcount`, `queue`, `sendsystem`, `status`, `customersystem`, `cdate`, `diecount`) VALUES (?, ?, ?, ?, ?, ?, ?, ?,?);", new PreparedStatementSetter() {
                    //映射
                    // 数据
                    @Override
                    public void setValues(PreparedStatement ps) throws SQLException {
                        ps.setString(1, id);
                        ps.setString(2,message);
                        ps.setString(3,sendcount);
                        ps.setString(4,queue);
                        ps.setString(5,sendsystem);
                        ps.setString(6,status);
                        ps.setString(7,customersystem);
                        ps.setString(8, date);
                        ps.setString(9,diecount);
    
                    }
                });
                //返回结果
                return resRow;
    
        }
    
        @RequestMapping(value = "/message/update")
        public int updatemessage(@RequestParam(value = "id")String id){
            return messageService.updateMessage(id);
        }
    
        @RequestMapping(value = "/message/custom")
        public int updatemessagedate(@RequestParam(value = "id")String id){
            return messageService.updateMessageCustomerDate(id);
        }
    }
    

    定时任务进行查看未消费且重复次数不超过10的消息,还有如果超过10次则设置死亡时间,消息状态

    package com.example.demo.task;
    
    
    import com.example.demo.entity.message;
    import com.example.demo.mq.JMSProducer;
    import com.example.demo.service.messageService;
    import com.netflix.discovery.converters.Auto;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Component
    public class ScheduledTasks {
    
        @Autowired
        private messageService messageService;
    
        @Autowired
        private JMSProducer jmsProducer;
    
        private static SimpleDateFormat dateFormat=new SimpleDateFormat("yyyy-MMM-ddd HH:mm:ss");
    
    
        /**
         * 定时查看未消费且重发次数少于10次的消息,并发送到mq
         */
        @Scheduled(fixedRate = 60000,initialDelay = 100)
        public void a(){
            System.out.println("现在时间:"+dateFormat.format(new Date())+"开始查找未消费且重新发送次数不超过10次的消息......");
            List<Map<String,Object>> list=messageService.getMessage();
            if(list.size()!=0) {
                for (Map map : list) {
                    String queue = map.get("queue").toString();
                    String message = map.get("message").toString();
                    String id = map.get("id").toString();
                    Map me = new HashMap();
                    me.put("message", "\"" + message + "\"");
                    me.put("id", "\"" + id + "\"");
    
                    try {
                        Destination destination = new ActiveMQQueue(queue);
                        jmsProducer.sendMessage(destination, me.toString());
                        int result = messageService.updateMessage(id);
                        if (result == 0) {
                            System.out.println("更新message发送次数失败!");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
        /**
         * 定时查看未消费且重发次数已经等于10次的消息,修改消息状态以及死亡时间
         */
        @Scheduled(fixedRate = 60000,initialDelay = 5000)
        public void b(){
            System.out.println("现在时间:"+dateFormat.format(new Date())+"开始查找并更新已死亡的消息......");
            messageService.updateMessageStatusAndDate();
        }
    }
    
    

    在启动类加上

    @EnableScheduling

    将获取的消息队列,通过mq进行发送

    application.properties

    spring.activemq.broker-url=tcp://127.0.0.1:61616
    spring.activemq.user=admin
    spring.activemq.password=admin
    spring.activemq.pool.enabled=true
    spring.activemq.pool.max-connections=50
    spring.activemq.pool.expiry-timeout=10000
    spring.activemq.pool.idle-timeout=30000
    

    生产消息

    package com.example.demo.mq;
    
    import com.example.demo.service.messageService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    
    
    /**
     * @program: demoA
     * @description
     * @author: dajitui
     * @create: 2019-01-03 14:17
     **/
    @Component
    public class JMSProducer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private messageService messageService;
    
        public void sendMessage(Destination destination, String message) {
            this.jmsTemplate.convertAndSend(destination,message);
        }
    }
    

    消费消息

    package com.example.demo.mq;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @program: demoA
     * @description
     * @author: dajitui
     * @create: 2019-01-03 14:18
     **/
    @Component
    public class JMSConsumer {
        private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
    
        @JmsListener(destination = "springboot.queue.test")
        public void receiveQueue(String msg) {
            logger.info("接收到消息:{}",msg);
        }
    }

    项目地址:

    https://github.com/dajitui/MQ-

     

    如果有多个消息服务并行进行,需要使用分布式锁进行控制https://blog.csdn.net/weixin_38336658/article/details/85786818

     

    总结

    像现在很多公司比较使用分布式事务框架去实现,应该维护开发代价都很大。像mq可以满足很多场景,主要实现:重试(补偿机制),幂等性处理!!!

    TCC框架:ByteTcc,Hmilu

    XA:LCN一般很少人用,缺陷很多

     

    展开全文
  • 本文实例说明如何使用IBM WebSphere MQ实现两个应用系统之间通信。两个应用系统都必须安装MQ服务器,只有双方都安装了MQ同时也创建队列管理器,而且双方都启动MQ的队列管理器才可以实现通
  • 主要介绍了Spring Boot基于Active MQ实现整合JMS,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 基于mq和redis实现的秒杀系统基于mq和redis实现的秒杀系统
  • 如何利用MQ实现大文件传输和交换具有可靠、安全、断点传输等特点(java 实现概述)
  • 本地事务表+普通mq实现最终一致性

    千次阅读 2021-12-15 19:06:04
    主要流程步骤 1,本地业务数据处理 2.1本地消息数据处理(1+2.1需要在同一个事务),要么成功,要么失败 2.2 发送消息给mq 3,消息投递到Consumer消费端 4,消费端服务处理业务数据 5,业务端业务数据消费成功,反馈给...

    主要流程步骤

    1. 1,本地业务数据处理
    2. 2.1本地消息数据处理(1+2.1需要在同一个事务),要么成功,要么失败
    3. 2.2 发送消息给mq
    4. 3,消息投递到Consumer消费端
    5. 4,消费端服务处理业务数据
    6. 5,业务端业务数据消费成功,反馈给comsumer
    7. 6,comsumer返回ack非消息中间件MQ
    8. 7,发送ack消息,更新本地消息表,表示已经完成或者消费成功,修改消息状态
    9. 8,恢复系统定时轮询消息表,找出没有消费成功ack的消息
    10. 9,重新发送没有消费成功的消息

    bad case

    1. 如果1,2.1出错,本地事务回滚,业务数据,消息都没有变化,啥都没做
    2. 如果2.2 发送失败,定时任务系统会查询消息表,依据状态重新发送
    3. 3-7步骤出错,也没法ack本地消息表,都会重新发送消息
    4. 既然会重新发送消息,那消费端就要考虑重复消息的问题,也就是幂等,尤其是在7.1/7.2确认过程中失败了,一旦重复发送,如果不幂等,业务逻辑就会重复执行,如果是减库存或者支付场景,就会出现多加或者多减数据,出现不一致

    展开全文
  • 基于MQ实现实时监控系统(一) 最近项目开发一个实时监控的系统。该项目的实现主要是基于mq,实时消费对应的最新数据,对一些指标进行监控。对应的监控规则下,如果产生对应告警,就会发送通知给用户,同时实现一些...
  • 传统的事务是基于项目耦合并且是单数据库的本地事务,简单的来说,分布式事务就是实现跨服务器和数据库的事务支持 CAP 定理,又被叫作布鲁尔定理。对于设计分布式系统(不仅仅是分布式事务),CAP 就是你的入门理论。 ...
  • 基于Redis实现下单时精准扣减库存 秒杀活动中,大量请求进入订单系统调用库存系统的接口,然后直接访问库存数据库去扣减,那么势必导致瞬时压力过大,可能让库存系统的压力很大。 解决方案,将每个秒杀商品的库存...
  • spring+mq实现异步处理

    千次阅读 2018-12-15 12:49:47
    业务逻辑是这样的,有一个功能(比如用户注册)运行十分缓慢,但是实际需要发起请求之后马上提示“处理成功”而不关心是否真的处理成功,这种情况就可以交给MQ来处理。 本文章不再介绍springMVC的搭建与activeMq的...
  • NETTY+ACTIVITYMQ,NETTY+ACTIVITYMQ,NETTY+ACTIVITYMQ 基于spring集成netty的实现
  • 阿里部分业务是用Mq实现了最终一致性,也有一部分业务用了tcc事务,但是tcc事务用的比较少,因为会侵染业务,开发成本比较高,如果体量不大的话直接用jta或mq支持事务就好,其实在分布式事务这一块还有一种最大努力...
  • 服务之间的传递常用的调用就是直接调用(RPC框架)和消息MQ推送两种,但是都有一个缺点,下游消息接收方无法控制到达自己的流量,如果调用方不限速,很有可能把下游压垮。 举个例子,秒杀业务: 上游发起下单操作 ...
  • Spring log4j mq实现日志记录

    千次阅读 2018-04-12 16:23:56
    Spring log4j mq实现日志记录,对于一般系统还是很实用的,简单方便。记得之前做过一个项目是解析的log日志文件,今天突发奇想log4j能不能将信息推送到mq于是百度查阅资料写了这个例子。1、导入对应jar&lt;!-- ...
  • NULL 博文链接:https://bijian1013.iteye.com/blog/2317339
  • Websphere MQ实现应用程序通信
  • MQ实现事务消息

    2021-10-14 17:14:31
    下单成功之后送积分的操作,我们使用 mq实现 下单成功之后,投递一条消息到 mq,积分系统消费消息,给用户增加积分 我们主要讨论一下,下单及投递消息到 mq 的操作,如何实现?每种方式优缺点? 2.2、...
  • mq实现分布式事务-补偿事务一致性CAP原则Rocket mq实现思路Rabbit mq实现思路需要考虑的问题后记 严格的来说,消息中间件并不能实现分布式事务,而是通过事后补偿机制,达到和分布式事务一样的数据一致性。这里主要...
  • mq简单实现

    2020-08-16 14:44:52
    mq简单实现(为网络简单版) public class MqServer { public static void main(String[] args) throws InterruptedException { //定义消息队列容器 LinkedBlockingDeque<String> blockingDeque = new ...
  • JMS IBM MQ 订阅模式

    2018-08-04 11:42:15
    JMS 操作IBM MQ实现订阅发布模式!包含完整的JAR包以及测试文件。
  • 3种使用MQ实现分布式事务的方式

    千次阅读 2019-09-17 00:22:13
    撸了今年阿里、头条和美团的面试,我有一个重要发现.......>>> 1.保证消息传递与一致性 1.1生产者确保消息自主性 ...消息标记为持久化,MQ将会利用保存并转发机制,来履行它与发送者之间的契...
  • rabbit Mq 实现定向消费,设置Ip白名单: 初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种...
  • JAVA实现MQ发送接收消息详解 MQ配置文档 MQ配置
  • IBM WEBSPHERE MQ实现本地队列消息传送接收。websphere资源管理器中队列,通道等的创建过程,以及java代码实现数据的传输

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 103,894
精华内容 41,557
关键字:

mq如何实现

友情链接: 103_9.ZIP