精华内容
下载资源
问答
  • Redisson分布式锁原理

    2021-02-09 16:29:03
    本文只介绍Redisson如何实现分布式锁原理。其它的会在接下来的博客讲,最后有关Redisson实现分布式锁的项目代码的博客中会放上项目源码到GitHub上。 一、高效分布式锁 当我们在设计分布式锁的时候,我们应该考虑...

    文章目录

    Redisson分布式锁原理

    RedissonLock介绍

    分布式锁项目落地实战

    Redisson原理

    本文只介绍Redisson如何实现分布式锁的原理。其它的会在接下来的博客讲,最后有关Redisson实现分布式锁的项目代码的博客中会放上项目源码到GitHub上。

    一、高效分布式锁

    当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。

    1、互斥

    在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。

    2、防止死锁

    在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。

    所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。

    3、性能

    对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。

    所以在锁的设计时,需要考虑两点。

    1、锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。

    2、锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。

    4、重入

    我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。

    针对以上Redisson都能很好的满足,下面就来分析下它。

    二、Redisson原理分析

    为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。

    1、加锁机制

    线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。

    线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

    2、watch dog自动延期机制

    这个比较难理解,找了些许资料感觉也并没有解释的很清楚。这里我自己的理解就是:

    在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

    但在实际开发中会有下面一种情况:

    		//设置锁1秒过去
            redissonLock.lock("redisson", 1);
            /**
             * 业务逻辑需要咨询2秒
             */
            redissonLock.release("redisson");
    
          /**
           * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
           * 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
           * 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
           */
    


    所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。

    注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。

    3、为啥要用lua脚本呢?

    这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

    4、可重入加锁机制

    Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:

    1、Redis存储锁的数据类型是 Hash类型
    2、Hash数据类型的key值包含了当前线程信息。
    

    下面是redis存储的数据

    这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 ‘redisson’

    它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:

    guid + 当前线程的ID。后面的value是就和可重入加锁有关。

    举图说明

    上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。

    5、Redis分布式锁的缺点

    Redis分布式锁会有个缺陷,就是在Redis哨兵模式下:

    客户端1 对某个master节点写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。

    这时客户端2 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。

    这时系统在业务语义上一定会出现问题,导致各种脏数据的产生

    缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。

    展开全文
  • redisson分布式锁原理

    2019-08-01 22:10:13
    Redisson中,使用key来作为是否上的标志,当通过getLock(String key)方法获得相应的之后,这个key即作为一个存储到Redis集群中,在接下来如果有其他的线程尝试获取名为key的时,便会向集群中进行查询,...

    锁的原理

    在Redisson中,使用key来作为是否上锁的标志,当通过getLock(String key)方法获得相应的锁之后,这个key即作为一个锁存储到Redis集群中,在接下来如果有其他的线程尝试获取名为key的锁时,便会向集群中进行查询,如果能够查到这个锁并发现相应的value的值不为0,则表示已经有其他线程申请了这个锁同时还没有释放,则当前线程进入阻塞,否则由当前线程获取这个锁并将value值加一,如果是可重入锁的话,则当前线程每获得一个自身线程的锁,就将value的值加一,而每释放一个锁则将value值减一,直到减至0,完全释放这个锁。因为底层是基于分布式的Redis集群,所以Redisson实现了分布式的锁机制。

    redisson 的最基本的用法,

    RLock lock = redisson.getLock(“lockName”);
    try{
    // 1. 最常见的使用方法
    //lock.lock();
    // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
    //lock.lock(10, TimeUnit.SECONDS);
    // 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁
    boolean res = lock.tryLock(2, 8, TimeUnit.SECONDS);
    if(res){ //成功
    //处理业务
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    //释放锁
    lock.unlock();
    }

    展开全文
  • Redis分布式锁原理 分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的...

    Redis分布式锁原理

    分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

    使用setnx、getset、expire、del这4个redis命令实现

    1. setnx 是『SET if Not eXists』(如果不存在,则 SET)的简写。 命令格式:SETNX key value;使用:只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。
    2. getset 命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置之前的旧的value。返回值:如果键 key 没有旧值, 也即是说, 键 key 在被设置之前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
    3. expire 命令格式:EXPIRE key seconds,使用:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。返回值:设置成功返回 1 。 当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
    4. del 命令格式:DEL key [key …],使用:删除给定的一个或多个 key ,不存在的 key 会被忽略。返回值:被删除 key 的数量。

    redis 分布式锁原理一

    过程分析:

    1. A尝试去获取锁lockkey,通过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;
    2. 如果返回值为1,说明redis服务器中还没有lockkey,也就是没有其他用户拥有这个锁,A就能获取锁成功;
    3. 在进行相关业务执行之前,先执行expire(lockkey),对lockkey设置有效期,防止死锁。因为如果不设置有效期的话,lockkey将一直存在于redis中,其他用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;
    4. 执行相关业务;
    5. 释放锁,A完成相关业务之后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行重新设置锁新值。

    原理图如下
    在这里插入图片描述
    代码实现:

    public void redis1() {
            log.info("关闭订单定时任务启动");
            long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
            //这个方法的缺陷在这里,如果setnx成功后,锁已经存到Redis里面了,服务器异常关闭重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁
            Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
            if (setnxResult != null && setnxResult.intValue() == 1) {
                //如果返回值为1,代表设置成功,获取锁
                closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            } else {
                log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            }
            log.info("关闭订单定时任务结束");
        }
    private void closeOrder(String lockName) {
            //对锁设置有效期
            RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
            log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            //执行业务
            int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
            iOrderService.closeOrder(hour);
            //执行完业务后,释放锁
            RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            log.info("=================================");
        }
    
    

    缺陷:

    如果A在setnx成功后,A成功获取锁了,也就是锁已经存到Redis里面了,此时服务器异常关闭或是重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁。

    解决方法:

    关闭Tomcat有两种方式,一种通过温柔的执行shutdown关闭,一种通过kill杀死进程关闭,使用shutdown关闭tomcat,以下的方法会在关闭前执行,即关闭前删除锁

    //通过温柔的执行shutdown关闭时,以下的方法会在关闭前执行,即可以释放锁,而对于通过kill杀死
    //进程关闭时,以下方法不会执行,即不会释放锁
    //这种方式释放锁的缺点在于,如果关闭的锁过多,将造成关闭服务器耗时过长
      @PreDestroy
      public void delLock() {
          RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
      }
    
    

    redis 分布式锁原理2(优化版)

    为了解决原理1中会出现的死锁问题,提出原理2双重防死锁,可以更好解决死锁问题。
    过程分析:

    1. 当A通过setnx(lockkey,currenttime+timeout)命令能成功设置lockkey时,即返回值为1,过程与原理1一致;
    2. 当A通过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;
    3. A通过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA
    4. lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
    5. 步骤4返回结果为true后,通过getSet设置新的超时时间,并返回旧值lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
    6. lockvalueB == null || lockvalueA==lockvalueB,判断:若果lockvalueB为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的lockvalueB一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。

    原理图如下:
    在这里插入图片描述

    代码实现:

    public void redis2() {
            log.info("关闭订单定时任务启动");
            long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
            Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
            if (setnxResult != null && setnxResult.intValue() == 1) {
                //如果返回值为1,代表设置成功,获取锁
                closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            } else {
                //未获取到锁,继续判断,判断时间戳,看是否可以重置并获取到锁
                String lockValueStr = RedisShardedPoolUtil.get(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                //通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,不能获取该锁
                if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {
                    //通过getSet设置新的超时时间,并返回旧值,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
                    String getSetResult = RedisShardedPoolUtil.getSet(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
                    //再次用当前时间戳getset。
                    //返回给定的key的旧值,与旧值判断,是否可以获取锁
                    //当key没有旧值时,即key不存在时,返回nil ->获取锁
                    //这里我们set了一个新的value值,获取旧的值。
                    //若果getSetResult为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的getSetResult一致说明中间未被其他进程获取该锁,可以获取锁
                    if (getSetResult == null || (getSetResult != null && StringUtils.equals(lockValueStr, getSetResult))) {
                        //真正获取到锁
                        closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                    } else {
                        log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                    }
                } else {
                    log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                }
            }
            log.info("关闭订单定时任务结束");
        }
        private void closeOrder(String lockName) {
            //对锁设置有效期
            RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
            log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            //执行业务
            int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
            iOrderService.closeOrder(hour);
            //执行完业务后,释放锁
            RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            log.info("=================================");
        }
    
    
    • 优化点:

    加入了超时时间判断锁是否超时了,及时A在成功设置了锁之后,服务器就立即出现宕机或是重启,也不会出现死锁问题;因为B在尝试获取锁的时候,如果不能setnx成功,会去获取redis中锁的超时时间与当前的系统时间做比较,如果当前的系统时间已经大于锁超时时间,说明A已经对锁的使用权失效,B能继续判断能否获取锁,解决了redis分布式锁的死锁问题。

    Redisson实现Redis分布式锁的底层原理

    在这里插入图片描述

    (1)加锁机制

    咱们来看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。

    这里注意,仅仅只是选择一台机器!这点很关键!

    紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:
    在这里插入图片描述
    为啥要用lua脚本呢?

    因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。(lua脚本的原子性见文章最后补充内容)

    那么,这段lua脚本是什么意思呢?

    KEYS[1]代表的是你加锁的那个key,比如说:
    
    RLock lock = redisson.getLock("myLock");
    

    这里你自己设置了加锁的那个锁key就是“myLock”。

    **ARGV[1]**代表的就是锁key的默认生存时间,默认30秒。
    
    **ARGV[2]**代表的是加锁的客户端的ID,类似于下面这样:
    
    8743c9c0-0795-4907-87fd-6c719a6b4586:1
    

    给大家解释一下,第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

    如何加锁呢?很简单,用下面的命令:

    hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    

    通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
    在这里插入图片描述
    上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。

    接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

    好了,到此为止,ok,加锁完成了。

    (2)锁互斥机制

    那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

    很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

    接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

    所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的**剩余生存时间。**比如还剩15000毫秒的生存时间。

    此时客户端2会进入一个while循环,不停的尝试加锁。

    (3)watch dog自动延期机制

    客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

    简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

    (4)可重入加锁机制

    那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

    比如下面这种代码:
    在这里插入图片描述
    这时我们来分析一下上面那段lua脚本。

    第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

    第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

    此时就会执行可重入加锁的逻辑,他会用:

    incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
    

    通过这个命令,对客户端1的加锁次数,累加1。

    此时myLock数据结构变为下面这样:
    在这里插入图片描述
    大家看到了吧,那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数

    (5)释放锁机制

    如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

    其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。

    如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

    “del myLock”命令,从redis里删除这个key。

    然后呢,另外的客户端2就可以尝试完成加锁了。

    这就是所谓的分布式锁的开源Redisson框架的实现机制。

    一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

    Redis分布式锁缺点

    其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。

    但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。

    接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。

    此时就会导致多个客户端对一个分布式锁完成了加锁。

    这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。

    所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:

    在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。

    补充-lua脚本的原子性:

    Redis使用同一个Lua解释器来执行所有命令,同时,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。

    然而这也意味着,执行一个较慢的lua脚本是不建议的,由于脚本的开销非常低,构造一个快速执行的脚本并非难事。但是你要注意到,当你正在执行一个比较慢的脚本时,所以其他的客户端都无法执行命令。

    展开全文
  • Redisson - Redis Java clientwith features of an in-memory data grid Quick start | Documentation | Javadocs | Changelog | Code examples | FAQs | Report an issue Based on high-performance async and ...
  • ReentrantLock 重入在说 Redisson 之前我们先来说一下 JDK 可重入: ReentrantLockReentrantLock 保证了 JVM 共享资源同一时刻只允...

    ReentrantLock 重入锁

    在说 Redisson 之前我们先来说一下 JDK 可重入锁: ReentrantLock

    ReentrantLock 保证了 JVM 共享资源同一时刻只允许单个线程进行操作

    实现思路

    ReentrantLock 内部公平锁与非公平锁继承了 AQS[AbstractQueuedSynchronizer]

    1、AQS 内部通过 volatil 修饰的 int 类型变量 state 控制并发情况下线程安全问题及锁重入

    2、将未竞争到锁的线程放入 AQS 的队列中通过 LockSupport#park、unPark 挂起唤醒

    简要描述哈, 详情可以查看具体的文章

    Redisson

    可以直接查看 Github Redisson官网 介绍, 没有了解过的小伙伴, 看一下 Redisson 的 WIKI 目录, 仔细瞅瞅 Redis 是如何被 Redisson 武装到牙齿的


    上下滚动查看更多

    这里先过一下和文章有关的一部分内容

    通过项目简介可以看出来, 写这个项目介绍的人水平非常哇塞哈, 从第一段咱们就知道了两个问题

    Redisson 是什么

    Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格框架, 充分利用 Redis 键值数据库提供的一系列优势, 基于 Java 实用工具包中常用接口, 为使用者提供了 一系列具有分布式特性的常用工具类

    Redisson 的优势

    使得原本作为协调单机多线程并发程序的工具包 获得了协调分布式多机多线程并发系统的能力, 大大降低了设计和研发大规模分布式系统的难度

    同时结合各富特色的分布式服务, 更进一步 简化了分布式环境中程序相互之间的协作

    了解到这里就差不多了, 就不向下扩展了, 想要了解详细用途的, 翻一下上面的目录

    Redisson 重入锁

    由于 Redisson 太过于复杂, 设计的 API 调用大多用 Netty 相关, 所以这里只对 如何加锁、如何实现重入锁进行分析以及如何锁续时进行分析

    创建锁

    我这里是将 Redisson 的源码下载到本地了

    下面这个简单的程序, 就是使用 Redisson 创建了一个非公平的可重入锁

    lock() 方法加锁成功 默认过期时间 30 秒, 并且支持 "看门狗" 续时功能

    public static void main(String[] args) {
        Config config = new Config();
        config.useSingleServer()
                .setPassword("123456")
                .setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
    
        RLock lock = redisson.getLock("myLock");
    
        try {
            lock.lock();
            // 业务逻辑
        } finally {
            lock.unlock();
        }
    }
    

    我们先来看一下 RLock 接口的声明

    public interface RLock extends Lock, RLockAsync {}
    

    RLock 继承了 JDK 源码 JUC 包下的 Lock 接口, 同时也继承了 RLockAsync

    RLockAsync 从字面意思看是 支持异步的锁, 证明获取锁时可以异步获取

    看了 Redisson 的源码会知道, 注释比黄金贵 ????️

    由于获取锁的 API 较多, 我们这里以 lock() 做源码讲解, 看接口定义相当简单

    /**
     * lock 并没有指定锁过期时间, 默认 30 秒
     * 如果获取到锁, 会对锁进行续时
     */
    void lock();
    

    获取锁实例

    根据上面的小 Demo, 看下第一步获取锁是如何做的

    RLock lock = redisson.getLock("myLock");
    
    // name 就是锁名称
    public RLock getLock(String name) {
       // 默认创建的同步执行器, (存在异步执行器, 因为锁的获取和释放是有强一致性要求, 默认同步)
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
    

    Redisson 中所有 Redis 命令都是通过 ...Executor 执行的

    获取到默认的同步执行器后, 就要初始化 RedissonLock

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
       // 唯一ID
        this.id = commandExecutor.getConnectionManager().getId();
       // 等待获取锁时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
       // ID + 锁名称
        this.entryName = id + ":" + name;
       // 发布订阅, 后面关于加、解锁流程会用到
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    

    尝试获取锁

    我们来看一下 RLock#lock()  底层是如何获取锁的

    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    

    leaseTime: 加锁到期时间, -1 使用默认值 30 秒

    unit: 时间单位, 毫秒、秒、分钟、小时...

    interruptibly: 是否可被中断标示

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 获取当前线程ID
        long threadId = Thread.currentThread().getId();
        // ???? 尝试获取锁, 下面重点分析
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // 成功获取锁, 过期时间为空
        if (ttl == null) {
            return;
        }
    
        // 订阅分布式锁, 解锁时进行通知
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }
    
        try {
            while (true) {
                // 再次尝试获取锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // 成功获取锁, 过期时间为空, 成功返回
                if (ttl == null) {
                    break;
                }
    
                // 锁过期时间如果大于零, 则进行带过期时间的阻塞获取
                if (ttl >= 0) {
                    try {
                        // 获取不到锁会在这里进行阻塞, Semaphore, 解锁时释放信号量通知
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                    // 锁过期时间小于零, 则死等, 区分可中断及不可中断
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 取消订阅
            unsubscribe(future, threadId);
        }
    }
    

    这一段代码是用来执行加锁, 继续看下方法实现

    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    

    lock() 以及 tryLock(...) 方法最终都会调用此方法, 分为两个流程分支

    1、tryLock(...) API 异步加锁返回

    2、lock() & tryLock() API 异步加锁并进行锁续时

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        // 执行 tryLock(...) 才会进入
        if (leaseTime != -1) {
            // 进行异步获取锁
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 尝试异步获取锁, 获取锁成功返回空, 否则返回锁剩余过期时间
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // ttlRemainingFuture 执行完成后触发此操作
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
           // ttlRemaining == null 代表获取了锁
            // 获取到锁后执行续时操作
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    

    继续看一下 tryLockInnerAsync(...) 详细的加锁流程, 内部采用的 Lua 脚本形式, 保证了原子性操作

    到这一步大家就很明了了, 将 Lua 脚本被 Redisoon 包装最后通过 Netty 进行传输

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
    
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    evalWriteAsync(...) 是对 Eval 命令的封装以及 Netty 的应用就不继续跟进了

    加锁 Lua

    执行 Redis 加锁的 Lua 脚本, 截个图让大家看一下参数以及具体含义

    KEYS[1]: myLock

    ARGV[1]: 36000... 这个是过期时间, 自己测试的, 单位毫秒

    ARGV[2]: UUID + 线程 ID

    # KEYS[1] 代表上面的 myLock
    # 判断 KEYS[1] 是否存在, 存在返回 1, 不存在返回 0
    if (redis.call('exists', KEYS[1]) == 0) then
      # 当 KEYS[1] == 0 时代表当前没有锁
      # 使用 hincrby 命令发现 KEYS[1] 不存在并新建一个 hash
      # ARGV[2] 就作为 hash 的第一个key, val 为 1
      # 相当于执行了 hincrby myLock 91089b45... 1
     redis.call('hincrby', KEYS[1], ARGV[2], 1);
      # 设置 KEYS[1] 过期时间, 单位毫秒
     redis.call('pexpire', KEYS[1], ARGV[1]);
     return nil;
    end;
    # 查找 KEYS[1] 中 key ARGV[2] 是否存在, 存在回返回 1
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
      # 同上, ARGV[2] 为 key 的 val +1
     redis.call('hincrby', KEYS[1], ARGV[2], 1);
      # 同上
     redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
    # 返回 KEYS[1] 过期时间, 单位毫秒
    return redis.call('pttl', KEYS[1]);
    

    整个 Lua 脚本加锁的流程画图如下:

    现在回过头看一下获取到锁之后, 是如何为锁进行延期操作的

    锁续时

    之前有和军哥聊过这个话题, 他说的思路和 Redisson 中体现的基本一致

    说一下 Redisson 的具体实现思路吧, 中文翻译叫做 "看门狗"

    1、获取到锁之后执行 "看门狗" 流程

    2、使用 Netty 的 Timeout 实现定时延时

    3、比如锁过期 30 秒, 每过 1/3 时间也就是 10 秒会检查锁是否存在, 存在则更新锁的超时时间

    可能会有小伙伴会提出这么一个疑问, 如果检查返回存在, 设置锁过期时刚好锁被释放了怎么办?

    有这样的疑问, 代表确实用心去考虑所有可能发生的情况了, 但是不必担心哈

    Redisson 中使用的 Lua 脚本做的检查及设置过期时间操作, 本身是原子性的不会出现上面情况

    如果不想要引用 Netty 的包, 使用延时队列等包工具也是可以完成 "看门狗"

    这里也贴一哈相关代码, 能够让小伙伴更直观的了解如何锁续时的

    我可真是个暖男, 上代码 RedissonLock#tryAcquireAsync(...)

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        // ...
        // 尝试异步获取锁, 获取锁成功返回空, 否则返回锁剩余过期时间
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // ttlRemainingFuture 执行完成后触发此操作
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
            // 获取到锁后执行续时操作
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    

    可以看到续时方法将 threadId 当作标识符进行续时

    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
    

    知道核心理念就好了, 没必要研究每一行代码哈

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
    
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
    
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
    
                    if (res) {
                        // 调用本身
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
        ee.setTimeout(task);
    }
    

    解锁操作

    解锁时的操作相对加锁还是比较简单的

    @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
    

    解锁成功后会将之前的"看门狗" Timeout 续时取消, 并返回成功

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    
        future.onComplete((opStatus, e) -> {
           // 取消自动续时功能
            cancelExpirationRenewal(threadId);
    
            if (e != null) {
               // 失败
                result.tryFailure(e);
                return;
            }
    
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
        // 解锁成功
            result.trySuccess(null);
        });
    
        return result;
    }
    

    又是一个精髓点, 解锁的 Lua 脚本定义

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    

    还是来张图理解哈, Lua 脚本会详细分析

    解锁 Lua

    老规矩, 图片加参数说明

    KEYS[1]: myLock

    KEYS[2]: redisson_lock_channel:{myLock}

    ARGV[1]: 0

    ARGV[2]: 360000... (过期时间)

    ARGV[3]: 7f0c54e2...(Hash 中的锁 Key)

    # 判断 KEYS[1] 中是否存在 ARGV[3]
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
    end;
    # 将 KEYS[1] 中 ARGV[3] Val - 1
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    # 如果返回大于0 证明是一把重入锁
    if (counter > 0) then
      # 重制过期时间
     redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
    else
      # 删除 KEYS[1]
     redis.call('del', KEYS[1]);
      # 通知阻塞等待线程或进程资源可用
     redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
    end;
    return nil;
    

    Redlock 算法

    不可否认, Redisson 设计的分布式锁真的很 NB, 但是还是没有解决 主从节点下异步同步数据导致锁丢失问题

    所以 Redis 作者 Antirez 推出 红锁算法, 这个算法的精髓就是: 没有从节点, 如果部署多台 Redis, 各实例之间相互独立, 不存在主从复制或者其他集群协调机制

    如何使用

    创建多个 Redisson Node, 由这些无关联的 Node 组成一个完整的分布式锁

    public static void main(String[] args) {
        String lockKey = "myLock";
        Config config = new Config();
        config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6379");
        Config config2 = new Config();
        config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6380");
        Config config3 = new Config();
        config.useSingleServer().setPassword("123456").setAddress("redis://127.0.0.1:6381");
    
        RLock lock = Redisson.create(config).getLock(lockKey);
        RLock lock2 = Redisson.create(config2).getLock(lockKey);
        RLock lock3 = Redisson.create(config3).getLock(lockKey);
    
        RedissonRedLock redLock = new RedissonRedLock(lock, lock2, lock3);
    
        try {
            redLock.lock();
        } finally {
            redLock.unlock();
        }
    }
    

    当然, 对于 Redlock 算法不是没有质疑声, 大家可以去 Redis 官网查看Martin Kleppmann 与 Redis 作者Antirez 的辩论

    CAP 原则之间的取舍

    CAP 原则又称 CAP 定理, 指的是在一个分布式系统中,  Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性), 三者不可得兼

    一致性(C) : 在分布式系统中的所有数据备份, 在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)

    可用性(A): 在集群中一部分节点故障后, 集群整体是否还能响应客户端的读写请求(对数据更新具备高可用性)

    分区容忍性(P): 以实际效果而言, 分区相当于对通信的时限要求. 系统如果不能在时限内达成数据一致性, 就意味着发生了分区的情况, 必须就当前操作在 C 和 A 之间做出选择

    分布式锁选型

    如果要满足上述分布式锁之间的强一致性, 可以采用 Zookeeper 的分布式锁, 因为它底层的 ZAB协议(原子广播协议), 天然满足 CP

    但是这也意味着性能的下降, 所以不站在具体数据下看 Redis 和 Zookeeper, 代表着性能和一致性的取舍

    如果项目没有强依赖 ZK, 使用 Redis 就好了, 因为现在 Redis 用途很广, 大部分项目中都引用了 Redis

    没必要对此再引入一个新的组件, 如果业务场景对于 Redis 异步方式的同步数据造成锁丢失无法忍受, 在业务层处理就好了

    
    往期推荐
    

    Redis为什么变慢了?一文详解Redis性能问题 | 万字长文


    Redis 消息队列的三种方案(List、Streams、Pub/Sub)


    硬核Redis总结,看这篇就够了!


    展开全文
  • Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,这篇文章主要介绍了Java使用Redisson分布式锁实现原理,非常具有实用价值,需要的朋友可以参考下
  • 在这里插入图片描述ReentrantLock 重入在说 Redisson 之前我们先来说一下 JDK 可重入: ReentrantLockReentrantLock 保证了 JVM 共享资源同一时刻只允许单个线程进行操作实现思路ReentrantLock 内部公平与非...
  • } // 订阅分布式锁, 解锁时进行通知 RFuture future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription...
  • 在说 Redisson 之前我们先来说一下 JDK 可重入: ReentrantLock ReentrantLock 保证了 JVM 共享资源同一时刻只允许单个线程进行操作 实现思路 ReentrantLock 内部公平与非公平继承了 AQS...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 443
精华内容 177
关键字:

redisson分布式锁原理

redis 订阅