精华内容
参与话题
问答
  • 初识pipeline

    万次阅读 2016-08-04 19:56:05
    1、pipeline的产生  从一个现象说起,有一家咖啡吧生意特别好,每天来的客人络绎不绝,客人A来到柜台,客人B紧随其后,客人C排在客人B后面,客人D排在客人C后面,客人E排在客人D后面,一直排到店面门外。老板和三...

    1、pipeline的产生

        从一个现象说起,有一家咖啡吧生意特别好,每天来的客人络绎不绝,客人A来到柜台,客人B紧随其后,客人C排在客人B后面,客人D排在客人C后面,客人E排在客人D后面,一直排到店面门外。老板和三个员工首先为客人A准备食物:员工甲拿了一个干净的盘子,然后员工乙在盘子里装上薯条,员工丙再在盘子里放上豌豆,老板最后配上一杯饮料,完成对客人A的服务,送走客人A,下一位客人B开始被服务。然后员工甲又拿了一个干净的盘子,员工乙又装薯条,员工丙又放豌豆,老板又配上了一杯饮料,送走客人B,客人C开始被服务。一直重复下去。

     

        从效率方面观察这个现象,当服务客人A时,在员工甲拿了一个盘子后,员工甲一直处于空闲状态,直到送走客人A,客人B被服务。老板自然而然的就会想到如果每个人都不停的干活,就可以服务更多的客人,赚到更多的钱。老板通过不停的尝试想出了一个办法。以客户A,B为例阐述这个方法:员工甲为客户A准备好了盘子后,在员工乙开始为客户A装薯条的同时,员工甲开始为客户B准备托盘。这样员工甲就可以不停的进行生产。整个过程如下图,客户们围着咖啡吧台排队,因为有四个生产者,一个老板加三个员工,所以可以同时服务四个客户。我们将目光转向老板,单位时间从他那里出去的客户数提高了将近四倍,也就是说效率提高将近四倍。

        pipeline的概念可以从这里抽象出来:将一件需要重复做的事情(这里指为客户准备一份精美的食物)切割成各个不同的阶段(这里是四个阶段:盘子,薯条,豌豆,饮料),每一个阶段由独立的单元负责(四个生产者分别负责不同的环节)。所有待执行的对象依次进入作业队列(这里是所有的客户排好队依次进入服务,除了开始和结尾的一段时间,任意时刻,四个客户被同时服务)。对应到CPU中,每一条指令的执行过程可以切割成:fetch instruction、decode it、find operand、perform action、store result 5个阶段。

    2、将pipeline应用到CPU的计算单元中

        在未将pipeline应用到CPU之前,假如一个计算单元耗时300ps,将结果写入到寄存器耗时20ps,那么一条指令的执行时间为320ps。吞吐量定义为单位时间内执行的指令的条数,一般其单位为GIPS(giga-instruction per second),那么其吞吐量为3.12 GIPS,也就是说每秒执行3.12 giga条指令,1 giga 个= 10^9 个。

        下面将pipeline应用到CPU,看计算单元的吞吐量会提高多少。我们将上图的组合逻辑单元切割成三个小的组合逻辑单元,每个组合逻辑单元耗时100ps,另外为了使前后组合逻辑单元的执行不相互影响,需要在每一对的小单元中间插入一个寄存器(对于这一点的理解,看完下面关于使用pipeline的CPU的运行过程就可以理解)如下图所示:

        运行原理:首先这里非常值得指出的是,这里对寄存器的模型表示有些不细腻,因为从上图中并不能看出每个寄存器由输入,状态,和输出三个小单元组成。对于I1,I2,I3三条指令,当时钟迎来第一个上升沿时,I1首先进入组合逻辑A(如果这里不理解时钟,暂且忽略,下面会讲解),经过100ps后将结果花20ps写入到第一个寄存器的输入;当时钟迎来第二个上升沿时,更新第一个寄存器的状态和输出,即把I1指令经过组合逻辑A 后的结果更新到第一个寄存器以作为组合逻辑单元B的输入。与此同时,I2进入组合逻辑单元A,并在100ps后将结果花20ps写入到第一个寄存器的输入,这里注意,第一个寄存器的状态和输出并没有发生变化。这种机制保证了前后指令的互不干扰性。当时钟第三个上升沿来到时,I1进入逻辑单元C,I2进入逻辑单元B,I3开始进入逻辑单元A。

    下面我们来计算使用pipeline的计算单元的吞吐量,由于每个阶段都需要100ps+20ps=120ps的时间,我们可以选用使得系统吞吐量最大的周期为120ps的时钟1/120*1000=8.3 GIPS,即每秒钟执行8.3 giga条指令相比于未使用pipeline的3.12 GIPS,提高了2.67倍,大家可能有疑问为什么不是3倍,因为我们为了让前后指令互不影响插入了两个寄存器,所以达不到最大极限3。

    上面两幅图中的两幅b图是专门用来表示pipeline中各个时刻各个指令所处状态的pipeline diagram。

    3、决定计算单元速度的是pipeline而不是系统时钟的频率

        我们以第2部分为背景来阐述这个问题,三个阶段,每一阶段耗时120 ps,如果时钟周期高于120ps,那么将会出现寄存器值由于没有来得及更新导致的指令执行混乱的情况。对于更一般的情况,比如从左向右,三个计算单元的执行时间是(120+20)+(80+20)+(100+20)=360,那么时钟周期必须大于最大的单个组合逻辑单元的执行时间,否则就会出现阶段执行不完整的情况,即140ps,所以说决定计算单元速度的是pipeline,更精确的说是pipeline中的最大的组合逻辑单元的执行时间。对于如何将计算单元切割成更小的执行时间几乎相同的阶段,对硬件设计者来说,是一个挑战。

    4、delay slot

        在上面的讨论中我们都假设连续的指令间并没有依赖关系,现在引入指令间的依赖关系。依赖关系可以分为两种:data dependency, control dependency。

    对于data dependency,我们用下面的指令序列作为例子

        图中的小圆圈加箭头表示了这种依赖关系,比如第二条指令的执行需要用到第一条指令的结果,所以第二条指令必须推迟进入pipeline的时间,称为load/store delay slot,以获得eax更新后的值,2条与第3条的数据依赖关系同理。

        对于control dependency,我们用下面的指令序列作为例子

        第3条指令为跳转指令,第4条指令是否执行依赖于第三条指令的结果,即是否跳转,所以第四条指令必须延迟进入pipeline的时间,称为branch delay slot。

    5、 参考资料

        《see mips run》

        《computer system: a programmer's perspective》p391-p400

    展开全文
  • 到底什么是Pipeline

    万次阅读 2020-05-05 17:13:10
    在各个领域,有一个词眼出现得越来越频繁,即Pipeline。 开始接触的时候,百思不得其解,要么觉得作者在 用个 洋名字 在装 高大上, 其实,鲁迅先生说过一句话,太阳底下没有新鲜事 一切的一切,都是纸老虎, ...

    在各个领域,有一个词眼出现得越来越频繁,即 Pipeline。

    开始接触的时候,百思不得其解,觉得作者在 用个 洋名字 在装 高大上,

    其实,鲁迅先生说过一句话,太阳底下没有新鲜事

     

    一切的一切,都是纸老虎,

    Pipeline,你 土味一点 你把它 翻译成  一条龙服务

    专业一点,叫 它  综合解决方案,就行。

    算法或者大数据分析里的

    可重复使用,针对新的数据,直接输入数据,可以得到结果。

     

    一个典型的机器学习构建包含若干个过程

     1、源数据ETL
    2、数据预处理
    3、特征选取
    4、模型训练与验证
    以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果。因此,对以上多个步骤、进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效、易用。

    管道机制在机器学习算法中得以应用的根源在于,参数集在新数据集(比如测试集)上的重复使用。
     

    展开全文
  • 浅谈管道模型(Pipeline)

    万次阅读 多人点赞 2012-05-12 21:17:34
    本篇和大家谈谈一种通用的设计与处理模型——Pipeline(管道)。Pipeline简介Pipeline模型最早被使用在Unix操作系统中。据称,如果说Unix是计算机文明中最伟大的发明,那么,Unix下的Pipe管道就是跟随Unix所带来的另...

    本篇和大家谈谈一种通用的设计与处理模型——Pipeline(管道)。

    Pipeline简介

    Pipeline模型最早被使用在Unix操作系统中。据称,如果说Unix是计算机文明中最伟大的发明,那么,Unix下的Pipe管道就是跟随Unix所带来的另一个伟大的发明【1】。我认为管道的出现,所要解决的问题,还是软件设计中老生常谈的设计目标——高内聚,低耦合。它以一种“链式模型”来串接不同的程序或者不同的组件,让它们组成一条直线的工作流。这样给定一个完整的输入,经过各个组件的先后协同处理,得到唯一的最终输出。

    Pipeline模型的应用

    以下列举了,我熟悉或者有所了解的典型pipeline模型的应用。

    公司.net web程序员很多,那么首先就谈谈asp.net。一个http请求到达http服务器IIS之后,就是经过pipeline模型被处理的。参见下图:


    说明一下,这幅图,并没有下面的图形来得直观。它的侧重点在于展示管道中各个组件处理事件触发的时序图,而不是pipeline模型。但如果你思考以下,也能体会到其中“管道”的概念(注意右面循环图标,如果需要比划一下的话,就是一个U逆时针旋转90度的形状)。

    最后,我还是决定上个清晰点的图:


    上图可以请求到达IIS,经过HttpApplication工厂得到一个HttpApplication,创建一个HttpContext上下文,然后就会进入Http Pipeline。好了,这篇的目标并不是谈论http处理行为以及asp.net底层架构,所以到此为止。

     

    又一个大家熟悉的web container,特别是java web人员——Tomcat

    Tomcat接受请求之后,请求从被接受,被分发,被处理,到最后转变成http响应,会走如下的管道【2】:


    在《Tomcat系统架构与设计模式,第2部分: 设计模式分析》【3】中,你可以清晰地发现一个最为显而易见的设计模式——责任链模式(这是实现管道模型比较常用的一种设计模式)


    可见,pipeline模型几乎是大部分主流http server处理请求的通用模型。这种设计并不意外,因为pipeline模型特定的理念会让你感觉到它似乎就是为了处理请求而生的。事实上,它的应用原不止这些web 服务器架构。

    下面,给大家带来的另一个典型示例也是在web架构里广为人知的MVC模型的良好实践——Struts:


    我认为用这幅图来阐述Pipeline最为清晰,简洁。从上面这幅图中你能够看到对于pipeline模型的多处使用(单向、双向都有)。它也很好地展示了高内聚,低耦合的设计目标,展示了各个组件以类似“搭建积木”的形式来组合功能(见图中Interceptor),我最近有空也在读struts2的源码,如果你也有兴趣,可以看看这个专题

    最后一个示例了,公司Java服务器的开发人员,相信都会对Mina框架有所了解。下面是Mina的处理模型图:


    不再废话了,同样是pipeline的优秀实践。

    上面介绍了很多pipeline的优秀实践,他们并非来自同一个领域,有web端,有处理socket的等。但对于他们的一个归纳,可以是——优秀的服务端数据处理模型,我觉得公司在数据处理上比较频繁,这也是选择介绍pipeline模型的原因。

    Pipeline模型带来的启示

    其实,关于它的好处已经在上面各种优秀的实践中得以体现。但你还是应该能够从中去发现一些能够为我们所用的设计思想。我总结了我得一些观点,欢迎各位拍砖:

    (1)    工作流的参考模型

    上面的各个模型图很难让我不把pipeline模型与工作流模型联想到一块儿去。他们都是链式的(或者说流程式的),就像一条生产线一样。各个组件的前后协同,会让你联想到生产流水线上得工人处理流过自己的产品环节。事实上,我在去年年末的时候在云方圆徐工基础任务流程里面曾经尝试使用了该模型,作为工作流模型

    (2)    服务framework的参考构建模型

    Pipeline模型的一个特点是,在其内部是以组建的模式来实现组合的(虽然这些组建有先后顺序之分),它假如你把侧重点放在它的链式组合上,而不是将侧重点放在上面的工作流上(工作流的关注点是流程的先后顺序),那么完全可以用这种方式来搭建一个复杂的服务,当然这样做的前提是,单个组件的粒度必须尽可能小并且耦合性极低。

    在这里我冒昧吐槽几句:

    在我的印象中,公司很多服务都喜欢采用WebService,即使不是Web Service也是Http GET请求。当然,这其中的很多情况是不得不采用它来和别的系统或者业务平台交互。但我一直坚持认为,只有在理论上你根本没有可能拿到那些数据时,你才会采用别人提供的服务,比如:股票行情、天气预报、各大开放平台(新浪、支付宝)的API的等。本公司之内的,原则上其实可以访问的某些数据,有时我们反而退而求其次选择采用Web Service这种模式。批量数据走http或者之上的协议(SOAP)在网络上传输,有时web系统还有可能发布在远端。想要性能从哪儿来?我了解你担心安全性,希望保持本业务平台数据库的独立性。告诉我,其实你也明白有些担心是没有意义的。我直接连你的库,只做一些查询会有什么问题?如果你真的比较谨慎的话,你也应该担心一下你的系统有被攻击的可能性,为什么你没有呢。甚至有人希望,某些相似的业务逻辑也把他抽象出来在dll外面包装成web service。如果真得是这样的话,我觉得“可复用的组件”这个词就没有必要存在了。

    Pipeline模型应用

    刚才谈到,我认为Pipeline模型带来的启示,我个人更看好第二点。我认为在NGP构建API的时候,这种模型也能够派上大用场。

    就拿Redis举个例子(在一些场景下):

    读取数据流程

    (1)    客户程序从Redis读取数据,如果读取到则返回

    (2)    如果没有读取到,则从数据库抓取数据

    (3)    从数据库抓取到的数据存储到Redis

    写入数据流程

    (1)    客户程序将数据写入Redis

    (2)    将数据写入数据库

    假如有一天,你不打算采用Redis,或者Redis服务全部不可用。你怎么让客户端自己能够“智能感知”,让这些巨大的后端变动对于客户端透明,并且不会产生调用异常?那么Pipeline模型,就可以派上用场。因为上面这些流程都是可配置的,而开放的API是唯一的。

    你是否会觉得普通的封装也能够实现上面的读取数据流程?没错,也可以。但我认为Pipeline模型带来的:流程式(有序)+可拆卸(配置),比普通的封装机动性更好。

    当然,这里只是选择了一个简单的场景来举个例子。

    Pipeline模型实现

    其实在上面那个Tomcat的设计模式截图中已经看到,实现该模型最通常的设计模式就是责任链模式,在上面工作流那篇文章中,也是采用责任链模式来实现,但我当时忽略了一个非常重要的东西——Context,这是串联整个Pipeline的重要前提。

    你找到一篇任何介绍责任链模式的文章,然后搭配淘宝的《基于管道模式的容器设计》【4】就基本能够完全了解Pipeline。

    Pipeline模式的缺点

    没有那种模式是完美的。Pipeline模式的缺点是,每次它对于一个输入(或者一次请求)都必须从链头开始遍历(参考Http Server处理请求就能明白),这确实存在一定的性能损耗。

    引用

    【1】:Unix Pipes 管道原稿

    【2】:Servlet工作原理解析

    【3】:Tomcat系统架构与设计模式,第 2 部分: 设计模式分析

    【4】:基于管道模式的容器设计


    展开全文
  • Pipeline并行处理模型

    千次阅读 2019-09-03 23:21:07
    文章目录前言Pipeline并行处理模式概要车厢模拟式的Pipeline并行处理模式 前言 在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是...

    前言


    在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是尽管说并行处理效率确实会比较高,但是它在处理拥有数据结果依赖关系的逻辑时,需要额外的同步管控。例如我的输出怎么临时被存放,然后被下游程序收到处理等等。倘若我们设计的并行处理程序能很好地解决,逻辑依赖关系,那么无疑并行处理的方式将会大大提速我们实际系统中的执行效率。本文笔者来聊聊其中一种被称为Pipeline(流水线)模式的并行处理作业模式,相信此并行处理模式在实际工作中还是有所应用场景的。

    Pipeline并行处理模式概要


    首先一个问题,什么叫做Pipeline并行处理模式呢?Pipeline并行处理模式,首先它具有Pipeline属性,它有一条完整的依赖关系处理流程。比如一道工序总共分3个流程,A,B和C, 并且有严格的先后顺序执行要求。B流程的执行必须依赖A流程的执行结果,同样C流程需要依赖B的。其实这么来看,串行处理模式是天然适用于Pipeline处理模式的。

    不过本文我们要讨论的是并行处理Pipeline模式作业。如果按照惯常使用的多线程依赖同步的处理方法,主要有以下两种:

    • 将程序中间结果写出到第三方存储介质内,然后并行处理线程与此第三方存储进行数据交互,同步。
    • 程序中间结果保存到本地变量内,通过线程安全的做法将中间结果进行赋值输出,灵活一点的,还可以进行同步的控制。

    接下来笔者将要讨论的方法是第二种方法。

    车厢模拟式的Pipeline并行处理模式


    这里我们将用模拟车厢的方式来进行Pipeline work的执行,每个车厢代表一个执行单元,车厢具有Pipeline work的独有特征:

    • 车厢具有连接关系,每个车厢有它的前车厢和后车厢。
    • 头车厢无前车厢,它的处理无须依赖前车厢执行结果。
    • 尾车厢无后车厢,它的处理完毕即意味着总执行过程的结束。

    另外在每个车厢内,它还具有以下变量:

    • 多执行线程
    • 中间结果置换列表

    因此,基于车厢模型的Pipeline处理模式如下图所示:
    在这里插入图片描述
    上图中Middle Carriage(中部车厢)可能会有很多节,每个线程都有对应的中间结果输出列表。

    Pipeline并行处理模式代码实现


    此部分笔者来分享一段Pipeline并行处理模式的多线实现代码,引用自Hadoop社区JIRA的一个patch。

    在这个patch代码中,还是沿用了上节车厢的概念,另外它的整个过程可拆分为以下几个步骤:

    1. Pipeline work的定义以及Pipeline task的构建
    2. 根据Pipeline task,来构建车厢
    3. 车厢内部逻辑处理
      3.1) 头车厢(起始线程)处理过程
      3.2) 中部车(中部线程)厢处理过程
      3.3) 尾部车厢(末尾线程)处理过程

    针对上述子步骤,我们逐一来阐述。首先是Pipeline work的定义,此work的是每个车厢的线程的具体执行逻辑。

        PipelineWork header = new PipelineWork<Object, TestItem>() {
          int idx = 0;
    
          @Override
          public TestItem doWork(Object obj) throws IOException {
            if (idx < items.size()) {
              TestItem item = items.get(idx);
              item.setVal(item.getVal() * 2);
              if (exp && (idx == items.size() - 1)) {
                throw ioe;
              }
              idx++;
              
              LOG.info(Thread.currentThread().getName() +
                  ": Head worker produce item: " + item.getVal());
              return item;
            } else {
              LOG.info("Head worker finsihed produce item.");
              return null;
            }
          }
        };
        PipelineWork middle = new PipelineWork<TestItem, TestItem>() {
          @Override
          public TestItem doWork(TestItem item) {
            item.setVal(item.getVal() * 2);
            LOG.info(Thread.currentThread().getName() +
                ": Middle worker set value: " + item.getVal());
    
            return item;
          }
        };
        PipelineWork trailer = new PipelineWork<TestItem, Object>() {
          @Override
          public Object doWork(TestItem item) {
            item.setVal(item.getVal() * 2);
            LOG.info(Thread.currentThread().getName() +
                ": Trailer woker set value: " + item.getVal());
    
            return EMPTY;
          }
        };
    

    然后根据上面的work,构建Pipeline task,

        PipelineTask task = new PipelineTask(conf, "pipeline.testcarriage");
        task.appendWork(header, "header");
        task.appendWork(middle, "middle" + i);
        task.appendWork(trailer, "trailer");
        task.kickOff();
        task.join();
    

    在Pipeline的task的构建过程中,会进行车厢的组织,

      /**
       * Pipeline work的构建
       */
      public void kickOff() {
        for (int i = 0; i < pipeLine.size(); i++) {
          // 从pipeline中获取当前车厢
          Carriage ca = pipeLine.get(i);
          if (i == 0) {
        	// 第一个车厢为头车厢,没有前车厢,标记Header字段
            ca.setPrev(null);
            ca.setHeader();
          }
          if (i == pipeLine.size() - 1) {
        	// 如果是最后一节末尾车厢,则标记Trailer字段
            ca.setTrailer();
          }
          if (i > 0) {
        	// 中部车厢进行前车厢的引用赋值
            ca.setPrev(pipeLine.get(i - 1));
          }
          // 车厢衔接处理设置完毕,然后进行启动执行
          ca.kickOff();
        }
      }
    

    每节车厢在组织完毕后,就可以开始run起来了。

    对于每个车厢,它有如下的定义:

      private class Carriage<K, T> {
        private PipelineWork<K, T> work;
        private ArrayList<T>[] transferList;
        private ReentrantLock[] lock;
        private Throwable[] exception;
        // 每个线程的是否执行完毕的标识
        private boolean[] done;
        // 执行Carriage任务的线程数
        private int numThreads;
        private int blockingThreshold, transferThreshold, logThreshold;
        // 此车厢依赖的前车厢
        private Carriage prev;
        private volatile boolean shouldStop = false;
        final private List<T> EMPTY_LIST = new ArrayList<T>(0);
        // 是否是头车厢
        private boolean header = false;
        // 是否是末尾车厢
        private boolean trailer = false;
        // 执行当前车厢的线程组
        private Thread[] threads = null;
        private String name = null;
        ...
        
        public Carriage(PipelineWork<K, T> inWork, String caName) {
          name = caName;
          blockingThreshold = conf.getInt(getBlockingThresholdKey(name), 10000);
          transferThreshold = conf.getInt(getTransferThresholdKey(name), 100);
          logThreshold = conf.getInt(getLoggingThresholdKey(name), 10000000);
          numThreads = conf.getInt(getNumThreadsKey(name), 3);
    
          if (blockingThreshold < 0 || transferThreshold < 0
              || blockingThreshold < transferThreshold || logThreshold < 0
              || numThreads < 0) {
            throw new IllegalArgumentException("Illegal argument for PipelineTask "
                + name);
          }
          work = inWork;
          transferList = new ArrayList[numThreads];
          for (int i = 0; i < numThreads; i++) {
            transferList[i] = null;
          }
          lock = new ReentrantLock[numThreads];
          for (int i = 0; i < numThreads; i++) {
            lock[i] = new ReentrantLock();
          }
          exception = new Throwable[numThreads];
          for (int i = 0; i < numThreads; i++) {
            exception[i] = null;
          }
          done = new boolean[numThreads];
          for (int i = 0; i < numThreads; i++) {
            done[i] = false;
          }
        }
    
        public void setPrev(Carriage ca) {
          prev = ca;
          Preconditions.checkState(header == false);
        }
    
        public void stopCarriager() {
          shouldStop = true;
        }
    
        public void setHeader() {
          header = true;
          Preconditions.checkState(prev == null);
        }
    
        public void setTrailer() {
          trailer = true;
        }
    
        /**
         * 获取车厢的本地处理结果.
         */
        public List<T> getList() {
          int doneThread = 0;
          for (int i = 0; i < transferList.length; i++) {
            lock[i].lock();
            try {
              if (exception[i] != null) {
                return null;
              }
              // 如果发现i下标线程的中间结果列表不为空,则进行返回
              if (transferList[i] != null) {
                ArrayList<T> res = transferList[i];
                // 然后置空此下标值为空
                transferList[i] = null;
                return res;
              } else {
            	// 如果此下标置换列表为空,则判断是否为当前对应的线程已经处理完毕
                if (done[i]) {
                  // 如果是,则进行技术加一
                  doneThread++;
                }
              }
            } finally {
              lock[i].unlock();
            }
          }
    
          // 如果线程结束数量达到置换结果列表原始长度,则返回空,意为当前车厢数据已经完全处理完毕
          if (doneThread == transferList.length) {
            return null;
          } else {
        	// 否则,返回空list,意为暂时没有处理数据
            return EMPTY_LIST;
          }
        }
    
    

    然后就是各个车厢内的并行处理的核心执行逻辑,

       /**
         * 执行车厢内容的线程类
         */
        private class CarriageThread extends Thread {
          // 线程标识
          private int index;
    
          public CarriageThread(int i) {
            index = i;
            this.setName("PipelineTask-" + name + "-" + index);
          }
    
        @Override
        public void run() {
          try {
              Preconditions.checkState((prev != null) ^ header);
              ArrayList<T> localList = null;
              if (!trailer) {
                localList =
                    new ArrayList<T>((blockingThreshold > 0) ? blockingThreshold
                        : 0);
              }
              LOG.info("Carriage " + name + " thread " + index + " is scheduled");
              int itemHandled = 0;
              while (!shouldStop) {
                if (header) {
                  /** 1.头车厢的处理过程 **/
                  // 如果是头车厢,则没有前车厢依赖输入,直接执行
                  T t = work.doWork(null);
                  itemHandled++;
                  if (itemHandled == logThreshold) {
                    LOG.info("Handled " + itemHandled + " in " + name + " thread "
                        + index);
                    itemHandled = 0;
                  }
                  if (!trailer && t != null) {
                	// 将处理结果加入到当前本地结果列表中
                    localList.add(t);
                  } else if (t == null) {
                	// 如果处理结果为空,意为原始输入数据已经完全处理完毕,头车厢任务宣告全部完成
                    break;
                  }
                } else {
                   /** 2.中部车厢/尾车厢的处理过程 **/
                  // 非头车厢,意为当前为中间车厢或者尾车厢,先取出前车厢的处理结果
                  List<K> inputList = prev.getList();
                  if (inputList == null) {
                	// 如果前车厢的处理结果为空,意为前车厢数据已处理完毕,则当前车厢跳出当前循环
                	LOG.info("Break out.");
                    break;
                  } else if (inputList.isEmpty()) {
                	// 如果前车厢的处理结果为Empty,意为前车厢还有未处理的数据,不过还未将结果赋值到置换列表内
                	// 让出当前CPU给其它线程执行
                    Thread.yield();
                    continue;
                  }
    
                  for (K k : inputList) {
                	// 遍历前车厢的数据处理结果,得到新的处理结果
                    T t = work.doWork(k);
                    itemHandled++;
                    if (itemHandled == logThreshold) {
                      LOG.info("Handled " + itemHandled + " in " + name
                          + " thread " + index);
                      itemHandled = 0;
                    }
                    if (!trailer && t != null) {
                      // 将处理结果加入到当前车厢的本地结果列表内,将作为此车厢下节的依赖输入
                      // 后续同样是中间置换结果的赋值过程
                      localList.add(t);
                    }
                  }
                }
                if (trailer) {
                  // 如果是尾车厢,跳过后续本地输出结果的交换
                  continue;
                }
                
                /** 3.执行线程中间结果交换 **/
                // 如果当前本地结果超出阈值设定大小
                if (localList.size() >= transferThreshold) {
                  if (lock[index].tryLock()) {
                	// 而且之前对应位置交换列表为空,则进行赋值
                    if (transferList[index] == null) {
                      transferList[index] = localList;
                      lock[index].unlock();
                      // 交换完毕,重新置空本地结果列表
                      localList =
                          new ArrayList<T>(
                              (blockingThreshold > 0) ? blockingThreshold : 0);
                    } else {
                      lock[index].unlock();
                    }
                  }
                }
    
                // 本地结果列表达到阻塞阈值设定大小,阻塞意为当前本地输出的结果必须要赋值到置换列表内
                if (blockingThreshold > 0 && localList.size() >= blockingThreshold) {
                  lock[index].lock();
                  // 如果当前置换列表数据不为空,意为还没被下游车厢处理,则进行循环等待
                  while (transferList[index] != null) {
                    lock[index].unlock();
                    // 让出CPU给其它线程执行,然后再循环执行判断
                    Thread.yield();
                    lock[index].lock();
                  }
                  // 置换列表被下游车厢取走,则进行新的本地结果赋值
                  transferList[index] = localList;
                  lock[index].unlock();
                  // 交换完毕,重新置空本地结果列表
                  localList = new ArrayList<T>(blockingThreshold);
                }
              }
    
              /** 4.车厢末端中间结果赋值输出 **/
              // 循环执行结果后,将最后一批处理得到的本地输出结果进行输出赋值
              lock[index].lock();
              while (!trailer) {
                if (localList.size() > 0) {
                  // 本地输出结果有值,并且置换列表为空,则进行辅助
                  if (transferList[index] == null) {
                    transferList[index] = localList;
                    break;
                  } else {
                	// 不为空,让出当前CPU给其它线程处理,然后再等待下次的循环处理
                    lock[index].unlock();
                    Thread.yield();
                    lock[index].lock();
                    continue;
                  }
                } else {
                  break;
                }
              }
              
              // 本地输入结果处理完毕,标记当前线程处理完毕
              done[index] = true;
              if (prev != null) {
                exception[index] = prev.getException();
              }
              lock[index].unlock();
            } catch (Throwable t) {
              // 更新对应下标线程信息的变量
              LOG.warn("Exception in pipeline task ", t);
              lock[index].lock();
              exception[index] = t;
              lock[index].unlock();
              shouldStop = true;
            }
            LOG.info("Carriage " + name + " thread " + index + " is done");
          }
        }
    

    大家可以反复阅读上述的执行逻辑,设计还是比较巧妙的。

    以下是其中的测试结果输出:每节车厢内的执行逻辑可以并行地run起来,无须强同步的控制。

    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2884
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2888
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-2: Middle worker set value: 1704
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(56)) - PipelineTask-header-0: Head worker produce item: 1976
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 176
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 192
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 208
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 232
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(56)) - PipelineTask-header-0: Head worker produce item: 1992
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-0: Trailer woker set value: 16
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-2: Middle worker set value: 1720
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2892
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2896

    引用


    [1].https://issues.apache.org/jira/browse/HDFS-13700 . The process of loading image can be done in a pipeline model

    展开全文
  • pipeline

    千次阅读 2018-06-16 08:56:35
    一、回忆通信模型 二、流水线1.什么是流水线2.pipeline-Jedis实现 3.与原生M(mget,mset等)操作对比M操作是原子操作pipeline命令是非原子的,Redis服务器会对其命令集进行拆分。 三、使用建议...
  • Redis系列十:Pipeline详解

    万次阅读 2018-11-25 01:14:09
    1、pipeline出现的背景: redis客户端执行一条命令分4个过程: 发送命令-〉命令排队-〉命令执行-〉返回结果 这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如...
  • Pipeline(一)

    千次阅读 2019-03-05 17:20:02
    Pipeline 介绍 什么是Pipeline? Jenkins Pipeline是一套插件,支持将连续输送Pipeline实施和整合到Jenkins。Pipeline提供了一组可扩展的工具,用于通过PipelineDSL为代码创建简单到复杂的传送Pipeline。 通常,此...
  • pipeline 指令

    2019-06-07 23:51:43
    pipeline: 代表整条流水线,包含整条流水线的逻辑 stage部分: 阶段,代表流水线的阶段。每个阶段都必须有名称。比如build就是某阶段的名称 stages部分: 流水线多个stage的容器。stages部分至少包含一个stage steps...
  • pipeline离线安装

    千次阅读 2019-07-05 17:54:26
    1、本来在线安装分分钟的事情,但是没办法,公司服务器不能联网,只能离线安装。...3、有一些插件名字和Jenkins上面显示的不正确,比如Jenkins上面是pipeline,在插件里面叫:workflow-aggregator。这个...
  • 分布式缓存Redis之Pipeline(管道)

    万次阅读 多人点赞 2017-12-11 09:21:56
    写在前面  本学习教程所有示例代码见GitHub:https://github.com/selfconzrr/Redis_Learning  Redis的pipeline(管道)功能在命令行中没有,但redis是支持pipeline的,而且在各个语言版的client中都有相应的实现。...
  • 玩转Jenkins Pipeline

    万次阅读 多人点赞 2018-07-17 15:28:47
    Jenkins Pipeline的总体介绍 1.Jenkins Pipeline 的核心概念 Pipeline,简而言之,就是一套运行于Jenkins上的工作流框架,将原本独立运行于单个或者多个节点的任务连接起来,实现单个任务难以完成的复杂流程编排与...
  • redispipeline详解

    千次阅读 2019-07-13 15:29:15
    redis客户端执行一条命令分4个过程: 发送命令-〉命令排队-〉命令执行-〉返回结果 1 这个过程称为Round trip time(简称RTT, 往返时间),mget mset有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不...
  • Redis是基于TCP连接进行通信 Redis是使用客户端 - 服务器模型的TCP服务器,称为请求/响应协议。 这意味着通常一个请求是通过以下步骤完成的: 客户端向服务器发送查询,并通常以阻塞的方式从套接字读取服务器...
  • Redis Pipeline讲解

    千次阅读 2019-05-29 19:24:36
    一、pipeline出现的背景 Redis是一种基于客户端-服务端(CS)模型以及请求/响应协议的TCP服务,这意味着通常情况下一个请求会遵循以下步骤: 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式...
  • [redis]redis中的pipeline

    千次阅读 2018-11-01 20:18:45
    redis中的pipeline second60 20181101 1 单操作命令分析 单操作命令操作时间 = 1次网络往返 + 1次命令执行 假如一次get key,那么是单次操作   2 批量操作命令分析 如果我们要得到n个key, 如果循环调用get,...
  • 业务场景 最近项目中场景需要get一批...所以想到了redispipeline命令。 pipeline简介 非pipeline:client一个请求,redis server一个响应,期间client阻塞 Pipelineredis的管道命令,允许client将多个请求依次发...
  • Redis Pipeline(管道)

    2020-07-22 18:40:10
    Redispipeline(管道)功能在命令行中没有,但 redis 是支持 pipeline 的,而且在各个语言版的 client 中都有相应的实现。 由于网络开销延迟,就算 redis server 端有很强的处理能力,也会由于收到的 client 消息...
  • Java 使用PipelineRedis批量读写

    千次阅读 2018-04-25 16:56:54
    Redis是一种基于客户端-服务端模型以及请求/响应协议的TCP服务。 这意味着通常情况下一个请求会遵循以下步骤: 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。 服务端...
  • Spring集成redispipeline方式)

    千次阅读 2016-12-26 23:23:41
    1.maven添加spring、redis、log4j依赖 properties设置 &lt;properties&gt; &lt;project.build.sourceEncoding&gt;UTF-8&lt;/project.build.sourceEncoding&gt; &lt;spring....
  • 本文中的代码来自我正在写的分布式缓存框架(主要解决缓存使用中的各种痛点:缓存穿透\redis-cluster pipeline\注解使用等等)。 什么是pipeLine 为什么使用pipeLine ? 管道(pipeline)将客户端 client 与服务器...
  • 上一篇文章《redis pipeline批量处理提高性能》中我们讲到redis pipeline模式在批量数据处理上带来了很大的性能提升,我们先来回顾一下pipeline的原理,redis client与server之间采用的是请求应答的模式,如下所示:...
  • redis cluster使用pipeline

    2018-12-13 09:48:42
    redis cluster使用pipeline为什么cluster无法使用pipeline基于redisCluster整合pipeline设计思路代码实现 为什么cluster无法使用pipeline 主要是因为redis-cluster的hash分片。具体的redis命令,会根据key计算出一个...
  • Redis 管道pipeline

    千次阅读 2015-04-06 01:03:11
    管道技术最显著的优势是提高了redis服务...需要注意到是用pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并是不是打包的命令越多越好。
  • Redis编程实践【pipeline和事务】

    千次阅读 2016-11-06 16:04:53
     Redis或许已经在很多企业开始推广并试水,本文也根据个人的实践,简单描述一下Redis在实际开发过程中的使用(部署与架构,稍后介绍),程序执行环境为java + jedis,关于spring下如何集成redis-api,稍后介绍吧。...
  • 4.Redis Pipeline管道命令的使用 5.总结 本文源码地址:https://github.com/online-demo/redis-pipeline.git 第1节 Redis单条命令使用场景 Redis客户端连接到Redis服务端执行一条命令需要经历的步骤如下: 上...
  • Redis 事务和Pipeline--JAVA(系列文章三)

    千次阅读 2017-06-13 16:53:57
    来到Redis当中这里有事务吗,Redis是支持事务的。但是这个事务跟关系型数据库的传统事务不一样,在关系型数据库当中我们可以对出现错误的sql进行回滚,但是在redis是没有这一说的。 在Redis事务当中,所有操作都是...
  • redis性能优化(pipeline/lua)

    千次阅读 2018-10-10 12:37:00
    2019独角兽企业重金招聘Python工程师标准>>> ...
  • 今天小编就为大家分享一篇python使用pipeline批量读写redis的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • redis集群客户端JedisCluster优化 - 管道(pipeline)模式支持Redis在3.0版正式引入了集群这个特性,扩展变得非常简单。然而当你开心的升级到3.0后,却发现有些很好用的功能现在工作不了了, 比如我们今天要聊的...
  • Redis Pipeline 性能测试

    2020-08-07 17:58:54
    那么,框架组大佬丢来了一个解决方案,通过Redis Pipeline管道进行操作的话,会提升处理性能,说白了,就是节省请求-响应往返的时间(简称RTT) 揣着对Pipeline的好奇,以及在多大数据量的情况下的使用场景,于是,...

空空如也

1 2 3 4 5 ... 20
收藏数 784,740
精华内容 313,896
关键字:

pipeline