精华内容
下载资源
问答
  • 请求队列

    千次阅读 2013-11-10 19:22:04
    ------------------------------------------ 一:前言 文件的读写是文件系统中最核心也是最复杂的一部份,...二:I/O请求的概述 如之前所提到的,为了提高文件的操作效率,文件系统中的内容都是缓存在内存里的.每当发
    ------------------------------------------
    一:前言
    文件的读写是文件系统中最核心也是最复杂的一部份,它牵涉到了很多的概念.之前分析文件系统其它操作的时候,遇到与文件系统相关的读写部份都忽略过去了.在这一节里,来讨论一下文件的读写是怎样实现的.
    二:I/O请求的概述
    如之前所提到的,为了提高文件的操作效率,文件系统中的内容都是缓存在内存里的.每当发起一个Rear/Write请求的时候,都会到页面高速缓存中寻找具体的页面.如果页面不存在,则在页面高速缓存中建立相关页面的缓存.如果当前的页面不是最新的.那就必须要到具体的文件系统中读取数据了.一般来说,内核提供了这样的界面:它产生一个I/O请求.这个界面为上层隐藏了下层的不同实现.在这个界面中,将产生的I/O请求提交给I/O调度.再与I/O调度调用具体的块设备驱动程序.
    整个过程如下图所示:

    上图中的Generic Block Layer就是上面描述中所说的I/O的界面.
    接下来我们以上图从下到上的层次进行讨论.
    三:块设备驱动
    块设备与字符设备的区别在于:块设备可以随机的访问,例如磁盘.正是因为它可以随机访问,内核才需要一个高效的手段去管理每一个块设备.例如对磁盘的操作,每次移动磁针都需要花不少的时候,所以尽量让其处理完相同磁道内的请求再将磁针移动到另外的磁道.而对于字符设备来说,不存在这样的顾虑,只需按顺序从里面读/写就可以了.
    先来看一下块设备驱动所涉及到的数据结构.
    3.1: block_device结构:
    struct block_device {
         //主次驱备号
         dev_t              bd_dev;  /* not a kdev_t - it's a search key */
         //指向bdev文件系统中块设备对应的文件索引号
         struct inode *         bd_inode; /* will die */
         //计数器,统计块驱备被打开了多少次
         int           bd_openers;
         // 块设备打开和关闭的信号量
         struct semaphore   bd_sem;  /* open/close mutex */
         //禁止在块设备上建行新安装的信号量
         struct semaphore   bd_mount_sem; /* mount mutex */
         //已打开的块设备文件inode链表
         struct list_head   bd_inodes;
         //块设备描述符的当前拥有者
         void *             bd_holder;
         //统计字段,统计对bd_holder进行更改的次数
         int           bd_holders;
         //如果当前块设备是一个分区,此成员指向它所属的磁盘的设备
         //否则指向该描述符的本身
         struct block_device *  bd_contains;
         //块大小
         unsigned      bd_block_size;
         //指向分区描述符的指针
         struct hd_struct * bd_part;
         /* number of times partitions within this device have been opened. */
         //统计字段,统计块设备分区被打开的次数
         unsigned      bd_part_count;
         //读取块设备分区表时设置的标志
         int           bd_invalidated;
         //指向块设备所属磁盘的gendisk
         struct gendisk *   bd_disk;
         //指向块设备描述符链表的指针
         struct list_head   bd_list;
         //指向块设备的专门描述符backing_dev_info
         struct backing_dev_info *bd_inode_backing_dev_info;
         /*
          * Private data.  You must have bd_claim'ed the block_device
          * to use this.  NOTE:  bd_claim allows an owner to claim
          * the same device multiple times, the owner must take special
          * care to not mess up bd_private for that case.
          */
          //块设备的私有区
         unsigned long      bd_private;
    }
    通常,对于块设备来说还涉及到一个分区问题.分区在内核中是用hd_struct来表示的.
    3.2: hd_struct结构:
    struct hd_struct {
         //磁盘分区的起始扇区
         sector_t start_sect;
         //分区的长度,即扇区的数目
         sector_t nr_sects;
         //内嵌的kobject
         struct kobject kobj;
         //分区的读操作次数,读取扇区数,写操作次数,写扇区数
         unsigned reads, read_sectors, writes, write_sectors;
         //policy:如果分区是只读的,置为1.否则为0
         //partno:磁盘中分区的相对索引
         int policy, partno;
    }
    每个具体的块设备都会都应一个磁盘,在内核中磁盘用gendisk表示.
    3.3: gendisk结构:
    struct gendisk {
         //磁盘的主驱备号
         int major;             /* major number of driver */
         //与磁盘关联的第一个设备号
         int first_minor;
         //与磁盘关联的设备号范围
         int minors;                     /* maximum number of minors, =1 for
                                             * disks that can't be partitioned. */
         //磁盘的名字
         char disk_name[32];         /* name of major driver */
         //磁盘的分区描述符数组                                       
         struct hd_struct **part;    /* [indexed by minor] */
         //块设备的操作指针
         struct block_device_operations *fops;
         //指向磁盘请求队列指针
         struct request_queue *queue;
         //块设备的私有区
         void *private_data;
         //磁盘内存区大小(扇区数目)
         sector_t capacity;
         //描述磁盘类型的标志
         int flags;
         //devfs 文件系统中的名字
         char devfs_name[64];        /* devfs crap */
         //不再使用
         int number;            /* more of the same */
         //指向磁盘中硬件设备的device指针
         struct device *driverfs_dev;
         //内嵌kobject指针
         struct kobject kobj;
         //记录磁盘中断定时器
         struct timer_rand_state *random;
         //如果只读,此值为1.否则为0
         int policy;
         //写入磁盘的扇区数计数器
         atomic_t sync_io;      /* RAID */
         //统计磁盘队列使用情况的时间戳
         unsigned long stamp, stamp_idle;
         //正在进行的I/O操作数
         int in_flight;
         //统计每个CPU使用磁盘的情况
    #ifdef   CONFIG_SMP
         struct disk_stats *dkstats;
    #else
         struct disk_stats dkstats;
    #endif
    }
    以上三个数据结构的关系,如下图所示:

    如上图所示:
    每个块设备分区的bd_contains会指它的总块设备节点,它的bd_part会指向它的分区表.bd_disk会指向它所属的磁盘.
    从上图中也可以看出:每个磁盘都会对应一个request_queue.对于上层的I/O请求就是通过它来完成的了.它的结构如下:
    3.4:request_queue结构:
    struct request_queue
    {
         /*
          * Together with queue_head for cacheline sharing
          */
          //待处理请求的链表
         struct list_head   queue_head;
         //指向队列中首先可能合并的请求描述符
         struct request         *last_merge;
         //指向I/O调度算法指针
         elevator_t         elevator;

         /*
          * the queue request freelist, one for reads and one for writes
          */
          //为分配请请求描述符所使用的数据结构
         struct request_list    rq;

         //驱动程序策略例程入口点的方法
         request_fn_proc        *request_fn;
         //检查是否可能将bio合并到请求队列的最后一个请求的方法
         merge_request_fn   *back_merge_fn;
         //检查是否可能将bio合并到请求队列的第一个请求中的方法
         merge_request_fn   *front_merge_fn;
         //试图合并两个相邻请求的方法
         merge_requests_fn  *merge_requests_fn;
         //将一个新请求插入请求队列时所调用的方法
         make_request_fn        *make_request_fn;
         //该方法反这个处理请求的命令发送给硬件设备
         prep_rq_fn         *prep_rq_fn;
         //去掉块设备方法
         unplug_fn     *unplug_fn;
         //当增加一个新段时,该方法驼回可插入到某个已存在的bio  结构中的字节数
         merge_bvec_fn      *merge_bvec_fn;
         //将某个请求加入到请求队列时,会调用此方法
         activity_fn        *activity_fn;
         //刷新请求队列时所调用的方法
         issue_flush_fn         *issue_flush_fn;

         /*
          * Auto-unplugging state
          */
          //插入设备时所用到的定时器
         struct timer_list  unplug_timer;
         //如果请求队列中待处理请求数大于该值,将立即去掉请求设备
         int           unplug_thresh;     /* After this many requests */
         //去掉设备之间的延迟
         unsigned long      unplug_delay; /* After this many jiffies */
         //去掉设备时使用的操作队列
         struct work_struct unplug_work;
         //
         struct backing_dev_info backing_dev_info;

         /*
          * The queue owner gets to use this for whatever they like.
          * ll_rw_blk doesn't touch it.
          */
          //指向块设备驱动程序中的私有数据
         void          *queuedata;
         //activity_fn()所用的参数
         void          *activity_data;

         /*
          * queue needs bounce pages for pages above this limit
          */
          //如果页框号大于该值,将使用回弹缓存冲
         unsigned long      bounce_pfn;
         //回弹缓存区页面的分配标志
         int           bounce_gfp;

         /*
          * various queue flags, see QUEUE_* below
          */
          //描述请求队列的标志
         unsigned long      queue_flags;

         /*
          * protects queue structures from reentrancy
          */
          //指向请求队列锁的指针
         spinlock_t         *queue_lock;

         /*
          * queue kobject
          */
          //内嵌的kobject
         struct kobject kobj;

         /*
          * queue settings
          */
          //请求队列中允许的最大请求数
         unsigned long      nr_requests;  /* Max # of requests */
         //如果待请求的数目超过了该值,则认为该队列是拥挤的
         unsigned int       nr_congestion_on;
         //如果待请求数目在这个阀值下,则认为该队列是不拥挤的
         unsigned int       nr_congestion_off;

         //单个请求所能处理的最大扇区(可调的)
         unsigned short         max_sectors;
         //单个请求所能处理的最大扇区(硬约束)
         unsigned short         max_hw_sectors;
         //单个请求所能处理的最大物理段数
         unsigned short         max_phys_segments;
         //单个请求所能处理的最大物理段数(DMA的约束)
         unsigned short         max_hw_segments;
         //扇区中以字节 为单位的大小
         unsigned short         hardsect_size;
         //物理段的最大长度(以字节为单位)
         unsigned int       max_segment_size;
         //段合并的内存边界屏弊字
         unsigned long      seg_boundary_mask;
         //DMA缓冲区的起始地址和长度的对齐
         unsigned int       dma_alignment;
         //空闲/忙标记的位图.用于带标记的请求
         struct blk_queue_tag   *queue_tags;
         //请求队列的引用计数
         atomic_t      refcnt;
         //请求队列中待处理的请求数
         unsigned int       in_flight;

         /*
          * sg stuff
          */
          //用户定义的命令超时
         unsigned int       sg_timeout;
         //Not Use
         unsigned int       sg_reserved_size;
    }
    request_queue表示的是一个请求队列,每一个请求都是用request来表示的.
    3.5: request结构:
    struct request {
         //用来形成链表
         struct list_head queuelist; /* looking for ->queue? you must _not_
                            * access it directly, use
                            * blkdev_dequeue_request! */
         //请求描述符的标志               
         unsigned long flags;        /* see REQ_ bits below */

         /* Maintain bio traversal state for part by part I/O submission.
          * hard_* are block layer internals, no driver should touch them!
          */
         //要传送的下一个扇区
         sector_t sector;       /* next sector to submit */
         //要传送的扇区数目
         unsigned long nr_sectors;   /* no. of sectors left to submit */
         /* no. of sectors left to submit in the current segment */
         //当前bio段传送扇区的数目
         unsigned int current_nr_sectors;
         //要传送的下一个扇区号
         sector_t hard_sector;       /* next sector to complete */
         //整个过程中要传送的扇区号
         unsigned long hard_nr_sectors;   /* no. of sectors left to complete */
         /* no. of sectors left to complete in the current segment */
         //当前bio段要传送的扇区数目
         unsigned int hard_cur_sectors;

         /* no. of segments left to submit in the current bio */
         //
         unsigned short nr_cbio_segments;
         /* no. of sectors left to submit in the current bio */
         unsigned long nr_cbio_sectors;

         struct bio *cbio;      /* next bio to submit */
         //请求中第一个没有完成的bio
         struct bio *bio;       /* next unfinished bio to complete */
         //最后的bio
         struct bio *biotail;
         //指向I/O调度的私有区
         void *elevator_private;
         //请求的状态
         int rq_status;     /* should split this into a few status bits */
         //请求所引用的磁盘描述符
         struct gendisk *rq_disk;
         //统计传送失败的计数
         int errors;
         //请求开始的时间
         unsigned long start_time;

         /* Number of scatter-gather DMA addr+len pairs after
          * physical address coalescing is performed.
          */
          //请求的物理段数
         unsigned short nr_phys_segments;

         /* Number of scatter-gather addr+len pairs after
          * physical and DMA remapping hardware coalescing is performed.
          * This is the number of scatter-gather entries the driver
          * will actually have to deal with after DMA mapping is done.
          */
          //请求的硬段数
         unsigned short nr_hw_segments;
         //与请求相关的标识
         int tag;
         //数据传送的缓冲区,如果是高端内存,此成员值为NULL
         char *buffer;
         //请求的引用计数
         int ref_count;
         //指向包含请求的请求队列描述符
         request_queue_t *q;
         struct request_list *rl;
         //指向数据传送终止的completion
         struct completion *waiting;
         //对设备发达“特殊请求所用到的指针”
         void *special;

         /*
          * when request is used as a packet command carrier
          */
          //cmd中的数据长度
         unsigned int cmd_len;
         //请求类型
         unsigned char cmd[BLK_MAX_CDB];
         //data中的数据长度
         unsigned int data_len;
         //为了跟踪所传输的数据而使用的指针
         void *data;
         //sense字段的数据长度
         unsigned int sense_len;
         //指向输出sense缓存区
         void *sense;
         //请求超时
         unsigned int timeout;

         /*
          * For Power Management requests
          */
          //指向电源管理命令所用的结构
         struct request_pm_state *pm;
    }
    请求队列描述符与请求描述符都很复杂,为了简化驱动的设计,内核提供了一个API,供块设备驱动程序来初始化一个请求队列.这就是blk_init_queue().它的代码如下:
    //rfn:驱动程序自动提供的操作I/O的函数.对应请求队列的request_fn
    //lock:驱动程序提供给请求队列的自旋锁
    request_queue_t *blk_init_queue(request_fn_proc *rfn, spinlock_t *lock)
    {
         request_queue_t *q;
         static int printed;
         //申请请求队列描述符
         q = blk_alloc_queue(GFP_KERNEL);
         if (!q)
             return NULL;
         //初始化q->request_list
         if (blk_init_free_list(q))
             goto out_init;

         if (!printed) {
             printed = 1;
             printk("Using %s io scheduler\n", chosen_elevator->elevator_name);
         }

         //初始化请求队列描述符中的各项操作函数
         q->request_fn      = rfn;
         q->back_merge_fn       = ll_back_merge_fn;
         q->front_merge_fn      = ll_front_merge_fn;
         q->merge_requests_fn   = ll_merge_requests_fn;
         q->prep_rq_fn      = NULL;
         q->unplug_fn       = generic_unplug_device;
         q->queue_flags         = (1
         q->queue_lock      = lock;

         
         blk_queue_segment_boundary(q, 0xffffffff);
         //设置q->make_request_fn函数,初始化等待队对列的定时器和等待队列
         blk_queue_make_request(q, __make_request);
         //设置max_segment_size,max_hw_segments,max_phys_segments
         blk_queue_max_segment_size(q, MAX_SEGMENT_SIZE);
         blk_queue_max_hw_segments(q, MAX_HW_SEGMENTS);
         blk_queue_max_phys_segments(q, MAX_PHYS_SEGMENTS);

         /*
          * all done
          */
          //设置等待队列的I/O调度程序
         if (!elevator_init(q, chosen_elevator))
             return q;
         //失败的处理
         blk_cleanup_queue(q);
    out_init:
         kmem_cache_free(requestq_cachep, q);
         return NULL;
    }
    这个函数中初始化了很多操作指针,这个函数在所有块设备中都是一样的,这样就为通用块设备层提供了一个统一的接口.对于块设备驱动的接口就是我们在blk_init_queue中设置的策略例程了.留意一下关于请求队列的各操作的设置,这在后续的分析中会用到.
    另外,在请求结构中涉及到了bio结构.bio表示一个段.目前内核中关于I/O的所有操作都是由它来表示的.它的结构如下所示:
    struct bio {
         //段的起始扇区
         sector_t      bi_sector;
         //下一个bio
         struct bio         *bi_next; /* request queue link */
         //段所在的块设备
         struct block_device    *bi_bdev;
         //bio的标志
         unsigned long      bi_flags; /* status, command, etc */
         //Read/Write
         unsigned long      bi_rw;        /* bottom bits READ/WRITE,
                                 * top bits priority
                                 */
         //bio_vec的项数
         unsigned short         bi_vcnt; /* how many bio_vec's */
         //当前正在操作的bio_vec
         unsigned short         bi_idx;       /* current index into bvl_vec */

         /* Number of segments in this BIO after
          * physical address coalescing is performed.
          */
          //结合后的片段数目
         unsigned short         bi_phys_segments;

         /* Number of segments after physical and DMA remapping
          * hardware coalescing is performed.
          */
          //重映射后的片段数目
         unsigned short         bi_hw_segments;
         //I/O计数
         unsigned int       bi_size; /* residual I/O count */

         /*
          * To keep track of the max hw size, we account for the
          * sizes of the first and last virtually mergeable segments
          * in this bio
          */
          //第一个可以合并的段大小
         unsigned int       bi_hw_front_size;
         //最后一个可以合并的段大小
         unsigned int       bi_hw_back_size;
         //最大的bio_vec项数
         unsigned int       bi_max_vecs;  /* max bvl_vecs we can hold */
         //bi_io_vec数组
         struct bio_vec         *bi_io_vec;   /* the actual vec list */
         //I/O完成的方法
         bio_end_io_t       *bi_end_io;
         //使用计数
         atomic_t      bi_cnt;       /* pin count */
         //拥有者的私有区
         void          *bi_private;
         //销毁此bio的方法
         bio_destructor_t   *bi_destructor;    /* destructor */
    }
    bio_vec的结构如下:
    struct bio_vec {
         //bi_vec所表示的页面
         struct page   *bv_page;
         //数据区的长度
         unsigned int  bv_len;
         //在页面中的偏移量
         unsigned int  bv_offset;
    }
    关于bio与bio_vec的关系,用下图表示:

    现在,我们来思考一个问题:
    当一个I/O请求提交给请求队列后,它是怎么去调用块设备驱动的策略例程去完成这次I/O的呢?还有,当一个I/O请求被提交给请求队列时,会不会立即调用驱动中的策略例程去完成这次I/O呢?
    实际上,为了提高效率,所有的I/O都会在一个特定的延时之后才会调用策略例程去完成本次I/O.我们来看一个反面的例子,假设I/O在被提交后马上得到执行.例如.磁盘有磁针在磁盘12.现在有一个磁道1的请求.就会将磁针移动到磁道1.操作完后,又有一个请求过来了,它要操作磁道11.然后又会将磁针移到磁道11.操作完后,又有一个请求过来,要求操作磁道4.此时会将磁针移到磁道4.这个例子中,磁针移动的位置是:12->1->11->4.实际上,磁针的定位是一个很耗时的操作.这样下去,毫无疑问会影响整个系统的效率.我们可以在整个延时内,将所有I/O操作按顺序排列在一起,然后再调用策略例程.于是上例的磁针移动就会变成12->11->4->1.此时磁针只会往一个方向移动.
    至于怎么样排列请求和选取哪一个请求进行操作,这就是I/O调度的任务了.这部份我们在通用块层再进行分析.
    内核中有两个操作会完成上面的延时过程.即:激活块设备驱动程序和撤消块设备驱动程序.
    3.6:块设备驱动程序的激活和撤消
    激活块设备驱动程序和撤消块设备驱动程序在内核中对应的接口为blk_plug_device()和blk_remove_plug().分别看下它们的操作:
    void blk_plug_device(request_queue_t *q)
    {
         WARN_ON(!irqs_disabled());

         /*
          * don't plug a stopped queue, it must be paired with blk_start_queue()
          * which will restart the queueing
          */

         //如果设置了QUEUE_FLAG_STOPPED.直接退出
         if (test_bit(QUEUE_FLAG_STOPPED, &q->queue_flags))
             return;

         //为请求队列设置QUEUE_FLAG_PLUGGED.
         if (!test_and_set_bit(QUEUE_FLAG_PLUGGED, &q->queue_flags))
             //如果之前请求队列的状态不为QUEUE_FLAG_PLUGGED,则设置定时器超时时间
             mod_timer(&q->unplug_timer, jiffies + q->unplug_delay);
    }

    int blk_remove_plug(request_queue_t *q)
    {
         WARN_ON(!irqs_disabled());

         //将队列QUEUE_FLAG_PLUGGED状态清除
         if (!test_and_clear_bit(QUEUE_FLAG_PLUGGED, &q->queue_flags))
             //如果请求队列之前不为QUEUE_FLAG_PLUGGED标志,直接返回
             return 0;
    //如果之前是QUEUE_FLAG_PLUGGED标志,则将定时器删除
         del_timer(&q->unplug_timer);
         return 1;
    }
    如果请求队列状态为QUEUE_FLAG_PLUGGED,且定时器超时,会有什么样的操作呢?
    回忆在请求队列初始化函数中,blk_init_queue()会调用blk_queue_make_request().它的代码如下:
    void blk_queue_make_request(request_queue_t * q, make_request_fn * mfn)
    {
         ……
         ……
         q->unplug_delay = (3 * HZ) / 1000;   /* 3 milliseconds */
         if (q->unplug_delay == 0)
             q->unplug_delay = 1;

         INIT_WORK(&q->unplug_work, blk_unplug_work, q);

         q->unplug_timer.function = blk_unplug_timeout;
         q->unplug_timer.data = (unsigned long)q;
         ……
         ……
    }
    上面设置了定时器的时间间隔为(3*HZ)/1000.定时器超时的处理函数为blk_unplug_timeout().参数为请求队列本身.
    blk_unplug_timeout()的代码如下:
    static void blk_unplug_timeout(unsigned long data)
    {
         request_queue_t *q = (request_queue_t *)data;

         kblockd_schedule_work(&q->unplug_work);
    }
    从上面的代码看出,定时器超时之后,会唤醒q->unplug_work这个工作对列.
    在blk_queue_make_request()中,对这个工作队列的初始化为:
    INIT_WORK(&q->unplug_work, blk_unplug_work, q)
    即工作队列对应的函数为blk_unplug_work().对应的参数为请求队列本身.代码如下:
    static void blk_unplug_work(void *data)
    {
         request_queue_t *q = data;

         q->unplug_fn(q);
    }
    到此,就会调用请求队列的unplug_fn()操作.
    在blk_init_queue()对这个成员的赋值如下所示:
         q->unplug_fn       = generic_unplug_device;
    generic_unplug_device()对应的代码如下:
    void __generic_unplug_device(request_queue_t *q)
    {
         //如果请求队列是QUEUE_FLAG_STOPPED 状态,返回
         if (test_bit(QUEUE_FLAG_STOPPED, &q->queue_flags))
             return;
         //如果请求队列的状态是QUEUE_FLAG_PLUGGED.就会返回1
         if (!blk_remove_plug(q))
             return;

         /*
          * was plugged, fire request_fn if queue has stuff to do
          */
          //如果请求对列中的请求,则调用请求队列的reauest_fn函数.也就是驱动程序的
          //策略例程
         if (elv_next_request(q))
             q->request_fn(q);
    }
    blk_remove_plug()在上面已经分析过了.这里不再赘述.
    归根到底,最后的I/O完成操作都会调用块设备驱动的策略例程来完成.
    四:I/O调度层
    I/O调度对应的结构如下所示:
    struct elevator_s
    {
         //当要插入一个bio时会调用
         elevator_merge_fn *elevator_merge_fn;
         elevator_merged_fn *elevator_merged_fn;
         elevator_merge_req_fn *elevator_merge_req_fn;
         //取得下一个请求
         elevator_next_req_fn *elevator_next_req_fn;
         //往请求队列中增加请求
         elevator_add_req_fn *elevator_add_req_fn;
         elevator_remove_req_fn *elevator_remove_req_fn;
         elevator_requeue_req_fn *elevator_requeue_req_fn;

         elevator_queue_empty_fn *elevator_queue_empty_fn;
         elevator_completed_req_fn *elevator_completed_req_fn;

         elevator_request_list_fn *elevator_former_req_fn;
         elevator_request_list_fn *elevator_latter_req_fn;

         elevator_set_req_fn *elevator_set_req_fn;
         elevator_put_req_fn *elevator_put_req_fn;

         elevator_may_queue_fn *elevator_may_queue_fn;
         
         //初始化与退出操作
         elevator_init_fn *elevator_init_fn;
         elevator_exit_fn *elevator_exit_fn;

         void *elevator_data;

         struct kobject kobj;
         struct kobj_type *elevator_ktype;
         //调度算法的名字
         const char *elevator_name;
    }
    我们以最简单的NOOP算法为例进行分析.
    NOOP算法只是做简单的请求合并的操作.的定义如下:
    elevator_t elevator_noop = {
         .elevator_merge_fn     = elevator_noop_merge,
         .elevator_merge_req_fn      = elevator_noop_merge_requests,
         .elevator_next_req_fn       = elevator_noop_next_request,
         .elevator_add_req_fn        = elevator_noop_add_request,
         .elevator_name              = "noop",
    }
    挨个分析里面的各项操作:
    elevator_noop_merge():在请求队列中寻找能否有可以合并的请求.代码如下:
    int elevator_noop_merge(request_queue_t *q, struct request **req,
                  struct bio *bio)
    {
         struct list_head *entry = &q->queue_head;
         struct request *__rq;
         int ret;

         //如果请求队列中有last_merge项.则判断last_merge项是否能够合并
         //在NOOP中一般都不会设置last_merge
         if ((ret = elv_try_last_merge(q, bio))) {
             *req = q->last_merge;
             return ret;
         }

         //遍历请求队列中的请求
         while ((entry = entry->prev) != &q->queue_head) {
             __rq = list_entry_rq(entry);

             if (__rq->flags & (REQ_SOFTBARRIER | REQ_HARDBARRIER))
                  break;
             else if (__rq->flags & REQ_STARTED)
                  break;
             //如果不是一个fs类型的请求?
             if (!blk_fs_request(__rq))
                  continue;
             //判断能否与这个请求合并   
             if ((ret = elv_try_merge(__rq, bio))) {
                  *req = __rq;
                  q->last_merge = __rq;
                  return ret;
             }
         }

         return ELEVATOR_NO_MERGE;
    }
    Elv_try_merge()用来判断能否与请求合并,它的代码如下:
    inline int elv_try_merge(struct request *__rq, struct bio *bio)
    {
         int ret = ELEVATOR_NO_MERGE;

         /*
          * we can merge and sequence is ok, check if it's possible
          */
          //判断rq与bio是否为同类型的请求
         if (elv_rq_merge_ok(__rq, bio)) {
             //如果请求描述符中的起始扇区+ 扇区数= bio的起始扇区
             //则将bio加到_rq的后面.
             //返回ELEVATOR_BACK_MERGE
             if (__rq->sector + __rq->nr_sectors == bio->bi_sector)
                  ret = ELEVATOR_BACK_MERGE;
             //如果请求描述符中的起始扇区- 扇区数=bio的起始扇区
             //则将bio加到_rq的前面
              //返回ELEVATOR_FRONT_MERGE
             else if (__rq->sector - bio_sectors(bio) == bio->bi_sector)
                  ret = ELEVATOR_FRONT_MERGE;
         }

         //如果不可以合并,返回ELEVATOR_NO_MERGE (值为0)
         return ret;
    }
    elv_rq_merge_ok()代码如下:
    inline int elv_rq_merge_ok(struct request *rq, struct bio *bio)
    {
         //判断rq是否可用
         if (!rq_mergeable(rq))
             return 0;

         /*
          * different data direction or already started, don't merge
          */
          //操作是否相同
         if (bio_data_dir(bio) != rq_data_dir(rq))
             return 0;

         /*
          * same device and no special stuff set, merge is ok
          */
          //要操作的对象是否一样
         if (rq->rq_disk == bio->bi_bdev->bd_disk &&
             !rq->waiting && !rq->special)
             return 1;

         return 0;
    }
    注意:如果检查成功返回1.失败返回0.

    elevator_noop_merge_requests():将next 从请求队列中取出.代码如下:
    void elevator_noop_merge_requests(request_queue_t *q, struct request *req,
                         struct request *next)
    {
         list_del_init(&next->queuelist);
    }
    从上面的代码中看到,NOOP算法从请求队列中取出请求,只需要取链表结点即可.不需要进行额外的操作.

    elevator_noop_next_request():取得下一个请求.代码如下:
    struct request *elevator_noop_next_request(request_queue_t *q)
    {
         if (!list_empty(&q->queue_head))
             return list_entry_rq(q->queue_head.next);

         return NULL;
    }
    很简单,取链表的下一个结点.

    elevator_noop_add_request():往请求队列中插入一个请求.代码如下:
    void elevator_noop_add_request(request_queue_t *q, struct request *rq,
                         int where)
    {
         //默认是将rq插和到循环链表末尾
         struct list_head *insert = q->queue_head.prev;
         //如果要插到请求队列的前面
         if (where == ELEVATOR_INSERT_FRONT)
             insert = &q->queue_head;

         //不管是什么样的操作,都将新的请求插入到请求队列的末尾
         list_add_tail(&rq->queuelist, &q->queue_head);

         /*
          * new merges must not precede this barrier
          */
         if (rq->flags & REQ_HARDBARRIER)
             q->last_merge = NULL;
         else if (!q->last_merge)
             q->last_merge = rq;
    }

    五:通用块层的处理
    通用块层的入口点为generic_make_request().它的代码如下:
    void generic_make_request(struct bio *bio)
    {
         request_queue_t *q;
         sector_t maxsector;
         //nr_sectors:要操作的扇区数
         int ret, nr_sectors = bio_sectors(bio);

         //可能会引起睡眠
         might_sleep();
         /* Test device or partition size, when known. */
         //最大扇区数目
         maxsector = bio->bi_bdev->bd_inode->i_size >> 9;
         if (maxsector) {
             //bio操作的起始扇区
             sector_t sector = bio->bi_sector;

             //如果最大扇区数
             //非法的情况
             if (maxsector
                 maxsector - nr_sectors
                  char b[BDEVNAME_SIZE];
                  /* This may well happen - the kernel calls
                   * bread() without checking the size of the
                   * device, e.g., when mounting a device. */
                  printk(KERN_INFO
                         "attempt to access beyond end of device\n");
                  printk(KERN_INFO "%s: rw=%ld, want=%Lu, limit=%Lu\n",
                         bdevname(bio->bi_bdev, b),
                         bio->bi_rw,
                         (unsigned long long) sector + nr_sectors,
                         (long long) maxsector);

                  set_bit(BIO_EOF, &bio->bi_flags);
                  goto end_io;
             }
         }

         /*
          * Resolve the mapping until finished. (drivers are
          * still free to implement/resolve their own stacking
          * by explicitly returning 0)
          *
          * NOTE: we don't repeat the blk_size check for each new device.
          * Stacking drivers are expected to know what they are doing.
          */
         do {
             char b[BDEVNAME_SIZE];
             //取得块设备的请求对列
             q = bdev_get_queue(bio->bi_bdev);
         if (!q) {
                  //请求队列不存在
                  printk(KERN_ERR
                         "generic_make_request: Trying to access "
                       "nonexistent block-device %s (%Lu)\n",
                       bdevname(bio->bi_bdev, b),
                       (long long) bio->bi_sector);
    end_io:
                  //最终会调用bio->bi_end_io
                  bio_endio(bio, bio->bi_size, -EIO);
                  break;
             }

             //非法的情况
             if (unlikely(bio_sectors(bio) > q->max_hw_sectors)) {
                  printk("bio too big device %s (%u > %u)\n",
                       bdevname(bio->bi_bdev, b),
                       bio_sectors(bio),
                       q->max_hw_sectors);
                  goto end_io;
             }

             //如果请求队列为QUEUE_FLAG_DEAD
             //退出
             if (test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))
                  goto end_io;

             /*
              * If this device has partitions, remap block n
              * of partition p to block n+start(p) of the disk.
              */
              //如果当前块设备是一个分区,则转到分区所属的块设备
             blk_partition_remap(bio);
             //调用请求队列的make_request_fn()
             ret = q->make_request_fn(q, bio);
         } while (ret);
    }

    在blk_init_queue()中对请求队列的make_request_fn的设置如下所示:
    blk_init_queue()—> blk_queue_make_request(q, __make_request)
    void blk_queue_make_request(request_queue_t * q, make_request_fn * mfn)
    {
         ……
         ……
         q->make_request_fn = mfn;
         ……
    }
    这里,等待队对的make_request_fn就被设置为了__make_request.这个函数的代码如下:
    static int __make_request(request_queue_t *q, struct bio *bio)
    {
         struct request *req, *freereq = NULL;
         int el_ret, rw, nr_sectors, cur_nr_sectors, barrier, err;
         sector_t sector;

         //bio的起始扇区
         sector = bio->bi_sector;
         //扇区数目
         nr_sectors = bio_sectors(bio);
         //当前bio中的bio_vec的扇区数目
         cur_nr_sectors = bio_cur_sectors(bio);
         //读/写
         rw = bio_data_dir(bio);

         /*
          * low level driver can indicate that it wants pages above a
          * certain limit bounced to low memory (ie for highmem, or even
          * ISA dma in theory)
          */
          //建立一个弹性回环缓存
         blk_queue_bounce(q, &bio);

         spin_lock_prefetch(q->queue_lock);

         barrier = bio_barrier(bio);
         if (barrier && !(q->queue_flags & (1
             err = -EOPNOTSUPP;
             goto end_io;
         }

    again:
         spin_lock_irq(q->queue_lock);

         //请求队列是空的
         if (elv_queue_empty(q)) {
             //激活块设备驱动
             blk_plug_device(q);
             goto get_rq;
         }
         if (barrier)
             goto get_rq;
         //调用I/O调度的elevator_merge_fn方法,判断这个bio能否和其它请求合并
         //如果可以合并,req参数将返回与之合并的请求描述符
         el_ret = elv_merge(q, &req, bio);
         switch (el_ret) {
             //可以合并.且bio加到req的后面
             case ELEVATOR_BACK_MERGE:
                  BUG_ON(!rq_mergeable(req));

                  if (!q->back_merge_fn(q, req, bio))
                       break;

                  req->biotail->bi_next = bio;
                  req->biotail = bio;
                  req->nr_sectors = req->hard_nr_sectors += nr_sectors;
                  drive_stat_acct(req, nr_sectors, 0);
                  if (!attempt_back_merge(q, req))
                       elv_merged_request(q, req);
                  goto out;
             //可以合并.且bio加到req的前面
             case ELEVATOR_FRONT_MERGE:
                  BUG_ON(!rq_mergeable(req));

                  if (!q->front_merge_fn(q, req, bio))
                       break;

                  bio->bi_next = req->bio;
                  req->cbio = req->bio = bio;
                  req->nr_cbio_segments = bio_segments(bio);
                  req->nr_cbio_sectors = bio_sectors(bio);

                  /*
                   * may not be valid. if the low level driver said
                   * it didn't need a bounce buffer then it better
                   * not touch req->buffer either...
                   */
                  req->buffer = bio_data(bio);
                  req->current_nr_sectors = cur_nr_sectors;
                  req->hard_cur_sectors = cur_nr_sectors;
                  req->sector = req->hard_sector = sector;
                  req->nr_sectors = req->hard_nr_sectors += nr_sectors;
                  drive_stat_acct(req, nr_sectors, 0);
                  if (!attempt_front_merge(q, req))
                       elv_merged_request(q, req);
                  goto out;

             /*
              * elevator says don't/can't merge. get new request
              */
              //不可以合并.申请一个新的请求,将且加入请求队列
             case ELEVATOR_NO_MERGE:
                  break;

             default:
                  printk("elevator returned crap (%d)\n", el_ret);
                  BUG();
         }

         /*
          * Grab a free request from the freelist - if that is empty, check
          * if we are doing read ahead and abort instead of blocking for
          * a free slot.
          */
    get_rq:
         //freereq:是新分配的请求描述符
         if (freereq) {
             req = freereq;
             freereq = NULL;
         } else {
             //分配一个请求描述符
             spin_unlock_irq(q->queue_lock);
             if ((freereq = get_request(q, rw, GFP_ATOMIC)) == NULL) {
                  /*
                   * READA bit set
                   */
                   //分配失败
                   err = -EWOULDBLOCK;
                  if (bio_rw_ahead(bio))
                       goto end_io;
         
                  freereq = get_request_wait(q, rw);
             }
             goto again;
         }

         req->flags |= REQ_CMD;

         /*
          * inherit FAILFAST from bio (for read-ahead, and explicit FAILFAST)
          */
         if (bio_rw_ahead(bio) || bio_failfast(bio))
             req->flags |= REQ_FAILFAST;

         /*
          * REQ_BARRIER implies no merging, but lets make it explicit
          */
         if (barrier)
             req->flags |= (REQ_HARDBARRIER | REQ_NOMERGE);

         //初始化新分配的请求描述符
         req->errors = 0;
         req->hard_sector = req->sector = sector;
         req->hard_nr_sectors = req->nr_sectors = nr_sectors;
         req->current_nr_sectors = req->hard_cur_sectors = cur_nr_sectors;
         req->nr_phys_segments = bio_phys_segments(q, bio);
         req->nr_hw_segments = bio_hw_segments(q, bio);
         req->nr_cbio_segments = bio_segments(bio);
         req->nr_cbio_sectors = bio_sectors(bio);
         req->buffer = bio_data(bio);     /* see ->buffer comment above */
         req->waiting = NULL;
         //将bio 关联到请求描述符
         req->cbio = req->bio = req->biotail = bio;
         req->rq_disk = bio->bi_bdev->bd_disk;
         req->start_time = jiffies;
         //请将求描述符添加到请求队列中
         add_request(q, req);
    out: (R)
         if (freereq)
             __blk_put_request(q, freereq);
         //如果定义了BIO_RW_SYNC.
         //将调用__generic_unplug_device将块设备驱动,它会直接调用驱动程序的策略例程
         if (bio_sync(bio))
             __generic_unplug_device(q);

         spin_unlock_irq(q->queue_lock);
         return 0;

    end_io:
         bio_endio(bio, nr_sectors
         return 0;
    }
    这个函数的逻辑比较简单,它判断bio能否与请求队列中存在的请求合并,如果可以合并,将其它合并到现有的请求.如果不能合并,则新建一个请求描述符,然后把它插入到请求队列中.上面的代码可以结合之前分析的NOOP算法进行理解.
    重点分析一下请求描述符的分配过程:
    分配一个请求描述符的过程如下所示:
             if ((freereq = get_request(q, rw, GFP_ATOMIC)) == NULL) {
                  /*
                   * READA bit set
                   */
                   //分配失败
                   err = -EWOULDBLOCK;
                  if (bio_rw_ahead(bio))
                       goto end_io;
         
                  freereq = get_request_wait(q, rw);
             }
    在分析这段代码之前,先来讨论一下关于请求描述符的分配方式.记得我们在分析请求队列描述符的时候,request_queue中有一个成员:struct request_list  rq;
    它的数据结构如下:
    struct request_list {
         //读/写请求描述符的分配计数
         int count[2];
         //分配缓存池
         mempool_t *rq_pool;
         //如果没有空闲内存时.读/写请求的等待队列
         wait_queue_head_t wait[2];
    };
    如果当前空闲内存不够.则会将请求的进程挂起.如果分配成功,则将请求队列的rl字段指向这个分配的request_list.
    释放一个请求描述符,将会将其归还给指定的内存池.
    request_list结构还有一个避免请求拥塞的作用:
    每个请求队列都有一个允许处理请求的最大值(request_queue->nr_requests).如果队列中的请求超过了这个数值,则将队列置为QUEUE_FLAG_READFULL/QUEUE_FLAG_WRITEFULL.后续试图加入到队列的进程就会被放置到request_list结构所对应的等待队列中睡眠.如果一个队列中的睡眠进程过程也多也会影响系统的效率.如果待处理的请求大于request_queue-> nr_congestion_on就会认为这个队列是拥塞的.就会试图降低新请求的创建速度.如果待处理请求小于request_queue->nr_congestion_off.则会认为当前队列是不拥塞的.
    get_request()的代码如下:
    static struct request *get_request(request_queue_t *q, int rw, int gfp_mask)
    {
         struct request *rq = NULL;
         struct request_list *rl = &q->rq;
         struct io_context *ioc = get_io_context(gfp_mask);

         spin_lock_irq(q->queue_lock);
         //如果请求数超过了请求队列允许的最大请求值(q->nr_requests)
         //就会将后续的请求进程投入睡眠
         
         if (rl->count[rw]+1 >= q->nr_requests) {
             /*
              * The queue will fill after this allocation, so set it as
              * full, and mark this process as "batching". This process
              * will be allowed to complete a batch of requests, others
              * will be blocked.
              */
              //判断是否将队列置为了QUEUE_FLAG_READFULL/QUEUE_FLAG_WRITEFULL
              //如果没有,则置此标志.并且设置当前进程为batching
             if (!blk_queue_full(q, rw)) {
                  ioc_set_batching(ioc);
                  blk_set_queue_full(q, rw);
             }
         }

         //如果队列满了,进程不为batching 且I/O调度程序不能忽略它
         //不能分配.直接返回
         if (blk_queue_full(q, rw)
                  && !ioc_batching(ioc) && !elv_may_queue(q, rw)) {
             /*
              * The queue is full and the allocating process is not a
              * "batcher", and not exempted by the IO scheduler
              */
             spin_unlock_irq(q->queue_lock);
             goto out;
         }

         //要分配请求描述符了,递增计数
         rl->count[rw]++;
         //如果待请求数量超过了request_queue-> nr_congestion_on
         //则队列是阻塞的,设置阻塞标志
         if (rl->count[rw] >= queue_congestion_on_threshold(q))
             set_queue_congested(q, rw);
         spin_unlock_irq(q->queue_lock);

         //分配请求描述符
         rq = blk_alloc_request(q, gfp_mask);
         if (!rq) {
             /*
              * Allocation failed presumably due to memory. Undo anything
              * we might have messed up.
              *
              * Allocating task should really be put onto the front of the
              * wait queue, but this is pretty rare.
              */
             spin_lock_irq(q->queue_lock);
             //分配失败了,要减小分配描述的引用计数
             freed_request(q, rw);
             spin_unlock_irq(q->queue_lock);
             goto out;
         }

         if (ioc_batching(ioc))
             ioc->nr_batch_requests--;

         //初始化请求的各字段
         INIT_LIST_HEAD(&rq->queuelist);

         /*
          * first three bits are identical in rq->flags and bio->bi_rw,
          * see bio.h and blkdev.h
          */
         rq->flags = rw;

         rq->errors = 0;
         rq->rq_status = RQ_ACTIVE;
         rq->bio = rq->biotail = NULL;
         rq->buffer = NULL;
         rq->ref_count = 1;
         rq->q = q;
         rq->rl = rl;
         rq->waiting = NULL;
         rq->special = NULL;
         rq->data_len = 0;
         rq->data = NULL;
         rq->sense = NULL;

    out:
         //减少ioc的引用计数
         put_io_context(ioc);
         return rq;
    }
    由于在分配之前递增了统计计数,所以在分配失败后,要把这个统计计数减下来,这是由freed_request()完成的.它的代码如下:
    static void freed_request(request_queue_t *q, int rw)
    {
         struct request_list *rl = &q->rq;

         rl->count[rw]--;
         //如果分配计数小于request_queue->nr_congestion_off.队列已经不拥塞了
         if (rl->count[rw]
             clear_queue_congested(q, rw);
         //如果计数小于允许的最大值.那可以分配请求了,将睡眠的进程唤醒
         if (rl->count[rw]+1 nr_requests) {
             //唤醒等待进程
             if (waitqueue_active(&rl->wait[rw]))
                  wake_up(&rl->wait[rw]);
             //清除QUEUE_FLAG_READFULL/QUEUE_FLAG_WRITEFULL
             blk_clear_queue_full(q, rw);
         }
    }
    在这里我们可以看到,如果待处理请求小于请求队列所允许的最大值,就会将睡眠的进程唤醒.
    如果请求描述符分配失败,会怎么样呢?我们接着看__make_request()中的代码:
             if ((freereq = get_request(q, rw, GFP_ATOMIC)) == NULL) {
                  /*
                   * READA bit set
                   */
                   //分配失败
                   err = -EWOULDBLOCK;
                  //如果此次操作是一次预读,且不阻塞
                  if (bio_rw_ahead(bio))
                       goto end_io;
                  //挂起进程
                  freereq = get_request_wait(q, rw);
             }
    如果分配失败,会调用get_request_wait()将进程挂起.它的代码如下:
    static struct request *get_request_wait(request_queue_t *q, int rw)
    {
         //初始化一个等待队列
         DEFINE_WAIT(wait);
         struct request *rq;
         struct io_context *ioc;

         //撤消块设备驱动.这里会直接调用块设备驱动的策略例程
         generic_unplug_device(q);
         ioc = get_io_context(GFP_NOIO);
         do {
             struct request_list *rl = &q->rq;

             //将当前进程加入等待队列.并设置进程状态为TASK_UNINTERRUPTIBLE
             prepare_to_wait_exclusive(&rl->wait[rw], &wait,
                       TASK_UNINTERRUPTIBLE);
             //再次获得等待队列
             rq = get_request(q, rw, GFP_NOIO);

             if (!rq) {
                  
                  //如果还是失败了,睡眠
                  io_schedule();

                  /*
                   * After sleeping, we become a "batching" process and
                   * will be able to allocate at least one request, and
                   * up to a big batch of them for a small period time.
                   * See ioc_batching, ioc_set_batching
                   */
                   //这里是被唤醒之后运行
                  ioc_set_batching(ioc);
             }
             //将进程从等待队列中删除
             finish_wait(&rl->wait[rw], &wait);
         } while (!rq);
         put_io_context(ioc);

         return rq;
    }
    这段代码比较简单,相似的代码我们在之前已经分析过很多次了.这里不做重点分析.

    此外.在__make_request()中还需要注意一件事情.在bio中的内存可能是高端内存的.但是内核不能直接访问,这里就必须要对处理高端内存的bio_vec做下处理.即将它临时映射之后copy到普通内存区.这就是所谓的弹性回环缓存.相关的操作是在blk_queue_bounce()中完成的.这个函数比较简单,可以自行分析.
    到这里,通用块层的处理分析就结束了.我们继续分析其它的层次.
    展开全文
  • AJAX请求队列实现

    2020-10-21 02:34:58
    主要为大家详细介绍了AJAX请求队列的实现代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • 2.2.3 请求通道的请求队列和响应队列 创建SocketServer也会创建一个请求通道(RequestChannel),在KafkaServer中,会将SocketServer的请求通道传给Kafka请求处理线程(KafkaRequestHandler,下文简称“请求处理线程...

    2.2.3 请求通道的请求队列和响应队列

    创建SocketServer也会创建一个请求通道(RequestChannel),在KafkaServer中,会将SocketServer的请求通道传给Kafka请求处理线程(KafkaRequestHandler,下文简称“请求处理线程”)和KafkaApis。在上一节中客户端的请求已经到达服务端的处理器(processor),那么请求通道就是处理器与请求处理线程和KafkaApis交换数据的地方:如果处理器往请求通道添加请求,请求处理线程和KafkaApis都可以获取到请求通道中的请束;如果请求处理线程和KafkaApis往请求通道添加响应,处理器也可以从请求通道获取晌应。

    处理器会将客户端发送的请求放到全局的请求队列(requestQueue)中,供请求处理线程获取,请求处理线程会将请求转发给KafkaApis处理。最后KafkaApis会将处理完成的响应结果放到响应队列(responseQueue)中,供处理器获取后发送给客户端。相关代码如下:

    requestChannel.addResponseListener(id => processors(id).wakeup())
    
    class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
      private var responseListeners: List[(Int) => Unit] = Nil
      private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
      private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
      for(i <- 0 until numProcessors)
        responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
    
      newGauge(
        "RequestQueueSize",
        new Gauge[Int] {
          def value = requestQueue.size
        }
      )
    
      newGauge("ResponseQueueSize", new Gauge[Int]{
        def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
      })
    
      for (i <- 0 until numProcessors) {
        newGauge("ResponseQueueSize",
          new Gauge[Int] {
            def value = responseQueues(i).size()
          },
          Map("processor" -> i.toString)
        )
      }
    
      /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
      def sendRequest(request: RequestChannel.Request) {
        requestQueue.put(request)
      }
    
      /** Send a response back to the socket server to be sent over the network */
      def sendResponse(response: RequestChannel.Response) {
        responseQueues(response.processor).put(response)
        for(onResponse <- responseListeners)
          onResponse(response.processor)
      }
    
      /** Get the next request or block until specified time has elapsed */
      def receiveRequest(timeout: Long): RequestChannel.Request =
        requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
    
      /** Get the next request or block until there is one */
      def receiveRequest(): RequestChannel.Request =
        requestQueue.take()
    
      /** Get a response for the given processor if there is one */
      def receiveResponse(processor: Int): RequestChannel.Response = {
        val response = responseQueues(processor).poll()
        if (response != null)
          response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
        response
      }
    
      def addResponseListener(onResponse: Int => Unit) {
        responseListeners ::= onResponse
      }
    
      def shutdown() {
        requestQueue.clear()
      }
    }
    

    如图2-20所示,因为请求通道保存了请求和响应两种类型的队列,它的各个方法中关于请求和响应的接收和发送是有顺序的:发送请求→接收请求→发送n向应→接收响应。

    (1)sendRequest():处理器接收到客户端请求后,将请求放入请求队列。
    (2)receiveRequest():请求处理线程从队列中获取请求,并交给KafkaApis处理。
    (3)sendResponse():KafkaApis处理完,将响应结果放入响应队列。
    (4)receiveResponse():处理器从响应队列中获取响应结果发送给客户端。

    在这里插入图片描述

    上面只是一个请求和响应在请求通道上的调用顺序,下面以服务端同时处理多个客户端请求为例,并结合其他相关的组件,来说明处理器将请求放入请求通道,一直到从请求通道获取响应的过程(图中的编号和图2-20编号的含义相同)。如图2-21所示,由于一个SocketServer有多个处理器,每个处理器都负责一部分客户端的请求。如果请求l发送给处理器l,那么请求l对应的响应也只能发送给处理器l,所以每个处理器都有一个响应队列。虽然请求队列是所有处理器全局共享的,不过会有多个请求处理线程同时处理请求队列中的客户端请求。假设处理器3有两个客户端请求,这两个请求进入全局的请求队列后可能被不同的请求处理线程处理,最后KafkaApis会将这两个请求的响应都放入处理器3对应的响应队列中。

    在这里插入图片描述
    从图2-21处理器使用请求通道的方式也可以看到,处理器的processCollpletedReceives()会往请求通道的请求队列添加请求,ProcessNewResponses()则从请求通道的响应队列获取响应。与之相对应的获取请求和添加响应的操作,则属于请求处理线程(KafkaRequestHandler)和KafkaApis的功能。

    展开全文
  • 网络请求队列

    千次阅读 2017-05-07 19:55:40
    当前网络请求完成时(成功、失败、或者超时),才能进行下一个网络请求,这就是网络请求队列。 使用场景 客户端短时间内产生的网络请求次数过多,造成服务器压力过大,需要限制 下一个网络请求的参数依赖上一个...

    定义

    当前网络请求完成时(成功、失败、或者超时),才能进行下一个网络请求,这就是网络请求队列。

    使用场景

    • 客户端短时间内产生的网络请求次数过多,造成服务器压力过大,需要限制
    • 下一个网络请求的参数依赖上一个网络请求的结果
    • 每次网络请求的参数结构和返回结果基本一样
    • 解决小程序只能同时并发5个请求的限制

    网络请求队列的代码实现

    let currentRequestQueue = [];  //当前网络请求队列
    let currentRequestResponse = true;  //当前网络请求响应状态
    let requestTimes = 0;   //网络请求次数
    let responseFailTimes = 0;   //网络反馈失败次数
    let responseSuccessTimes = 0;   //网络反馈成功次数
    
    /** 网络请求队列函数  */
    function requestQueue(){   
        //网络请求队列里还有网络请求,当前请求反馈有结果时才进行下一个请求
        if(this.currentRequestQueue.length > 0 && this.currentRequestResponse){
            this.updateData(this.currentRequestQueue.shift());
        }
    }
    
    /** 请求更新服务器上的数据 */
    function updateData(params){
        requestTimes++;   //网络请求次数加1
        currentRequestResponse = false;  //已发送请求,但未响应	
        fetch('http://127.0.0.1:8080/app/update',{
    	    method:'POST',
    	    headers:{"Content-Type": "application/json;charset=UTF-8"},
    	    body:JSON.stringify(params)   //通常需要转换成字符串后服务器才能解析
        }).then((response) => response.json())
            .then( res=>{
                if(res.header.statusCode == 'success'){  //请求成功
                    responseSuccessTimes++;   //网络反馈成功次数加1            
                }else{  //请求失败   
                    currentRequestQueue.unshift(params);  //添加到队列的开头再次请求          
                    responseFailTimes++;  //网络反馈失败次数加1
                }
                currentRequestResponse = true;  //当前网络请求已响应
                requestQueue();   //进行下一个请求
                requestResult();  // 所有网络请求结果
            }).catch( err=>{  //请求出错
                currentRequestQueue.unshift(params);  //添加到队列的开头再次请求  
                responseFailTimes++;   //网络反馈失败次数加1
                currentRequestResponse = true;  //当前网络请求已响应
                requestQueue();   //进行下一个请求
                requestResult();  // 所有网络请求结果
            })
    }
    
    /** 根据网络反馈次数判断当前所有网络请求的结果 */
    function requestResult(){
        console.log(`网络请求次数:${requestTimes}`,`失败:${responseFailTimes}`,`成功:${responseSuccessTimes}`);
        if(responseSuccessTimes == requestTimes){  //请求次数和成功次数相等
            initAfterRequest();
            console.log('网络请求全部成功');      
        }else if(responseFailTimes == requestTimes){  //请求次数和失败次数相等
            initAfterRequest();
            console.log('网络请求全部失败');
        }else if(responseSuccessTimes + responseFailTimes == requestTimes){  //请求次数和反馈次数相等
            initAfterRequest();
            console.log('网络请求部分成功');
        }
    }
    
    /** 所有网络请求得到结果后重置次数 */
    function initAfterRequest(){
        requestTimes = 0;
        responseFailTimes = 0;
        responseSuccessTimes = 0;
    }
    
    //手动创建网络请求队列
    for(let i = 0; i < 10; i++){ 
    	//请求参数 
        let params = {
            userId:'abcd123456',  //用户id
            data:i    //当前的数据,这里用i来模拟,实际项目请求的参数更复杂
        }
        currentRequestQueue.push(params);
    }
    
    requestQueue(currentRequestQueue);  //调用网络请求队列
    

    上述网络请求使用的是fetch请求,也可以更换成Ajax请求的方式。

    扩展:

    1. 当网络队列请求次数过多,需要过多时间来处理时,为提升用户体验,可以设置一个按钮让用户手动停止网络请求队列

    实现方法:首先定义一个变量isStopRequest ,然后在requestQueue里判断此变量即可

    let isStopRequest = false;  //是否手动停止网络请求队列
    function requestQueue(){    
           if(isStopRequest){
       	    console.log('手动停止网络请求队列');
               return;
           }
           if(this.currentRequestQueue.length > 0 && this.currentRequestResponse){
               this.updateData(this.currentRequestQueue.shift());
           }
    }
    
    1. 当网络状况不好时,请求失败次数超过3次时,自动停止网络请求队列

    实现方法:在requestQueue里判断变量responseFailTimes即可

    function requestQueue(){    
           //当请求失败超过3次后,根据实际也可以定义其他基准次数,表明当前网络状况不好,这时可以停止网络请求队列
           if(responseFailTimes >= 3){   
       	   console.log('网络请求队列因网络失败次数过多自动停止');
              return;
           }
           if(this.currentRequestQueue.length > 0 && this.currentRequestResponse){
               this.updateData(this.currentRequestQueue.shift());
           }
    }
    
    1. 如果对于数据比较敏感或者对数据的完整性要求较高,当停止网络请求队列时可以将当前还未发送的网络请求队列保存在本地缓存,当用户再次触发时读取缓存并将上次未发送的请求加入到网络请求队列。

    2. 当网络意外断开时,当前的网络请求会立即返回失败的回调,并将当前网络请求队列保存到缓存中。而当前的请求其实已经成功发送到服务器上,只是客户端还没有收到请求成功返回的数据,当下次再次读取缓存并将上次未发送的请求加入到网络请求队列,就会导致比实际上多了一个请求,造成数据错误。

    解决办法:

    • 客户端:在生成网络请求的时候加入当前请求的时间戳,代码如下:
    for(let i = 0; i < 10; i++){  
          let params = {
               userId:'abcd123456', 
               data:i ,
               synchronized_time:String(new Date().getTime())  //时间戳字符串
           }
           currentRequestQueue.push(params);
    }
    
    • 服务器端:判断当前请求的时间戳跟上次请求的时间戳是否一样,如果一样,返回请求成功,但此次请求改变的数据不更新到数据库中。
    展开全文
  • Android网络请求队列

    2015-11-02 17:32:27
    博客 http://blog.csdn.net/chuwe1/article/details/49589305 源码,实现安卓网络请求队列,类似新浪微博效果
  • WeatherApp:请求队列
  • 博客源码,Android平台实现网络请求队列
  • 请求队列request_queue

    千次阅读 2017-06-08 22:58:13
    请求队列request_queue请求队列是由一个大的数据结构request_queue表示的。struct request_queue { struct list_head queue_head; //待处理请求的链表,请求队列中的请求用链表组织在一起 struct request *last_...

    请求队列request_queue

    请求队列是由一个大的数据结构request_queue表示的。

    struct request_queue {
        struct list_head    queue_head; //待处理请求的链表,请求队列中的请求用链表组织在一起
        struct request      *last_merge; //指向队列中首先可能合并的请求描述符
        struct elevator_queue   *elevator;//指向elevator对象的指针(电梯算法)
        struct request_list root_rl;///为分配请求描述符所使用的数据结构
    
        request_fn_proc     *request_fn;//实现驱动程序的策略例程入口点的方法,由他处理队列中请求
        make_request_fn     *make_request_fn;//将一个新请求插入请求队列时调用的方法
        prep_rq_fn      *prep_rq_fn; //该方法把这个处理请求的命令发送给硬件设备
    
        softirq_done_fn     *softirq_done_fn;
        rq_timed_out_fn     *rq_timed_out_fn;
    
        sector_t        end_sector;
        struct request      *boundary_rq;
        struct delayed_work delay_work;
    
        struct backing_dev_info backing_dev_info;
    
        /*
         * The queue owner gets to use this for whatever they like.
         * ll_rw_blk doesn't touch it.
         */
        void            *queuedata;
        spinlock_t      __queue_lock; //请求队列锁
        spinlock_t      *queue_lock; //指向请求队列锁的指针
    
    
        unsigned long       nr_requests;/* 请求队列中允许的最大请求数 */
    
        struct queue_limits limits;//队列的其他限制
    };

    该结构是串联整个块设备驱动的核心,下面列举几个特别重要的元素:

    1. request_fn_proc *request_fn; 它是实现驱动程序的策略例程入口点的方法,由他处理队列中请求,我们所写的块设备驱动程序的核心就是它的请求处理函数(request_fn)。
    2. make_request_fn *make_request_fn;将一个新请求插入请求队列时调用的方法,块设备的io调度程序主要是在该函数内完成。
    3. struct elevator_queue *elevator;指向elevator对象的指针(电梯算法),决定了io调度层使用的io调度算法。

    请求队列是一个双向链表,其元素就是请求描述符(也就是request数据结构)。请求队列描述符中的queue_head字段存放链表的头(第一个伪元素),而请求描述符中queuelist字段的指针把任一请求链接到链表的前一个和后一个元素之间。

    队列链表中元素的排序方式对每个块设备驱动程序是特定的;然而,I/O调度程序提供了几种预先确定好的元素排序方式,牵涉到“I/O调度算法”的概念。

    我们通过一个图把前面的知识串联起来:
    这里写图片描述

    展开全文
  • 基于jquery的Ajax请求队列 用于处理优先ajax与一般ajax请求 用法 var handler = AjaxQueue.setup(), // 实例化队列对象 priority = 0; // 优先级为0是普通请求,1为高级请求。 高级请求会优先发送 // 向队列中添加...
  • 1.请求队列简单介绍: InFlightRequest是client的请求队列。max.in.flight.requests.per.connection配置请求队列大小,默认5,请求队列中存放的是在发送途中的请求,包括:正在发送的请求和已经发送的但还没有接收到...
  • Android网络开发 请求队列

    千次阅读 2015-11-02 17:14:53
    欢迎使用Markdown编辑器写博客本Markdown编辑器使用[StackEdit][6]修改而来,用它写博客,将会带来全新的体验哦:Android网络开发 请求队列文章出处:Android那些事儿的博客因为之前参与的网络开发项目都遇到一些...
  • Volley源码解析<四> RequestQueue请求队列@[Volley, 核心, RequestQueue]声明:转载请注明出处,知识有限,如有错误,请多多交流指正!Volley源码解析四 RequestQueue请求队列RequestQueue结构 RequestQueue类...
  • Scrapy默认爬虫引擎是scrapy.core.engine.ExecutionEngine,其中的_next_request函数负责从调度器的队列中取得下一个Request对象进行处理,处理完后会调用spider_is_idle函数检查爬虫的请求队列是否为空,下面是简略...
  • RequestQueue:表示请求队列,查看源码得知,里面包含一个CacheDispatcher(用于处理走缓存请求的调度线程)、NetworkDispatcher数组(用于处理走网络请求的调度线程),一个ResponseDelivery(返回结果分发接口),通过 ...
  • 请求队列描述符

    千次阅读 2011-01-31 23:59:00
    1.4.4 请求队列描述符 make_request_fn方法属于块设备I/O调度层的内容,要继续往下走,需要介绍一下通用块层的体系架构,这里需要从磁盘和磁盘分区开始说起。磁盘是一个由通用块层处理的逻辑块设备,是块设备...
  • 在iOS网络编程中,我们经常会遇到线程的同步和异步问题,同时为了对异步请求更加精准丰富的控制,我们还常常在iOS中使用请求队列,下面就来谈谈iOS开发中同步、异步以及请求队列的使用方法。 1. 同步意为着线程阻塞...
  • scrapy请求队列

    千次阅读 2018-11-28 10:41:30
    Scrapy 如何获取 request 队列? 最近由于在数据量比较大的抓取遇到了一些问题,就想看看运行时的request队列中的url有什么,可是google的很久也没找到获取request队列的api,我查了这么久得出以下结论:  1....
  • vue项目中使用axios,拦截器实现自动刷新token机制,拦截请求队列 https://blog.csdn.net/sinat_41695090/article/details/96305150
  • 实例:请求队列我们通过一个例子介绍一下请求队列使用,我们设计了一个应用,用户点击GO按钮从服务器同时下载两张图片显示在画面中。 我们直接看看主视图控制器ViewController.h代码如下:#import “ASIHTTPRequest...
  • RequestQueue 请求队列

    千次阅读 2014-09-05 20:02:41
    /*  NSURLRequest *request = [NSURLRequest requestWithURL:[NSURL URLWithString:@"https://api.douban.com/v2/book/search?q=harry&apikey=00862fc9947075ac01928f5cbd516104"]];  //操作队列  NSO
  • 通过请求队列的方式来缓解高并发抢购(初探)   一、背景  在移动互联网高速发展的时代,各种电商平台的抢购业务变得越来越火爆,抢购业务所带来的高并发问题值得我们去探索,主要涉及的方面包括处理和响应速度、...
  • AJAX 请求队列实现

    千次阅读 2016-09-22 15:36:22
    AJAX在使用的过程中会遇到一个问题,当用户短时间内执行了多个异步请求的时候,如果前一个请求没完成,将会被取消执行最新的一个请求,大多数情况下,不会有什么影响,例如请求了一个新的列表,旧的请求也就没什么...
  • 1.将Volley框架请求队列的单例模式封装成SingleVolleyRequestQueue.java:public class SingleVolleyRequestQueue { //私有化属性 private static SingleVolleyRequestQueue singleQueue; private RequestQueue ...
  • 前言最近遇到一个问题,我1个站点链接2个后端服务,但1个后端服务有问题,导致访问超时,但请求接口都是分开的。自认为一个服务站点请求超时,不会影响到另外一个请求的,但不是。全部请求都发不出去。为什么呢?...
  • 在[Android]Volley源码分析(一)概述中,我们看到了Volley通过调研RequestQueue的start()方法启动了一个请求队列,下面我们通过代码看看具体是如何实现的:

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 490,388
精华内容 196,155
关键字:

请求队列