精华内容
下载资源
问答
  • 数据库同步锁实现

    2020-07-14 16:17:19
    对于多线程的集群环境,同一段代码逻辑我们不希望多个线程同时执行的时候,我们可以使用数据库锁实现线程的控制 二.场景例子 最近做了一个数据同步的逻辑,就是将第三方接口的数据同步到本地,可以手动同步,可以...

    一.使用场景

    对于多线程的集群环境,同一段代码逻辑我们不希望多个线程同时执行的时候,我们可以使用数据库锁来实现线程的控制

    二.场景例子

    最近做了一个数据同步的逻辑,就是将第三方接口的数据同步到本地,可以手动同步,可以定时自动同步,这时候我需要考虑在同一个时间点不可以有两个线程同时执行这个同步逻辑。
    同时 我们不仅仅需要考虑同一台服务器是否会存在同时运行这条线程,我们还需要考虑集群服务器是否会同时运行这条线程,这时候我们需要引入数据库锁
    原因

    1. 一个线程已经执行逻辑,不需要重复执行,可能会造成数据库数据的重复读脏读

    2. 因为调用接口获取的数据量比较大,同时同步会占用服务器的内存和cpu性能,影响应用的正常使用速度

    3. 第三方接口调用有频率限制,多次调用可能会出现接口调用频率超时导致无法同步的情况

    三.具体实现

    1.数据建一张同步状态表 如 sync_status_set 表
    关键字段
    sync_status 同步状态 此字段来当数据库锁,来控制线程的执行
    如 0-未同步 1-正在同步 2-同步结束
    基本字段
    create_time 创建时间
    update_time 修改时间
    create_user_id 创建人id
    update_user_id 更新人id

    2.代码逻辑处理
    1.开启线程,获取同步状态设置
    2.判断同步状态

    • sync_status == null || sync_status == 0 则插入同步数据 设置sync_stauts = 1
    • sync_status == 1 代表有线程正在同步 停止当前线程 更新修改时间 修改人员id
    • sync_status == 2 同步结束,创建新建的同步状态 设置创建时间

    结语:此为集群环境,保证一段逻辑在一个时间段只能单线程执行的实现方式之一 可能不是完美的处理方式,如果有更好的方式欢迎提出讨论

    展开全文
  • 业务场景:该服务需要每隔固定时间同步一次数据库,服务部署在多台机器,我们只希望同时有一台机器在做同步操作。 实现:用一个scheduler.go跑线程 //scheduler.go type Scheduler struct { } var scheduler *...

    业务场景:该服务需要每隔固定时间同步一次数据库,服务部署在多台机器,我们只希望同时有一台机器在做同步操作。
    实现:包含scheduler.go和util.go两个文件。scheduler.go跑线程,util.go通过一个锁表实现分布式。

    //scheduler.go
    
    type Scheduler struct {
    }
    
    var scheduler *Scheduler
    var scheduler_once sync.Once
    
    func GetSchedulerInstance() *Scheduler {
    	scheduler_once.Do(func() {
    		scheduler = &Scheduler{}
    	})
    	return scheduler
    }
    
    func (scheduler *Scheduler) Run() {
    	go scheduler.TaskRunner()
    }
    
    func (scheduler *Scheduler) TaskRunner() (int, error) {
    	var util Util
    	var conf *BaseConf
    	conf = GetConfInstance()
    	db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s",
    		conf.GetDBConf()[0].User,
    		conf.GetDBConf()[0].Password,
    		conf.GetDBConf()[0].Host,
    		conf.GetDBConf()[0].Port,
    		conf.GetDBConf()[0].Database,
    		conf.GetDBConf()[0].Charset))
    	if err != nil {
    		tars.TLOG.Error("Open database0 error: ", err)
    		return -1, err
    	}
    	defer db.Close()
    
    	for {
    		tx, err := db.Begin()
    		if err != nil {
    			tars.TLOG.Error(fmt.Sprintf("loop EXPAND_FIRST_LOOPER tx error: %s", err))
    			time.Sleep(60 * time.Second)
    			continue
    		}
    		if !util.GetLockFromLooperLocks("LOCK_DATA_LOOPER", "UPDATE_TASK_STATUS_ACCESS", tx) {
    			util.LooperLocksCommitOrRollBack(tx, -1)
    			time.Sleep(60 * time.Second)
    			continue
    		}
    		
    		//业务逻辑代码
    		//业务逻辑代码
    		//业务逻辑代码
    		
    		if  发生错误{
    			tars.TLOG.Error("failed")
    			util.LooperLocksCommitOrRollBack(tx, -1)
    			return 0, nil
    		}
    		util.LooperLocksCommitOrRollBack(tx, 0)
    		time.Sleep(300 * time.Second)//定时间隔
    	}
    	return 0, nil
    }
    
    //util,go
    
    func (util *Util) GetLockFromLooperLocks(loop_name string, lock_name string, tx *sql.Tx) bool {
    	//TODO) tx null point
    	if tx == nil {
    		tars.TLOG.Error("tx is nil")
    		return false
    	}
    	//TODO mdb not support nowait|wait|skip locked
    	sql_str := "select * from running_loop_locks where loop_name=? and lock_name=? for update"
    	results, err := tx.Query(sql_str, loop_name, lock_name)
    	defer results.Close()
    	if err != nil {
    		tars.TLOG.Error(fmt.Sprintf("loop_name: %s,get lock error: %s", loop_name, err))
    		return false
    	}
    	for results.Next() {
    	}
    	return true
    }
    
    func (util *Util) LooperLocksCommitOrRollBack(tx *sql.Tx, ret int) (int, error) {
    	if tx == nil {
    		return 1, nil
    	} else if 0 == ret {
    		tx.Commit()
    	} else {
    		tx.Rollback()
    	}
    	return 0, nil
    }
    

    这里的分布式锁利用了for update的特性:当select * from … for update语句没有commit或rollback时,其他进程(同机器或不同机器)会阻塞于这条语句,因此保证了同一时间只有一台机器上的一个进程进行业务逻辑的运行。

    展开全文
  • 分布式数据库实现

    万次阅读 2020-09-16 14:31:16
    分布式数据库实现 什么是分布式 在单实例单进程的系统中,当有多个线程同时修改某个共享变量时,为了保证线程安全,就需要对变量或者代码做同步处理,这种同步操作在java中可以使用synchronized、JUC包下的...

    分布式锁之数据库实现

    什么是分布式锁

    在单实例单进程的系统中,当有多个线程同时修改某个共享变量时,为了保证线程安全,就需要对变量或者代码做同步处理,这种同步操作在java中可以使用synchronized、JUC包下的显式锁、cas+volatile来实现。

    而目前大部分系统都是分布式部署的,使用synchronized等手动只能保证单个进程内的线程安全,多个进程多个实例下的线程安全就需要分布式锁来实现。

    目前主流的分布式锁解决方案有四种:

    • 基于数据库实现(悲观+乐观)
    • 基于Redis实现
    • 基于ZooKeeper实现
    • 基于Etcd实现

    数据库之悲观锁实现分布式锁

    建表sql:

    CREATE TABLE `t_order` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `amount` int(11) NOT NULL,
      `status` int(11) NOT NULL,
      `version` int(11) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    INSERT INTO `t_order` (`amount`, `status`, `version`) VALUES ('100', '1', '1');
    

    先借助sql来模拟分布锁的实现:

    步骤 SessionA SessionB
    1 begin; begin;
    2 select * from t_order where id=1 for update;
    3 select * from t_order where id=1 for update; – 阻塞
    4 update t_order set status=2 where id=1 and status=1;
    5 commit; 返回查询结果
    6 update t_order set status=2 where id=1 and status=1; – 状态变了未更新成功
    7 commit;

    说明:

    1. 客户端A和客户端B同时执行前面两行sql,客户端A返回数据,而客户端B阻塞等待获取行锁。
    2. 客户端A执行后面两行sql,提交事务,客户端B获得行锁,立刻返回数据。
    3. 客户端B执行后面两行sql,提交事务,释放行锁。

    注意for update语句一定要走主键索引,否则没走索引会锁住整个表,走了其他索引会产生间隙锁,可能会锁住多条记录。

    Java代码实现:

    package com.morris.distribute.lock.database.exclusive;
    
    import com.morris.distribute.entity.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.concurrent.TimeUnit;
    
    @Service
    @Slf4j
    public class OrderService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        /**
         * 数据库分布式锁之悲观锁
         *
         * @param id
         */
        @Transactional
        public void updateStatus(int id) {
            log.info("updateStatus begin, {}", id);
    
            Integer status = jdbcTemplate.queryForObject("select status from t_order where id=? for update", new Object[]{id}, Integer.class);
    
            if (Order.ORDER_STATUS_NOT_PAY == status) {
    
                try {
                    // 模拟耗时操作
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                int update = jdbcTemplate.update("update t_order set status=? where id=? and status=1", new Object[]{2, id, Order.ORDER_STATUS_NOT_PAY});
    
                if (update > 0) {
                    log.info("updateStatus success, {}", id);
                } else {
                    log.info("updateStatus failed, {}", id);
                }
            } else {
                log.info("updateStatus status already updated, ignore this request, {}", id);
            }
            log.info("updateStatus end, {}", id);
        }
    }
    

    注意开启事务@Transactional

    使用多个线程模拟竞争锁:

    package com.morris.distribute.lock.database.exclusive;
    
    import com.morris.distribute.config.JdbcConfig;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.stream.IntStream;
    
    public class Demo {
    
        public static void main(String[] args) {
            AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
            applicationContext.register(JdbcConfig.class);
            applicationContext.register(OrderService.class);
            applicationContext.refresh();
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
            IntStream.rangeClosed(1, 3).forEach((i) -> new Thread(() -> {
                OrderService orderService = applicationContext.getBean(OrderService.class);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                orderService.updateStatus(1);
            }, "t" + i).start());
        }
    }
    

    运行结果如下:

    2020-09-16 14:16:53,248  INFO [t2] (OrderService.java:26) - updateStatus begin, 1
    2020-09-16 14:16:53,248  INFO [t1] (OrderService.java:26) - updateStatus begin, 1
    2020-09-16 14:16:53,248  INFO [t3] (OrderService.java:26) - updateStatus begin, 1
    2020-09-16 14:16:56,289  INFO [t2] (OrderService.java:42) - updateStatus success, 1
    2020-09-16 14:16:56,289  INFO [t2] (OrderService.java:49) - updateStatus end, 1
    2020-09-16 14:16:56,290  INFO [t3] (OrderService.java:47) - updateStatus status already updated, ignore this request, 1
    2020-09-16 14:16:56,290  INFO [t3] (OrderService.java:49) - updateStatus end, 1
    2020-09-16 14:16:56,291  INFO [t1] (OrderService.java:47) - updateStatus status already updated, ignore this request, 1
    2020-09-16 14:16:56,291  INFO [t1] (OrderService.java:49) - updateStatus end, 1
    

    从运行结果可以看出,同一时间只有一个线程持有了锁。

    数据库之乐观锁实现分布式锁

    乐观锁每次通过版本号来判断记录是否被更新过。

    package com.morris.distribute.lock.database.share;
    
    import com.morris.distribute.entity.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.TimeUnit;
    
    @Service
    @Slf4j
    public class OrderService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        /**
         * 数据库分布式锁之乐观锁
         *
         * @param id
         */
        public void updateStatus(int id) {
    
            log.info("updateStatus begin, {}", id);
            for (;;) { // 自旋,有可能对订单做其他操作,导致version变了,所以需要自旋
                Order order = jdbcTemplate.queryForObject("select status, version from t_order where id=?",
                        new Object[]{id}, (rs, row) -> {
                            Order o = new Order();
                            o.setStatus(rs.getInt(1));
                            o.setVersion(rs.getInt(2));
                            return o;
                        });
    
                if (Order.ORDER_STATUS_NOT_PAY == order.getStatus()) {
    
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    int update = jdbcTemplate.update("update t_order set status=?,version=? where id=? and version=? and status=?",
                            new Object[]{Order.ORDER_STATUS_PAY_SUCCESS, order.getVersion() + 1, id, order.getVersion(), Order.ORDER_STATUS_NOT_PAY});
    
                    if (update > 0) {
                        log.info("updateStatus success, {}", id);
                        break;
                    } else {
                        log.info("updateStatus failed, {}", id);
                    }
                } else {
                    log.info("updateStatus status already updated, ignore this request, {}", id);
                    break;
                }
            }
            log.info("updateStatus end, {}", id);
        }
    
    }
    

    运行结果如下:

    2020-09-16 14:21:08,934  INFO [t3] (OrderService.java:25) - updateStatus begin, 1
    2020-09-16 14:21:08,934  INFO [t2] (OrderService.java:25) - updateStatus begin, 1
    2020-09-16 14:21:08,934  INFO [t1] (OrderService.java:25) - updateStatus begin, 1
    2020-09-16 14:21:12,110  INFO [t1] (OrderService.java:50) - updateStatus failed, 1
    2020-09-16 14:21:12,110  INFO [t2] (OrderService.java:50) - updateStatus failed, 1
    2020-09-16 14:21:12,111  INFO [t3] (OrderService.java:47) - updateStatus success, 1
    2020-09-16 14:21:12,111  INFO [t3] (OrderService.java:57) - updateStatus end, 1
    2020-09-16 14:21:12,117  INFO [t2] (OrderService.java:53) - updateStatus status already updated, ignore this request, 1
    2020-09-16 14:21:12,117  INFO [t1] (OrderService.java:53) - updateStatus status already updated, ignore this request, 1
    2020-09-16 14:21:12,117  INFO [t1] (OrderService.java:57) - updateStatus end, 1
    2020-09-16 14:21:12,117  INFO [t2] (OrderService.java:57) - updateStatus end, 1
    

    总结

    悲观锁:会锁住整行记录,导致对数据的其他业务操作也无法进行,效率低,如果sql没写好,可能会产生间隙锁,锁住多条记录,甚至锁住全表。

    乐观锁:每个表都需要增加与业务无关的version字段。

    优点:直接基于数据库实现,实现简单。

    缺点:IO开销大,连接数有限,无法满足高并发的需求。

    展开全文
  • 悲观实现,往往依靠数据库提供的机制(也只有数据库层提供的机制才能真正保证数据访问的排他性,否则即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。一个典型的倚赖数据库的悲观调用:...

    测试需要:本地开两个测试窗口

    悲观锁

    悲观锁它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。 
    一个典型的倚赖数据库的悲观锁调用: 
    select * from table where name='who am i' for update;
    这条 sql 语句锁定了table表中所有符合检索条件( name='who am i' )的记录。本次事务提交之前(事务提交时会释放事务过程中的锁),外界无法修改这些记录。 Hibernate 的悲观锁,也是基于数据库的锁机制实现。

    一个窗口开启一个事务但是不提交事务,执行select语句其中包含for update子语句。需要注意的是for update要放到mysql的事务中,即begin和commit中,否则不起作用。

    另一窗口尝试更新表

    我们可以看到报错了内容是:锁等待超时超过;试着重新启动事务

     这个时候我们提交事务

     再次尝试更新时更新成功了

     优点与不足

    悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会;另外,在只读型事务处理中由于不会产生冲突,也没必要使用锁,这样做只能增加系统负载;还有会降低了并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数

     乐观锁

    参考http://chenzhou123520.iteye.com/blog/1863407

    转载于:https://www.cnblogs.com/BINGJJFLY/p/7488460.html

    展开全文
  • 小编在最近的项目中遇到了要对数据库中同一个字段进行操作的一个功能,少数人操作的话,还体现不出来线程的问题,当很多人同时使用,数据量变大,就会出现线程的问题。如何保持线程同步,是小编这篇博客要达到的目的...
  • Mysql数据库如何实现分布式

    千次阅读 2018-03-07 14:10:11
    分布式是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来...
  • (lock)或互斥(mutex)是一种同步机制,用于在多线程环境中控制各线程对资源的访问权限。旨在强制实施互斥排他、并发控制策略。 1.1、单体应用   JDK中的只能在一个JVM进程内有效,我们把这种叫做单体...
  • 简单的秒杀实现方式比较多,常见的有可重入锁、redis分布式锁、线程同步锁数据库锁等,其中最简单也最容易实现的就是数据库乐观锁了,下面的demo以springboot+Data JPA框架为基础,利用mysql乐观锁实现了一个简单...
  • 由于之前同步是全量同步,每天都需要定时全量同步,不仅时间消耗长,也影响下游业务(查询慢,有时还会表)。全量同步使用的java的定时任务,多线程的方式进行同步,发现某一天同步任务执行不完的情况,修改为增量...
  • 主要是讲解的分布式的三种实现的过程和方式。 分布式诞生的背景 我们在开发单机应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的Java多线程的18般武艺进行处理,并且可以...
  • 数据库锁

    2019-11-29 17:31:16
    悲观:总是假设最坏的情况,每次去拿数据的时候都认为...再比如Java里面的同步原语synchronized关键字的实现也是悲观。 乐观:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上...
  • 银行转账问题: A在上海,B在北京同时在建行转账给杭州C,A转账时,会修改C处服务器的表,B不能在此刻转账,同理,B转账时,A不能做处理,A,B的转账操作时同步,必须保证数据的一致性,这就需要分布式来进行处理. ...
  • 重启mysql服务,查看是否生效3.master数据库上创建主从同步账户:4.master表5.拷贝数据库至从库6.配置从数据库的配置文件7.新建数据库并且合并8.配置主库信息,实现主从同步9.检测是否实现主从同步 1.修改master配置...
  • 上来就住,把事情考虑的比较悲观,它是采用数据库机制实现的,数据库之后其它用户将无法查看,直到提交或者回滚,释放之后才可查看。所以悲观具有其霸道性。 简单说其悲观的功能就是,住读取的记录,...
  • 数据库

    2021-05-20 14:05:19
    在计算机科学中,是在执行多线程时用于强行限制资源访问的同步机制,即用于在并发控制中保证对互斥要求的满足。 加锁是实现数据库并发控制的一个非常重要的技术。当事务在对某个数据对象进行操作前,先向系统发出...
  • 数据库锁简介

    2021-01-28 15:57:46
    悲观 悲观并发控制实际上是“先取再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的... Java 里面的同步 synchronized 关键字的实现 共享【shared locks】又称为读,简称S...
  • 在高并发的场景中我们经常会让线程同步,如在秒杀商品时,我们需要对资源上锁来确保不发生超卖等问题,在单体应用中java已经为我们提供了相应的同步锁。然而,在分布式应用中这些锁将无能为力。 分布式锁的实现方式...
  • 学习了淘宝的订单系统设计,将订单拆分为买家库、卖家库,消息中间件进行订单同步(类似于方案1、2的对买家、卖家分表么),似乎是不关注订单数据冗余,但没想明白如何按照订单查询,所以借鉴设计了方案3: ...
  • 在原本的项目中往往是通过锁数据库的表来实现的。 在高并发的需求下,我们往往要需要对非常多的表进行加锁。加锁的表越少,资源越少,竞争越严重。 不过好在mysql数据还提供了一种不需要表的加锁方式。 加锁:...
  • 一、quartz数据库锁 其中,QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,其表结构如下: 点击(此处)折叠或打开 --QRT...
  • 上来就住,把事情考虑的比较悲观,它是采用数据库机制实现的,数据库之后其它用户将无法查看,直到提交或者回滚,释放之后才可查看。所以悲观具有其霸道性。  简单说其悲观的功能就是,住读取的记录...
  • 其中,QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,其表结构如下: --QRTZ_LOCKS表结构 CREATE TABLE `QRTZ_LOCKS` (  `LOCK_NAME` varchar(40) NOT NULL,  PRIMARY KEY (`LOCK_NAME`) ) ...
  • 分布式实现前言分布式应该具备哪些条件分布式实现方式基于数据库实现方式基于缓存(Redis)的实现方式基于Zookeeper的实现方式三种方案的比较 前言     我们在开发应用时,如果需要...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 927
精华内容 370
关键字:

数据库实现同步锁