原子操作 订阅
"原子操作(atomic operation)是不需要synchronized",这是多线程编程的老生常谈了。所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切 [1]  换到另一个线程)。 展开全文
"原子操作(atomic operation)是不需要synchronized",这是多线程编程的老生常谈了。所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切 [1]  换到另一个线程)。
信息
特    性
不可分割的
外文名
atomic operation
含    义
是多个操作 不可被打乱或切割
中文名
原子操作
原子操作定义
如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构,那么这个操作是一个原子(atomic)操作。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中的一部分。将整个操作视作一个整体是原子性的核心特征。
收起全文
精华内容
下载资源
问答
  • 2021-05-21 17:38:51

    所谓原子操作,就是"不可中断的一个或一系列操作" 。

    硬件级的原子操作:

    1)在单处理器系统(UniProcessor)中,能够在单条指令中完成的操作都可以认为是" 原子操作",因为中断只能发生于指令之间。这也是某些CPU指令系统中引入了test_and_set、test_and_clear等指令用于临界资源互斥的原因。

    2)在对称多处理器(Symmetric Multi-Processor)结构中就不同了,由于系统中有多个处理器在独立地运行,即使能在单条指令中完成的操作也有可能受到干扰。

    软件级的原子操作:

    软件级的原子操作实现依赖于硬件原子操作的支持。

    问题:GNU C中x++是原子操作吗?

    答案:不是。x++由3条指令完成。x++在单CPU下不是原子操作。对应3条汇编指令:

    movl x, %eax

    addl $1, %eax

    movl %eax, x

    ++x,x++等都不是原子操作。其步骤包括了从内存中取x值放入寄存器,加寄存器,再把结果值写入内存三个指令。

    如何实现x++的原子性?在单处理器上,如果执行x++时,禁止多线程调度,就可以实现原子。因为单处理的多线程并发是伪并发。

    在多处理器上,需要借助cpu提供的Lock功能,锁总线。读取内存值,修改,写回内存三步期间禁止别的CPU访问总线。

    在多处理器系统中存在潜在问题的原因是:不使用LOCK指令前缀锁定总线的话,在一次内存访问周期中有可能其他处理器会产生异常或中断,而在异常处理中有可能会修改尚未写入的地址,这样当INC操作完成后会产生无效数据(覆盖了前面的修改)。

    spinlock 用于CPU同步, 它的实现是基于CPU锁定数据总线的指令.

    当某个CPU锁住数据总线后, 它读一个内存单元(spinlock_t)来判断这个spinlock 是否已经被别的CPU锁住. 如果否, 它写进一个特定值, 表示锁定成功, 然后返回. 如果是, 它会重复以上操作直到成功, 或者spin次数超过一个设定值. 锁定数据总线的指令只能保证一个机器指令内, CPU独占数据总线.

    单CPU当然能用spinlock, 但实现上无需锁定数据总线.

    spinlock在锁定的时候,如果不成功,不会睡眠,会持续的尝试,单cpu的时候spinlock会让其它process动不了。

    对于linux而言,内核提供了两组原子操作接口:一组是针对整数进行操作;另一组是针对单独的位进行操作。

    1、原子整数操作

    原子操作通常针对int或bit类型的数据,但是Linux并不能直接对int进行原子操作,而只能通过atomic_t的数据结构来进行。

    原子整数操作最常见的用途就是实现计数器。

    针对整数的原子操作只能对atomic_t类型的数据处理。这里没有使用C语言的int类型,主要是因为:

    1) 让原子函数只接受atomic_t类型操作数,可以确保原子操作只与这种特殊类型数据一起使用

    2) 使用atomic_t类型确保编译器不对相应的值进行访问优化

    3) 使用atomic_t类型可以屏蔽不同体系结构上的数据类型的差异。尽管Linux支持的所有机器上的整型数据都是32位,但是使用atomic_t的代码只能将该类型的数据当作24位来使用。这个限制完全是因为在SPARC体系结构上,原子操作的实现不同于其它体系结构:32位int类型的低8位嵌入了一个锁,因为SPARC体系结构对原子操作缺乏指令级的支持,所以只能利用该锁来避免对原子类型数据的并发访问。

    常见的用法是:

    atomic_t use_cnt;

    atomic_set(&use_cnt, 2);

    atomic_add(4, &use_cnt);

    atomic_inc(use_cnt);

    2、原子位操作的实现

    位操作函数是对普通的内存地址进行操作的。原子位操作在多数情况下是对一个字长的内存访问,因而位号该位于0-31之间(在64位机器上是0-63之间),但是对位号的范围没有限制。

    编写内核代码,只要把指向了你希望的数据的指针给操作函数,就可以进行位操作了:

    unsigned long word = 0;

    set_bit(0, &word); /*第0位被设置*/

    set_bit(1, &word); /*第1位被设置*/

    clear_bit(1, &word); /*第1位被清空*/

    change_bit(0, &word); /*翻转第0位*/

    为什么关注原子操作?1)在确认一个操作是原子的情况下,多线程环境里面,我们可以避免仅仅为保护这个操作在外围加上性能开销昂贵的锁。2)借助于原子操作,我们可以实现互斥锁。3)借助于互斥锁,我们可以把一些列操作变为原子操作。

    更多相关内容
  • 1.认识原子操作 原子操作就是在多线程程序中“最小的且不可并行化的”操作,意味着多个线程访问同一个资源时,有且仅有一个线程能对资源进行操作。通常情况下原子操作可以通过互斥的访问方式来保证,例如Linux下的...
  • 串口方式:用串口接收中断方式接收,不是DMA. 遇到的问题:串口数据有帧丢失。 原因描述:在串口接收中断中接收到字节时变量size...实际的原因是对size的操作不是原子操作的,具体更改见文档。有相关程序和具体的分析。
  • MongoDB 原子操作

    2020-12-16 16:25:26
    MongoDB 原子操作 mongodb不支持事务,所以,在你的项目中应用时,要注意这点。无论什么设计,都不要要求mongodb保证数据的完整性。 但是mongodb提供了许多原子操作,比如文档的保存,修改,删除等,都是原子操作。 ...
  • 如两个线程操作同一变量过程中,一个线程执行过程中可能被内核临时挂起,这就是线程切换,当内核再次切换到该线程时,之前的数据可能已被修改,不能保证原子操作。 C++11提供了个原子的类和方法atomic,保证了多线程...
  • 分布式Redis原子操作示例,近期项目中遇到分布式项目中多节点大并发操作redis同一个key。此案例利用java调用LUA脚本实现redis操作的原子性。分享出来大家参考。
  • kotlinx.atomicfu:在Kotlin中使用原子操作的惯用方式
  • 主要介绍了Java原子操作CAS原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 在kotlin中使用原子操作的惯用方法。
  • 主要介绍了Golang使用lua脚本实现redis原子操作,本文通过实例代码给大家介绍的非常详细,对大家的工作或学习具有一定的参考借鉴价值,需要的朋友可以参考下
  • 没有原子操作的GPU上的SSSP
  • c++基础封装(线程、锁、定时器、原子操作等),c++封装,接口方便好用。
  • 原子操作 - linux内核锁(一)

    千次阅读 2022-03-31 17:25:38
    “原子”是不可分割的意思,原子操作是指一个实际运行的操作不可分割,这个运行必然会被执行并完成而不会被另外一个任务或者事件打断。也就说,它是最小的执行单位,不可能有比它更小的执行单位。linux原子操作的...

    “原子”是不可分割的意思,原子操作是指一个实际运行的操作不可分割,这个运行必然会被执行并完成而不会被另外一个任务或者事件打断。也就说,它是最小的执行单位,不可能有比它更小的执行单位。linux原子操作的问题来源于中断、进程抢占以及多核SMP系统中程序并发执行访问临界区。为了防止临界区数据的混乱,通过原子操作来保证其数据的原子操作。这里的临界区域分为全局或者局部静态变量和其他的混合临界区,对于混合临界区的原子操作需要通过复杂的锁机制保证,对于变量其原子操作以来实际的硬件平台完成其数据的原子操作。这篇博客主要讨论某一个变量的原子操作。

    对于变量的原子操作,在linux系统当中经典实例是实现资源的技术,很多引用计数(/linux/fork.c)都是通过原子操作实现(类似于C++中智能指针的引用)。当然,也可以通过原子操作来对某个操作或者某些资源上锁,具体操作只需要在资源开始计数加1,并做防护判定,结束的时候释放计数减1即可。不过这并不是原子操作常见的场景。

    1. 硬件原子操作

    1.1. 缘由

    我们的程序逻辑经常遇到这样的操作序列:

    • 读一个位于memory中的变量的值到寄存器中
    • 修改该变量的值(也就是修改寄存器中的值)
    • 将寄存器中的数值写回memory中的变量值

    如果这个操作序列是串行化的操作(在一个thread中串行执行),那么一切OK,然而,世界总是不能如你所愿。在多CPU体系结构中,运行在两个CPU上的两个内核控制路径同时并行执行上面操作序列,有可能发生下面的场景:

    CPU1上的操作CPU2上的操作
    读操作
    读操作
    修改修改
    写操作
    写操作

    多个CPUs和memory chip是通过总线互联的,在任意时刻,只能有一个总线master设备(例如CPU、DMA controller)访问该Slave设备(在这个场景中,slave设备是RAM chip)。因此,来自两个CPU上的读memory操作被串行化执行,分别获得了同样的旧值。完成修改后,两个CPU都想进行写操作,把修改的值写回到memory。但是,硬件arbiter的限制使得CPU的写回必须是串行化的,因此CPU1首先获得了访问权,进行写回动作,随后,CPU2完成写回动作。在这种情况下,CPU1的对memory的修改被CPU2的操作覆盖了,因此执行结果是错误的。
    不仅是多CPU,在单CPU上也会由于有多个内核控制路径的交错而导致上面描述的错误。一个具体的例子如下:

    系统调用的控制路径中断handler控制路径
    读操作
    读操作
    修改
    写操作
    修改
    写操作

    系统调用的控制路径上,完成读操作后,硬件触发中断,开始执行中断handler。这种场景下,中断handler控制路径的写回的操作被系统调用控制路径上的写回覆盖了,结果也是错误的。

    1.2. 硬件

    数据变量的原子操作是和具体的处理器架构有密切关系的,对于不同的架构linux内核中都有相应的文件完成特定的实现。处理器分为复杂指令集CISC和精简指令集RICS两种。对于复杂指令集的芯片,如X86,可以通过一条汇编指令完成原子的一个数据位的读取,修改、写入三个动作。但是,对于精简指令集,完成一个内存的修改需要先将数据加载到内存当中,计算之后再讲数据导入到内存当中。因此,像a++这样的操作在精简指令集架构芯片上操作不是原子的,必须翻译成rmw三个基本的操作。如果这个操作流程中途被打断,执行结果可能就不符合预期结果了。为了解决这个问题,常见的有两种芯片解决方式。一种芯片吧寄存器分类成了set和clear两个寄存器,需要修改的时候只需要将某个bit写为1,如果需要清除就使用clear命令完成,这样就可以保证操作的原子性。另外一种是bitband技术,对于一个32位寄存器,有一个32位影子寄存器对应。想改变一个这个寄存器的某一个bit值,只需要先修改影子寄存器,硬件完成两个寄存器的同步。

    对于ARM处理器,读取的时候调用ldrex,写的时候使用strex组合命令实现变量原子操作。当两个线程同时使用这一组命令时,就会把并行的序列变成串行序列,只有一个完成最后的保存后面的相同变量的操作才能够再进行。ARM编译器不具备将特定操作翻译成ldrex和strex组合,需要手动完成汇编代码编写。具体这部分汇编操作见博客原子操作 - ARM汇编指令实例(一)_生活需要深度的博客-CSDN博客

    2. 原子操作API

    位原子操作

      // arch/arm/include/asm/bitops.h
      set_bit(nr, void *addr)      // addr内存中的nr位置1
      clear_bit
      change_bit
      test_bit

    在linux内核系统当中原子操作是一个不需要数据编译优化的的整数结构体,直接完成对相关变量存储位置的硬件资源访问。其常用接口说明如表一所示。

    typedef struct { volatile int counter; } atomic_t;

    2.1. 接口API 

    宏或函数说明
    atomic_read(atomic_t * v);对原子类型的变量进行原子读操作,它返回原子类型的变量v的值。
    atomic_set(atomic_t *v,int i)设置原子变量结构体内部counter的值为i
    atomic_add(int i,atomic_t *v)给原子变量结构体内部counter值加i
    atomic_inc(atomic_t *v)给原子变量结构体内部counter值加1
    atomic_sub(int i,atomic_t *v)给原子变量结构体内部counter值减i
    atomic_dec(atomic_t *v)给原子变量结构体内部counter值减1
    atomic_add_return(int i, atomic_t *v)对原子变量结构体内部counter加i,返回加以后的结果
    atomic_inc_return(atomic_t *v)对原子变量结构体内部counter加1,返回加以后的结果
    atomic_sub_return(int i, atomic_t *v)对原子变量结构体内部的counter减i,返回减以后的结果
    atomic_dec_return(atomic_t * v)对原子变量结构体内部的counter减1,返回减以后的结果
    atomic_add_and_test(int i, atomic_t *v)对原子变量接头体内部的counter加i,并将结果与0进行比较,返回对应的逻辑比较结果
    atomic_inc_and_test(atomic_t *v)
    atomic_sub_and_test(int i, atomic_t *v)对原子变量结构体内部的counter减i,并将结果与0进行比较,返回对应的逻辑比较结果
    atomic_dec_and_test(atomic_t *v)对原子变量结构体内部的counter减1,并将结果与0进行比较,返回对应的逻辑比较结果

    2.3. 怎么使用

    原子操作通常用于实现资源的引用计数,在TCP/IP协议栈的IP碎片处理中,就使用了引用计数,碎片队列结构structipq描述了一个IP碎片,字段refcnt就是引用计数器,它的类型为atomic_t,当创建IP碎片时(在函数ip_frag_create中),使用atomic_set函数把它设置为1,当引用该IP碎片时,就使用函数atomic_inc把引用计数加1,当不需要引用该IP碎片时,就使用函数ipq_put来释放该IP碎片,ipq_put使用函数atomic_dec_and_test把引用计数减1并判断引用计数是否为0,如果是就释放Ip碎片。函数ipq_kill把IP碎片从ipq队列中删除,并把该删除的IP碎片的引用计数减1(通过使用函数atomic_dec实现)。

    2.4. 驱动原子操作(非经典使用)

    #include <linux/init.h>
    #include <linux/module.h>
    #include <linux/fs.h>
    #include <linux/device.h>
    #include <linux/cdev.h>
    
    MODULE_LICENSE("GPL");
    
    dev_t dev;
    struct cdev btn_cdev;
    struct class *cls = NULL;
    
    atomic_t btn_tv;       /*[1] 定义整型原子变量*/
    
    int btn_open(struct inode *inode, struct file *filp) 
    {
        if(!atomic_dec_and_test(&btn_tv)) { //保持减1操作的完整性
            atomic_inc(&btn_tv);			//[3] 保持为0
            return -EBUSY;
        }
        return 0;  //第一次进来return 0,打开成功且btn_tv=0;
    }
    
    int btn_close(struct inode *inode, struct file *filp)
    {
        atomic_inc(&btn_tv);  //btn_tv=1,保持可以打开状态
        return 0;
    }
    
    struct file_operations btn_fops =
    {
        .owner = THIS_MODULE,
        .open  = btn_open,
        .release = btn_close,
    };
    
    int __init btn_drv_init(void)
    {
        alloc_chrdev_region(&dev, 100, 1, "mybuttons");        /*设备号的动态申请注册*/
        cdev_init(&btn_cdev, &btn_fops);                       /*初始化cdev*/
        cdev_add(&btn_cdev, dev, 1);                           /*注册cdev*/
        cls = class_create(THIS_MODULE, "buttons");            /*设备文件的自动创建*/
        device_create(cls, NULL, dev, NULL,"mybuttons");
       
        atomic_set(&btn_tv, 1);                                /*[2] 整型原子变量赋值为1*/
        return 0;
    }
    
    void __exit btn_drv_exit(void)
    {
        /*销毁设备文件*/
        device_destroy(cls, dev);
        class_destroy(cls);
        /*注销cdev*/
        cdev_del(&btn_cdev);
        /*注销设备号*/
        unregister_chrdev_region(dev, 1);
    }
    
    module_init(btn_drv_init);
    module_exit(btn_drv_exit);
    obj-m  += btn_drv.o
    all:
           make -C /home/chuckchee/driver/kernel M=$(PWD) modules
           cp *.ko ../../rootfs
           arm-cortex_a9-linux-gnueabi-gcc test.c -o test
           cp test ../../rootfs
    clean:
           make -C /home/chuckchee/driver/kernel M=$(PWD) clean
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>
    
    int main(void)
    {
        int fd = open("/dev/mybuttons", O_RDWR); //读写方式打开
    
        if(fd < 0) {
            perror("open failed:");
            return -1;
        }
        printf("open successed: using device 20s...\n");
        sleep(20);
        printf("close device\n");
        close(fd);
        return 0;
    }

    原子操作主要实现当前某个设备驱动节点在同一时间只能背一个应用程序打开,只有当前驱动节点关闭以后下一个应用程序才能再次对这个文件节点打开并完成后期的操作。

    2.5. 原子操作实现互斥

    为便于比较,直接基于前篇文章:线程同步之互斥锁中的示例程序进行修改,用原子库取代互斥库的代码如下:

    //atomic1.cpp 使用原子库取代互斥库实现线程同步
    #include <chrono>
    #include <atomic>
    #include <thread>
    #include <iostream> 
    std::chrono::milliseconds interval(100);
    std::atomic<bool> readyFlag(false);     //原子布尔类型,取代互斥量
    std::atomic<int> job_shared(0); //两个线程都能修改'job_shared',将该变量特化为原子类型
    int job_exclusive = 0; //只有一个线程能修改'job_exclusive',不需要保护
    //此线程只能修改 'job_shared'
    void job_1()
    {   
        std::this_thread::sleep_for(5 * interval);
        job_shared.fetch_add(1);
        std::cout << "job_1 shared (" << job_shared.load() << ")\n";
        readyFlag.store(true);      //改变布尔标记状态为真
    }
    // 此线程能修改'job_shared'和'job_exclusive'
    void job_2()
    {
        while (true) {    //无限循环,直到可访问并修改'job_shared'
            if (readyFlag.load()) {     //判断布尔标记状态是否为真,为真则修改‘job_shared’
                job_shared.fetch_add(1);
                std::cout << "job_2 shared (" << job_shared.load() << ")\n";
                return;
            } else {      //布尔标记为假,则修改'job_exclusive'
                ++job_exclusive;
                std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
                std::this_thread::sleep_for(interval);
            }
        }
    }
    int main() 
    {
        std::thread thread_1(job_1);
        std::thread thread_2(job_2);
        thread_1.join();
        thread_2.join();
        getchar();
        return 0;
    }

    由示例程序可以看出,原子布尔类型可以实现互斥锁的部分功能,但在使用条件变量condition variable时,仍然需要mutex保护对condition variable的消费,即使condition variable是一个atomic object。

    3.  CAS无锁编程

    3.1. 什么是无锁编程

    在原子操作出现之前,对共享数据的读写可能得到不确定的结果,所以多线程并发编程时要对使用锁机制对共享数据的访问过程进行保护。但锁的申请释放增加了访问共享资源的消耗,且可能引起线程阻塞、锁竞争、死锁、优先级反转、难以调试等问题。

    现在有了原子操作的支持,对单个基础数据类型的读、写访问可以不用锁保护了,但对于复杂数据类型比如链表,有可能出现多个核心在链表同一位置同时增删节点的情况,这将会导致操作失败或错序。所以我们在对某节点操作前,需要先判断该节点的值是否跟预期的一致,如果一致则进行操作,不一致则更新期望值,这几步操作依然需要实现为一个RMW(Read-Modify-Write)原子操作,这就是前面提到的CAS(Compare And Swap)原子操作,它是无锁编程中最常用的操作。

    既然无锁编程是为了解决锁机制带来的一些问题而出现的,那么无锁编程就可以理解为不使用锁机制就可保证多线程间原子变量同步的编程。无锁(lock-free)的实现只是将多条指令合并成了一条指令形成一个逻辑完备的最小单元,通过兼容CPU指令执行逻辑形成的一种多线程编程模型。

    无锁编程是基于原子操作的,对基本原子类型的共享访问由load()与store(val)即可保证其并发同步,对抽象复杂类型的共享访问则需要更复杂的CAS来保证其并发同步,并发访问过程只是不使用锁机制了,但还是可以理解为有锁止行为的,其粒度很小,性能更高。对于某个无法实现为一个原子操作的并发访问过程还是需要借助锁机制来实现。

    3.1 CAS原子操作实现无锁编程

    CAS原子操作主要是通过函数a.compare_exchange(expected,desired)实现的,其语义为“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS算法的实现伪码如下:

    bool compare_exchange_strong(T& expected, T desired) 
    { 
        if( this->load() == expected ) { 
            this->store(desired); 
            return true; 
        } else {
            expected = this->load();
        	return false; 
        } 
    }

    下面尝试实现一个无锁栈,代码如下:

    //atomic3.cpp 使用CAS操作实现一个无锁栈
    #include <atomic>
    #include <iostream>
    template<typename T>
    class lock_free_stack
    {
    private:
        struct node
        {
            T data;
            node* next;
            node(const T& data) : data(data), next(nullptr) {}
        };
        std::atomic<node*> head;
     public:
        lock_free_stack(): head(nullptr) {}
        void push(const T& data)
        {
            node* new_node = new node(data);
            do{
                new_node->next = head.load();   //将 head 的当前值放入new_node->next
            }while(!head.compare_exchange_strong(new_node->next, new_node));
            // 如果新元素new_node的next和栈顶head一样,证明在你之前没人操作它,使用新元素替换栈顶退出即可;
            // 如果不一样,证明在你之前已经有人操作它,栈顶已发生改变,该函数会自动更新新元素的next值为改变后的栈顶;
            // 然后继续循环检测直到状态1成立退出;
        }
        T pop()
        {
            node* node;
            do{
                node = head.load();
            }while (node && !head.compare_exchange_strong(node, node->next));
            if(node) 
                return node->data;
        }
    };
     
    int main()
    {
        lock_free_stack<int> s;
        s.push(1);
        s.push(2);
        s.push(3);
        std::cout << s.pop() << std::endl;
        std::cout << s.pop() << std::endl;
        
        getchar();
        return 0;
    }

    程序注释中已经解释的很清楚了,在将数据压栈前,先通过比较原子类型head与新元素的next指向对象是否相等来判断head是否已被其他线程修改,根据判断结果选择是继续操作还是更新期望,而这一切都是在一个原子操作中完成的,保证了在不使用锁的情况下实现共享数据的并发同步。

    CAS 看起来很厉害,但也有缺点,最著名的就是 ABA 问题,假设一个变量 A ,修改为 B之后又修改为 A,CAS 的机制是无法察觉的,但实际上已经被修改过了。如果在基本类型上是没有问题的,但是如果是引用类型呢?这个对象中有多个变量,我怎么知道有没有被改过?聪明的你一定想到了,加个版本号啊。每次修改就检查版本号,如果版本号变了,说明改过,就算你还是 A,也不行。

    上面的例子节点指针也属于引用类型,自然也存在ABA问题,比如在线程2执行pop操作,将A,B都删掉,然后创建一个新元素push进去,因为操作系统的内存分配机制会重复使用之前释放的内存,恰好push进去的内存地址和A一样,我们记为A’,这时候切换到线程1,CAS操作检查到A没变化成功将B设为栈顶,但B是一个已经被释放的内存块。该问题的解决方案就是上面说的通过打标签标识A和A’为不同的指针,具体实现代码读者可以尝试实现。

    展开全文
  • redis原子操作

    千次阅读 2021-08-03 20:27:25
    为了保证并发访问的正确性,Redis 提供了两种方法,分别是加锁和原子操作。 加锁是一种常用的方法,在读取数据前,客户端需要先获得锁,否则就无法进行操作。当一个客户端获得锁后,就会一直持有这把锁,直到客户端...

    我们在使用 Redis 时,不可避免地会遇到并发访问的问题,比如说如果多个用户同时下单,就会对缓存在 Redis 中的商品库存并发更新。一旦有了并发写操作,数据就会被修改,如果我们没有对并发写请求做好控制,就可能导致数据被改错,影响到业务的正常使用(例如库存数据错误,导致下单异常)。

    为了保证并发访问的正确性,Redis 提供了两种方法,分别是加锁和原子操作。

    加锁是一种常用的方法,在读取数据前,客户端需要先获得锁,否则就无法进行操作。当一个客户端获得锁后,就会一直持有这把锁,直到客户端完成数据更新,才释放这把锁。
    看上去好像是一种很好的方案,但是,其实这里会有两个问题:一个是,如果加锁操作多,会降低系统的并发访问性能;第二个是,Redis 客户端要加锁时,需要用到分布式锁,而分布式锁实现复杂,需要用额外的存储系统来提供加解锁操作,我会在下节课向你介绍。

    原子操作是另一种提供并发访问控制的方法。原子操作是指执行过程保持原子性的操作,而且原子操作执行时并不需要再加锁,实现了无锁操作。这样一来,既能保证并发控制,还能减少对系统并发性能的影响。

    并发访问中需要对什么进行控制?
    我们说的并发访问控制,是指对多个客户端访问操作同一份数据的过程进行控制,以保证任何一个客户端发送的操作在 Redis 实例上执行时具有互斥性。例如,客户端 A 的访问操作在执行时,客户端 B 的操作不能执行,需要等到 A 的操作结束后,才能执行。

    并发访问控制对应的操作主要是数据修改操作。当客户端需要修改数据时,基本流程分成两步:

    1. 客户端先把数据读取到本地,在本地进行修改;
    2. 客户端修改完数据后,再写回 Redis。

    我们把这个流程叫做“读取 - 修改 - 写回”操作(Read-Modify-Write,简称为 RMW 操作)。当有多个客户端对同一份数据执行 RMW 操作的话,我们就需要让 RMW 操作涉及的代码以原子性方式执行。访问同一份数据的 RMW 操作代码,就叫做临界区代码。

    不过,当有多个客户端并发执行临界区代码时,就会存在一些潜在问题,接下来,我用一个多客户端更新商品库存的例子来解释一下。

    我们先看下临界区代码。假设客户端要对商品库存执行扣减 1 的操作,伪代码如下所示:

    current = GET(id)
    current--
    SET(id, current)
    

    可以看到,客户端首先会根据商品 id,从 Redis 中读取商品当前的库存值 current(对应 Read),然后,客户端对库存值减 1(对应 Modify),再把库存值写回 Redis(对应 Write)。当有多个客户端执行这段代码时,这就是一份临界区代码。

    如果我们对临界区代码的执行没有控制机制,就会出现数据更新错误。在刚才的例子中,假设现在有两个客户端 A 和 B,同时执行刚才的临界区代码,就会出现错误,你可以看下下面这张图。
    在这里插入图片描述

    可以看到,客户端 A 在 t1 时读取库存值 10 并扣减 1,在 t2 时,客户端 A 还没有把扣减后的库存值 9 写回 Redis,而在此时,客户端 B 读到库存值 10,也扣减了 1,B 记录的库存值也为 9 了。等到 t3 时,A 往 Redis 写回了库存值 9,而到 t4 时,B 也写回了库存值 9。

    如果按正确的逻辑处理,客户端 A 和 B 对库存值各做了一次扣减,库存值应该为 8。所以,这里的库存值明显更新错了。

    出现这个现象的原因是,临界区代码中的客户端读取数据、更新数据、再写回数据涉及了三个操作,而这三个操作在执行时并不具有互斥性,多个客户端基于相同的初始值进行修改,而不是基于前一个客户端修改后的值再修改。

    为了保证数据并发修改的正确性,我们可以用锁把并行操作变成串行操作,串行操作就具有互斥性。一个客户端持有锁后,其他客户端只能等到锁释放,才能拿锁再进行修改。

    下面的伪代码显示了使用锁来控制临界区代码的执行情况,你可以看下。

    LOCK()
    current = GET(id)
    current--
    SET(id, current)
    UNLOCK()
    

    虽然加锁保证了互斥性,但是加锁也会导致系统并发性能降低。

    如下图所示,当客户端 A 加锁执行操作时,客户端 B、C 就需要等待。A 释放锁后,假设 B 拿到锁,那么 C 还需要继续等待,所以,t1 时段内只有 A 能访问共享数据,t2 时段内只有 B 能访问共享数据,系统的并发性能当然就下降了。
    在这里插入图片描述

    和加锁类似,原子操作也能实现并发控制,但是原子操作对系统并发性能的影响较小,接下来,我们就来了解下 Redis 中的原子操作。

    Redis 的两种原子操作方法

    为了实现并发控制要求的临界区代码互斥执行,Redis 的原子操作采用了两种方法:

    1. 把多个操作在 Redis 中实现成一个操作,也就是单命令操作;
    2. 把多个操作写到一个 Lua 脚本中,以原子性方式执行单个 Lua 脚本。

    我们先来看下 Redis 本身的单命令操作。

    Redis 是使用单线程来串行处理客户端的请求操作命令的,所以,当 Redis 执行某个命令操作时,其他命令是无法执行的,这相当于命令操作是互斥执行的。当然,Redis 的快照生成、AOF 重写这些操作,可以使用后台线程或者是子进程执行,也就是和主线程的操作并行执行。不过,这些操作只是读取数据,不会修改数据,所以,我们并不需要对它们做并发控制。

    你可能也注意到了,虽然 Redis 的单个命令操作可以原子性地执行,但是在实际应用中,数据修改时可能包含多个操作,至少包括读数据、数据增减、写回数据三个操作,这显然就不是单个命令操作了,那该怎么办呢?

    别担心,Redis 提供了 INCR/DECR 命令,把这三个操作转变为一个原子操作了。INCR/DECR 命令可以对数据进行增值 / 减值操作,而且它们本身就是单个命令操作,Redis 在执行它们时,本身就具有互斥性。

    比如说,在刚才的库存扣减例子中,客户端可以使用下面的代码,直接完成对商品 id 的库存值减 1 操作。即使有多个客户端执行下面的代码,也不用担心出现库存值扣减错误的问题。

    DECR id 
    

    所以,如果我们执行的 RMW 操作是对数据进行增减值的话,Redis 提供的原子操作 INCR 和 DECR 可以直接帮助我们进行并发控制。

    但是,如果我们要执行的操作不是简单地增减数据,而是有更加复杂的判断逻辑或者是其他操作,那么,Redis 的单命令操作已经无法保证多个操作的互斥执行了。所以,这个时候,我们需要使用第二个方法,也就是 Lua 脚本。

    Redis 会把整个 Lua 脚本作为一个整体执行,在执行的过程中不会被其他命令打断,从而保证了 Lua 脚本中操作的原子性。如果我们有多个操作要执行,但是又无法用 INCR/DECR 这种命令操作来实现,就可以把这些要执行的操作编写到一个 Lua 脚本中。
    然后,我们可以使用 Redis 的 EVAL 命令来执行脚本。这样一来,这些操作在执行时就具有了互斥性。

    再举个例子,具体解释下 Lua 的使用。
    当一个业务应用的访问用户增加时,我们有时需要限制某个客户端在一定时间范围内的访问次数,比如爆款商品的购买限流、社交网络中的每分钟点赞次数限制等。

    那该怎么限制呢?我们可以把客户端 IP 作为 key,把客户端的访问次数作为 value,保存到 Redis 中。客户端每访问一次后,我们就用 INCR 增加访问次数。

    不过,在这种场景下,客户端限流其实同时包含了对访问次数和时间范围的限制,例如每分钟的访问次数不能超过 20。所以,我们可以在客户端第一次访问时,给对应键值对设置过期时间,例如设置为 60s 后过期。同时,在客户端每次访问时,我们读取客户端当前的访问次数,如果次数超过阈值,就报错,限制客户端再次访问。你可以看下下面的这段代码,它实现了对客户端每分钟访问次数不超过 20 次的限制。

    //获取ip对应的访问次数
    current = GET(ip)
    //如果超过访问次数超过20次,则报错
    IF current != NULL AND current > 20 THEN
        ERROR "exceed 20 accesses per second"
    ELSE
        //如果访问次数不足20次,增加一次访问计数
        value = INCR(ip)
        //如果是第一次访问,将键值对的过期时间设置为60s后
        IF value == 1 THEN
            EXPIRE(ip,60)
        END
        //执行其他操作
        DO THINGS
    END
    

    可以看到,在这个例子中,我们已经使用了 INCR 来原子性地增加计数。但是,客户端限流的逻辑不只有计数,还包括访问次数判断和过期时间设置。

    对于这些操作,我们同样需要保证它们的原子性。否则,如果客户端使用多线程访问,访问次数初始值为 0,第一个线程执行了 INCR(ip) 操作后,第二个线程紧接着也执行了 INCR(ip),此时,ip 对应的访问次数就被增加到了 2,我们就无法再对这个 ip 设置过期时间了。这样就会导致,这个 ip 对应的客户端访问次数达到 20 次之后,就无法再进行访问了。即使过了 60s,也不能再继续访问,显然不符合业务要求。

    所以,这个例子中的操作无法用 Redis 单个命令来实现,此时,我们就可以使用 Lua 脚本来保证并发控制。我们可以把访问次数加 1、判断访问次数是否为 1,以及设置过期时间这三个操作写入一个 Lua 脚本,如下所示:

    local current
    current = redis.call("incr",KEYS[1])
    if tonumber(current) == 1 then
        redis.call("expire",KEYS[1],60)
    end
    

    假设我们编写的脚本名称为 lua.script,我们接着就可以使用 Redis 客户端,带上 eval 选项,来执行该脚本。脚本所需的参数将通过以下命令中的 keys 和 args 进行传递。

    redis-cli  --eval lua.script  keys , args
    

    这样一来,访问次数加 1、判断访问次数是否为 1,以及设置过期时间这三个操作就可以原子性地执行了。即使客户端有多个线程同时执行这个脚本,Redis 也会依次串行执行脚本代码,避免了并发操作带来的数据错误。

    展开全文
  • 原子操作类总结

    千次阅读 多人点赞 2019-10-14 21:10:53
    文章目录原子操作类简介预备知识-CAS操作原子更新基本类型原子更新数组类型原子更新引用类型原子更新字段类型 原子操作类简介 在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比如...

    原子操作类简介

    在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比如多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过Synchronized进行控制来达到线程安全的目的(关于synchronized可以看这篇文章)。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。实际上,在J.U.C下的atomic包提供了一系列的操作简单,性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic包下的这些类都是采用的是乐观锁策略去原子更新数据,在java中则是使用CAS操作具体实现。

    预备知识-CAS操作

    能够弄懂atomic包下这些原子操作类的实现原理,就要先明白什么是CAS操作。

    什么是CAS?

    使用锁时,线程获取锁是一种悲观锁策略,即假设每一次执行临界区代码都会产生冲突,所以当前线程获取到锁的时候同时也会阻塞其他线程获取该锁。而CAS操作(又称为无锁操作)是一种乐观锁策略,它假设所有线程访问共享资源的时候不会出现冲突,既然不会出现冲突自然而然就不会阻塞其他线程的操作。因此,线程就不会出现阻塞停顿的状态。那么,如果出现冲突了怎么办?无锁操作是使用CAS(compare and swap)又叫做比较交换来鉴别线程是否出现冲突,出现冲突就重试当前操作直到没有冲突为止。

    CAS的操作过程

    CAS比较交换的过程可以通俗的理解为CAS(V,O,N),包含三个值分别为:V 内存地址存放的实际值;O 预期的值(旧值);N 更新的新值。当V和O相同时,也就是说旧值和内存中实际的值相同表明该值没有被其他线程更改过,即该旧值O就是目前来说最新的值了,自然而然可以将新值N赋值给V。反之,V和O不相同,表明该值已经被其他线程改过了,则该旧值O不是最新版本的值了,所以不能将新值N赋给V,返回V即可。当多个线程使用CAS操作一个变量时,只有一个线程会成功,并成功更新,其余会失败。失败的线程会重新尝试,当然也可以选择挂起线程。

    CAS的实现需要硬件指令集的支撑,在JDK1.5后虚拟机才可以使用处理器提供的CMPXCHG指令实现。

    Synchronized VS CAS

    元老级的Synchronized(未优化前)最主要的问题是:在存在线程竞争的情况下会出现线程阻塞和唤醒锁带来的性能问题,因为这是一种互斥同步(阻塞同步)。而CAS并不是武断的将线程挂起,当CAS操作失败后会进行一定的尝试,而非进行耗时的挂起唤醒的操作,因此也叫做非阻塞同步。这是两者主要的区别。

    CAS的问题

    1. ABA问题
      因为CAS会检查旧值有没有变化,这里存在这样一个有意思的问题。比如一个旧值A变为了成B,然后再变成A,刚好在做CAS时检查发现旧值并没有变化依然为A,但是实际上的确发生了变化。解决方案可以沿袭数据库中常用的乐观锁方式,添加一个版本号可以解决。原来的变化路径A->B->A就变成了1A->2B->3C。

    2. 自旋时间过长

      使用CAS时非阻塞同步,也就是说不会将线程挂起,会自旋(无非就是一个死循环)进行下一次尝试,如果这里自旋时间过长对性能是很大的消耗。如果JVM能支持处理器提供的pause指令,那么在效率上会有一定的提升。

    原子更新基本类型

    atomic包提高原子更新基本类型的工具类,主要有这些:

    1. AtomicBoolean:以原子更新的方式更新boolean;
    2. AtomicInteger:以原子更新的方式更新Integer;
    3. AtomicLong:以原子更新的方式更新Long;

    这几个类的用法基本一致,这里以AtomicInteger为例总结常用的方法

    1. addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
    2. incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果;
    3. getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
    4. getAndIncrement():以原子的方式将实例中的原值加1,返回的是自增前的旧值;

    还有一些方法,可以查看API,不再赘述。为了能够弄懂AtomicInteger的实现原理,以getAndIncrement方法为例,来看下源码:

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    

    可以看出,该方法实际上是调用了unsafe实例的getAndAddInt方法,unsafe实例的获取时通过UnSafe类的静态方法getUnsafe获取:

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    

    Unsafe类在sun.misc包下,Unsafer类提供了一些底层操作,atomic包下的原子操作类的也主要是通过Unsafe类提供的compareAndSwapInt,compareAndSwapLong等一系列提供CAS操作的方法来进行实现。下面用一个简单的例子来说明AtomicInteger的用法:

    public class AtomicDemo {
        private static AtomicInteger atomicInteger = new AtomicInteger(1);
    
        public static void main(String[] args) {
            System.out.println(atomicInteger.getAndIncrement());
            System.out.println(atomicInteger.get());
        }
    }
    

    输出结果

    1
    2
    

    例子很简单,就是新建了一个atomicInteger对象,而atomicInteger的构造方法也就是传入一个基本类型数据即可,对其进行了封装。对基本变量的操作比如自增,自减,相加,更新等操作,atomicInteger也提供了相应的方法进行这些操作。但是,因为atomicInteger借助了UnSafe提供的CAS操作能够保证数据更新的时候是线程安全的,并且由于CAS是采用乐观锁策略,因此,这种数据更新的方法也具有高效性。

    AtomicLong的实现原理和AtomicInteger一致,只不过一个针对的是long变量,一个针对的是int变量。而boolean变量的更新类AtomicBoolean类是怎样实现更新的呢?核心方法是compareAndSett方法,其源码如下:

    public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }
    

    可以看出,compareAndSet方法的实际上也是先转换成0,1的整型变量,然后是通过针对int型变量的原子更新方法compareAndSwapInt来实现的。可以看出atomic包中只提供了对boolean,int ,long这三种基本类型的原子更新的方法,参考对boolean更新的方式,原子更新char,doule,float也可以采用类似的思路进行实现。

    原子更新数组类型

    atomic包下提供能原子更新数组中元素的类有:

    1. AtomicIntegerArray:原子更新整型数组中的元素;
    2. AtomicLongArray:原子更新长整型数组中的元素;
    3. AtomicReferenceArray:原子更新引用类型数组中的元素

    这几个类的用法一致,就以AtomicIntegerArray来总结下常用的方法:

    1. addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
    2. getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
    3. compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新

    可以看出,AtomicIntegerArray与AtomicInteger的方法基本一致,只不过在AtomicIntegerArray的方法中会多一个指定数组索引位i。下面举一个简单的例子:

    public class AtomicIntegerArrayDemo {
    
        private static int[] value = new int[]{1, 2, 3};
        private static AtomicIntegerArray integerArray = new AtomicIntegerArray(value);
    
        public static void main(String[] args) {
            //对数组中索引为1的位置的元素加5
            int result = integerArray.getAndAdd(1, 5);
            System.out.println(result);
            System.out.println(integerArray.get(1));
        }
    }
    

    输出结果

    2
    7
    

    通过getAndAdd方法将位置为1的元素加5,从结果可以看出索引为1的元素变成了7,该方法返回的也是相加之前的数为2。

    原子更新引用类型

    如果需要原子更新引用类型变量的话,为了保证线程安全,atomic也提供了相关的类:

    1. AtomicReference:原子更新引用类型;
    2. AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
    3. AtomicMarkableReference:原子更新带有标记位的引用类型;

    这几个类的使用方法也是基本一样的,以AtomicReference为例,来说明这些类的基本用法。下面是一个demo

    public class AtomicReferenceDemo {
    
        private static AtomicReference<User> reference = new AtomicReference<>();
    
        public static void main(String[] args) {
            User user1 = new User("a", 1);
            reference.set(user1);
            User user2 = new User("b",2);
            User user = reference.getAndSet(user2);
            System.out.println(user);
            System.out.println(reference.get());
        }
    
        static class User {
            private String userName;
            private int age;
    
            public User(String userName, int age) {
                this.userName = userName;
                this.age = age;
            }
    
            @Override
            public String toString() {
                return "User{" +
                        "userName='" + userName + '\'' +
                        ", age=" + age +
                        '}';
            }
        }
    }
    

    输出结果

    User{userName='a', age=1}
    User{userName='b', age=2}
    

    首先将对象User1用AtomicReference进行封装,然后调用getAndSet方法,从结果可以看出,该方法会原子更新引用的user对象,变为User{userName='b', age=2},返回的是原来的user对象User{userName='a', age=1}

    原子更新字段类型

    如果需要更新对象的某个字段,并在多线程的情况下,能够保证线程安全,atomic同样也提供了相应的原子操作类:

    1. AtomicIntegeFieldUpdater:原子更新整型字段类;
    2. AtomicLongFieldUpdater:原子更新长整型字段类;
    3. AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号。而为什么在更新的时候会带有版本号,是为了解决CAS的ABA问题;

    要想使用原子更新字段需要两步操作:

    1. 原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性;
    2. 更新类的属性必须使用public volatile进行修饰;

    这几个类提供的方法基本一致,以AtomicIntegerFieldUpdater为例来看看具体的使用:

    public class AtomicDemo {
    
        private static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
        public static void main(String[] args) {
            User user = new User("a", 1);
            int oldValue = updater.getAndAdd(user, 5);
            System.out.println(oldValue);
            System.out.println(updater.get(user));
        }
    
        static class User {
            private String userName;
            public volatile int age;
    
            public User(String userName, int age) {
                this.userName = userName;
                this.age = age;
            }
    
            @Override
            public String toString() {
                return "User{" +
                        "userName='" + userName + '\'' +
                        ", age=" + age +
                        '}';
            }
        }
    } 
    

    输出结果

    1
    6
    

    从示例中可以看出,创建AtomicIntegerFieldUpdater是通过它提供的静态方法进行创建,getAndAdd方法会将指定的字段加上输入的值,并且返回相加之前的值。user对象中age字段原值为1,加5之后,可以看出user对象中的age字段的值已经变成了6。

    展开全文
  • JAVA操作REDIS执行原子操作

    千次阅读 2022-04-26 14:07:03
    JAVA操作REDIS执行原子操作JAVA操作REDIS执行原子操作为什么要使用原子操作 JAVA操作REDIS执行原子操作 为什么要使用原子操作 众所周知,redis 作为数据库的前置库,给数据库使用节省了很多请求,很多请求再查询缓存...
  • Linux--原子操作1、原子操作1.1、概念1.2、事例1.3、原子操作结构体介绍1.4、原子操作的使用1.4.1、定义1.4.2、初始化1.5、原子整形操作 API 函数1.5.1、事例1.6、原子位操作 API 函数 1、原子操作 1.1、概念 原子...
  • 测试了windows下原子操作api的使用,很简单的测试,还是比较有趣的
  • 操作系统之原子操作

    千次阅读 2020-12-09 10:47:49
    原子操作是指不会被线程调度机制打断的操作; 这种操作一旦开始,就一直运行到结束,中间不会有任何线程切换。 定义 如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构,那么这个操作是一个原子...
  • Linux原子操作

    千次阅读 2021-12-14 10:34:03
    可以使用原子操作实现资源互斥。 原子操作 原子操作指的是在执行过程中不会被别的代码路径所中断的操作。 1.设置原子变量的值 void atomic_set(atomic_t *v, int i); //设置原子变量的值为 i atomic_t v = ATOMIC...
  • 使用.NET提供的Interlocked类可以对一些数据进行原子操作,看起来似乎跟lock锁一样,但它并不是lock锁,它的原子操作是基于CPU本身的,非阻塞的,所以要比lock的效率高
  • int i =1 是原子操作吗?i++是原子操作吗?int i =1 是原子操作吗?i++是原子操作吗? int i =1 是原子操作吗?i++是原子操作吗? 原子操作(atomic operation)指的是由多步操作组成的一个操作。如果该操作不能原子...
  • 什么是原子操作

    千次阅读 2020-08-21 15:02:02
    原子操作就是: 不可中断的一个或者一系列操作, 也就是不会被线程调度机制打断的操作, 运行期间不会有任何的上下文切换(context switch). 二. 为什么关注原子操作? 1. 如果确定某个操作是原子的, 就不用为了去保护这...
  • 29 Redis 应对并发访问的无锁原子操作前言一、并发访问中需要对什么进行控制?二、Redis 的两种原子操作方法:总结 前言 在使用 Redis 时,不可避免地会遇到并发访问的问题,比如说如果多个用户同时下 单,就会对...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 430,366
精华内容 172,146
关键字:

原子操作