精华内容
下载资源
问答
  • 事务补偿机制
    2021-06-15 17:22:47

    事务补偿机制

    • 什么是事务补偿机制?
    • 针对每个操作,都要注册一个与其对应的补偿(撤销)操作
    • 在执行失败时,调用补偿操作,撤销之前的操作
    • A给B转账的例子,A和B在两家不同的银行
    • A账户减200元,B账户加200元
    • 两个操作要保证原子性,要么全成功、要么全失败
    • 由于A和B在两家不同的银行,所以存在分布式事务的问题
    • 转账接口需要提供补偿机制
    • 如果A在扣减的过程出现问题,直接抛出异常,事务回滚
    • B在增加余额的过程中,出现问题,要调用A的补偿接口
    • A之前的扣减操作,得到了补偿,进行了撤销
    • 保证了A和B的账是没有问题的

    在这里插入图片描述

    • 优点:逻辑清晰、流程简单
    • 缺点:数据一致性比XA还要差,可能出错的点比较多
    • TCC属于应用层的一种补偿方式,程序员需要写大量代码
    更多相关内容
  • Java中分布式事务补偿机制,当A服务调用B服务失败时,使用该异步注解则,会把失败调用数据保存到数据库中,进行重试,从而保证B服务调用成功,即使调用不成功,也可以拿到报错信息,留下对应的调用记录,代码如下: ...

    Java中分布式事务补偿机制,当A服务调用B服务失败时,使用该异步注解则,会把失败调用数据保存到数据库中,进行重试,从而保证B服务调用成功,即使调用不成功,也可以拿到报错信息,留下对应的调用记录,代码如下:

    annotation:
    package com.lx.annotation;
    
    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Inherited;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     *
     *类描述:异步任务注解
     *
     *@Author:liuweiping
     *@Version:1.0.0
     */
    @Target({ ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    @Documented
    public @interface AsyncTask {
    
    	/**
    	 *  功能描述
    	 */
    	String value() default "";
    	
    	/**
    	 * 
    	 * 延迟执行,当需要当前事务提交后才执行可设置
    	 */
    	long delay() default 3000;
    	
    	
    	/**
    	 * 过滤重复参数任务
    	 */
    	boolean filterRepeatCall() default true;
    
    }
    

    entiry:

    package com.lx.task.entity;
    
    
    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableId;
    import com.baomidou.mybatisplus.annotation.TableName;
    import com.lx.conmon.BaseEntity;
    
    import java.util.Date;
    
    /**
     * 
     *类描述:异步任务表
     *
     *@Author:liuweiping
     *@date:2021年07月09日
     *@Version:1.0.0
     */
    @TableName("t_async_task")
    public class AsyncTaskEntity extends BaseEntity {
    
    	private static final long serialVersionUID = 1L;
    	/**
    	 * ID
    	 */
        @TableId(type=IdType.AUTO)
        private Integer id;
    		    
        /**
    	 * 仓库编码
    	 */
        private String locno;
    		    
        /**
    	 * 任务名称
    	 */
        private String name;
    		    
        /**
    	 * 业务ID
    	 */
        private String sid;
    		    
        /**
    	 * 类
    	 */
        private String execBean;
    		    
        /**
    	 * 方法
    	 */
        private String execMethod;
    		    
        /**
    	 * 参数
    	 */
        private String execParams;
    		    
        /**
    	 * 执行结果
    	 */
        private String execResult;
    		    
        /**
    	 * 执行签名MD5(类,方法,参数)
    	 */
        private String execSign;
    		    
        /**
    	 * 执行时间
    	 */
        private Integer execTime;
    		    
        /**
    	 * 执行次数
    	 */
        private Integer tryCount;
    		    
        /**
    	 * 是否执行成功(0:否,1:是)
    	 */
        private Integer successFlag;
    		    
        /**
    	 * 首次执行时间
    	 */
        private Date firstTime;
    		    
        /**
    	 * 最后执行时间
    	 */
        private Date lastTime;
    		    
        /**
    	 * 下次重试时间
    	 */
        private Date nextTime;
    		    
        /**
    	 * 备注
    	 */
        private String remarks;
    									    	
    	/**
    	 * 获取:ID
    	 */
    	public Integer getId() {
    		return id;
    	}
    	
    	/**
    	 * 设置:ID
    	 */
    	public void setId(Integer id) {
    		this.id = id;
    	}
    	    	
    	/**
    	 * 获取:仓库编码
    	 */
    	public String getLocno() {
    		return locno;
    	}
    	
    	/**
    	 * 设置:仓库编码
    	 */
    	public void setLocno(String locno) {
    		this.locno = locno;
    	}
    	    	
    	/**
    	 * 获取:任务名称
    	 */
    	public String getName() {
    		return name;
    	}
    	
    	/**
    	 * 设置:任务名称
    	 */
    	public void setName(String name) {
    		this.name = name;
    	}
    	    	
    	/**
    	 * 获取:业务ID
    	 */
    	public String getSid() {
    		return sid;
    	}
    	
    	/**
    	 * 设置:业务ID
    	 */
    	public void setSid(String sid) {
    		this.sid = sid;
    	}
    	    	
    	/**
    	 * 获取:类
    	 */
    	public String getExecBean() {
    		return execBean;
    	}
    	
    	/**
    	 * 设置:类
    	 */
    	public void setExecBean(String execBean) {
    		this.execBean = execBean;
    	}
    	    	
    	/**
    	 * 获取:方法
    	 */
    	public String getExecMethod() {
    		return execMethod;
    	}
    	
    	/**
    	 * 设置:方法
    	 */
    	public void setExecMethod(String execMethod) {
    		this.execMethod = execMethod;
    	}
    	    	
    	/**
    	 * 获取:参数
    	 */
    	public String getExecParams() {
    		return execParams;
    	}
    	
    	/**
    	 * 设置:参数
    	 */
    	public void setExecParams(String execParams) {
    		this.execParams = execParams;
    	}
    	    	
    	/**
    	 * 获取:执行结果
    	 */
    	public String getExecResult() {
    		return execResult;
    	}
    	
    	/**
    	 * 设置:执行结果
    	 */
    	public void setExecResult(String execResult) {
    		this.execResult = execResult;
    	}
    	    	
    	/**
    	 * 获取:执行签名MD5(类,方法,参数)
    	 */
    	public String getExecSign() {
    		return execSign;
    	}
    	
    	/**
    	 * 设置:执行签名MD5(类,方法,参数)
    	 */
    	public void setExecSign(String execSign) {
    		this.execSign = execSign;
    	}
    	    	
    	/**
    	 * 获取:执行时间
    	 */
    	public Integer getExecTime() {
    		return execTime;
    	}
    	
    	/**
    	 * 设置:执行时间
    	 */
    	public void setExecTime(Integer execTime) {
    		this.execTime = execTime;
    	}
    	    	
    	/**
    	 * 获取:执行次数
    	 */
    	public Integer getTryCount() {
    		return tryCount;
    	}
    	
    	/**
    	 * 设置:执行次数
    	 */
    	public void setTryCount(Integer tryCount) {
    		this.tryCount = tryCount;
    	}
    	    	
    	/**
    	 * 获取:是否执行成功(0:否,1:是)
    	 */
    	public Integer getSuccessFlag() {
    		return successFlag;
    	}
    	
    	/**
    	 * 设置:是否执行成功(0:否,1:是)
    	 */
    	public void setSuccessFlag(Integer successFlag) {
    		this.successFlag = successFlag;
    	}
    	    	
    	/**
    	 * 获取:首次执行时间
    	 */
    	public Date getFirstTime() {
    		return firstTime;
    	}
    	
    	/**
    	 * 设置:首次执行时间
    	 */
    	public void setFirstTime(Date firstTime) {
    		this.firstTime = firstTime;
    	}
    	    	
    	/**
    	 * 获取:最后执行时间
    	 */
    	public Date getLastTime() {
    		return lastTime;
    	}
    	
    	/**
    	 * 设置:最后执行时间
    	 */
    	public void setLastTime(Date lastTime) {
    		this.lastTime = lastTime;
    	}
    	    	
    	/**
    	 * 获取:下次重试时间
    	 */
    	public Date getNextTime() {
    		return nextTime;
    	}
    	
    	/**
    	 * 设置:下次重试时间
    	 */
    	public void setNextTime(Date nextTime) {
    		this.nextTime = nextTime;
    	}
    	    	
    	/**
    	 * 获取:备注
    	 */
    	public String getRemarks() {
    		return remarks;
    	}
    	
    	/**
    	 * 设置:备注
    	 */
    	public void setRemarks(String remarks) {
    		this.remarks = remarks;
    	}
    	                                
    }
    
    package com.lx.task.entity;
    
    import java.util.Date;
    
    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableId;
    import com.baomidou.mybatisplus.annotation.TableName;
    import com.lx.conmon.BaseEntity;
    
    /**
     * 
     *类描述:异步任务历史表
     *
     *@Author:liuweiping
     *@date:2021年08月12日
     *@Version:1.0.0
     */
    @TableName("t_async_task_history")
    public class AsyncTaskHistoryEntity extends BaseEntity {
    
    	private static final long serialVersionUID = 1L;
    	/**
    	 * ID
    	 */
        @TableId(type=IdType.AUTO)
        private Integer id;
    		    
        /**
    	 * 仓库编码
    	 */
        private String locno;
    		    
        /**
    	 * 任务名称
    	 */
        private String name;
    		    
        /**
    	 * 业务ID
    	 */
        private String sid;
    		    
        /**
    	 * 类
    	 */
        private String execBean;
    		    
        /**
    	 * 方法
    	 */
        private String execMethod;
    		    
        /**
    	 * 参数
    	 */
        private String execParams;
    		    
        /**
    	 * 执行结果
    	 */
        private String execResult;
    		    
        /**
    	 * 执行签名MD5(类,方法,参数)
    	 */
        private String execSign;
    		    
        /**
    	 * 执行时间
    	 */
        private Integer execTime;
    		    
        /**
    	 * 执行次数
    	 */
        private Integer tryCount;
    		    
        /**
    	 * 是否执行成功(0:否,1:是)
    	 */
        private Integer successFlag;
    		    
        /**
    	 * 首次执行时间
    	 */
        private Date firstTime;
    		    
        /**
    	 * 最后执行时间
    	 */
        private Date lastTime;
    		    
        /**
    	 * 下次重试时间
    	 */
        private Date nextTime;
    		    
        /**
    	 * 备注
    	 */
        private String remarks;
    									    	
    	/**
    	 * 获取:ID
    	 */
    	public Integer getId() {
    		return id;
    	}
    	
    	/**
    	 * 设置:ID
    	 */
    	public void setId(Integer id) {
    		this.id = id;
    	}
    	    	
    	/**
    	 * 获取:仓库编码
    	 */
    	public String getLocno() {
    		return locno;
    	}
    	
    	/**
    	 * 设置:仓库编码
    	 */
    	public void setLocno(String locno) {
    		this.locno = locno;
    	}
    	    	
    	/**
    	 * 获取:任务名称
    	 */
    	public String getName() {
    		return name;
    	}
    	
    	/**
    	 * 设置:任务名称
    	 */
    	public void setName(String name) {
    		this.name = name;
    	}
    	    	
    	/**
    	 * 获取:业务ID
    	 */
    	public String getSid() {
    		return sid;
    	}
    	
    	/**
    	 * 设置:业务ID
    	 */
    	public void setSid(String sid) {
    		this.sid = sid;
    	}
    	    	
    	/**
    	 * 获取:类
    	 */
    	public String getExecBean() {
    		return execBean;
    	}
    	
    	/**
    	 * 设置:类
    	 */
    	public void setExecBean(String execBean) {
    		this.execBean = execBean;
    	}
    	    	
    	/**
    	 * 获取:方法
    	 */
    	public String getExecMethod() {
    		return execMethod;
    	}
    	
    	/**
    	 * 设置:方法
    	 */
    	public void setExecMethod(String execMethod) {
    		this.execMethod = execMethod;
    	}
    	    	
    	/**
    	 * 获取:参数
    	 */
    	public String getExecParams() {
    		return execParams;
    	}
    	
    	/**
    	 * 设置:参数
    	 */
    	public void setExecParams(String execParams) {
    		this.execParams = execParams;
    	}
    	    	
    	/**
    	 * 获取:执行结果
    	 */
    	public String getExecResult() {
    		return execResult;
    	}
    	
    	/**
    	 * 设置:执行结果
    	 */
    	public void setExecResult(String execResult) {
    		this.execResult = execResult;
    	}
    	    	
    	/**
    	 * 获取:执行签名MD5(类,方法,参数)
    	 */
    	public String getExecSign() {
    		return execSign;
    	}
    	
    	/**
    	 * 设置:执行签名MD5(类,方法,参数)
    	 */
    	public void setExecSign(String execSign) {
    		this.execSign = execSign;
    	}
    	    	
    	/**
    	 * 获取:执行时间
    	 */
    	public Integer getExecTime() {
    		return execTime;
    	}
    	
    	/**
    	 * 设置:执行时间
    	 */
    	public void setExecTime(Integer execTime) {
    		this.execTime = execTime;
    	}
    	    	
    	/**
    	 * 获取:执行次数
    	 */
    	public Integer getTryCount() {
    		return tryCount;
    	}
    	
    	/**
    	 * 设置:执行次数
    	 */
    	public void setTryCount(Integer tryCount) {
    		this.tryCount = tryCount;
    	}
    	    	
    	/**
    	 * 获取:是否执行成功(0:否,1:是)
    	 */
    	public Integer getSuccessFlag() {
    		return successFlag;
    	}
    	
    	/**
    	 * 设置:是否执行成功(0:否,1:是)
    	 */
    	public void setSuccessFlag(Integer successFlag) {
    		this.successFlag = successFlag;
    	}
    	    	
    	/**
    	 * 获取:首次执行时间
    	 */
    	public Date getFirstTime() {
    		return firstTime;
    	}
    	
    	/**
    	 * 设置:首次执行时间
    	 */
    	public void setFirstTime(Date firstTime) {
    		this.firstTime = firstTime;
    	}
    	    	
    	/**
    	 * 获取:最后执行时间
    	 */
    	public Date getLastTime() {
    		return lastTime;
    	}
    	
    	/**
    	 * 设置:最后执行时间
    	 */
    	public void setLastTime(Date lastTime) {
    		this.lastTime = lastTime;
    	}
    	    	
    	/**
    	 * 获取:下次重试时间
    	 */
    	public Date getNextTime() {
    		return nextTime;
    	}
    	
    	/**
    	 * 设置:下次重试时间
    	 */
    	public void setNextTime(Date nextTime) {
    		this.nextTime = nextTime;
    	}
    	    	
    	/**
    	 * 获取:备注
    	 */
    	public String getRemarks() {
    		return remarks;
    	}
    	
    	/**
    	 * 设置:备注
    	 */
    	public void setRemarks(String remarks) {
    		this.remarks = remarks;
    	}
    	                                
    }
    

    mapper:

    package com.lx.task.mapper;
    
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.lx.task.entity.AsyncTaskEntity;
    import org.apache.ibatis.annotations.Mapper;
    
    /**
     * 
     *类描述:异步任务表
     *
     *@Author:liuweiping
     *@date:2021年04月11日
     *@Version:1.0.0
     */
    @Mapper
    public interface AsyncTaskDao extends BaseMapper<AsyncTaskEntity> {
    	
    }
    
    
    
    
    package com.lx.task.mapper;
    
    
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.lx.task.entity.AsyncTaskHistoryEntity;
    import org.apache.ibatis.annotations.Mapper;
    
    
    /**
     * 
     *类描述:异步任务历史表
     *
     *@Author:liu wei ping    
     *@date:2019年08月12日
     *@Version:1.0.0
     */
    @Mapper
    public interface AsyncTaskHistoryDao extends BaseMapper<AsyncTaskHistoryEntity> {
    	
    }
    

    service:

    package com.lx.task.service.impl;
    
    
    import java.lang.reflect.Method;
    import java.util.Date;
    import java.util.UUID;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    
    import com.alibaba.fastjson.JSON;
    import com.lx.annotation.AsyncTask;
    import com.lx.conmon.UserInfo;
    import com.lx.task.entity.AsyncTaskEntity;
    import com.lx.task.mapper.AsyncTaskDao;
    import com.lx.utils.IpUtils;
    import com.lx.utils.Md5Utils;
    
    
    /**
     * 异步任务
     * @author liu wei ping
     */
    @Aspect
    @Configuration
    public class AsyncTaskAspect {
    
        private static final Logger logger = LoggerFactory.getLogger(AsyncTaskAspect.class);
    
        @Autowired
        private AsyncTaskExecutor asyncTaskExecutor;
    
        @Autowired
        private AsyncTaskDao asyncTaskDao;
    
        @Pointcut("@annotation(com.lx.annotation.AsyncTask)")
        public void asyncTaskPointcut() {
        }
    
        @Around("asyncTaskPointcut()")
        public Object asyncTaskRunning(ProceedingJoinPoint point) throws Throwable {
            if (asyncTaskExecutor.isJobRunning()) {
                return point.proceed();
            }
            MethodSignature methodSignature = (MethodSignature) point.getSignature();
            Method method = methodSignature.getMethod();
            Class<?> targetClass = point.getTarget().getClass();
            Method thisMethod = targetClass.getMethod(method.getName(), method.getParameterTypes());
            AsyncTask asyncTask = thisMethod.getAnnotation(AsyncTask.class);
            String taskName = asyncTask.value();
            String sid = UUID.randomUUID().toString();
            AsyncTaskEntity entity = new AsyncTaskEntity();
            entity.setName(taskName);
            entity.setExecBean(targetClass.getName());
            entity.setExecMethod(method.getName());
            entity.setExecParams(JSON.toJSONString(point.getArgs()));
            entity.setExecSign(asyncTask.filterRepeatCall() ? Md5Utils.encryption(JSON.toJSONString(entity)) : sid);
            entity.setSid(sid);
            UserInfo dto = UserInfo.getUserInfo();
            entity.setLocno(dto.getLocno());
            entity.setCreateUserId(dto.getCreateUserId());
            entity.setCreateUserName(dto.getCreateUserName());
            entity.setUpdateUserId(dto.getUpdateUserId());
            entity.setUpdateUserName(dto.getCreateUserName());            
            
            entity.setCreateTime(new Date());
            entity.setNextTime(new Date(System.currentTimeMillis() + asyncTask.delay()));
            entity.setTryCount(0);
            entity.setSuccessFlag(0);
            entity.setRemarks(IpUtils.getLocalIP());
            asyncTaskDao.insert(entity);
            return null;
        }
    
    }
    
    package com.lx.task.service.impl;
    
    
    import com.alibaba.dubbo.common.utils.CollectionUtils;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.util.ParameterizedTypeImpl;
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.lx.task.entity.AsyncTaskEntity;
    import com.lx.task.entity.AsyncTaskHistoryEntity;
    import com.lx.task.mapper.AsyncTaskDao;
    import com.lx.task.mapper.AsyncTaskHistoryDao;
    import com.lx.utils.IpUtils;
    import com.lx.utils.RedisLock;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeanUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.lang.reflect.Method;
    import java.lang.reflect.Type;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.*;
    
    
    /**
     * 异步任务执行service
     * @author liuweiping
     */
    @Service
    public class AsyncTaskExecutor {
    
        private static final Logger logger = LoggerFactory.getLogger(AsyncTaskExecutor.class);
    
        /**
         * 定时任务调度线程池
         */
        private ScheduledExecutorService mainExecutorService = new ScheduledThreadPoolExecutor(1);
    
        /**
         * 异步任务执行线程池
         */
        private ExecutorService workExecutorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors() * 2, 10L,
                TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1024));
    
        /**
         * 启动后几秒开始检查
         */
        private final static Integer INIT_DELAY_SEC = 30;
    
        /**
         * 每1分钟检查一次
         */
        private final static Integer PERIOD_SEC = 60;
    
        /***
         * 最大锁定时间(30分钟) - 任务执行
         */
        private static final int TIMEOUT_FOR_LOCK_JOB_SECONDS = 60 * 30;
    
        /**
         * 执行失败延迟执行时间
         */
        private static final int TIME_OUT_FOR_NEXT = 1000 * 60 * 1;
    
        /**
         * 任务最大重试次数
         */
        private static final int MAX_JOB_TRY_COUNT = 5;
    
        /**
         * 备注字段最大长度
         */
        private int MAX_REMARKS_LEN = 1000;
    
        /**
         * 标记是否执行任务
         */
        private ThreadLocal<Boolean> jobRunning = new ThreadLocal<>();
    
        /**
         * 标记是否转移异步任务数据
         */
        private ThreadLocal<Boolean> storeJob = new ThreadLocal<>();
    
        /**
         * 标记是否转移异步任务数据
         */
        private ThreadLocal<Integer> jobId = new ThreadLocal<>();
    
        /**
         * spring context
         */
        private final ApplicationContext applicationContext;
    
        @Value("${spring.application.name}")
        private String appName;
    
        @Resource
        private AsyncTaskDao asyncTaskDao;
    
        @Resource
        private AsyncTaskHistoryDao asyncTaskHistoryDao;
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Autowired
        public AsyncTaskExecutor(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
    
        @PostConstruct
        public void init() {
            mainExecutorService.scheduleAtFixedRate(() -> {selectJobs();},
                    INIT_DELAY_SEC, PERIOD_SEC, TimeUnit.SECONDS);
        }
    
        public boolean isJobRunning() {
            return null != jobRunning.get() && jobRunning.get();
        }
    
        public Integer getJobId() {
            return jobId.get();
        }
    
        private void selectJobs() {
            try {
           	
                QueryWrapper<AsyncTaskEntity> wrapper = new QueryWrapper<AsyncTaskEntity>();           		
                wrapper.lt("next_time", new Date());
                wrapper.eq("success_flag", 0);
                wrapper.le("try_count", MAX_JOB_TRY_COUNT);
                wrapper.last(" limit 0,20");
                List<AsyncTaskEntity> jobs = asyncTaskDao.selectList(wrapper);
                if (jobs != null && jobs.size() > 0) {
                    jobs.forEach(item -> {
                        workExecutorService.submit(() -> {
                            doSingleJobWithLock(item);
                        });
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private void doSingleJobWithLock(AsyncTaskEntity job) {
            String lockKey = appName + ":async:lock:job:" + job.getId();
            logger.info("doSingleJob , lock key {} ,id:{} ", lockKey, job.getId());
            RedisLock lock = new RedisLock(stringRedisTemplate, lockKey);
            if (lock.tryLock(TIMEOUT_FOR_LOCK_JOB_SECONDS)) {
                try {
                    // 防止重复执行
                    QueryWrapper<AsyncTaskEntity> wrapper = new QueryWrapper<AsyncTaskEntity>();           		
                    wrapper.lt("next_time", new Date());
                    wrapper.eq("success_flag", 0);
                    wrapper.le("try_count", MAX_JOB_TRY_COUNT);
                    wrapper.eq("id", job.getId());
                    List<AsyncTaskEntity> asyncTaskEntityList = asyncTaskDao.selectList(wrapper);
                    if (CollectionUtils.isNotEmpty(asyncTaskEntityList)) {
                        execSingleJob(asyncTaskEntityList.get(0));
                    } else {
                        logger.info("doSingleJob already run id:{} ", job.getId());
                    }
                } finally {
                    lock.unlock();
                }
            } else {
                logger.info("doSingleJob , lock failed for key {} ,id:{} ", lockKey, job.getId());
            }
        }
    
        private void execSingleJob(AsyncTaskEntity job) {
            jobRunning.set(true);
            storeJob.set(false);
            jobId.set(job.getId());
            logger.info("exec async job id {} , sid:{} , name {} , sign {} ", job.getId(), job.getSid(),
                    job.getName(), job.getExecSign());
            Date startTime = new Date();
            try {
                if (job.getFirstTime() == null) {
                    job.setFirstTime(startTime);
                }
                job.setLastTime(startTime);
                job.setTryCount(job.getTryCount() + 1);
                JSONArray paramArray = JSON.parseArray(job.getExecParams());
                Class<?> clazz = Class.forName(job.getExecBean());
                Object bean = applicationContext.getBean(clazz);
                Method method = null;
                for (Method m : clazz.getMethods()) {
                    if (m.getName().equals(job.getExecMethod())
                            && m.getParameterCount() == paramArray.size()) {
                        method = m;
                        break;
                    }
                }
                Class<?>[] parameterTypes = method.getParameterTypes();
                int paramsLen = paramArray.size();
                Object result = null;
                if (null != parameterTypes && 0 != paramsLen) {
                    Object[] params = new Object[paramsLen];
                    Type[] types = method.getGenericParameterTypes();
                    for (int i = 0; i < paramsLen; i++) {
                        params[i] = caseParam(paramArray, parameterTypes[i], types[i], i);
                    }
                    result = method.invoke(bean, params);
                } else {
                    result = method.invoke(bean);
                }
                job.setExecResult(JSON.toJSONString(result));
                job.setRemarks(IpUtils.getLocalIP() + "执行成功");
                job.setSuccessFlag(1);
                job.setExecTime((int) (System.currentTimeMillis() - startTime.getTime()));
                storeJob(job);
            } catch (Exception e) {
                logger.error("exec async job failed job-id-" + job.getId() + ", sid " + job.getSid()
                        + " " + e.getMessage(), e);
                job.setNextTime(new Date(System.currentTimeMillis() + TIME_OUT_FOR_NEXT * job.getTryCount()));
                Throwable cause = e.getCause() != null ? e.getCause() : e;
                job.setRemarks(IpUtils.getLocalIP() + "执行失败:" + cause.toString());
                if (job.getRemarks().length() > MAX_REMARKS_LEN) {
                    job.setRemarks(job.getRemarks().substring(0, MAX_REMARKS_LEN));
                }
                asyncTaskDao.updateById(job);
            } finally {
                jobRunning.remove();
                storeJob.remove();
                jobId.remove();
            }
        }
    
        public void storeJob(AsyncTaskEntity job) {
            if (Boolean.FALSE.equals(storeJob.get())) {
                if (null != job.getId()) {
                    asyncTaskDao.deleteById(job.getId());
                }
                AsyncTaskHistoryEntity history = new AsyncTaskHistoryEntity();
                BeanUtils.copyProperties(job, history);
                history.setId(null);
                asyncTaskHistoryDao.insert(history);
                storeJob.set(true);
            }
        }
    
        private Object caseParam(JSONArray paramArray, Class<?> ctype, Type type, int index) {
            if (ctype.isAssignableFrom(List.class) && type instanceof ParameterizedTypeImpl) {
                ParameterizedTypeImpl typeImpl = (ParameterizedTypeImpl) type;
                return JSON.parseArray(JSON.toJSONString(paramArray.get(index)),
                        (Class) typeImpl.getActualTypeArguments()[0]);
            } else {
                return paramArray.getObject(index, ctype);
            }
        }
    
    }
    
    package com.lx.task.service.impl;
    
    import java.util.Date;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    
    import com.alibaba.fastjson.JSON;
    import com.lx.task.entity.AsyncTaskEntity;
    import com.lx.task.mapper.AsyncTaskDao;
    import com.lx.utils.IpUtils;
    
    /**
     * 异步任务
     * @author liu wei ping
     */
    @Aspect
    @Configuration
    public class TransactionalAspect {
    
        private static final Logger logger = LoggerFactory.getLogger(TransactionalAspect.class);
    
        @Autowired
        private AsyncTaskExecutor asyncTaskExecutor;
    
        @Autowired
        private AsyncTaskDao asyncTaskDao;
    
        @Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
        public void transactionalPointcut() {
        }
    
        @Around("transactionalPointcut()")
        public Object transactionalRunning(ProceedingJoinPoint point) throws Throwable {
            if (asyncTaskExecutor.isJobRunning()) {
                Date startTime = new Date();
                Object result = point.proceed();
                AsyncTaskEntity job = asyncTaskDao.selectById(asyncTaskExecutor.getJobId());
                job.setExecResult(JSON.toJSONString(result));
                job.setRemarks(IpUtils.getLocalIP() + " 执行成功");
                job.setSuccessFlag(1);
                job.setExecTime((int) (System.currentTimeMillis() - startTime.getTime()));
                asyncTaskExecutor.storeJob(job);
                return result;
            } else {
                return point.proceed();
            }
        }
    
    }
    

    sql脚本:

    CREATE TABLE `t_async_task` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
      `locno` varchar(10) NOT NULL DEFAULT '' COMMENT '仓库编码',
      `name` varchar(500) NOT NULL DEFAULT '' COMMENT '任务名称',
      `sid` varchar(50) NOT NULL DEFAULT '' COMMENT '业务ID',
      `exec_bean` varchar(500) NOT NULL DEFAULT '' COMMENT '类',
      `exec_method` varchar(500) NOT NULL DEFAULT '' COMMENT '方法',
      `exec_params` longtext COMMENT '参数',
      `exec_result` varchar(3000) DEFAULT '' COMMENT '执行结果',
      `exec_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '执行签名MD5(类,方法,参数)',
      `exec_time` int(11) NOT NULL DEFAULT '0' COMMENT '执行时间',
      `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行次数',
      `success_flag` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否执行成功(0:否,1:是)',
      `first_time` datetime DEFAULT NULL COMMENT '首次执行时间',
      `last_time` datetime DEFAULT NULL COMMENT '最后执行时间',
      `next_time` datetime NOT NULL COMMENT '下次重试时间',
      `remarks` varchar(3000) NOT NULL DEFAULT '' COMMENT '备注',
      `trace_id` varchar(128) DEFAULT NULL COMMENT '日志跟踪id',
      `create_user_id` varchar(50) DEFAULT NULL COMMENT '创建人ID',
      `create_user_name` varchar(50) DEFAULT NULL COMMENT '创建人名称',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_user_id` varchar(50) DEFAULT NULL COMMENT '更新人ID',
      `update_user_name` varchar(50) DEFAULT NULL COMMENT '更新人名称',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      `del_flag` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识(0-正常,1-删除)',
      PRIMARY KEY (`id`),
      KEY `next_time` (`next_time`),
      KEY `sid` (`sid`),
      KEY `success_flag` (`success_flag`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COMMENT='异步任务表';
    
    
    
    
    CREATE TABLE `t_async_task_history` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
      `locno` varchar(10) NOT NULL DEFAULT '' COMMENT '仓库编码',
      `name` varchar(500) NOT NULL DEFAULT '' COMMENT '任务名称',
      `sid` varchar(50) NOT NULL DEFAULT '' COMMENT '业务ID',
      `exec_bean` varchar(500) NOT NULL DEFAULT '' COMMENT '类',
      `exec_method` varchar(500) NOT NULL DEFAULT '' COMMENT '方法',
      `exec_params` longtext COMMENT '参数',
      `exec_result` varchar(3000) DEFAULT '' COMMENT '执行结果',
      `exec_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '执行签名MD5(类,方法,参数)',
      `exec_time` int(11) NOT NULL DEFAULT '0' COMMENT '执行时间',
      `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行次数',
      `success_flag` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否执行成功(0:否,1:是)',
      `first_time` datetime DEFAULT NULL COMMENT '首次执行时间',
      `last_time` datetime DEFAULT NULL COMMENT '最后执行时间',
      `next_time` datetime NOT NULL COMMENT '下次重试时间',
      `remarks` varchar(3000) NOT NULL DEFAULT '' COMMENT '备注',
      `trace_id` varchar(128) DEFAULT NULL COMMENT '日志跟踪id',
      `create_user_id` varchar(50) DEFAULT NULL COMMENT '创建人ID',
      `create_user_name` varchar(50) DEFAULT NULL COMMENT '创建人名称',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_user_id` varchar(50) DEFAULT NULL COMMENT '更新人ID',
      `update_user_name` varchar(50) DEFAULT NULL COMMENT '更新人名称',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      `del_flag` tinyint(4) NOT NULL DEFAULT '0' COMMENT '删除标识(0-正常,1-删除)',
      PRIMARY KEY (`id`),
      KEY `next_time` (`next_time`),
      KEY `sid` (`sid`),
      KEY `success_flag` (`success_flag`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='异步任务表';

    测试如下:

     

    这样就实现了分布式事务失败补偿机制,当重试6次失败,则需要人为去处理失败异常!

    展开全文
  • 1.什么是事务补偿机制 例子: A的补偿就相当于+200元。

    1.什么是事务补偿机制

    在这里插入图片描述
    例子:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    A的补偿就相当于+200元。

    但是A也可能出错,这时候就需要对A进行判断,出错了进行循环操作,循环一定次数还不行,那只能人为进行处理。

    优缺点
    在这里插入图片描述

    展开全文
  • 在亿级流量架构之分布式事务解决方案对比中, 已经简单阐明了从本机事务到分布式事务的演变过程, 文章的最后简单说明了TCC事务, 这儿将会深入了解TCC事务是原理, 以及理论支持, 最后会用Demo举例实现。 XA协议 在...

    在亿级流量架构之分布式事务解决方案对比中, 已经简单阐明了从本机事务到分布式事务的演变过程, 文章的最后简单说明了TCC事务, 这儿将会深入了解TCC事务是原理, 以及理论支持, 最后会用Demo举例实现。

    XA协议

    在上面提到的文章中, 分布式事务直接将二阶段提交, 思维逻辑有些断层, 但是那毕竟是比较解决方案, 在这儿从理论上推导分布式事务的根基, 也就是为什么要二阶段提交。

    在单体应用中, 往往由自己来保证事务的一致性, 但是分布式中, 涉及到跨网络调用就难以保证, 从理论上讲两台机器理论上无法达到一致的状态, 所以专门从服务角色上将事务操作抽象出一个服务用来协调事务, 叫做协调者, 或者说事务管理者。由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程。全局事务管理器一般使用 XA 二阶段提交协议与数据库进行交互。

    分布式事务之LCN、TCC特点、事务补偿机制缘由以及设计重点

    而XA协议, 就是事务管理者与各个服务模块(也叫服务者、资源管理者)之间的通讯遵守的协议就是XA协议, 简单来说就是规范了接口, 这个协议由X/Open组织提出, 是分布式事务的规范。 XA规范主要定义了全局事务管理器(TM)和局部资源管理器(RM)之间的接口。除此之外, XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁,如上图。

    二阶段提交协议

    二阶段协议,一句话说就是, 先进行一个复杂度低的询问操作, 看看各个服务模块(也叫参与者、资源管理者、RM)是否可以进行事务操作, 一方面检验网络是否通畅, 另一方面看看对应的资源是否被占用 , 如果可以得到的回应是所有的服务可以进行事务操作, 那么这时候再通知所有服务提交事务。详细地说, 二阶段提交(2PC:Two-Phase Commit), 该协议将一个分布式的事务过程拆分成两个阶段: 投票 和 事务提交 。为了让整个数据库集群能够正常的运行,该协议指定了一个 协调者(事务管理器) 单点,用于协调整个数据库集群各节点的运行。为了简化描述,我们将数据库集群中的各个节点称为 参与者(也叫服务者, 资源管理者) 。

    第一阶段:投票

    该阶段的主要目的在于打探数据库集群中的各个参与者是否能够正常的执行事务,具体步骤如下:

    1. 协调者向所有的参与者发送事务执行请求,并等待参与者反馈事务执行结果;
    2. 事务参与者收到请求之后,执行事务但不提交,并记录事务日志;
    3. 参与者将自己事务执行情况反馈给协调者,同时阻塞等待协调者的后续指令。

    第二阶段:事务提交

    在经过第一阶段协调者的询盘之后,各个参与者会回复自己事务的执行情况,这时候存在 3 种可能性:

    1. 所有的参与者都回复能够正常执行事务。
    2. 一个或多个参与者回复事务执行失败。
    3. 协调者等待超时。

    对于第 1 种情况,协调者将向所有的参与者发出提交事务的通知,具体步骤如下:

    1. 协调者向各个参与者发送 commit 通知,请求提交事务;
    2. 参与者收到事务提交通知之后执行 commit 操作,然后释放占有的资源;
    3. 参与者向协调者返回事务 commit 结果信息。

    分布式事务之LCN、TCC特点、事务补偿机制缘由以及设计重点

    除此之外, 还有2种情况, 囿于篇幅, 详情参考: 亿级流量架构之分布式事务思路及方法后面的二阶段提交协议

    今天要聊的TCC就是二阶段提交的具体事务实现。

    LCN

    详情参考:官网(中文版)

    有了前面的XA协议以及二阶段提交的知识, 就不难理解LCN框架了, 这个框架可以理解成就是上面所说的协调者, 不生产事务, 只负责协调事务。5.0以后框架兼容了LCN、TCC、TXC三种事务模式。

    LCN中各个字母依次代表:锁定事务单元(lock)、确认事务模块状态(confirm)、通知事务(notify)。

    在一个分布式系统下存在多个模块协调来完成一次业务。那么就存在一次业务事务下可能横跨多种数据源节点的可能。TX-LCN目的是解决这样的问题。

    例如存在服务模块A 、B、 C。A模块是mysql作为数据源的服务,B模块是基于redis作为数据源的服务,C模块是基于mongo作为数据源的服务。若需要解决他们的事务一致性就需要针对不同的节点采用不同的方案,并且统一协调完成分布式事务的处理。

    在LCN中, 协调者称之为TxManager , 参与者称之为 TxClient, TxManager作为分布式事务的控制方, 事务发起方或者参与方都由TxClient端来控制决定。

    时序图(来源官网):

    分布式事务之LCN、TCC特点、事务补偿机制缘由以及设计重点

    LCN核心步骤

    • 创建事务组
      是指在事务发起方开始执行业务代码之前先调用TxManager创建事务组对象,然后拿到事务标示GroupId的过程。
    • 加入事务组
      添加事务组是指参与方在执行完业务方法以后,将该模块的事务信息通知给TxManager的操作。
    • 通知事务组
      是指在发起方执行完业务代码以后,将发起方执行结果状态通知给TxManager,TxManager将根据事务最终状态和事务组的信息来通知相应的参与模块提交或回滚事务,并返回结果给事务发起方。

    TCC

    详情参考: Github(中文版)

    TCC事务机制相对于二阶段提交,其特征在于它不依赖资源管理器(RM)对XA协议的支持,而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务, 将事务分成 Try 和 Confirm/ Cancel两个阶段。

    三种操作作用: Try: 尝试执行业务、 Confirm:确认执行业务、 Cancel: 取消执行业务。

    整体流程如图

    分布式事务之LCN、TCC特点、事务补偿机制缘由以及设计重点

    Try 从执行阶段来看,与传统事务机制(二阶段提交)中业务逻辑相同。但从业务角度来看,却不一样。TCC机制中的Try仅是一个初步操作,它和后续的确认一起才能真正构成一个完整的业务逻辑。TCC机制将传统事务机制(2PC)中的业务逻辑一分为二:

    拆分后保留的部分为初步操作(Try);

    而分离出的部分即为验证操作(Confirm/cancel),被延迟到事务提交阶段执行。

    三阶段主要特点:

    Try 阶段

    Confirm阶段

    Cancel阶段

    主要含义

    尝试执行业务

    确认执行业务

    取消执行业务

    执行操作

    完成所有业务检查( 一致性 )

    不做任务业务检查

    释放 Try 阶段预留的业务资源

    预留必需业务资源( 准隔离性 )

    Confirm 操作满足幂等性

    Cancel 操作满足幂等性

    真正执行业务

    TCC补偿机制

    在很多情况下,我们是无法做到强一致的 ACID 的。特别是我们需要跨多个系统的时候,而且这些系统还不是由一个公司所提供的。比如,在我们的日常生活中,我们经常会遇到这样的情况,就是要找很多方协调很多事,而且要保证我们每一件事都成功,否则整件事就做不到。

    比如,要出门旅游, 我们需要干这么几件事。

    第一,向公司请假,拿到相应的假期;

    第二,订飞机票或是火车票;

    第三,订酒店;

    第四,租车。

    这四件事中,前三件必须完全成功,我们才能出行,而第四件事只是一个锦上添花的事,但第四件事一旦确定,那么也会成为整个事务的一部分。这些事都是要向不同的组织或系统请求。我们可以并行地做这些事,而如果某个事有变化,其它的事都会跟着出现一些变化。

    设想下面的几种情况。

    1. 我没有订到返程机票,那么我就去不了了。我需要把订到的去程机票,酒店、租到的车都给取消了,并且把请的假也取消了。
    2. 如果我假也请好了,机票,酒店也订好了,只是车没租到,那么并不影响我出行这个事,整个事还是可以继续的。
    3. 如果我的飞机因为天气原因取消或是晚点了,那么我被迫要去调整和修改我的酒店预订和租车的预订。

    从人类的实际生活当中,我们可以看出,上述的这些情况都是天天在发生的事情。所以,我们的分布式系统也是一样的,也是需要处理这样的事情——就是当条件不满足,或是有变化的时候,需要从业务上做相应的整体事务的补偿。

    对于业务补偿来说,首先需要将服务做成幂等性的,如果一个事务失败了或是超时了,我们需要不断地重试,努力地达到最终我们想要的状态。然后,如果我们不能达到这个我们想要的状态,我们需要把整个状态恢复到之前的状态。另外,如果有变化的请求,我们需要启动整个事务的业务更新机制。

    业务补偿机制特点

    由上可知,一个好的业务补偿机制需要做到下面这几点。

    1. 要能清楚地描述出要达到什么样的状态(比如:请假、机票、酒店这三个都必须成功,租车是可选的),以及如果其中的条件不满足,那么,我们要回退到哪一个状态。这就是所谓的整个业务的起始状态定义。
    2. 当整条业务跑起来的时候,我们可以串行或并行地做这些事。对于旅游订票是可以并行的,但是对于网购流程(下单、支付、送货)是不能并行的。总之,我们的系统需要努力地通过一系列的操作达到一个我们想要的状态。如果达不到,就需要通过补偿机制回滚到之前的状态。这就是所谓的状态拟合
    3. 对于已经完成的事务进行整体修改,可以考虑成一个修改事务。

    其实,在纯技术的世界里也有这样的事。比如,线上运维系统需要发布一个新的服务或是对一个已有的服务进行水平扩展,我们需要先找到相应的机器,然后初始化环境,再部署上应用,再做相应的健康检查,最后接入流量。这一系列的动作都要完全成功,所以,我们的部署系统就需要管理好整个过程和相关的运行状态。

    业务补偿的设计重点

    业务补偿主要做两件事。

    1. 努力地把一个业务流程执行完成。
    2. 如果执行不下去,需要启动补偿机制,回滚业务流程。

    所以,下面是几个重点。

    • 因为要把一个业务流程执行完成,需要这个流程中所涉及的服务方支持幂等性。并且在上游有重试机制。
    • 我们需要小心维护和监控整个过程的状态,所以,千万不要把这些状态放到不同的组件中,最好是一个业务流程的控制方来做这个事,也就是一个工作流引擎。所以,这个工作流引擎是需要高可用和稳定的。这就好像旅行代理机构一样,我们把需求告诉它,它会帮我们搞定所有的事。如果有问题,也会帮我们回滚和补偿的。
    • 补偿的业务逻辑和流程不一定非得是严格反向操作。有时候可以并行,有时候,可能会更简单。总之,设计业务正向流程的时候,也需要设计业务的反向补偿流程。
    • 我们要清楚地知道,业务补偿的业务逻辑是强业务相关的,很难做成通用的。
    • 下层的业务方最好提供短期的资源预留机制。就像电商中的把货品的库存预先占住等待用户在 15 分钟内支付。如果没有收到用户的支付,则释放库存。然后回滚到之前的下单操作,等待用户重新下单。

    原文链接:
    https://www.cnblogs.com/Courage129/p/14528981.html

    如果觉得本文对你有帮助,就点赞关注支持一下吧!

    展开全文
  • 分布式事务补偿机制

    千次阅读 2019-01-15 14:25:21
    关键字:事务补偿机制 淘宝梁飞分析分布式 文章:http://javatar.iteye.com/blog/981787 可以设想一个最简单的分布式事务场景,对于跨银行的转账操作,该操作涉及到调用两个异地的Service服务,一个是本地提供的取款...
  • dubbo-tcc 关于dubbo结合buyetcc事务补偿机制的内容
  • 如果事务号没有结束,事务超时,则按照回滚日志,反向操作,对事务进行补偿,补偿步骤如下: 第1步:对用户2进行事务补偿,检查数据库2的用户2的事务是否成功; 第2步:如果成功,则认为事务完成,在事务回滚日志表...
  • 1.XA XA是由X/Open组织提出的分布式事务的规范。...XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。XA之所以需要引入事
  • ...XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务
  • 分布式事务.补偿事务

    2022-04-10 09:35:39
    TCC Saga 基于事务事务 业务模型分 2 阶段设计 允许空回滚 防悬挂控制 幂等控制 Bytetcc TCC-transaction DTX EasyTransaction Hmily TX-LCN
  • 重试补偿机制完善

    千次阅读 2021-03-03 14:19:32
    最近上线了一个下单平台项目,需要定时将线下的订单也推送过去,但对于补偿机制,当时是简单的进行定时,每隔15分钟拉取所有的未推送记录(包含上次未推成功的),放入MQ中,在消费端进行数据推送。---在消费端有另...
  • 核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。分为三个阶段: Try 阶段:主要是对业务系统做检测(一致性)及资源预留(准隔离性) Confirm 阶段:主要是对业务系统做确认提交,Try...
  • 补偿事务(TCC) TCC 将事务提交分为 Try(method1) - Confirm(method2) - Cancel(method3) 3个操作。其和两阶段提交有点类似,Try为第一阶段,Confirm - Cancel为第二阶段,是一种应用层面侵入业务的两阶段提交。 ...
  • 简单聊聊消息队列的事务补偿机制

    万次阅读 2018-05-08 10:26:17
    因为一直学习与尝试负责公司的推送相关业务,包括整个应用的实现,其中就采用了基于消息队列的异步事件驱动模型来做解耦异步处理,所以就要去做了解一些相关的知识点,这边稍作总结,并整理一下消息补偿机制的一套...
  • 分布式事务产生的原因 数据库分库分表 微服务化 在微服务架构中,每个服务在用本地事务的时候,知道自己执行的事务是成功还是失败,但是无法知道其他服务节点的... 如何理解最终一致性和它的事务补偿机制呢? 刚...
  • TCC补偿机制 业务补偿机制特点 业务补偿的设计重点 XA协议 在上面提到的文章中, 分布式事务直接讲二阶段提交, 思维逻辑有些断层, 但是那毕竟是比较解决方案, 在这儿从理论上推导分布式事务的根基, 也就是为...
  • 补偿事务(TCC)? 补偿事务(TCC) 针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段: Try 阶段主要是对业务系统做检测及资源预留 Confirm 阶段主要是对业务系统做确认提交,Try阶段...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,451
精华内容 6,580
关键字:

事务补偿机制

友情链接: mymusic.rar