精华内容
下载资源
问答
  • 分布式锁

    千次阅读 2021-04-16 18:13:03
    现在面试都会聊聊分布式系统,通常面试官都会从服务框架(Spring Cloud、Dubbo),一路聊到分布式事务、分布式锁、ZooKeeper等知识。今天就来聊聊分布式锁这块的知识,先具体的来看看Redis分布式锁的实现原理。 ...

    现在面试都会聊聊分布式系统,通常面试官都会从服务框架(Spring Cloud、Dubbo),一路聊到分布式事务、分布式锁、ZooKeeper等知识。今天就来聊聊分布式锁这块的知识,先具体的来看看Redis分布式锁的实现原理。

    如果在公司里落地生产环境用分布式锁的时候,一定是会用开源类库的,比如Redis分布式锁,一般就是用Redisson框架就好了,非常的简便易用。感兴趣可以去Redisson官网看看如何在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁。

    一段简单的使用代码片段,先直观的感受一下:

    是不是感觉简单的不行!此外,还支持Redis单实例、Redis哨兵、Redis Cluster、redis master-slave等各种部署架构,都可以完美实现。

    一、Redisson实现Redis分布式锁的底层原理

    现在通过一张手绘图,说说Redisson这个开源框架对Redis分布式锁的实现原理。

    1、加锁机制

    看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个Redis Cluster集群,他首先会根据Hash节点选择一台机器。

    注:仅仅只是选择一台机器!然后发送一段Lua脚本到Redis上,那段Lua脚本如下所示:

    为啥要用Lua脚本呢?因为一大坨复杂的业务逻辑,可以通过封装在Lua脚本中发送给Redis,保证这段复杂业务逻辑执行的原子性。

    那么,这段Lua脚本是什么意思呢?这里KEYS[1]代表的是你加锁的那个Key,比如说:RLock lock = redisson.getLock("myLock");这里你自己设置了加锁的那个锁Key就是“myLock”。

    • ARGV[1]代表的就是锁Key的默认生存时间,默认30秒。
    • ARGV[2]代表的是加锁的客户端的ID,类似于下面这样的:8743c9c0-0795-4907-87fd-6c719a6b4586:1。

    第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁Key不存在的话,你就进行加锁。如何加锁呢?很简单,用下面的命令:hset myLock。

    8743c9c0-0795-4907-87fd-6c719a6b4586:11,通过这个命令设置一个Hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

    上述内容就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端,已经对“myLock”这个锁Key完成了加锁。

    接着会执行“pexpiremyLock 30000”命令,设置myLock这个锁Key的生存时间是30秒,加锁完成。

    2、锁互斥机制

    这个时候,如果客户端2来尝试加锁,执行了同样的一段Lua脚本,会怎样?

    第一个if判断会执行“exists myLock”,发现myLock这个锁Key已经存在了。

    第二个if判断,判断myLock锁Key的Hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

    所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁Key的剩余生存时间。比如还剩15000毫秒的生存时间。此时客户端2会进入一个while循环,不停的尝试加锁。

    3、watch dog自动延期机制

    客户端1加锁的锁Key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

    只要客户端1加锁成功,就会启动一个watchdog看门狗,这个后台线程,会每隔10秒检查一下,如果客户端1还持有锁Key,就会不断的延长锁Key的生存时间。

    4、可重入加锁机制

    那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?如下代码:

    分析一下上面那段Lua脚本。第一个if判断肯定不成立,“exists myLock”会显示锁Key已经存在了。

    第二个if判断会成立,因为myLock的Hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”。

    此时就会执行可重入加锁的逻辑,incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:11,通过这个命令,对客户端1的加锁次数,累加1。

    此时myLock数据结构变为下面这样:

    myLock的Hash数据结构中的那个客户端ID,就对应着加锁的次数。

    5、释放锁机制

    如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。就是每次都对myLock数据结构中的那个加锁次数减1。

    如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从Redis里删除这个Key。

    而另外的客户端2就可以尝试完成加锁了。这就是所谓的分布式锁的开源Redisson框架的实现机制。

    一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于Redis进行分布式锁的加锁与释放锁。

    6、上述Redis分布式锁的缺点

    上面那种方案最大的问题,就是如果你对某个Redis Master实例,写入了myLock这种锁Key的Value,此时会异步复制给对应的Master Slave实例。

    但是这个过程中一旦发生Redis Master宕机,主备切换,Redis Slave变为了Redis Master。

    会导致客户端2尝试加锁时,在新的Redis Master上完成加锁,客户端1也以为自己成功加锁。

    此时就会导致多个客户端对一个分布式锁完成了加锁。这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。

    所以这个就是Redis Cluster,或者是redis master-slave架构的主从异步复制导致的Redis分布式锁的最大缺陷:在Redis Master实例宕机的时候,可能导致多个客户端同时完成加锁。

    二、七张图彻底讲清楚ZooKeeper分布式锁的实现原理

    下面再聊一下ZooKeeper实现分布式锁的原理。同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称ZK)分布式锁的实现。

    一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事的方式。

    ZooKeeper分布式锁机制

    看看多客户端获取及释放ZK分布式锁的整个流程及背后的原理。首先看看下图,如果现在有两个客户端一起要争抢ZK上的一把分布式锁,会是个什么场景?

    如果大家对ZK还不太了解的话,建议先自行百度一下,简单了解点基本概念,比如ZK有哪些节点类型等等。

    参见上图。ZK里有一把锁,这个锁就是ZK上的一个节点。两个客户端都要来获取这个锁,具体是怎么来获取呢?

    假设客户端A抢先一步,对ZK发起了加分布式锁的请求,这个加锁请求是用到了ZK中的一个特殊的概念,叫做“临时顺序节点”。简单来说,就是直接在"my_lock"这个锁节点下,创建一个顺序节点,这个顺序节点有ZK内部自行维护的一个节点序号。

    • 比如第一个客户端来搞一个顺序节点,ZK内部会给起个名字叫做:xxx-000001。
    • 然后第二个客户端来搞一个顺序节点,ZK可能会起个名字叫做:xxx-000002。
    • 注意,最后一个数字都是依次递增的,从1开始逐次递增。ZK会维护这个顺序。

    所以这个时候,假如说客户端A先发起请求,就会搞出来一个顺序节点,大家看下图,Curator框架大概会弄成如下的样子:

    客户端A发起一个加锁请求,先在要加锁的node下搞一个临时顺序节点,这列长名字都是Curator框架自己生成出来的。

    然后,那个最后一个数字是"1"。因为客户端A是第一个发起请求的,所以给他搞出来的顺序节点的序号是"1"。

    接着客户端A创建完一个顺序节点。还没完,他会查一下"my_lock"这个锁节点下的所有子节点,并且这些子节点是按照序号排序的,这个时候他大概会拿到这么一个集合:

    接着客户端A会走一个关键性的判断:这个集合里创建的顺序节点,是否排在首位?

    如果是的话,就可以加锁,因为明明我就是第一个来创建顺序节点的人,所以我就是第一个尝试加分布式锁的人啊!

    加锁成功!看下图,再来直观的感受一下整个过程:

    接着假如说,客户端A都加完锁了,客户端B过来想要加锁了,这个时候他会干一样的事儿:先是在"my_lock"这个锁节点下创建一个临时顺序节点,此时名字会变成类似于:

    下图:

    客户端B因为是第二个来创建顺序节点的,所以ZK内部会维护序号为"2"。

    接着客户端B会走加锁判断逻辑,查询"my_lock"锁节点下的所有子节点,按序号顺序排列,此时他看到的类似于:

    同时检查自己创建的顺序节点,是不是集合中的第一个?明显不是啊,此时第一个是客户端A创建的那个顺序节点,序号为"01"的那个。所以加锁失败!

    加锁失败了以后,客户端B就会通过ZK的API对他的顺序节点的上一个顺序节点加一个监听器。ZK天然就可以实现对某个节点的监听。

    如果大家还不知道ZK的基本用法,可以百度查阅,非常的简单。客户端B的顺序节点是:

    他的上一个顺序节点,不就是下面这个吗?

    即客户端A创建的那个顺序节点!所以,客户端B会对:

    这个节点加一个监听器,监听这个节点是否被删除等变化!大家看下图:

    接着,客户端A加锁之后,可能处理了一些代码逻辑,然后就会释放锁。那么,释放锁是个什么过程呢?

    其实就是把自己在ZK里创建的那个顺序节点,也就是:

    这个节点删除。删除了那个节点之后,ZK会负责通知监听这个节点的监听器,也就是客户端B之前加的那个监听器,说:你监听的那个节点被删除了,有人释放了锁。

    此时客户端B的监听器感知到了上一个顺序节点被删除,也就是排在他之前的某个客户端释放了锁。

    此时,就会通知客户端B重新尝试去获取锁,也就是获取"my_lock"节点下的子节点集合,此时为:

    集合里此时只有客户端B创建的唯一的一个顺序节点了!然后呢,客户端B判断自己居然是集合中的第一个顺序节点,Bingo!可以加锁了!直接完成加锁,运行后续的业务代码即可,运行完了之后再次释放锁。

    其实如果有客户端C、客户端D等N个客户端争抢一个ZK分布式锁,原理都是类似的:

    • 大家都是上来直接创建一个锁节点下的一个接一个的临时顺序节点。
    • 如果自己不是第一个节点,就对自己上一个节点加监听器。
    • 只要上一个节点释放锁,自己就排到前面去了,相当于是一个排队机制。
    • 而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,ZK感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。

    最后,咱们来看下用Curator框架进行加锁和释放锁的一个过程:

    其实用开源框架就是方便。这个Curator框架的ZK分布式锁的加锁和释放锁的实现原理,就是上面我们说的那样子。

    但是如果你要手动实现一套那个代码的话,要考虑到各种细节,异常处理等等。所以大家如果考虑用ZK分布式锁,可以参考下本文的思路。

    三、每秒上千订单场景下的分布式锁高并发优化实践

    接着聊一个有意思的话题:每秒上千订单场景下,如何对分布式锁的并发能力进行优化?

    首先,我们一起来看看这个问题的背景。前段时间有个朋友在外面面试,然后有一天找我聊说:有一个国内不错的电商公司,面试官给他出了一个场景题:

    假如下单时,用分布式锁来防止库存超卖,但是是每秒上千订单的高并发场景,如何对分布式锁进行高并发优化来应对这个场景?

    他说他当时没答上来,因为没做过没什么思路。其实我当时听到这个面试题心里也觉得有点意思,因为如果是我来面试候选人的话,给的范围会更大一些。比如,让面试的同学聊一聊电商高并发秒杀场景下的库存超卖解决方案,各种方案的优缺点以及实践,进而聊到分布式锁这个话题。

    因为库存超卖问题是有很多种技术解决方案的,比如悲观锁,分布式锁,乐观锁,队列串行化,Redis原子操作,等等吧。但是既然那个面试官兄弟限定死了用分布式锁来解决库存超卖,我估计就是想问一个点:在高并发场景下如何优化分布式锁的并发性能。

    面试官提问的角度还是可以接受的,因为在实际落地生产的时候,分布式锁这个东西保证了数据的准确性,但是他天然并发能力有点弱。

    刚好我之前在自己项目的其他场景下,确实是做过高并发场景下的分布式锁优化方案,因此正好是借着这个朋友的面试题,把分布式锁的高并发优化思路,给大家来聊一聊。

    1、库存超卖现象是怎么产生的?

    先来看看如果不用分布式锁,所谓的电商库存超卖是啥意思?大家看下图:

    这个图其实很清晰了,假设订单系统部署在两台机器上,不同的用户都要同时买10台iPhone,分别发了一个请求给订单系统。

    接着每个订单系统实例都去数据库里查了一下,当前iPhone库存是12台,大于了要买的10台数量。

    于是每个订单系统实例都发送SQL到数据库里下单,然后扣减了10个库存,其中一个将库存从12台扣减为2台,另外一个将库存从2台扣减为-8台。

    现在库存出现了负数!没有20台iPhone发给两个用户啊!怎么办?

    2、用分布式锁如何解决库存超卖问题?

    我们用分布式锁如何解决库存超卖问题呢?回忆一下上次我们说的那个分布式锁的实现原理:

    同一个锁Key,同一时间只能有一个客户端拿到锁,其他客户端会陷入无限的等待来尝试获取那个锁,只有获取到锁的客户端才能执行下面的业务逻辑。

    代码如上图,分析一下为什么这样可以避免库存超卖?

    大家可以顺着上面的那个步骤序号看一遍,马上就明白了。

    从上图可以看到,只有一个订单系统实例可以成功加分布式锁,然后只有他一个实例可以查库存、判断库存是否充足、下单扣减库存,接着释放锁。释放锁之后,另外一个订单系统实例才能加锁,接着查库存,一下发现库存只有2台了,库存不足,无法购买,下单失败。不会将库存扣减为-8的。

    3、有没其他方案解决库存超卖问题?

    当然有!比如悲观锁,分布式锁,乐观锁,队列串行化,异步队列分散,Redis原子操作,等等,很多方案,我们对库存超卖有自己的一整套优化机制。但是前面说过,这篇文章就聊一个分布式锁的并发优化,不是聊库存超卖的解决方案,所以库存超卖只是一个业务场景而已。

    4、分布式锁的方案在高并发场景下

    现在我们来看看,分布式锁的方案在高并发场景下有什么问题?分布式锁一旦加了之后,对同一个商品的下单请求,会导致所有客户端都必须对同一个商品的库存锁Key进行加锁。

    比如,对iPhone这个商品的下单,都必对“iphone_stock”这个锁Key来加锁。这样会导致对同一个商品的下单请求,就必须串行化,一个接一个的处理。大家再回去对照上面的图反复看一下,应该能想明白这个问题。

    假设加锁之后,释放锁之前,查库存→创建订单→扣减库存,这个过程性能很高吧,算他全过程20毫秒,这应该不错了。那么1秒是1000毫秒,只能容纳50个对这个商品的请求依次串行完成处理。如一秒钟50个请求,都是对iPhone下单的,那么每个请求处理20毫秒,逐个来,最后1000毫秒正好处理完50个请求。

    大家看下图,加深印象。

    所以看到这里,大家起码也明白了,简单的使用分布式锁来处理库存超卖问题,存在什么缺陷。

    同一商品多用户同时下单时,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单的请求。这种方案应对那种低并发、无秒杀场景的普通小电商系统,可能还可以接受。

    因为如果并发量很低,每秒就不到10个请求,没有瞬时高并发秒杀单个商品的场景的话,其实也很少会对同一个商品在1秒内瞬间下1000个订单,因为小电商系统没那场景。

    5、如何对分布式锁进行高并发优化?

    那么现在怎么办呢?面试官说,我现在就卡死,库存超卖就是用分布式锁来解决,而且一秒对一个iPhone下上千订单,怎么优化?

    现在按照刚才的计算,你1秒钟只能处理针对iPhone的50个订单。其实说出来也很简单,相信很多人看过Java里的Concurrent Hash Map的源码和底层原理,应该知道里面的核心思路,就是分段加锁!

    把数据分成很多个段,每个段是一个单独的锁,所以多个线程过来并发修改数据的时候,可以并发的修改不同段的数据。不至于说,同一时间只能有一个线程独占修改Concurrent Hash Map中的数据。

    另外,Java8中新增了一个Long Adder类,也是针对Java7以前的Atomic Long进行的优化,解决的是CAS类操作在高并发场景下,使用乐观锁思路,会导致大量线程长时间重复循环。Long Adder中也采用了类似的分段CAS操作,失败则自动迁移到下一个分段进行CAS的思路。

    其实分布式锁的优化思路也是类似的,之前我们是在另外一个业务场景下落地了这个方案到生产中,不是在库存超卖问题里用的。但是库存超卖这个业务场景不错,很容易理解,所以我们就用这个场景来说一下。

    大家看下图:

    这就是分段加锁。假如现在iPhone有1000个库存,完全可以给拆成20个库存段。

    要是你愿意,可以在数据库的表里建20个库存字段,比如stock_01,stock_02,类似这样的,也可以在Redis之类的地方放20个库存Key。

    总之,就是把你的1000件库存给他拆开,每个库存段是50件库存,比如stock_01对应50件库存,stock_02对应50件库存。

    接着,每秒1000个请求过来了!此时可以自己写一个简单的随机算法,每个请求都是随机在20个分段库存里,选择一个进行加锁。

    这样同时可以有最多20个下单请求一起执行,每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存→判断库存是否充足→扣减库存。

    这相当于一个20毫秒,可以并发处理掉20个下单请求,那么1秒,也就可以依次处理掉20*50=1000个对iPhone的下单请求了。

    一旦对某个数据做了分段处理之后,有一个坑大家一定要注意:就是如果某个下单请求,咔嚓加锁,然后发现这个分段库存里的库存不足了。这时你得自动释放锁,然后立马换下一个分段库存,再次尝试加锁后尝试处理。这个过程一定要实现。

    6、分布式锁并发优化方案有什么不足?

    最大的不足是很不方便,实现太复杂:

    • 首先,你得对一个数据分段存储,一个库存字段本来好好的,现在要分为20个库存字段。
    • 其次,你在每次处理库存的时候,还得自己写随机算法,随机挑选一个分段来处理。
    • 最后,如果某个分段中的数据不足了,你还得自动切换到下一个分段数据去处理。

    这个过程都是要手动写代码实现的,还是有点工作量。不过我们确实在一些业务场景里,因为用到了分布式锁,然后又必须要进行锁并发的优化,又进一步用到了分段加锁的技术方案,效果当然是很好的了,一下子并发性能可以增长几十倍。

    该优化方案的后续改进:以我们本文所说的库存超卖场景为例,你要是这么玩,会把自己搞的很痛苦!再次强调,我们这里的库存超卖场景,仅仅只是作为演示场景而已。

    展开全文
  • 分布式锁】三种分布式锁的实现【原创】

    万次阅读 多人点赞 2021-03-02 22:55:58
    三种分布式锁的实现,Redis分布式锁,数据库锁,Zookeeper分布式锁,主要介绍的是Redis分布式锁


    0x00 概述

    随着互联网技术的不断发展,用户量的不断增加,越来越多的业务场景需要用到分布式系统。


    分布式系统有一个著名的理论CAP,指在一个分布式系统中,最多只能同时满足下面三项中的两项

    • 一致性(Consistency):在分布式系统中的所有数据备份,在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)
    • 可用性(Availability):保证每个请求不管成功或者失败都有响应
    • 分区容错性(Partition tolerance):系统中任意信息的丢失或失败不会影响系统的继续运作

    所以在设计系统时,往往需要权衡,在CAP中作选择,要么AP,要么CP、要么AC。

    当然,这个理论也并不一定完美,不同系统对CAP的要求级别不一样,选择需要考虑方方面面。

    而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,这个时候就需要使用分布式锁。


    分布式锁:

    当在分布式模型下,数据只有一份(或有限制),此时需要利用锁技术来控制某一时刻修改数据的进程数。这种锁即为分布式锁。


    为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!


    分布式锁应该具备哪些条件:

    • 互斥性:在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
    • 高可用的获取锁与释放锁;
    • 高性能的获取锁与释放锁;
    • 可重入性:具备可重入特性,具备锁失效机制,防止死锁,即就算一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功
    • 非阻塞:具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

    分布式锁的业务场景:

    • 互联网秒杀(商品库存)
    • 抢优惠券

    0x02 实现方式

    分布式锁主要有几种实现方式:

    • 基于数据库实现
    • 基于Zookeeper实现
    • 基于Redis实现
    • 其他
      • Chubby:谷歌公司实现的粗粒度分布式锁服务,底层使用了Paxos一致性算法
      • Tair:淘宝的分布式Key/Value存储系统,主要是使用Tair的put()方法,原理和Redis类似
      • Memcached:利用Memcached的add命令,此命令是原子性操作,只有在key不存在的情况下才能add成功,也就意味着加锁成功

    如图:

    image-20210130195451985


    0x03 分布式锁:基于数据库

    1. 实现思想

    主要有两种方式:

    • 悲观锁
    • 乐观锁

    A. 悲观锁(排他锁)

    利用select … where xx=yy for update排他锁

    注意:这里需要注意的是where xx=yy,xx字段必须要走索引,否则会锁表。有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题。


    核心思想:以「悲观的心态」操作资源,无法获得锁成功,就一直阻塞着等待。

    注意:该方式有很多缺陷,一般不建议使用


    实现:

    创建一张资源锁表:

    CREATE TABLE `resource_lock` (
      `id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的资源名',
      `owner` varchar(64) NOT NULL DEFAULT '' COMMENT '锁拥有者',
      `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的资源';
    

    注意:resource_name 锁资源名称必须有唯一索引


    使用事务查询更新:

    @Transaction
    public void lock(String name) {
       ResourceLock rlock = exeSql("select * from resource_lock where resource_name = name for update");
         if (rlock == null) {
               exeSql("insert into resource_lock(reosurce_name,owner,count) values (name, 'ip',0)");
         }
    }
    

    使用 for update 锁定的资源。如果执行成功,会立即返回,执行插入数据库,后续再执行一些其他业务逻辑,直到事务提交,执行结束;如果执行失败,就会一直阻塞着。

    可以在数据库客户端工具上测试出来这个效果,当在一个终端执行了 for update,不提交事务。在另外的终端上执行相同条件的 for update,会一直卡着

    虽然也能实现分布式锁的效果,但是会存在性能瓶颈。


    优点:

    简单易用,好理解,保障数据强一致性。


    缺点

    1)在 RR 事务级别,select 的 for update 操作是基于间隙锁(gap lock) 实现的,是一种悲观锁的实现方式,所以存在阻塞问题

    2)高并发情况下,大量请求进来,会导致大部分请求进行排队,影响数据库稳定性,也会耗费服务的CPU等资源

    当获得锁的客户端等待时间过长时,会提示:

    [40001][1205] Lock wait timeout exceeded; try restarting transaction
    

    高并发情况下,也会造成占用过多的应用线程,导致业务无法正常响应。

    3)如果优先获得锁的线程因为某些原因,一直没有释放掉锁,可能会导致死锁的发生。

    4)锁的长时间不释放,会一直占用数据库连接,可能会将数据库连接池撑爆,影响其他服务。

    5)MySql数据库会做查询优化,即便使用了索引,优化时发现全表扫效率更高,则可能会将行锁升级为表锁,此时可能就更悲剧了。

    6)不支持可重入特性,并且超时等待时间是全局的,不能随便改动。


    B. 乐观锁

    所谓乐观锁与悲观锁最大区别在于基于CAS思想,表中添加一个时间戳或者是版本号的字段来实现,update xx set version=new_version where xx=yy and version=Old_version,通过增加递增的版本号字段实现乐观锁。

    不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。

    抢购、秒杀就是用了这种实现以防止超卖。

    如下图:

    1a588caeb5c544f3b55d54c4d0791750


    实现:

    创建一张资源锁表:

    CREATE TABLE `resource` (
      `id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '资源名',
      `share` varchar(64) NOT NULL DEFAULT '' COMMENT '状态',
      `version` int(4) NOT NULL DEFAULT '' COMMENT '版本号',
      `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='资源';
    

    为表添加一个字段,版本号或者时间戳都可以。通过版本号或者时间戳,来保证多线程同时间操作共享资源的有序性和正确性。


    伪代码实现:

    Resrouce resource = exeSql("select * from resource where resource_name = xxx");
    boolean succ = exeSql("update resource set version= 'newVersion' ... where resource_name = xxx and version = 'oldVersion'");
    
    if (!succ) {
        // 发起重试
    }
    

    实际代码中可以写个while循环不断重试,版本号不一致,更新失败,重新获取新的版本号,直到更新成功。


    2. 优缺点

    优点:

    • 实现简单,复杂度低
    • 保障数据一致性

    缺点:

    • 性能低,并且有锁表的风险
    • 可靠性差
    • 非阻塞操作失败后,需要轮询,占用CPU资源
    • 长时间不commit或者是长时间轮询,可能会占用较多的连接资源

    0x04 分布式锁:基于Zookeeper

    1. 实现思想

    ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。


    基于ZooKeeper实现分布式锁的步骤如下:

    1. 创建一个目录mylock;

    2. 线程A想获取锁就在mylock目录下创建临时顺序节点;

    3. 获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;

    4. 线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;

    5. 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。


    整个过程如图:

    image-20201220221737418

    业界推荐直接使用Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

    使用方式很简单:

    InterProcessMutex interProcessMutex = new InterProcessMutex(client,"/anyLock"); 
    interProcessMutex.acquire(); 
    interProcessMutex.release(); 
    

    其实现分布式锁的核心源码如下:

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception 
    { 
        boolean  haveTheLock = false; 
        boolean  doDelete = false; 
        try { 
            if ( revocable.get() != null ) { 
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath); 
            } 
     
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { 
                // 获取当前所有节点排序后的集合 
                List<String>        children = getSortedChildren(); 
                // 获取当前节点的名称 
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 
                // 判断当前节点是否是最小的节点 
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); 
                if ( predicateResults.getsTheLock() ) { 
                    // 获取到锁 
                    haveTheLock = true; 
                } else { 
                    // 没获取到锁,对当前节点的上一个节点注册一个监听器 
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 
                    synchronized(this){ 
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath); 
                        if ( stat != null ){ 
                            if ( millisToWait != null ){ 
                                millisToWait -= (System.currentTimeMillis() - startMillis); 
                                startMillis = System.currentTimeMillis(); 
                                if ( millisToWait <= 0 ){ 
                                    doDelete = true;    // timed out - delete our node 
                                    break; 
                                } 
                                wait(millisToWait); 
                            }else{ 
                                wait(); 
                            } 
                        } 
                    } 
                    // else it may have been deleted (i.e. lock released). Try to acquire again 
                } 
            } 
        } 
        catch ( Exception e ) { 
            doDelete = true; 
            throw e; 
        } finally{ 
            if ( doDelete ){ 
                deleteOurPath(ourPath); 
            } 
        } 
     return haveTheLock; 
    } 
    

    其实 Curator 实现分布式锁的底层原理和上面分析的是差不多的。如图详细描述其原理:

    image-20201220221805784

    另外,可基于Zookeeper自身的特性和原生Zookeeper API自行实现分布式锁。


    2. 优缺点

    优点:

    • 可靠性非常高
    • 性能较好
    • CAP模型属于CP,基于ZAB一致性算法实现

    缺点:

    • 性能并不如Redis(主要原因是在写操作,即获取锁释放锁都需要在Leader上执行,然后同步到follower)
    • 实现复杂度高

    0x05 分布式锁:基于Redis

    1. 实现思想

    主要是基于命令:SETNX key value

    命令官方文档:https://redis.io/commands/setnx

    用法可参考:Redis命令参考

    如图:

    image-20201213154237141

    实现思想的具体步骤:

    1. 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
    2. 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
    3. 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

    具体的分布式锁的实现可参考后面内容


    2. 优缺点

    优点:

    • 性能非常高
    • 可靠性较高
    • CAP模型属于AP

    缺点:

    • 复杂度较高
    • 无一致性算法,可靠性并不如Zookeeper
    • 锁删除失败 过期时间不好控制
    • 非阻塞,获取失败后,需要轮询不断尝试获取锁,比较消耗性能,占用cpu资源

    0x06 分布式锁对比

    • 从理解的难易程度角度(从低到高):数据库 > 缓存 > Zookeeper

    • 从实现的复杂性角度(从低到高):Zookeeper >= 缓存 > 数据库

    • 从性能角度(从高到低):缓存 > Zookeeper >= 数据库

    • 从可靠性角度(从高到低):Zookeeper > 缓存 > 数据库


    0x07 Redis分布式锁实现

    下面以减库存接口为例子,访问接口的时候自动减商品的库存

    一、方案一

    @Service
    public class RedisLockDemo {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
    
            //获取redis中的库存
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
    
            return "success";
        }
    }
    

    表示:

    • 先从Redis中读取stock的值,表示商品的库存
    • 判断商品库存是否大于0,如果大于0,则库存减1,然后再保存到Redis里面去,否则就报错

    1. 改进

    方案一这种简单的从Redis读取、判断值再减1保存到Redis的操作,很容易在并发场景下出问题:

    • 商品超卖

    比如:

    假设商品的库存有50个,有3个用户同时访问该接口,先是同时读取Redis中商品的库存值,即都是读取到了50,即同时执行到了这一行:

    int stock = Integer.valueOf(valueOperations.get("stock"));
    

    然后减1,即到了这一行:

    int newStock = stock - 1;
    

    此时3个用户的realStock都是49,然后3个用户都去设置stock为49,那么就会产生库存明明被3个用户抢了,理论上是应该减去3的,结果库存数只减去了1导致商品超卖。


    这种问题的产生原因是因为读取库存、减库存、保存到Redis这几步并不是原子操作

    那么可以使用加并发锁synchronized来解决:

    @Service
    public class RedisLockDemo {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
    
            synchronized (this) {
                //获取redis中的库存
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
            }
    
            return "success";
        }
    }
    

    注意:在Java中关键字synchronized可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块。


    2. 再改进

    以上的代码在单体模式下并没太大问题,但是在分布式或集群架构环境下存在问题,比如架构如下:

    image-20201203232558081

    在分布式或集群架构下,synchronized只能保证当前的主机在同一时刻只能有一个线程执行减库存操作,但如图同时有多个请求过来访问的时候,不同主机在同一时刻依然是可以访问减库存接口的,这就导致问题1(商品超卖)在集群架构下依然存在。

    注意:可以使用JMeter来模拟出高并发场景下访问Nginx来测试触发上面的问题


    解决方法

    使用如下的分布式锁进行解决


    注意:方案一并不能称之为分布式锁的


    二、方案二

    分布式锁的简单实现如图:

    image-20210130003116623

    代码实现如下:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            String lockKey = "product_001";
    
            //加锁: setnx
            Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }
    
            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
    
            //释放锁
            redisTemplate.delete(lockKey);
            return "success";
        }
    }
    

    其实就是对每一个商品加一把锁,代码里面是product_001

    • 使用setnx对商品进行加锁
    • 如成功说明加锁成功,如失败说明有其他请求抢占了该商品的锁,则当前请求失败退出
    • 加锁成功之后进行扣减库存操作
    • 删除商品锁

    1. 改进1

    上面的方式是有可能会造成死锁的,比如说加锁成功之后,扣减库存的逻辑可能抛异常了,即并不会执行到释放锁的逻辑,那么该商品锁是一直没有释放,会成为死锁的,其他请求完全无法扣减该商品的


    使用try...catch...finally的方式可以解决抛异常的问题,如下:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            String lockKey = "product_001";
    
            try {
                //加锁: setnx
                Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //释放锁
                redisTemplate.delete(lockKey);
            }
    
            return "success";
        }
    }
    

    把释放锁的逻辑放到finally里面去,即不管try里面的逻辑最终是成功还是失败都会执行释放锁的逻辑


    2. 改进2

    那么上面的方式是不是能够解决死锁的问题呢?

    其实不然,除了抛异常之外,比如程序崩溃、服务器宕机、服务器重启、请求超时被终止、发布、人为kill等都有可能导致释放锁的逻辑没有执行,比如对商品加分布式锁成功之后,在扣减库存的时候服务器正在执行重启,会导致没有执行释放锁。


    可以通过对锁设置超时时间来防止死锁的发生,使用Redis的expire命令可以对key进行设置超时时间,如图:

    代码实现如下:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            String lockKey = "product_001";
    
            try {
                //加锁: setnx
                Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
                //expire增加超时时间
                redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //释放锁
                redisTemplate.delete(lockKey);
            }
    
            return "success";
        }
    }
    

    加锁成功之后,把锁的超时时间设置为10秒,即10秒之后自动会释放锁,避免死锁的发生。


    3. 改进3

    但是上面的方式同样会产生死锁问题,加锁和对锁设置超时时间并不是原子操作,在加锁成功之后,即将执行设置超时时间的时候系统发生崩溃,同样还是会导致死锁。


    如图:

    image-20210130003525615


    对此,有两种做法:

    • lua脚本
    • set原生命令(Redis 2.6.12版本及以上)

    一般是推荐使用set命令,Redis官方在2.6.12版本对set命令增加了NX、EX、PX等参数,即可以将上面的加锁和设置时间放到一条命令上执行,通过set命令即可:

    命令官方文档:https://redis.io/commands/set

    用法可参考:Redis命令参考

    如图:

    image-20201213170150305

    SET key value NX 等同于 SETNX key value命令,并且可以使用EX参数来设置过期时间


    注意:其实目前在Redis 2.6.12版本之后,所说的setnx命令,并非单单指Redis的SETNX key value命令,一般是代指Redis中对set命令加上nx参数进行使用,一般不会直接使用SETNX key value命令了


    注意:Redis2.6.12之前的版本,只能通过lua脚本来保证原子性了。


    如图:

    image-20210130004248237

    代码实现如下:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            String lockKey = "product_001";
    
            try {
                //加锁: setnx 和 expire增加超时时间
                Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //释放锁
                redisTemplate.delete(lockKey);
            }
    
            return "success";
        }
    }
    

    4. 改进4

    以上的方式其实还是存在着问题,在高并发场景下会存在问题超时时间设置不合理导致的问题

    大概的流程图可参考: image-20201213213326193

    流程:

    • 进程A加锁之后,扣减库存的时间超过设置的超时时间,这里设置的锁是10秒
    • 在第10秒的时候由于时间到期了所以进程A设置的锁被Redis释放了(T5)
    • 刚好进程B请求进来了,加锁成功(T6)
    • 进程A操作完成(扣减库存)之后,把进程B设置的锁给释放了
    • 刚好进程C请求进来了,加锁成功
    • 进程B操作完成之后,也把进程C设置的锁给释放了
    • 以此类推…

    解决方法也很简单:

    • 加锁的时候,把值设置为唯一值,比如说UUID这种随机数
    • 释放锁的时候,获取锁的值判断value是不是当前进程设置的唯一值,如果是再去删除

    如图:

    image-20210130142138448


    代码如下:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            String lockKey = "product_001";
            String clientId = UUID.randomUUID().toString();
    
            try {
                //加锁: setnx 和 expire增加超时时间
                Boolean isSuccess = valueOperations.setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                if (clientId.equals(valueOperations.get(lockKey))) {
                    //释放锁
                    redisTemplate.delete(lockKey);
                }
            }
    
            return "success";
        }
    }
    

    5. 改进5

    上面的方式其实存在一个明显的问题,就是在finally代码块中,释放锁的时候,get和del并非原子操作,存在进程安全问题。

    那么删除锁的正确姿势是使用lua脚本,通过redis的eval/evalsha命令来运行:

    -- lua删除锁:
    -- KEYS和ARGV分别是以集合方式传入的参数,对应上文的Test和uuid。
    -- 如果对应的value等于传入的uuid。
    if redis.call('get', KEYS[1]) == ARGV[1] 
        then 
    	-- 执行删除操作
            return redis.call('del', KEYS[1]) 
        else 
    	-- 不成功,返回0
            return 0 
    end
    

    通俗一点的说,即lua脚本能够保证原子性,在lua脚本里执行是一个命令(eval/evalsha)去执行的,一条命令没有执行完,其他客户端是看不到的。


    到此,基本上Redis的分布式锁的实现思想如下:

    • 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
    • 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
    • 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

    6. 改进6

    虽然通过上面的方式解决了会删除其他进程的锁的问题,但是超时时间的设置依然是没有解决的,设置成多少依然是个比较棘手的问题,设置少了容易导致业务没有执行完锁就被释放了,而设置过大万一服务出现异常无法正常释放锁会导致出现异常锁的时间也很长。

    怎么解决这个问题呢?

    目前大公司的一个方案是这样子的:

    • 在加锁成功之后,启动一个守护线程
    • 守护线程每隔1/3的锁的超时时间就去延迟锁的超时时间,比如说锁设置为30秒,那就是每隔10秒就去延长锁的超时时间,重新设置为30秒
    • 业务代码执行完成,关闭守护线程

    在实际操作中,需要注意几点:

    • 只续对的:和释放锁一样,需要判断锁的对象有没有发生变化,否则会造成无论谁加锁,守护线程都会重新设置锁的超时时间
    • 不能动不动就续:守护线程要在合理的时间再去设置锁的超时时间,否则会造成资源的浪费
    • 及时销毁:如果加锁的线程/进程已经处理完业务了,那么守护进程应该被销毁,否则会造成资源的浪费

    三、方案三

    上面的方案还得考虑Redis的部署问题。

    众所周知,Redis有3种部署方式:

    • 单机模式
    • Master-Slave + Sentinel(哨兵)选举模式
    • Redis Cluster(集群)模式

    使用 Redis 做分布式锁的缺点在于:如果采用单机部署模式,会存在单点问题,只要 Redis 故障了。加锁就不行了。


    采用 Master-Slave 模式/集群模式,如下:

    线程1加了锁去执行业务了
    刚好Redis的 master 发生故障挂掉了,此时还没有将数据同步到 slave 上
    集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁
    线程2可以在新选举产生的 master 上去加锁,然后处理业务
    

    这样的话,就导致了两个线程同时持有了锁,锁就不再具有安全性。


    针对这个问题,有两个解决方案:

    • RedLock
    • Zookeeper【推荐】

    1. RedLock

    基于以上的考虑,Redis的作者提出了一个RedLock的算法。

    这个算法的意思大概是这样的:假设 Redis 的部署模式是 Redis Cluster,总共有 5 个 Master 节点。


    通过以下步骤获取一把锁:

    • 获取当前时间戳,单位是毫秒。
    • 轮流尝试在每个 Master 节点上创建锁,过期时间设置较短,一般就几十毫秒。
    • 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点(n / 2 +1)。
    • 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了。
    • 要是锁建立失败了,那么就依次删除这个锁。
    • 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。

    如图:

    image-20210204000845032

    但是这样的这种算法还是颇具争议的,可能还会存在不少的问题,无法保证加锁的过程一定正确,不太推荐


    更多关于RedLock的资料可参考:


    注意:除了RedLock之外目前并没有有效解决Redis主从切换导致锁失效的方法。在这种情况下(一致性要求非常高的情况下)一般是不会使用Redis,而推荐使用Zookeeper


    四、Redisson

    目前业界对于Redis的分布式锁有了现成的实现方案了,比较出名的是Redisson开源框架。

    Redisson 是 Redis 的 Java 实现的客户端,其 API 提供了比较全面的 Redis 命令的支持。

    Redission 通过 Netty 支持非阻塞 I/O。

    Redisson 封装了锁的实现,让我们像操作我们的本地 Lock一样来使用,除此之外还有对集合、对象、常用缓存框架等做了友好的封装,易于使用。

    除此之外,Redisson还实现了分布式锁的自动续期机制、锁的互斥自等待机制、锁的可重入加锁于释放锁的机制,可以说Redisson对分布式锁的实现是实现了一整套机制的。


    Redisson 可以便捷的支持多种Redis部署架构:

    • 单机模式
    • Master-Slave + Sentinel(哨兵)选举模式
    • Redis Cluster(集群)模式

    引入Redission之后,使用上非常简单,RedissonClient客户端提供了众多的接口实现,支持可重入锁、公平锁、读写锁、锁超时、RedLock等都提供了完整实现。


    使用如下:

    A. 引入maven

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.13.4</version>
    </dependency>
    

    B. 增加配置文件

    @Configuration
    public class RedissonConfig {
     
        @Bean
        public Redisson redisson() {
            Config config = new Config();
            //单机版
            //config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
     
            //集群版
            config.useClusterServers()
                    .addNodeAddress("redis://192.168.1.1:8001")
                    .addNodeAddress("redis://192.168.1.1:8002")
                    .addNodeAddress("redis://192.168.1.2:8001")
                    .addNodeAddress("redis://192.168.1.2:8002")
                    .addNodeAddress("redis://192.168.1.3:8001")
                    .addNodeAddress("redis://192.168.1.3:8002");
            return (Redisson) Redisson.create(config);
        }
    }
    

    C. 分布式锁的实现

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
     
        @Autowired
        private Redisson redisson;
     
        public String deduceStock() {
            String lockKey = "lockKey";
            RLock redissonLock = redisson.getLock(lockKey);
     
            try {
                //加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁)
                redissonLock.lock();
     
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    redisTemplate.opsForValue().set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //解锁
                redissonLock.unlock();
            }
            return "success";
        }
    }
    

    实现的原理如下:

    image-20210130142553091


    RedissonLock的使用介绍

    // 锁默认有效时间30秒,每10秒去检查并重新设置超时时间
    void lock(); 
     
    // 超过锁有效时间 leaseTime,就会释放锁
    void lock(long leaseTime, TimeUnit unit);
     
    // 尝试获取锁;成功则返回true,失败则返回false
    boolean tryLock();
     
    // 不会去启动定时任务;在 time 时间内还没有获取到锁,则返回false
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
     
    // 不会去启动定时任务;当 waitTime 的时间到了,还没有获取到锁则返回false;若获取到锁了,锁的有效时间设置为 leaseTime
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    

    也就是说,用法非常简单,但是内部上实现了方案二里面的所有细节:

    • 为了兼容老的Redis版本,Redisson 所有指令都通过 Lua 脚本执行,Redis 支持 Lua 脚本原子性执行。
    • Redisson 设置的Key 的默认过期时间为 30s,如果某个客户端持有一个锁超过了 30s 怎么办?Redisson 中有一个 Watchdog 的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔 10s 帮你把 Key 的超时时间设为 30s。
    • 如果获取锁失败,Redsson会通过while循环一直尝试获取锁(可自定义等待时间,超时后返回失败)

    这样的话,就算一直持有锁也不会出现 Key 过期了,其他线程获取到锁的问题了。

    另外,Redssion还提供了对Redlock算法的支持,用法也很简单:

    RedissonClient redisson = Redisson.create(config); 
    RLock lock1 = redisson.getFairLock("lock1"); 
    RLock lock2 = redisson.getFairLock("lock2"); 
    RLock lock3 = redisson.getFairLock("lock3"); 
    RedissonRedLock multiLock = new RedissonRedLock(lock1, lock2, lock3); 
    multiLock.lock(); 
    multiLock.unlock(); 
    

    Redisson里面关于加锁/获取锁的Lua脚本流程图如下:

    image-20210302224252360


    释放锁的Lua脚本流程图如下:

    image-20210302224435964


    强烈建议大家看一下Redisson里面关于分布式锁的源码,更多关于Redisson的资料可参考:


    注意:Redison并不能有效的解决Redis的主从切换问题的目前推荐使用Zookeeper分布式锁来解决。


    五、分段锁

    怎么在高并发的场景去实现一个高性能的分布式锁呢?


    电商网站在大促的时候并发量很大:

    (1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;

    (2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是没有用的。


    针对第二个问题,可以使用库存分段锁的方式去实现。


    分段锁

    假如产品1有200个库存,可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。

    每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。


    具体可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。

    高性能分布式锁具体可参考链接:每秒上千订单场景下的分布式锁高并发优化实践!【石杉的架构笔记】

    原理如图:

    image-20201220220247407


    0x05 总结

    总结:

    • 追求数据可靠性/强一致性:使用Zookeeper
    • 追求性能:选择Redis,推荐Redisson
    • Redis分布式锁目前最大问题在于:主从模式下/集群模式下,master节点宕机,异步同步数据导致锁丢失问题
    • Redis的RedLock算法具有很大争议性,一般不推荐使用

    0x06 附录

    Python代码实现

    注意:没有实现看门狗的逻辑,需要自己实现

    import redis
    import uuid
    import time
    
    
    class LockService:
        """
        基于Redis实现的分布式锁
        """
        host = 'localhost'
        port = 6379
        password = ''
        db = 1
    
        def __init__(self, conn=None):
            """
            如果不传连接池的话,默认读取配置的Redis作为连接池
            :param conn:
            """
            self.conn = conn if conn else self.get_redis_client()
    
        def get_redis_client(self):
            """
            获取Redis连接
            :return:
            """
            return redis.Redis(
                host=self.host,
                port=self.port,
                password=self.password,
                db=self.db
            )
    
        def acquire_lock(self, lock_name, acquire_timeout=10, expire_time=30):
            """
            加锁/获取锁
    
            如果不存在lock_name,则加锁,并且设置过期时间,避免死锁
            如果存在lock_name,则刷新过期时间
    
            :param lock_name:       锁的名称
            :param acquire_timeout: 加锁/获取锁的超时时间,默认10秒
            :param expire_time:     锁的超时时间,默认30秒
            :return:
            """
            lockname = f'lock:{lock_name}'
            value = str(uuid.uuid4())
            end_time = time.time() + acquire_timeout
            while time.time() < end_time:
                # 如果不存在这个锁则加锁并设置过期时间,避免死锁
                if self.conn.set(lockname, value, ex=expire_time, nx=True):
                    return value
                time.sleep(0.1)
            return False
    
        def release_lock(self, lock_name, value):
            """
            释放锁
    
            :param lock_name: 锁的名称
            :param value:     锁的值
            :return:
            """
            unlock_script = """
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1])
            else
                return 0
            end
            """
            lockname = f'lock:{lock_name}'
            unlock = self.conn.register_script(unlock_script)
            result = unlock(keys=[lockname], args=[value])
            if result:
                return True
            else:
                return False
    

    参考

    展开全文
  • 1、什么场景会用到分布式锁(用本地锁举例,分布式场景在并发情况下存在的问题) 2、redis为什么可以实现分布式锁 3、假设一分布式场景,运用redis分布式锁并剖析各种坑 结合视频157、158

    下一篇:Redis分布式锁原理(二)——Redisson分布式锁源码浅析

            虽然目前Redisson框架已经帮我们封装好了分布式锁的实现逻辑,我们可以直接像调用本地锁一样使用即可,但本文并不直接剖析Redisson源码,而是首先分析在分布式场景下实现redis分布式锁需要注意哪些问题,这样之后阅读Redisson源码也会变得更容易。

    目录

    一、什么场景会用到分布式锁?

    二、redis为什么可以实现分布式锁?

    三、实现redis分布式锁需要注意哪些问题?


    一、什么场景会用到分布式锁?

            在分布式场景下,我们不同的业务功能放在不同的服务器上,而这些不同的业务可能会去操作同一个数据库资源,如果这时候大并发进来,就可能会出现同时操作共享资源的情况,为了避免这种情况发生,我们想到的是加锁,如果这时候是在各自服务器的代码实现上加本地锁能够解决这种问题吗?

            答案当然是不能,来分析一下上述场景:

            整个系统的业务1、业务2、业务3放在不同的服务器上构成了分布式场景,这三个业务底层都会去操作同一个数据库,现在对这三个业务的代码实现上加上本地锁,即synchronized、ReentrantLock等,此时100个并发请求进入这个系统,假设到达这三个服务器的请求量分别为30、30、40,由于三个业务是在不同的服务器上,所以对于它们而言使用本地锁锁住的不会是同一个对象,因此能进入业务1的请求只有1条,进入业务2的请求也有1条,同理进入业务3的请求也是1条,最终数据库会同时接收到3条访问请求,出现了同时操作共享资源的情况。

            分布式锁是怎么解决这种问题的呢?就好比在这三个业务外边放一把大锁,这把锁也就脱离了“本地”的概念,三个服务器都可以去这个公共的地方抢这把锁,谁抢到了这把锁谁就可以去执行业务操作数据库,其他业务只能等待。

    二、redis为什么可以实现分布式锁?

            上面所提到的公共的地方可以用redis代替,也就是说所有的服务器都连接上同一个redis,redis是一个缓存数据库,我们通过往这个缓存中存取标记的手段达到锁的获取和释放的效果,说到这里就不得不引出这个关键的命令了:set NX,它是一个原子性的命令,它能保证如果缓存中如果没有这个key时才会对其进行设置,如果这个key已经存在了那么这个命令就会执行失败。

            下面可以用xshell连接虚拟机后进行进行一个简单的测试,需要提前在虚拟机中安装好redis。首先复制多份会话模拟多个用户去抢redis锁,在确保redis在虚拟中正常运行的前提下输入命令“docker exec -it redis redis-cli”并发送到全部会话,使所有的会话都进入到redis容器中,接着输入命令“set locktest 123 NX”并同时发送到全部会话,模拟同一时刻多个会话去抢同一把锁。结果如下,会话1抢占锁成功,而会话2和会话3抢锁失败,而打开redis可以看到的确只有一条锁记录插入成功。

            这样看来,redis的确是可以用来实现分布式锁,原理就是使用set nx命令进行设值,若返回成功则代表拿锁成功,返回失败则代表拿锁失败。

    三、实现redis分布式锁需要注意哪些问题?

            首先来看这样一段代码。

        @Override
        public List<Map<String, Object>> findList() {
            //尝试获取分布式锁,步骤一
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "齐天小圣");
            if (lock) {
                //加锁成功,执行业务,步骤二
                List<Map<String, Object>> resultList = findListByDB();
                //删除锁,步骤三
                redisTemplate.delete("lock");
                return resultList;
            } else {
                //加锁失败,自旋重试,即重新调用本方法。
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return findList();
            }
        }

            乍一看用redis完成一个分布式锁的整体逻辑好像也不是很复杂,无非就是先去获取分布式锁,如果拿到锁了就去执行相应的业务,没有拿到锁就自旋重试。但其实上面这段代码漏洞百出,实际想要完成一个完美的分布式锁是很复杂的,我们需要注意以下问题:

            1、死锁问题:如果某服务器拿锁成功,执行到步骤二时突然宕机没能正常的执行步骤三释放锁,那么其他正在等待锁的服务器则永远也拿不到锁了,这就是死锁问题。

            解决这个问题很简单,只需要在拿锁之后给锁上一个超时时间即可,即使业务过程中出现问题导致不能释放锁,到了过期时间redis也会自动帮我们去把这个锁删除。需要注意的是,千万不能把“获取锁”和“设置超时时间”在代码中分成两步执行,如下:

            原因在于这两个步骤分开执行没有保证原子性,拿锁到设置过期时间之间是存在时间差的,如果在这之间机器宕机了还是会存在上述问题,解决办法就是在占锁的同时设置过期时间。

            2、业务时间 > 超时时间:假设这样一个场景,业务1拿锁成功并设置过期时间30s,但业务1比较复杂需要花费40s,那么到了30s后业务1的锁就已经失效了,此时业务2抢到了锁也进来执行相应的逻辑,那么此时业务1和业务2都在执行各自的业务逻辑,可能会操作相同的数据资源造成违反资源互斥的现象。问题还没完,又过去了10s,业务1执行完了,理所当然的就去删锁,那么自然就会把业务2手里的锁删掉,业务2一脸懵逼。。。这就造成了锁误删的现象。

            先解决锁误删的问题,其实很简单,只需要保证谁拿的锁谁就有资格删就可以了,我们在获取锁的时候设置了一个value,此时就派上了用场,把每个业务的value都设置uuid,最后删锁的时候先去redis获取锁对应的值,如果这个值等于uuid才有资格执行删锁命令。需要注意的是这里也需要保证原子性,因为去远程redis获取锁对应的值再返回来也是有时间差的,如果业务1去远程获取到锁的value为“1111”,回来的过程中锁过期了,此时业务2拿到了锁开心的去执行它的业务去了,好景不长,业务1回来判断出锁的value是等于自己的uuid,于是又理所当然的把锁删掉了,业务2又一脸懵逼。。。。这里就需要使用lua脚本解锁,lua脚本就是为了保证这两段操作的原子性,解锁脚本内容如下:

    if redis.call("get",KEYS[1]) == ARGV[1]
    then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

            最后优化后的代码如下:

        @Override
        public List<Map<String, Object>> findList() {
            //尝试获取分布式锁,并设置过期时间
            String uuid = UUID.randomUUID().toString();
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
            if (lock) {
                //加锁成功,执行业务
                List<Map<String, Object>> resultList;
                try{
                    resultList = findListByDB();
                } finally {
                    //获取值进行对比,若对比成功则有资格进行删锁,需使用lua脚本保证原子性
                    String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                            "then\n" +
                            "    return redis.call(\"del\",KEYS[1])\n" +
                            "else\n" +
                            "    return 0\n" +
                            "end";
                    Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class)
                            , Arrays.asList("lock"), uuid);
                }
    
                return resultList;
            } else {
                //加锁失败,自旋重试,即重新调用本方法。
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return findList();
            }
        }

            其实我们还遗漏了一个问题,上面优化后的代码并没有解决业务时间 > 超时时间造成的违反资源互斥的现象,解决这个问题的办法是如果业务还没有结束,那么这期间每隔一段时间就给锁进行延时,即“锁续命”,但这涉及到定时任务,异步编排,所以就不再往下继续优化,到了这里不得不请Redisson分布式锁隆重登场了。

            在实际开发中我们并不会像上面这样用原生redis代码去实现分布式锁,而是使用已经封装好的框架——Redisson,Redisson已经为我们解决了上述细节问题,包括用定时任务实现“看门狗”机制为锁延时,我们只需要像平时使用本地锁一样进行调用即可。

            有了上述的思考,我们在下篇再来探析Redisson源码就会变得更加明朗了。

    下一篇:Redis分布式锁原理(二)——Redisson分布式锁源码浅析 

    展开全文
  • Redis分布式锁原理 分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的...

    Redis分布式锁原理

    分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

    使用setnx、getset、expire、del这4个redis命令实现

    1. setnx 是『SET if Not eXists』(如果不存在,则 SET)的简写。 命令格式:SETNX key value;使用:只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。
    2. getset 命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置之前的旧的value。返回值:如果键 key 没有旧值, 也即是说, 键 key 在被设置之前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
    3. expire 命令格式:EXPIRE key seconds,使用:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。返回值:设置成功返回 1 。 当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
    4. del 命令格式:DEL key [key …],使用:删除给定的一个或多个 key ,不存在的 key 会被忽略。返回值:被删除 key 的数量。

    redis 分布式锁原理一

    过程分析:

    1. A尝试去获取锁lockkey,通过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;
    2. 如果返回值为1,说明redis服务器中还没有lockkey,也就是没有其他用户拥有这个锁,A就能获取锁成功;
    3. 在进行相关业务执行之前,先执行expire(lockkey),对lockkey设置有效期,防止死锁。因为如果不设置有效期的话,lockkey将一直存在于redis中,其他用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;
    4. 执行相关业务;
    5. 释放锁,A完成相关业务之后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行重新设置锁新值。

    原理图如下
    在这里插入图片描述
    代码实现:

    public void redis1() {
            log.info("关闭订单定时任务启动");
            long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
            //这个方法的缺陷在这里,如果setnx成功后,锁已经存到Redis里面了,服务器异常关闭重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁
            Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
            if (setnxResult != null && setnxResult.intValue() == 1) {
                //如果返回值为1,代表设置成功,获取锁
                closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            } else {
                log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            }
            log.info("关闭订单定时任务结束");
        }
    private void closeOrder(String lockName) {
            //对锁设置有效期
            RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
            log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            //执行业务
            int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
            iOrderService.closeOrder(hour);
            //执行完业务后,释放锁
            RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            log.info("=================================");
        }
    
    

    缺陷:

    如果A在setnx成功后,A成功获取锁了,也就是锁已经存到Redis里面了,此时服务器异常关闭或是重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁。

    解决方法:

    关闭Tomcat有两种方式,一种通过温柔的执行shutdown关闭,一种通过kill杀死进程关闭,使用shutdown关闭tomcat,以下的方法会在关闭前执行,即关闭前删除锁

    //通过温柔的执行shutdown关闭时,以下的方法会在关闭前执行,即可以释放锁,而对于通过kill杀死
    //进程关闭时,以下方法不会执行,即不会释放锁
    //这种方式释放锁的缺点在于,如果关闭的锁过多,将造成关闭服务器耗时过长
      @PreDestroy
      public void delLock() {
          RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
      }
    
    

    redis 分布式锁原理2(优化版)

    为了解决原理1中会出现的死锁问题,提出原理2双重防死锁,可以更好解决死锁问题。
    过程分析:

    1. 当A通过setnx(lockkey,currenttime+timeout)命令能成功设置lockkey时,即返回值为1,过程与原理1一致;
    2. 当A通过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;
    3. A通过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA
    4. lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
    5. 步骤4返回结果为true后,通过getSet设置新的超时时间,并返回旧值lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
    6. lockvalueB == null || lockvalueA==lockvalueB,判断:若果lockvalueB为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的lockvalueB一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。

    原理图如下:
    在这里插入图片描述

    代码实现:

    public void redis2() {
            log.info("关闭订单定时任务启动");
            long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
            Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
            if (setnxResult != null && setnxResult.intValue() == 1) {
                //如果返回值为1,代表设置成功,获取锁
                closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            } else {
                //未获取到锁,继续判断,判断时间戳,看是否可以重置并获取到锁
                String lockValueStr = RedisShardedPoolUtil.get(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                //通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,不能获取该锁
                if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {
                    //通过getSet设置新的超时时间,并返回旧值,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
                    String getSetResult = RedisShardedPoolUtil.getSet(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
                    //再次用当前时间戳getset。
                    //返回给定的key的旧值,与旧值判断,是否可以获取锁
                    //当key没有旧值时,即key不存在时,返回nil ->获取锁
                    //这里我们set了一个新的value值,获取旧的值。
                    //若果getSetResult为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的getSetResult一致说明中间未被其他进程获取该锁,可以获取锁
                    if (getSetResult == null || (getSetResult != null && StringUtils.equals(lockValueStr, getSetResult))) {
                        //真正获取到锁
                        closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                    } else {
                        log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                    }
                } else {
                    log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
                }
            }
            log.info("关闭订单定时任务结束");
        }
        private void closeOrder(String lockName) {
            //对锁设置有效期
            RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
            log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            //执行业务
            int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
            iOrderService.closeOrder(hour);
            //执行完业务后,释放锁
            RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
            log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
            log.info("=================================");
        }
    
    

    优化点:

    加入了超时时间判断锁是否超时了,及时A在成功设置了锁之后,服务器就立即出现宕机或是重启,也不会出现死锁问题;因为B在尝试获取锁的时候,如果不能setnx成功,会去获取redis中锁的超时时间与当前的系统时间做比较,如果当前的系统时间已经大于锁超时时间,说明A已经对锁的使用权失效,B能继续判断能否获取锁,解决了redis分布式锁的死锁问题。

    Redisson实现Redis分布式锁的底层原理

    在这里插入图片描述

    (1)加锁机制

    咱们来看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。

    这里注意,仅仅只是选择一台机器!这点很关键!

    紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:
    在这里插入图片描述
    为啥要用lua脚本呢?

    因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。(lua脚本的原子性见文章最后补充内容)

    那么,这段lua脚本是什么意思呢?

    KEYS[1]代表的是你加锁的那个key,比如说:
    
    RLock lock = redisson.getLock("myLock");
    

    这里你自己设置了加锁的那个锁key就是“myLock”。

    **ARGV[1]**代表的就是锁key的默认生存时间,默认30秒。
    
    **ARGV[2]**代表的是加锁的客户端的ID,类似于下面这样:
    
    8743c9c0-0795-4907-87fd-6c719a6b4586:1
    

    给大家解释一下,第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

    如何加锁呢?很简单,用下面的命令:

    hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    

    通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
    在这里插入图片描述
    上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。

    接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

    好了,到此为止,ok,加锁完成了。

    (2)锁互斥机制

    那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

    很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

    接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

    所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的**剩余生存时间。**比如还剩15000毫秒的生存时间。

    此时客户端2会进入一个while循环,不停的尝试加锁。

    (3)watch dog自动延期机制

    客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

    简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

    (4)可重入加锁机制

    那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

    比如下面这种代码:
    在这里插入图片描述
    这时我们来分析一下上面那段lua脚本。

    第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

    第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

    此时就会执行可重入加锁的逻辑,他会用:

    incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
    

    通过这个命令,对客户端1的加锁次数,累加1。

    此时myLock数据结构变为下面这样:
    在这里插入图片描述
    大家看到了吧,那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数

    (5)释放锁机制

    如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

    其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。

    如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

    “del myLock”命令,从redis里删除这个key。

    然后呢,另外的客户端2就可以尝试完成加锁了。

    这就是所谓的分布式锁的开源Redisson框架的实现机制。

    一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

    Redis分布式锁缺点

    其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。

    但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。

    接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。

    此时就会导致多个客户端对一个分布式锁完成了加锁。

    这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。

    所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:

    在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。

    补充-lua脚本的原子性:

    Redis使用同一个Lua解释器来执行所有命令,同时,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。

    然而这也意味着,执行一个较慢的lua脚本是不建议的,由于脚本的开销非常低,构造一个快速执行的脚本并非难事。但是你要注意到,当你正在执行一个比较慢的脚本时,所以其他的客户端都无法执行命令。

    学习参考:
    https://blog.csdn.net/dazou1/article/details/88088223?ops_request_misc=%25257B%252522request%25255Fid%252522%25253A%252522160862098916780276355198%252522%25252C%252522scm%252522%25253A%25252220140713.130102334…%252522%25257D&request_id=160862098916780276355198&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_click~default-1-88088223.first_rank_v2_pc_rank_v29&utm_term=redis分布式锁

    https://blog.csdn.net/qq_43778308/article/details/111058472?ops_request_misc=&request_id=&biz_id=102&utm_term=redis%25E5%2588%2586%25E5%25B8%2583%25E5%25BC%258F%25E9%2594%2581&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-1-111058472.first_rank_v2_pc_rank_v29

    展开全文
  • 1.分布式缓存 1.1高并发下的分布式缓存 我们先从最开始的来说,我们现在一般都是B/S架构,一般都是中间挂一个服务器,后面有一个数据库。如下图: 如果客户端的访问量很大的话,那对于后端的服务来说就有一定压力了...
  • ZK(ZooKeeper)分布式锁实现

    多人点赞 2021-10-24 17:51:04
    分布式锁的实现方式主要以(ZooKeeper、Reids、Mysql)这三种为主 今天我们主要讲解的是使用 ZooKeeper来实现分布式锁,ZooKeeper的应用场景主要包含这几个方面: 服务注册与订阅(共用节点) 分布式通知(监听...
  • 一般实现分布式锁都有哪些方式?使用 redis 如何设计分布式锁?使用 zk 来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 其实一般问问题,都是这么问的,先问问你 zk,然后其实是要过渡到 zk 相关...
  • redis分布式锁详解(优化redis分布式锁的过程及Redisson使用) 1. redis在实际的应用中,不仅可以用来缓存数据,在分布式应用开发中,经常被用来当作分布式锁的使用,为什么要用到分布式锁呢? 在分布式的开发中,以...
  • 前言单体架构的应用可以直接使用synchronized或者ReentrantLock就可以...常见的分布式锁应用场景秒杀活动、优惠券抢购、接口幂等性校验等常用的分布式锁1. 基于数据库实现分布式锁1.1 悲观锁利用select … where … for
  • 注解式redission分布式锁原理概述适用场景引入依赖编写注解编写切面使用参考阅读 原理概述 利用aop特性,编制一个环绕切面给加了注解的方法体上,每次执行该方法时,首先进入切面进行加锁,执行完毕后回到切面,进行...
  • 为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁
  • 一、分布式锁简介 1,什么是分布式锁 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题...
  • 为什么需要分布式锁? 在多线程并发的情况下,可以使用java的synchronized以及Reentrantlock类来保证一个代码块在同一时间只能由一个线程访问。这种方式可以保证在同一个JVM进程内的多个线程同步执行。如果在分布式...
  • 各位Javaer都对锁应该都是不陌生的,无论工作还是面试的时候,...那么本文就是来给大家唠嗑唠嗑在分布式系统中常见的几种实现分布式锁的方式。数据库方式实现分布式锁首先从大家最最熟悉的数据库来说,这里使用的是M...
  • 分布式锁的痛点 关于分布式锁,有过javaEE开发经验的就会说了,系统为了应对高并发,会搭建一个比如tomcat集群,集群内服务都是访问的同一台数据库,有多台服务器同时修改同一条数据库数据的操作,但是我们并没有在...
  • 还有一种方式可以实现分布式锁:zookeeper,也有主从架构。 CAP理论:C表示一致性、A表示可用性、P表示分区容错性。 Redis集群满足:AP zookeeper集群满足:CP Redis中只要在主节点中加锁成功,马上返回给客户端...
  • 如何用Redis实现分布式锁

    千次阅读 2021-07-30 16:50:22
    为什么需要分布式锁 在聊分布式锁之前,有必要先解释一下,为什么需要分布式锁。 与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量...
  • zookeeper实现分布式锁

    2021-05-05 10:00:59
    一、分布式锁介绍分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。二、架构介绍在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图解释:...
  • 分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来...
  • 种方案前言日常开发中,秒杀下单、抢红包等等业务场景,都需要用到分布式锁。而Redis非常适合作为分布式锁使用。本文将分七个方案展开,跟大家探讨Redis分布式锁的正确使用方式。如果有不正确...
  • 分布式锁 由此可见分布式锁的目的其实很简单,就是为了保证多台服务器在执行某一段代码时保证只有一台服务器执行。 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性。在...
  • 分布式锁主流的实现方案: 基于数据库实现分布式锁 基于缓存(Redis等) 基于Zookeeper 每一种分布式锁解决方案都有各自的优缺点: 性能:redis最高 可靠性:zookeeper最高 这里,我们就基于redis实现分布式锁。...
  • 前言 前面的俩个章节分别介绍了如何通过数据库和zookeeper的方式实现分布式锁,本节我们将使用redis的方式实现分布式锁。该小节也是我们分布式锁三大技术方案实战的终极篇,是我们在实际开发环境中最为常用的一种...
  • 分布式锁解决并发的三种实现方式在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同 一时间内只能被同一个线程执行。在单机...
  • 前言在分布式系统中,由于redis分布式锁相对于更简单和高效,成为了分布式锁的首先,被我们用到了很多实际业务场景当中。但不是说用了redis分布式锁,就可以高枕无忧了,如果没有用好或者用对...
  • Redis分布式锁实现

    2021-11-06 10:35:43
    一、高效分布式锁 二、Redis分布式锁的缺点 三、使用Jedis set命令以及Lua脚本方式实现分布式锁 四、使用Redisson+RLock实现分布式锁 锁互斥机制 watch dog自动延期机制 可重入加锁机制 锁释放机制 相关...
  • 之前的文章中通过电商场景中秒杀的例子和大家分享了单体架构中的使用方式,但是现在很多应用系统都是相当庞大的,很多应用系统都是微服务的架构体系,那么在这种跨jvm的场景下,我们又该如何去解决并发。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 213,470
精华内容 85,388
关键字:

分布式锁