精华内容
下载资源
问答
  • 常见并发问题的解决方案

    千次阅读 2021-04-02 10:41:31
    这里写目录标题序业务逻辑准备工作环境数据库表结构数据填充安装go-...现在网上关于秒杀,抢票,超卖等并发场景的文章已经烂大街了。之前看过很多,但从来没自己测试过。今天心血来潮,想落地一下。 虽然解决的方法很

    现在网上关于秒杀,抢票,超卖等并发场景的文章已经烂大街了。之前看过很多,但从来没自己测试过。今天心血来潮,想落地一下。

    虽然解决的方法很多,可不一定都适合各种具体场景,所以过一遍流程,也能更好的把握哪些场景更适合怎样的方法,此篇文章的目的就是如此。

    再啰嗦一句:并发和大流量是两码事,小流量也可以有并发。

    业务逻辑

    老板发福利,400个奖,不能发重,不能发超,大家快来抢啊!

    准备工作

    环境

    脚本:PHP,框架:Laravel,web服务器:Nginx,数据库:MySQL,NoSQL:Redis,并发压测工具:Go-stress-testing-linux,系统:CentOS7。

    具体的脚本不重要,这里用的是自己比较熟悉的。

    数据库表结构

    code

    字段类型说明
    idint11 unsigned not null自增主键
    codechar14 not null14位Char unique
    statusbit1 not null0未发放 1已发放
    update_timedatetime发放时间 未发放为null

    code_out

    字段类型说明
    idint11 unsigned not null自增主键
    code_idnt11 unsigned not nullcode表主键
    create_timedatetime not null发放时间 默认CURRENT_TIMESTAMP

    code_out表主要用来表现并发问题。

    正常情况下,code_out表数据量和code表status=1的数据量必须一样,且code_out表一定没有code_id相同的记录,否则同一code肯定被发给了多个用户。

    这里补充下,时间为什么没有用timestamp。

    其实以前我也喜欢用timestamp类型的,可自从有一次遇到有记录的实际创建时间是18xx年,导致客户劈头盖脸来骂了一顿这种情况之后,就改掉了这个习惯。当然我也不是说timestamp不好,而是人总是有惯性思维。

    再补充一下,为什么很多字段要可以不允许为null。

    字段为null是很危险的,它可能导致查询的数据和实际逻辑要求的不一致,并且null比空字符串会占用更多的空间。所以,除非业务要求区分"0"和"没有",都建议字段不允许null,怎么算都不划算对吧。

    数据填充
    use Illuminate\Support\Str;
    
    // 原谅我放纵不羁爱自由,懒得建模型了,直接用DB类走起
    for ($i = 0; $i < 100; $i++) {
        \DB::table('code')
            ->insert([
                'code' => Str::random(14),
            ]);
    }
    
    安装go-stress-testing-linux

    go-stress-testing-linux是Go写的压测工具。

    git上有打成二进制的可执行文件,下载即可(github搜索link1st/go-stress-testing)。

    下载后记得赋予文件可执行权限哦。想偷懒的话,就直接拷贝到/usr/bin下吧。如果使用二进制文件的话,不需要装go环境。

    为什么选择go-stress-testing-linux?

    它的运行原理是利用Go的携程发起并发,是真正意义上的多线程并发。

    安装Redis

    不再赘述,网上教程很多。

    安装php redis扩展

    这一步可选,php有很多种方式可以和redis互通,个人更喜欢这种原始的方法。

    让游戏开始吧

    压测参数
    go-stress-testing-linux -c 1500 -n 2 -u {url}
    

    模拟1500个用户,每个用户请求2次。看上去数字并不大对吧?

    压测过程
    没有任何保护措施
    开抢咯
    $remain = \DB::table('code')
        ->where('status', 0)
        ->select('id', 'code')
        ->first();
    if (null == $remain) {
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    }
    \DB::table('code')
        ->where('id', $remain->id)
        ->update([
            'status' => 1,
            'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
        ]);
    \DB::table('code_out')
        ->insert([
            'code_id' => $remain->id
        ]);
    return [
        'code' => 200,
        'msg' => 'congratulations',
        'data' => $remain->code
    ];
    
    结果
    ┬────┬──────┬──────┬──────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┬
    │ 耗时│ 并发数│ 成功数│ 失败数│   qps  │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节│ 字节每秒 │ 错误码  │
    ┼────┼──────┼──────┼──────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
    │  1s│    818102080.321000.70389.09721.04│        │        │  200:81│
    │  2s│   31031001173.301971.56389.091278.44│        │        │ 200:310│
    │  3s│   5455450835.092949.67389.091796.22│        │        │ 200:545│
    │  4s│   7787780657.163924.38389.092282.54│        │        │ 200:778│
    │  5s│  100510050545.644908.34389.092749.07│        │        │200:1005│
    │  6s│  123312330464.195949.70389.093231.45│        │        │200:1233│
    │  7s│  145114530404.716909.48389.093706.35│        │        │200:1453│
    │  8s│  150016800365.777277.43389.094100.99│        │        │200:1680│
    │  9s│  150019020341.607277.43389.094391.14│        │        │200:1902│
    │ 10s│  150021280324.087277.43389.094628.53│        │        │200:2128│
    │ 11s│  150023360311.627277.43389.094813.55│        │        │200:2336│
    │ 12s│  150025580301.017277.43389.094983.29│        │        │200:2558│
    │ 13s│  150027940292.187277.43389.095133.82│        │        │200:2794│
    │ 14s│  150030000286.167277.43389.095241.89│        │        │200:3000
    数据验证
    select count(*) from `code` where `status` = 1;
    # 400
    select count(*) from code_out;
    # 3000
    select count(*), code_id from code_out group by code_id having count(*) > 1;
    # 竟然有216条记录,其中吉尼斯记录获取者是code_id=2的奖项,它被发了43次!
    # 当然,其他很多code也被重复发了很多次
    
    结论

    可以看到,不加任何保护措施的情况下,代码造成了同一code发给了多个用户的情况,一上线那就是事故!

    为什么会造成这种情况呢?其实原因很简单:MySQL查询和更新都需要一定时间的,更新过程中,后来的线程读到的还是老数据!代码可不会管这么多,拿到就继续用咯。

    同时,这也证明压测工具确实模拟出了并发场景。

    版本控制
    准备
    # 给code加一个version列
    alter table `code` add version bit(1) not null default 0;
    
    开抢咯
    $remain = \DB::table('code')
        ->where('status', 0)
        ->select('id', 'code')
        ->first();
    if (null == $remain) {
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    }
    $res = \DB::table('code')
        ->where('id', $remain->id)
        ->where('version', 0)
        ->update([
            'status' => 1,
            'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME']),
            'version' => 1
        ]);
    if (0 == $res) {
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    }
    \DB::table('code_out')
        ->insert([
            'code_id' => $remain->id
        ]);
    return [
        'code' => 200,
        'msg' => 'congratulations',
        'data' => $remain->code
    ];
    
    结果
    ┼────┬──────┬──────┬──────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┼
    │ 耗时│ 并发数│ 成功数│ 失败数│  qps  │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节│ 字节每秒 │  错误码 │
    ┼────┼──────┼──────┼──────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼
    │  1s│   10410402049.70993.69395.58731.81│        │        │ 200:104│
    │  2s│   33833801179.551988.44395.581271.67│        │        │ 200:338│
    │  3s│   5575570853.742935.61395.581756.98│        │        │ 200:557│
    │  4s│   8038030662.973952.94395.582262.55│        │        │ 200:803│
    │  5s│  103610360549.074917.70395.582731.88│        │        │200:1036│
    │  6s│  128312830463.215912.17395.583238.26│        │        │200:1283│
    │  7s│  149615240402.646887.29395.583725.45│        │        │200:1524│
    │  8s│  150017740366.777060.28395.584089.79│        │        │200:1774│
    │  9s│  150020150345.617060.28395.584340.16│        │        │200:2015│
    │ 10s│  150022520330.467060.28395.584539.15│        │        │200:2252│
    │ 11s│  150024910319.097060.28395.584700.83│        │        │200:2491│
    │ 12s│  150027330310.397060.28395.584832.66│        │        │200:2733│
    │ 13s│  150029930302.997060.28395.584950.65│        │        │200:2993│
    │ 13s│  150030000302.827060.28395.584953.50│        │        │200:3000
    数据验证
    select count(*) from `code` where `status` = 1;
    # 333
    select count(*) from code_out;
    # 333
    select count(*), code_id from code_out group by code_id having count(*) > 1;
    # 无记录
    
    结论

    很遗憾,奖没发完呢,因为部分线程抢到了同一个记录,但由于受到了版本控制,所以那些没有更新到数据的线程只能怪自己运气不好咯。

    这里用到了MySQL默认的MVCC,不知道的童鞋赶紧Google一下吧。

    其实,利用InnoDB的事务隔离也可以达到目的哦,但是如果没有深刻理解的话,搞不好会玩火自焚呢(如果造成死锁,无论行表,都会严重影响业务)。

    顺便说一句,大名鼎鼎的Elasticsearch也是用的这种方式解决这种问题的哦。

    使用缓存
    准备
    // redis稍微封装一下
    private function redis(): \Redis {
        $redis = new \Redis();
        $redis->connect('{host}', {port});
        $redis->auth('{password}');
        return $redis;
    }
    
    // 预热数据,将code放入Redis set中
    $code = \DB::table('code')
        ->select('code')
        ->get();
    $redis = $this->redis();
    foreach ($code as $v) {
        $redis->sAdd('code', $v);
    }
    
    开抢咯
    $redis = $this->redis();
    $code = $redis->spop('code');
    if (null == $code) {
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    }
    $exist = \DB::table('code')
        ->where('code', $code)
        ->where('status', 0)
        ->select('id')
        ->first();
    if (null == $exist) {
        return [
            'code' => 500,
            'msg' => 'invalid code',
            'data' => null
        ];
    }
    \DB::table('code')
        ->where('id', $exist->id)
        ->update([
            'status' => 1,
            'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
        ]);
    \DB::table('code_out')
        ->insert([
            'code_id' => $exist->id
        ]);
    return [
        'code' => 200,
        'msg' => 'congratulations',
        'data' => $code
    ];
    
    结果
    ┼────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┼
    │ 耗时│ 并发数 │ 成功数 │ 失败数 │  qps   │ 最长耗时 │ 最短耗时 │ 平均耗时│ 下载字节 │ 字节每秒│ 错误码  │
    ┼────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
    │  1s│     686801880.27955.80704.57797.76│        │        │ 200:68 │
    │  2s│    27827801146.861979.88704.571307.92│        │        │ 200:278│
    │  3s│    5405400795.132928.10704.571886.49│        │        │ 200:540│
    │  4s│    6976970687.853467.25704.572180.72│        │        │ 200:697│
    │  5s│   105810580509.594935.67704.572943.54│        │        │200:1058│
    │  6s│   120712070464.165791.64704.573231.65│        │        │200:1207│
    │  7s│   150016820377.436835.16704.573974.30│        │        │200:1682│
    │  8s│   150019660359.366835.16704.574174.10│        │        │200:1966│
    │  9s│   150022770349.386835.16704.574293.34│        │        │200:2277│
    │ 10s│   150025600344.166835.16704.574358.40│        │        │200:2560│
    │ 11s│   150028480341.156835.16704.574396.88│        │        │200:2848│
    │ 11s│   150030000339.306835.16704.574420.93│        │        │200:3000
    数据验证
    select count(*) from `code `where `status` = 1;
    # 400
    select count(*) from code_out;
    # 400
    select count(*), code_id from code_out group by code_id having count(*) > 1;
    # 无记录
    
    结论

    可以看到,利用Redis单线程特性,并发问题已经解决啦。

    并发锁
    开抢咯
    $redis = $this->redis();
    if (false === $redis->setnx('lock', 1)) {
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    }
    // 避免死锁
    $redis->expire('lock', 10);
    try {
        $remain = \DB::table('code')
            ->where('status', 0)
            ->select('id', 'status')
            ->first();
        if (null == $remain) {
            return [
                'code' => 500,
                'msg' => 'no code available',
                'data' => null
            ];
        }
        \DB::table('code')
            ->where('id', $remain->id)
            ->update([
                'status' => 1,
                'out_time' => date('Y-m-d H:i:s', $_SERVER['REQUEST_TIME'])
            ]);
        \DB::table('code_out')
            ->insert([
                'code_id' => $remain->id
            ]);
        return [
            'code' => 200,
            'msg' => 'congratulations',
            'data' => $remain->code
        ];
    } catch (\Exception $e) {
        // 异常
        return [
            'code' => 500,
            'msg' => 'no code available',
            'data' => null
        ];
    } finally {
        // 释放锁
        $redis->del('lock');
    }
    
    结果
    ┼────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┼
    │ 耗时│ 并发数 │ 成功数 │ 失败数 │   qps  │ 最长耗时 │ 最短耗时│ 平均耗时 │ 下载字节 │ 字节每秒│  错误码 │
    │────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────┼
    │  1s│      0000.000.000.000.00│        │        │        │
    │  2s│     39390814.371886.711754.721841.90│        │        │  200:39│
    │  3s│    2872870577.952974.691754.722595.40│        │        │ 200:287│
    │  6s│    9229220434.784880.621754.723450.04│        │        │ 200:922│
    │  5s│    6956950483.453675.151754.723102.72│        │        │ 200:695│
    │  6s│   135213520363.115881.571754.724130.97│        │        │200:1352│
    │  7s│   145314890352.776302.321754.724252.01│        │        │200:1489│
    │  8s│   150020460345.427439.631754.724342.48│        │        │200:2046│
    │  9s│   150023040344.517439.631754.724354.06│        │        │200:2304│
    │ 10s│   150025590345.937439.631754.724336.18│        │        │200:2559│
    │ 11s│   150028180342.977439.631754.724373.58│        │        │200:2818│
    │ 12s│   150030000340.217439.631754.724409.07│        │        │200:3000
    数据验证
    select count(*) from `code` where `status` = 1;
    # 61
    select count(*) from code_out;
    # 61
    select count(*), code_id from code_out group by code_id having count(*) > 1;
    # 无记录
    
    结论

    虽然这里也用到了Redis的特性,但重点是并发锁的原理,用PHP的文件锁也可以实现这个功能。

    在这个例子中,很遗憾,3000个请求只完成了61个奖的发放。因为锁住的时候就直接返回了结果,导致很多请求被拒绝了。但重点是避免了重发的问题!

    总结

    这里通过几个简单的例子,验证了用不同方法解决并发问题。虽然实际业务会更加复杂,但解决问题的方式,原理就是这些啦。

    这里根据我的项目经验,给出一些建议:

    Redis虽然是单线程(新版本的Redis已经是多线程的啦),但是连续的Redis操作可不一定了哦。例子:先get一个key,再set它,在并发情况下,结果可不一定是你想要的啦。

    • 如果是数字的话,可以使用Redis的incr/decr这种连续操作的方法。
    • 其他类型的话,可以使用Lua脚本一并发送命令,特殊语言如Java,可以用自己的锁来锁住代码块。

    使用并发锁一定要注意死锁的问题,不管什么情况,都要及时释放锁,否则万一出现死锁问题,那就是重大事故!

    好了,就说这么多了,希望对你有所帮助。

    展开全文
  • JAVA如何解决并发问题

    2021-03-17 15:48:19
    并发问题的根源在哪首先,我们要知道并发要解决的是什么问题?并发要解决的是单进程情况下硬件资源无法充分利用的问题。而造成这一问题的主要原因是CPU-内存-磁盘三者之间速度差异实在太大。如果将CPU的速度比作火箭...

    并发问题的根源在哪

    首先,我们要知道并发要解决的是什么问题?并发要解决的是单进程情况下硬件资源无法充分利用的问题。而造成这一问题的主要原因是CPU-内存-磁盘三者之间速度差异实在太大。如果将CPU的速度比作火箭的速度,那么内存的速度就像火车,而最惨的磁盘,基本上就相当于人双腿走路。

    这样造成的一个问题,就是CPU快速执行完它的任务的时候,很长时间都会在等待磁盘或是内存的读写。

    计算机的发展有一部分就是如何重复利用资源,解决硬件资源之间效率的不平衡,而后就有了多进程,多线程的发展。并且演化出了各种为多进程(线程)服务的东西:

    CPU增加缓存机制,平衡与内存的速度差异

    增加了多个概念,CPU时间片,程序计数器,线程切换等,用以更好得服务并发场景

    编译器的指令优化,希望在内部充分利用硬件资源

    但是这样一来,也会带来新的并发问题,归结起来主要有三个。

    由于缓存导致的可见性问题

    线程切换带来的原子性问题

    编译器优化带来的有序性问题

    我们分别介绍这几个:

    缓存导致的可见性

    CPU为了平衡与内存之间的性能差异,引入了CPU缓存,这样CPU执行指令修改数据的时候就可以批量直接读写CPU缓存的内存,一个阶段后再将数据写回到内存。

    但由于现在多核CPU技术的发展,各个线程可能运行在不同CPU核上面,每个CPU核各有各自的CPU缓存。前面说到对变量的修改通常都会先写入CPU缓存,再写回内存。这就会出现这样一种情况,线程1修改了变量A,但此时修改后的变量A只存储在CPU缓存中。这时候线程B去内存中读取变量A,依旧只读取到旧的值,这就是可见性问题。

    线程切换带来的原子性

    为了更充分得利用CPU,引入了CPU时间片时间片的概念。进程或线程通过争用CPU时间片,让CPU可以更加充分得利用。

    比如在进行读写磁盘等耗时高的任务时,就可以将宝贵的CPU资源让出来让其他线程去获取CPU并执行任务。

    但这样的切换也会导致问题,那就是会破坏线程某些任务的原子性。比如java中简单的一条语句count += 1。

    映射到CPU指令有三条,读取count变量指令,变量加1指令,变量写回指令。虽然在高级语言(java)看来它就是一条指令,但实际上确是三条CPU指令,并且这三条指令的原子性无法保证。也就是说,可能在执行到任意一条指令的时候被打断,CPU被其他线程抢占了。而这个期间变量值可能会被修改,这里就会引发数据不一致的情况了。所以高并发场景下,很多时候都会通过锁实现原子性。而这个问题也是很多并发问题的源头。

    编译器优化带来的有序性

    展开全文
  • 面试必问!多线程并发问题

    千次阅读 2021-01-22 11:32:33
    多线程并发问题,基本是面试必问的。 大部分同学应该都知道Synchronized,Lock,部分同学能说到volatile、并发包,优秀的同学则能在前面的基础上,说出Synchronized、volatile的原理,以及并发包中常用的数据结构,...

    多线程并发问题,基本是面试必问的。

    大部分同学应该都知道SynchronizedLock,部分同学能说到volatile并发包,优秀的同学则能在前面的基础上,说出Synchronized、volatile的原理,以及并发包中常用的数据结构,例如ConcurrentHashMap的原理。

    这篇文章将总结多线程并发的各种处理方式,希望对大家有所帮助。

    一、多线程为什么会有并发问题

    为什么多线程同时访问(读写)同个变量,会有并发问题?

    1. Java 内存模型规定了所有的变量都存储在主内存中,每条线程有自己的工作内存。
    2. 线程的工作内存中保存了该线程中用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。
    3. 线程访问一个变量,首先将变量从主内存拷贝到工作内存,对变量的写操作,不会马上同步到主内存。
    4. 不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量的传递均需要自己的工作内存和主存之间进行数据同步进行。

    二、Java 内存模型(JMM)

    Java 内存模型(JMM) 作用于工作内存(本地内存)和主存之间数据同步过程,它规定了如何做数据同步以及什么时候做数据同步,如下图。

    img

    三、并发三要素

    原子性:在一个操作中,CPU 不可以在中途暂停然后再调度,即不被中断操作,要么执行完成,要么就不执行。

    可见性:多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

    有序性:程序执行的顺序按照代码的先后顺序执行。

    四、怎么做,才能解决并发问题?(重点)

    下面结合不同场景分析解决并发问题的处理方式。

    一、volatile

    1.1 volatile 特性

    保证可见性,不保证原子性

    1. 当写一个volatile变量时,JVM会把本地内存的变量强制刷新到主内存中
    2. 这个写操作导致其他线程中的缓存无效,其他线程读,会从主内存读。volatile的写操作对其它线程实时可见。

    禁止指令重排序 指令重排序是指编译器和处理器为了优化程序性能对指令进行排序的一种手段,需要遵守一定规则:

    1. 不会对存在依赖关系的指令重排序,例如 a = 1;b = a; a 和b存在依赖关系,不会被重排序
    2. 不能影响单线程下的执行结果。比如:a=1;b=2;c=a+b这三个操作,前两个操作可以重排序,但是c=a+b不会被重排序,因为要保证结果是3

    1.2 使用场景

    对于一个变量,只有一个线程执行写操作,其它线程都是读操作,这时候可以用 volatile 修饰这个变量。

    1.3 单例双重锁为什么要用到volatile?

    public class TestInstance {
    
    private static volatile TestInstance mInstance;
    
    public static TestInstance getInstance(){       //1
        if (mInstance == null){                     //2
            synchronized (TestInstance.class){      //3
                if (mInstance == null){             //4
                    mInstance = new TestInstance(); //5
                }
            }
        }
        return mInstance;
    }
    复制代码
    

    }

    假如没有用volatile,并发情况下会出现问题,线程A执行到注释5 new TestInstance() 的时候,分为如下几个几步操作:

    1. 分配内存
    2. 初始化对象
    3. mInstance 指向内存

    这时候如果发生指令重排,执行顺序是132,执行到第3的时候,线程B刚好进来了,并且执行到注释2,这时候判断mInstance 不为空,直接使用一个未初始化的对象。所以使用volatile关键字来禁止指令重排序。

    1.4 volatile 原理

    在JVM底层volatile是采用内存屏障来实现的,内存屏障会提供3个功能:

    1. 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
    2. 它会强制将缓存的修改操作立即写到主内存
    3. 写操作会导致其它CPU中的缓存行失效,写之后,其它线程的读操作会从主内存读。

    1.5 volatile 的局限性

    volatile 只能保证可见性,不能保证原子性写操作对其它线程可见,但是不能解决多个线程同时写的问题。

    二、Synchronized

    2.1 Synchronized 使用场景

    多个线程同时写一个变量。

    例如售票,余票是100张,窗口A和窗口B同时各卖出一张票, 假如余票变量用 volatile 修饰,是有问题的。
    A窗口获取余票是100,B窗口获取余票也是100,A卖出一张变成99,刷新回主内存,同时B卖出一张变成99,也刷新回主内存,会导致最终主内存余票是99而不是98。

    前面说到 volatile 的局限性,就是多个线程同时写的情况,这种情况一般可以使用Synchronized

    Synchronized 可以保证同一时刻,只有一个线程可执行某个方法或某个代码块。

    2.2 Synchronized 原理

    public class SynchronizedTest {
    
    public static void main(String[] args) {
        synchronized (SynchronizedTest.class) {
            System.out.println("123");
        }
        method();
    }
    
    private static void method() {
    }
    }
    复制代码
    

    将这段代码先用javac命令编译,再java p -v SynchronizedTest.class命令查看字节码,部分字节码如下

    public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class com/lanshifu/opengldemo/test/SynchronizedTest
         2: dup
         3: astore_1
         4: monitorenter
         5: getstatic     #3                  // Field java/lang/System.out:Ljava/io/PrintStream;
         8: ldc           #4                  // String 123
        10: invokevirtual #5                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        13: aload_1
        14: monitorexit
        15: goto          23
        18: astore_2
        19: aload_1
        20: monitorexit
        21: aload_2
        22: athrow
        23: invokestatic  #6                  // Method method:()V
        26: return
    复制代码
    

    可以看到 4: monitorenter14: monitorexit,中间是打印的语句。

    执行同步代码块,首先会执行monitorenter指令,然后执行同步代码块中的代码,退出同步代码块的时候会执行monitorexit指令 。

    使用Synchronized进行同步,其关键就是必须要对对象的监视器monitor进行获取,当线程获取monitor后才能继续往下执行,否则就进入同步队列,线程状态变成BLOCK,同一时刻只有一个线程能够获取到monitor,当监听到monitorexit被调用,队列里就有一个线程出队,获取monitor。详情参考:www.jianshu.com/p/d53bf830f…

    每个对象拥有一个计数器,当线程获取该对象锁后,计数器就会加一,释放锁后就会将计数器减一,所以只要这个锁的计数器大于0,其它线程访问就只能等待。

    2.3 Synchronized 锁的升级

    大家对Synchronized的理解可能就是重量级锁,但是Java1.6对 Synchronized 进行了各种优化之后,有些情况下它就并不那么重,Java1.6 中为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁。

    偏向锁: 大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。

    当一个线程A访问加了同步锁的代码块时,会在对象头中存 储当前线程的id,后续这个线程进入和退出这段加了同步锁的代码块时,不需要再次加锁和释放锁。

    轻量级锁: 在偏向锁情况下,如果线程B也访问了同步代码块,比较对象头的线程id不一样,会升级为轻量级锁,并且通过自旋的方式来获取轻量级锁。

    重量级锁: 如果线程A和线程B同时访问同步代码块,则轻量级锁会升级为重量级锁,线程A获取到重量级锁的情况下,线程B只能入队等待,进入BLOCK状态。

    2.4 Synchronized 缺点

    1. 不能设置锁超时时间
    2. 不能通过代码释放锁
    3. 容易造成死锁

    三、ReentrantLock

    上面说到Synchronized的缺点,不能设置锁超时时间和不能通过代码释放锁,ReentranLock就可以解决这个问题。

    在多个条件变量和高度竞争锁的地方,用ReentrantLock更合适,ReentrantLock还提供了Condition,对线程的等待和唤醒等操作更加灵活,一个ReentrantLock可以有多个Condition实例,所以更有扩展性。

    3.1 ReentrantLock 的使用

    lock 和 unlock

            ReentrantLock reentrantLock = new ReentrantLock();
            System.out.println("reentrantLock->lock");
            reentrantLock.lock();
            try {
    
                System.out.println("睡眠2秒...");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                reentrantLock.unlock();
                System.out.println("reentrantLock->unlock");
            }
    复制代码
    

    实现可定时的锁请求:tryLock

        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock();
            Thread thread1 = new Thread_tryLock(reentrantLock);
            thread1.setName("thread1");
            thread1.start();
            Thread thread2 = new Thread_tryLock(reentrantLock);
            thread2.setName("thread2");
            thread2.start();
    }
    
        static class Thread_tryLock extends Thread {
            ReentrantLock reentrantLock;
    
            public Thread_tryLock(ReentrantLock reentrantLock) {
                this.reentrantLock = reentrantLock;
            }
    
            @Override
            public void run() {
                try {
                    System.out.println("try lock:" + Thread.currentThread().getName());
                    boolean tryLock = reentrantLock.tryLock(3, TimeUnit.SECONDS);
                    if (tryLock) {
                        System.out.println("try lock success :" + Thread.currentThread().getName());
                        System.out.println("睡眠一下:" + Thread.currentThread().getName());
                        Thread.sleep(5000);
                        System.out.println("醒了:" + Thread.currentThread().getName());
                    } else {
                        System.out.println("try lock 超时 :" + Thread.currentThread().getName());
                    }
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("unlock:" + Thread.currentThread().getName());
                    reentrantLock.unlock();
                }
            }
        }
    
    复制代码
    

    打印的日志:

    try lock:thread1
    try lock:thread2
    try lock success :thread2
    睡眠一下:thread2
    try lock 超时 :thread1
    unlock:thread1
    Exception in thread "thread1" java.lang.IllegalMonitorStateException
        at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
        at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
        at com.lanshifu.demo_module.test.lock.ReentranLockTest$Thread_tryLock.run(ReentranLockTest.java:60)
    醒了:thread2
    unlock:thread2
    
    复制代码
    

    上面演示了trtLock的使用,trtLock设置获取锁的等待时间,超过3秒直接返回失败,可以从日志中看到结果。 有异常是因为thread1获取锁失败,不应该调用unlock。

    3.2 Condition 条件

    public static void main(String[] args) {
    
            Thread_Condition thread_condition = new Thread_Condition();
            thread_condition.setName("测试Condition的线程");
            thread_condition.start();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            thread_condition.singal();
    
        }
    
    static class Thread_Condition extends Thread {
    
            @Override
            public void run() {
                await();
            }
    
            private ReentrantLock lock = new ReentrantLock();
            public Condition condition = lock.newCondition();
    
            public void await() {
                try {
                    System.out.println("lock");
                    lock.lock();
                    System.out.println(Thread.currentThread().getName() + ":我在等待通知的到来...");
                    condition.await();//await 和 signal 对应
                    //condition.await(2, TimeUnit.SECONDS); //设置等待超时时间
                    System.out.println(Thread.currentThread().getName() + ":等到通知了,我继续执行>>>");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("unlock");
                    lock.unlock();
                }
            }
    
            public void singal() {
                try {
                    System.out.println("lock");
                    lock.lock();
                    System.out.println("我要通知在等待的线程,condition.signal()");
                    condition.signal();//await 和 signal 对应
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("unlock");
                    lock.unlock();
                }
            }
        }
    复制代码
    

    运行打印日志

    lock
    测试Condition的线程:我在等待通知的到来...
    lock
    我要通知在等待的线程,condition.signal()
    unlock
    测试Condition的线程:等到通知了,我继续执行>>>
    unlock
    复制代码
    

    上面演示了Condition的 await 和 signal 使用,前提要先lock。

    3.3 公平锁与非公平锁

    ReentrantLock 构造函数传true表示公平锁。

    公平锁表示线程获取锁的顺序是按照线程加锁的顺序来分配的,即先来先得的顺序。而非公平锁就是一种锁的抢占机制,是随机获得锁的,可能会导致某些线程一致拿不到锁,所以是不公平的。

    3.4 ReentrantLock 注意点

    1. ReentrantLock使用lock和unlock来获得锁和释放锁
    2. unlock要放在finally中,这样正常运行或者异常都会释放锁
    3. 使用condition的await和signal方法之前,必须调用lock方法获得对象监视器

    四、并发包

    通过上面分析,并发严重的情况下,使用锁显然效率低下,因为同一时刻只能有一个线程可以获得锁,其它线程只能乖乖等待。

    Java提供了并发包解决这个问题,接下来介绍并发包里一些常用的数据结构。

    4.1 ConcurrentHashMap

    我们都知道HashMap是线程不安全的数据结构,HashTable则在HashMap基础上,get方法和put方法加上Synchronized修饰变成线程安全,不过在高并发情况下效率底下,最终被ConcurrentHashMap替代。

    ConcurrentHashMap 采用分段锁,内部默认有16个桶,get和put操作,首先将key计算hashcode,然后跟16取余,落到16个桶中的一个,然后每个桶中都加了锁(ReentrantLock),桶中是HashMap结构(数组加链表,链表过长转红黑树)。

    所以理论上最多支持16个线程同时访问。

    4.2 LinkBlockingQueue

    链表结构的阻塞队列,内部使用多个ReentrantLock

        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();
    
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        /**
         * Signals a waiting put. Called only from take/poll.
         */
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    
    复制代码
    

    源码不贴太多,简单说一下LinkBlockingQueue 的逻辑:

    1. 从队列获取数据,如果队列中没有数据,会调用notEmpty.await();进入等待。
    2. 在放数据进去队列的时候会调用notEmpty.signal();,通知消费者,1中的等待结束,唤醒继续执行。
    3. 从队列里取到数据的时候会调用notFull.signal();,通知生产者继续生产。
    4. 在put数据进入队列的时候,如果判断队列中的数据达到最大值,那么会调用notFull.await();,等待消费者消费掉,也就是等待3去取数据并且发出notFull.signal();,这时候生产者才能继续生产。

    LinkBlockingQueue 是典型的生产者消费者模式,源码细节就不多说。

    4.3 原子操作类:AtomicInteger

    内部采用CAS(compare and swap)保证原子性

    举一个int自增的例子

            AtomicInteger atomicInteger = new AtomicInteger(0);
            atomicInteger.incrementAndGet();//自增
    复制代码
    

    源码看一下

       /**
         * Atomically increments by one the current value.
         *
         * @return the updated value
         */
        public final int incrementAndGet() {
            return U.getAndAddInt(this, VALUE, 1) + 1;
        }
    复制代码
    

    U 是 Unsafe,看下 Unsafe#getAndAddInt

        public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }
    复制代码
    

    通过compareAndSwapInt保证原子性。

    五、小结

    面试中问到多线程并发问题,可以这么答:

    1. 当只有一个线程写,其它线程都是读的时候,可以用volatile修饰变量
    2. 当多个线程写,那么一般情况下并发不严重的话可以用Synchronized,Synchronized并不是一开始就是重量级锁,在并发不严重的时候,比如只有一个线程访问的时候,是偏向锁;当多个线程访问,但不是同时访问,这时候锁升级为轻量级锁;当多个线程同时访问,这时候升级为重量级锁。所以在并发不是很严重的情况下,使用Synchronized是可以的。不过Synchronized有局限性,比如不能设置锁超时,不能通过代码释放锁。
    3. ReentranLock 可以通过代码释放锁,可以设置锁超时。
    4. 高并发下,Synchronized、ReentranLock 效率低,因为同一时刻只有一个线程能进入同步代码块,如果同时有很多线程访问,那么其它线程就都在等待锁。这个时候可以使用并发包下的数据结构,例如ConcurrentHashMapLinkBlockingQueue,以及原子性的数据结构如:AtomicInteger

    面试的时候按照上面总结的这个思路回答基本就ok了。既然说到并发包,那么除了ConcurrentHashMap,其它一些常用的数据结构的原理也需要去了解下,例如HashMap、HashTable、TreeMap原理,ArrayList、LinkedList对比,这些都是老生常谈的,自己去看源码或者一些博客。

    关于多线程并发就先总结到这里,如果是应付面试的话按照这篇文章的思路来准备应该是没太大问题的。

    六、最后

    为了方便有面试需要的朋友们,更加系统方便的学习刷题,拿到自己理想的offer。我将各大厂历年的常见、难点面试题做了系列整理,有需要的朋友,可以点赞+关注后,点击这里直接获取!

    img

    展开全文
  • 线程安全是并发编程中的重要关注点,应该注意到的是,造成线程安全问题的主要诱因有一下两点:1. 存在共享数据 (也称临界资源)2. 存在多条线程共同操作共享数据只用同时满足上面两个条件,才会引发线程安全问题。...

    线程安全是并发编程中的重要关注点,应该注意到的是,造成线程安全问题的主要诱因有一下两点:

    1. 存在共享数据 (也称临界资源)

    2. 存在多条线程共同操作共享数据

    只用同时满足上面两个条件,才会引发线程安全问题。

    因此,我们可以使用以下两种常用的加锁方式来解决线程安全问题:

    当存在多个线程并发访问共享数据时,需要保证同一时刻有且只有一个线程在操作共享数据,其他线程必须等到这个线程结束了对共享数据的操作(访问)才可以进行对数据的操作(访问)。这种安排多线程访问共享数据的方案被称为互斥锁,即能达到互斥访问共享数据目的的锁,具体的就是,当一个线程访问一个共享数据时,该线程必须获得共享数据的互斥锁,这样在同一时刻,其他线程因为无法活得该共享数据的互斥锁,只能处于等待状态,直到当前线程操作完共享数据,释放了互斥锁,其他等待的线程才可以通过获得这个互斥锁来获得对共享数据的操作权利。

    1. 在 Java 中,关键字 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块(主要是对方法或者代码块中存在共享数据的操作),同时我们还应该注意到 synchronized 另外一个重要的作用,synchronized 可保证一个线程的变化(主要是共享数据的变化)被其他线程所看到(保证可见性,完全可以替代 volatile 功能,但是 volatile 没有 synchronized 保证操作的原子性的特性),这点确实也是很重要的。

    2. synchronized 属于隐式锁,即锁的持有与释放都是隐式的,我们无需干预,还有一种显式锁 Lock,即锁的持有和释放都必须由我们手动编写。在Java 1.5中,官方在concurrent并发包中加入了Lock接口,该接口中提供了 lock() 方法和 unLock() 方法对显式加锁和显式释放锁操作进行支持。

    展开全文
  • springmvc+hibernate+jdbctemplate+mysql 原文链接:... 如内容是第一次请求的内容,并且让第一个请求跑完后,第二个请求到断线处的content正确时,可以确定不会出现并发问题
  • Java static并发问题

    2021-03-13 09:40:32
    只要你的静态方法不访问全局变量的话,就不会有并发问题访问全局变量肯定会出现并发问题,这是毫无疑问的静态方法内部的变量,都是局部变量,每次调用静态方法时都会重新分配内存空间,所以是安全的。 也就是是说...
  • 解决并发下的另外一个方法,隔离级别!!首先创建一个account的表,插入数据,引擎为Innodb1. 未提交读(Read uncommitted)允许脏读,事务中的修改,即时没有提交,对其它事务也是可见的A查询执行操作:123set ...
  • 序列号自增并发问题

    2021-11-08 10:43:19
    序列号自增并发问题使用场景使用 for update 和 直接update的区别功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容...
  • Quartz 调度任务并发问题 假设 A任务没3秒执行一次,但执行周期为10秒,就会出现多个任务同时在跑的现象,如果只是查询不进行更新操作的话这个问题不大,但是如果有更新操作就会导致数据并发异常 以下是具体的测试...
  • Random高并发问题

    2021-06-11 09:42:32
    // 这样的写法在高并发下会有问题 int random = new Random().nextInt(10); // 生成0~9的随机数 正确写法: int random = ThreadLocalRandom.current().nextInt(10); // 生成0~9的随机数 原因解析: // nextInt(10...
  • redis为什么会有高并发问题 redis的出身决定 redis是一种单线程机制的nosql数据库,基于key-value,数据可持久化落盘。由于单线程所以redis本身并没有锁的概念,多个客户端连接并不存在竞争关系,但是利用jedis等...
  • 避免token并发问题

    2021-07-12 16:57:59
    Redis可以用删除操作来判断Token是否有效, 删除成功代表Token校验通过,采用select+del来校验Token 会存在并发问题
  • 本文讲述了Redis高并发问题的解决办法。分享给大家供大家参考,具体如下:redis为什么会有高并发问题redis的出身决定redis是一种单线程机制的nosql数据库,基于key-value,数据可持久化落盘。由于单线程所以redis...
  • php redis处理并发问题

    2021-03-23 21:56:09
    最近在弄并发问题,看到都是用redis队列处理并发的,所以我也弄弄看,用Linux的webbench模拟并发请求,但我发现使用redis速度很慢,只是简单的读取数据操作lget()操作都要1秒钟,循环入队1000次的时间也只是1秒多...
  • 1,问题引入: 在单核cpu系统中;进程有个全局量 intg_i = 0,在进程中开10个线程,每个线程都不对 g_i 加锁的情况下做1亿次自增操作 (g_i++) ;主线程等待所有的线程结束后,再打印 g_i 的值能保证是 10 亿吗? ...
  • (一)redis技术的使用:redis真的是一个很好的技术,它可以很好的在一定程度上解决网站一瞬间的并发量,例如商品抢购秒杀等活动。。。redis之所以能解决高并发的原因是它可以直接访问内存,而以往我们用的是数据库...
  • 特点 无序的键值对集合,使用key来获取,无法通过index来获取,返回顺序未知,因此每次打印的顺序可能不一样。...非并发安全的。 声明与初始化 var m1 map[string]int // 只是声明,依然是 nil m2 := make(map
  • 2、每天的工单生成量是30W,所以会存在并发问题解决思路:1、首先乐观的认为redis不会宕机,对应的缓存不会被清除(除非人为操作,人为操作会有独立的补救办法)2、将工单编码存到缓存中(redis),其值只存“WT”+...
  • 在开发中遇到过这样一个问题一个看视频记录,更新到100就表示看完了,后面再有请求不继续更新了.结果是:导致,里面很多数据出现问题.推测是以下的情况才会导致第一条请求 事务在执行中,还未提交(因为本地有时候比较难...
  • 并发问题的解决一直是广大程序员的心病,而乐观锁便是能够有效解决高并发问题的一种模式,下面就让我们一起看看要如何在乐观锁下解决高并发问题吧。乐观锁下解决高并发问题例:在一银行中,如若A、B操作员同时读取一...
  • 在高并发的场景下,经常会遇到这种情况:A请求过来,查询出来一条数据,进行update操作,与此同时B请求也在这个时候过来,对这条数据进行查询,并进行操作。此时就会出现B在A之后进行查询操作,但是实际B的数据却被A...
  • 问题复现 客户端并发请求扣费接口,导致重复扣费; 服务端的加锁逻辑不正确,锁加到了非...这个思路无法规避并发问题 public function agoraToken(Request $request) { . . . try { DB::connection('footpri
  • 学习笔记1、并发问题的产生1.1 内存模型1.2 CPU内存模型1.3 Java内存模型1.4 基于内存模型分析并发问题产生的根因2、并发问题的解决方案 1、并发问题的产生 1.1 内存模型 想要了解并发问题产生的根源,首先我们需要...
  • /** * 解决ArrayList的三种方法 */ //1. 线程安全 add 方法使用的是 synchronized 修饰,但其效率并不高 //List<String> list = new Vector<>();... //2.... list = Collections.synchronizedList(new...
  • sqlite数据库并发问题

    2021-01-08 14:50:22
    目录 ...缺点:既然是轻量级,那功能和性能上就有点跟不上了,尤其是多线程操作时的并发问题。 sqlite的锁 用过sqlite的小伙伴都知道,sqlite使用的是库锁,当然也可以理解成“文件锁”。sqlite...
  • 所谓服务器大流量高并发指的是:在同时或极短时间内,有大量的请求到达服务端,每个请求都需要服务端耗费资源进行处理,并做出...高并发问题的本质就是:资源的有限性高并发带来的问题服务端的处理和响应会越来越...
  • java的hashmap高并发问题 java 的 HashMap 高并发问题2010 年 1 月 9 日 114 views 评论 发表评论 今天不知为什么服务器卡死了把所有线程的堆栈打印出来是这样的java.util.HashMap.get……………….ReceiveWorker....
  • nodejs处理高并发问题

    2021-01-19 14:35:10
    function (err, res) {if (!err) {var insert = "insert into testlog(testid,num,`desc`) values ('" + id + "','" + num + "','" + dateFormat(new Date()) + "')";connection.query(insert, function (err, res) ...
  • 这里我们主要利用Redis的setnx的命令来处理高并发。setnx 有两个参数。第一个参数表示键。第二个参数表示值。如果当前键不存在,那么会插入当前键,将第二个参数做为值。返回 1。如果当前键存在,那么会返回0。创建...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,010,882
精华内容 404,352
关键字:

并发问题