精华内容
下载资源
问答
  • 批处理与流处理详解

    千次阅读 2019-09-18 16:34:50
    大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了...

    大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了大规模扩展。

    本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据。数据的计算则是指从大量单一数据点中提取信息和见解的过程。

    下文将介绍这些框架:

    仅批处理框架:

    Apache Hadoop

    仅流处理框架:

    Apache Storm

    Apache Samza

    混合框架:

    Apache Spark

    Apache Flink

    批处理系统

    image

    批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。

    批处理模式中使用的数据集通常符合下列特征...

    • 有界:批处理数据集代表数据的有限集合

    • 持久:数据通常始终存储在某种类型的持久存储位置中

    • 大量:批处理操作通常是处理极为海量数据集的唯一方法

    批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。

    需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。

    大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。
    Apache Hadoop
    Apache Hadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。

    新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:

    HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。

    YARN:YARN是Yet Another Resource Negotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。

    MapReduce:MapReduce是Hadoop的原生批处理引擎。

    批处理模式
    Hadoop的处理功能来自MapReduce引擎。MapReduce的处理技术符合使用键值对的map、shuffle、reduce算法要求。基本处理过程包括:

    从HDFS文件系统读取数据集

    将数据集拆分成小块并分配给所有可用节点

    针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)

    重新分配中间态结果并按照键进行分组

    通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Reducing”

    将计算而来的最终结果重新写入 HDFS

    优势和局限
    由于这种方法严重依赖持久存储,每个任务需要多次执行读取和写入操作,因此速度相对较慢。但另一方面由于磁盘空间通常是服务器上最丰富的资源,这意味着MapReduce可以处理非常海量的数据集。同时也意味着相比其他类似技术,Hadoop的MapReduce通常可以在廉价硬件上运行,因为该技术并不需要将一切都存储在内存中。MapReduce具备极高的缩放潜力,生产环境中曾经出现过包含数万个节点的应用。

    MapReduce的学习曲线较为陡峭,虽然Hadoop生态系统的其他周边技术可以大幅降低这一问题的影响,但通过Hadoop集群快速实现某些应用时依然需要注意这个问题。

    围绕Hadoop已经形成了辽阔的生态系统,Hadoop集群本身也经常被用作其他软件的组成部件。很多其他处理框架和引擎通过与Hadoop集成也可以使用HDFS和YARN资源管理器。

    总结
    Apache Hadoop及其MapReduce处理引擎提供了一套久经考验的批处理模型,最适合处理对时间要求不高的非常大规模数据集。通过非常低成本的组件即可搭建完整功能的Hadoop集群,使得这一廉价且高效的处理技术可以灵活应用在很多案例中。与其他框架和引擎的兼容与集成能力使得Hadoop可以成为使用不同技术的多种工作负载处理平台的底层基础。

    流处理系统

    image

    流处理系统会对随时进入系统的数据进行计算。相比批处理模式,这是一种截然不同的处理方式。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作。

    流处理中的数据集是“无边界”的,这就产生了几个重要的影响:

    • 完整数据集只能代表截至目前已经进入到系统中的数据总量。

    • 工作数据集也许更相关,在特定时间只能代表某个单一数据项。

    • 处理工作是基于事件的,除非明确停止否则没有“尽头”。处理结果立刻可用,并会随着新数据的抵达继续更新。

    流处理系统可以处理几乎无限量的数据,但同一时间只能处理一条(真正的流处理)或很少量(微批处理,Micro-batch Processing)数据,不同记录间只维持最少量的状态。虽然大部分系统提供了用于维持某些状态的方法,但流处理主要针对副作用更少,更加功能性的处理(Functional processing)进行优化。

    功能性操作主要侧重于状态或副作用有限的离散步骤。针对同一个数据执行同一个操作会或略其他因素产生相同的结果,此类处理非常适合流处理,因为不同项的状态通常是某些困难、限制,以及某些情况下不需要的结果的结合体。因此虽然某些类型的状态管理通常是可行的,但这些框架通常在不具备状态管理机制时更简单也更高效。

    此类处理非常适合某些类型的工作负载。有近实时处理需求的任务很适合使用流处理模式。分析、服务器或应用程序错误日志,以及其他基于时间的衡量指标是最适合的类型,因为对这些领域的数据变化做出响应对于业务职能来说是极为关键的。流处理很适合用来处理必须对变动或峰值做出响应,并且关注一段时间内变化趋势的数据。

    Apache Storm

    Apache Storm是一种侧重于极低延迟的流处理框架,也许是要求近实时处理的工作负载的最佳选择。该技术可处理非常大量的数据,通过比其他解决方案更低的延迟提供结果。

    流处理模式

    Storm的流处理可对框架中名为Topology(拓扑)的DAG(Directed Acyclic Graph,有向无环图)进行编排。这些拓扑描述了当数据片段进入系统后,需要对每个传入的片段执行的不同转换或步骤。

    拓扑包含:

    • Stream:普通的数据流,这是一种会持续抵达系统的无边界数据。

    • Spout:位于拓扑边缘的数据流来源,例如可以是API或查询等,从这里可以产生待处理的数据。

    • Bolt:Bolt代表需要消耗流数据,对其应用操作,并将结果以流的形式进行输出的处理步骤。Bolt需要与每个Spout建立连接,随后相互连接以组成所有必要的处理。在拓扑的尾部,可以使用最终的Bolt输出作为相互连接的其他系统的输入。

    Storm背后的想法是使用上述组件定义大量小型的离散操作,随后将多个组件组成所需拓扑。默认情况下Storm提供了“至少一次”的处理保证,这意味着可以确保每条消息至少可以被处理一次,但某些情况下如果遇到失败可能会处理多次。Storm无法确保可以按照特定顺序处理消息。

    为了实现严格的一次处理,即有状态处理,可以使用一种名为Trident的抽象。严格来说不使用Trident的Storm通常可称之为Core Storm。Trident会对Storm的处理能力产生极大影响,会增加延迟,为处理提供状态,使用微批模式代替逐项处理的纯粹流处理模式。

    为避免这些问题,通常建议Storm用户尽可能使用Core Storm。然而也要注意,Trident对内容严格的一次处理保证在某些情况下也比较有用,例如系统无法智能地处理重复消息时。如果需要在项之间维持状态,例如想要计算一个小时内有多少用户点击了某个链接,此时Trident将是你唯一的选择。尽管不能充分发挥框架与生俱来的优势,但Trident提高了Storm的灵活性。

    Trident拓扑包含:

    • 流批(Stream batch):这是指流数据的微批,可通过分块提供批处理语义。

    • 操作(Operation):是指可以对数据执行的批处理过程。

    优势和局限

    目前来说Storm可能是近实时处理领域的最佳解决方案。该技术可以用极低延迟处理数据,可用于希望获得最低延迟的工作负载。如果处理速度直接影响用户体验,例如需要将处理结果直接提供给访客打开的网站页面,此时Storm将会是一个很好的选择。

    Storm与Trident配合使得用户可以用微批代替纯粹的流处理。虽然借此用户可以获得更大灵活性打造更符合要求的工具,但同时这种做法会削弱该技术相比其他解决方案最大的优势。话虽如此,但多一种流处理方式总是好的。

    Core Storm无法保证消息的处理顺序。Core Storm为消息提供了“至少一次”的处理保证,这意味着可以保证每条消息都能被处理,但也可能发生重复。Trident提供了严格的一次处理保证,可以在不同批之间提供顺序处理,但无法在一个批内部实现顺序处理。

    在互操作性方面,Storm可与Hadoop的YARN资源管理器进行集成,因此可以很方便地融入现有Hadoop部署。除了支持大部分处理框架,Storm还可支持多种语言,为用户的拓扑定义提供了更多选择。

    总结

    对于延迟需求很高的纯粹的流处理工作负载,Storm可能是最适合的技术。该技术可以保证每条消息都被处理,可配合多种编程语言使用。由于Storm无法进行批处理,如果需要这些能力可能还需要使用其他软件。如果对严格的一次处理保证有比较高的要求,此时可考虑使用Trident。不过这种情况下其他流处理框架也许更适合。

    Apache Samza

    Apache Samza是一种与Apache Kafka消息系统紧密绑定的流处理框架。虽然Kafka可用于很多流处理系统,但按照设计,Samza可以更好地发挥Kafka独特的架构优势和保障。该技术可通过Kafka提供容错、缓冲,以及状态存储。

    Samza可使用YARN作为资源管理器。这意味着默认情况下需要具备Hadoop集群(至少具备HDFS和YARN),但同时也意味着Samza可以直接使用YARN丰富的内建功能。

    流处理模式

    Samza依赖Kafka的语义定义流的处理方式。Kafka在处理数据时涉及下列概念:

    • Topic(话题):进入Kafka系统的每个数据流可称之为一个话题。话题基本上是一种可供消耗方订阅的,由相关信息组成的数据流。

    • Partition(分区):为了将一个话题分散至多个节点,Kafka会将传入的消息划分为多个分区。分区的划分将基于键(Key)进行,这样可以保证包含同一个键的每条消息可以划分至同一个分区。分区的顺序可获得保证。

    • Broker(代理):组成Kafka集群的每个节点也叫做代理。

    • Producer(生成方):任何向Kafka话题写入数据的组件可以叫做生成方。生成方可提供将话题划分为分区所需的键。

    • Consumer(消耗方):任何从Kafka读取话题的组件可叫做消耗方。消耗方需要负责维持有关自己分支的信息,这样即可在失败后知道哪些记录已经被处理过了。

    由于Kafka相当于永恒不变的日志,Samza也需要处理永恒不变的数据流。这意味着任何转换创建的新数据流都可被其他组件所使用,而不会对最初的数据流产生影响。

    优势和局限

    乍看之下,Samza对Kafka类查询系统的依赖似乎是一种限制,然而这也可以为系统提供一些独特的保证和功能,这些内容也是其他流处理系统不具备的。

    例如Kafka已经提供了可以通过低延迟方式访问的数据存储副本,此外还可以为每个数据分区提供非常易用且低成本的多订阅者模型。所有输出内容,包括中间态的结果都可写入到Kafka,并可被下游步骤独立使用。

    这种对Kafka的紧密依赖在很多方面类似于MapReduce引擎对HDFS的依赖。虽然在批处理的每个计算之间对HDFS的依赖导致了一些严重的性能问题,但也避免了流处理遇到的很多其他问题。

    Samza与Kafka之间紧密的关系使得处理步骤本身可以非常松散地耦合在一起。无需事先协调,即可在输出的任何步骤中增加任意数量的订阅者,对于有多个团队需要访问类似数据的组织,这一特性非常有用。多个团队可以全部订阅进入系统的数据话题,或任意订阅其他团队对数据进行过某些处理后创建的话题。这一切并不会对数据库等负载密集型基础架构造成额外的压力。

    直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。

    Samza可以使用以本地键值存储方式实现的容错检查点系统存储数据。这样Samza即可获得“至少一次”的交付保障,但面对由于数据可能多次交付造成的失败,该技术无法对汇总后状态(例如计数)提供精确恢复。

    Samza提供的高级抽象使其在很多方面比Storm等系统提供的基元(Primitive)更易于配合使用。目前Samza只支持JVM语言,这意味着它在语言支持方面不如Storm灵活。

    总结

    对于已经具备或易于实现Hadoop和Kafka的环境,Apache Samza是流处理工作负载一个很好的选择。Samza本身很适合有多个团队需要使用(但相互之间并不一定紧密协调)不同处理阶段的多个数据流的组织。Samza可大幅简化很多流处理工作,可实现低延迟的性能。如果部署需求与当前系统不兼容,也许并不适合使用,但如果需要极低延迟的处理,或对严格的一次处理语义有较高需求,此时依然适合考虑。

    混合处理系统:批处理和流处理

    image

    一些处理框架可同时处理批处理和流处理工作负载。这些框架可以用相同或相关的组件和API处理两种类型的数据,借此让不同的处理需求得以简化。

    如你所见,这一特性主要是由Spark和Flink实现的,下文将介绍这两种框架。实现这样的功能重点在于两种不同处理模式如何进行统一,以及要对固定和不固定数据集之间的关系进行何种假设。

    虽然侧重于某一种处理类型的项目会更好地满足具体用例的要求,但混合框架意在提供一种数据处理的通用解决方案。这种框架不仅可以提供处理数据所需的方法,而且提供了自己的集成项、库、工具,可胜任图形分析、机器学习、交互式查询等多种任务。

    Apache Spark

    Apache Spark是一种包含流处理能力的下一代批处理框架。与Hadoop的MapReduce引擎基于各种相同原则开发而来的Spark主要侧重于通过完善的内存计算和处理优化机制加快批处理工作负载的运行速度。

    Spark可作为独立集群部署(需要相应存储层的配合),或可与Hadoop集成并取代MapReduce引擎。

    批处理模式

    与MapReduce不同,Spark的数据处理工作全部在内存中进行,只在一开始将数据读入内存,以及将最终结果持久存储时需要与存储层交互。所有中间态的处理结果均存储在内存中。

    虽然内存中处理方式可大幅改善性能,Spark在处理与磁盘有关的任务时速度也有很大提升,因为通过提前对整个任务集进行分析可以实现更完善的整体式优化。为此Spark可创建代表所需执行的全部操作,需要操作的数据,以及操作和数据之间关系的Directed Acyclic Graph(有向无环图),即DAG,借此处理器可以对任务进行更智能的协调。

    为了实现内存中批计算,Spark会使用一种名为Resilient Distributed Dataset(弹性分布式数据集),即RDD的模型来处理数据。这是一种代表数据集,只位于内存中,永恒不变的结构。针对RDD执行的操作可生成新的RDD。每个RDD可通过世系(Lineage)回溯至父级RDD,并最终回溯至磁盘上的数据。Spark可通过RDD在无需将每个操作的结果写回磁盘的前提下实现容错。

    流处理模式

    流处理能力是由Spark Streaming实现的。Spark本身在设计上主要面向批处理工作负载,为了弥补引擎设计和流处理工作负载特征方面的差异,Spark实现了一种叫做微批(Micro-batch)*的概念。在具体策略方面该技术可以将数据流视作一系列非常小的“批”,借此即可通过批处理引擎的原生语义进行处理。

    Spark Streaming会以亚秒级增量对流进行缓冲,随后这些缓冲会作为小规模的固定数据集进行批处理。这种方式的实际效果非常好,但相比真正的流处理框架在性能方面依然存在不足。

    优势和局限

    使用Spark而非Hadoop MapReduce的主要原因是速度。在内存计算策略和先进的DAG调度等机制的帮助下,Spark可以用更快速度处理相同的数据集。

    Spark的另一个重要优势在于多样性。该产品可作为独立集群部署,或与现有Hadoop集群集成。该产品可运行批处理和流处理,运行一个集群即可处理不同类型的任务。

    除了引擎自身的能力外,围绕Spark还建立了包含各种库的生态系统,可为机器学习、交互式查询等任务提供更好的支持。相比MapReduce,Spark任务更是“众所周知”地易于编写,因此可大幅提高生产力。

    为流处理系统采用批处理的方法,需要对进入系统的数据进行缓冲。缓冲机制使得该技术可以处理非常大量的传入数据,提高整体吞吐率,但等待缓冲区清空也会导致延迟增高。这意味着Spark Streaming可能不适合处理对延迟有较高要求的工作负载。

    由于内存通常比磁盘空间更贵,因此相比基于磁盘的系统,Spark成本更高。然而处理速度的提升意味着可以更快速完成任务,在需要按照小时数为资源付费的环境中,这一特性通常可以抵消增加的成本。

    Spark内存计算这一设计的另一个后果是,如果部署在共享的集群中可能会遇到资源不足的问题。相比Hadoop MapReduce,Spark的资源消耗更大,可能会对需要在同一时间使用集群的其他任务产生影响。从本质来看,Spark更不适合与Hadoop堆栈的其他组件共存一处。

    总结

    Spark是多样化工作负载处理任务的最佳选择。Spark批处理能力以更高内存占用为代价提供了无与伦比的速度优势。对于重视吞吐率而非延迟的工作负载,则比较适合使用Spark Streaming作为流处理解决方案。

    Apache Flink

    Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。

    这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。

    流处理模型

    Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:

    • Stream(流)是指在系统中流转的,永恒不变的无边界数据集

    • Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能

    • Source(源)是指数据流进入系统的入口点

    • Sink(槽)是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器

    为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。

    此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。

    批处理模型

    Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。

    Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。

    另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。

    优势和局限

    Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。

    Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。

    Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。

    在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。

    Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。

    目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署。

    总结

    Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。

    展开全文
  • 大数据的流处理和批处理及其框架

    万次阅读 2019-07-09 19:15:25
    原文链接 ...在之前的文章中,我们曾经介绍过有关大数据系统的常规概念、处理过程,以及各种专门术语,本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,...

    原文链接

    简介

    大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了大规模扩展。

    在之前的文章中,我们曾经介绍过有关大数据系统的常规概念、处理过程,以及各种专门术语,本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据。数据的计算则是指从大量单一数据点中提取信息和见解的过程。
    下文将介绍这些框架:

    仅批处理框架:
        Apache Hadoop
    仅流处理框架:
        Apache Storm
        Apache Samza
    混合框架:
        Apache Spark
        Apache Flink
    

    大数据处理框架是什么?

    处理框架和处理引擎负责对数据系统中的数据进行计算。虽然“引擎”和“框架”之间的区别没有什么权威的定义,但大部分时候可以将前者定义为实际负责处理数据操作的组件,后者则可定义为承担类似作用的一系列组件。

    例如Apache Hadoop可以看作一种以MapReduce作为默认处理引擎的处理框架。引擎和框架通常可以相互替换或同时使用。例如另一个框架Apache Spark可以纳入Hadoop并取代MapReduce。组件之间的这种互操作性是大数据系统灵活性如此之高的原因之一。

    虽然负责处理生命周期内这一阶段数据的系统通常都很复杂,但从广义层面来看它们的目标是非常一致的:通过对数据执行操作提高理解能力,揭示出数据蕴含的模式,并针对复杂互动获得见解。

    为了简化这些组件的讨论,我们会通过不同处理框架的设计意图,按照所处理的数据状态对其进行分类。一些系统可以用批处理方式处理数据,一些系统可以用流方式处理连续不断流入系统的数据。此外还有一些系统可以同时处理这两类数据。

    在深入介绍不同实现的指标和结论之前,首先需要对不同处理类型的概念进行一个简单的介绍。

    一、批处理系统

    批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。

    批处理模式中使用的数据集通常符合下列特征…

    有界:批处理数据集代表数据的有限集合
    持久:数据通常始终存储在某种类型的持久存储位置中
    大量:批处理操作通常是处理极为海量数据集的唯一方法
    

    批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。

    需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。

    大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。

    Apache Hadoop

    Apache Hadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。

    新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:

    HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。
    YARN:YARN是Yet Another Resource Negotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。
    MapReduce:MapReduce是Hadoop的原生批处理引擎。
    

    批处理模式

    Hadoop的处理功能来自MapReduce引擎。MapReduce的处理技术符合使用键值对的map、shuffle、reduce算法要求。基本处理过程包括:

    从HDFS文件系统读取数据集
    将数据集拆分成小块并分配给所有可用节点
    针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)
    重新分配中间态结果并按照键进行分组
    通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Reducing”
    将计算而来的最终结果重新写入 HDFS
    

    优势和局限

    由于这种方法严重依赖持久存储,每个任务需要多次执行读取和写入操作,因此速度相对较慢。但另一方面由于磁盘空间通常是服务器上最丰富的资源,这意味着MapReduce可以处理非常海量的数据集。同时也意味着相比其他类似技术,Hadoop的MapReduce通常可以在廉价硬件上运行,因为该技术并不需要将一切都存储在内存中。MapReduce具备极高的缩放潜力,生产环境中曾经出现过包含数万个节点的应用。

    MapReduce的学习曲线较为陡峭,虽然Hadoop生态系统的其他周边技术可以大幅降低这一问题的影响,但通过Hadoop集群快速实现某些应用时依然需要注意这个问题。

    围绕Hadoop已经形成了辽阔的生态系统,Hadoop集群本身也经常被用作其他软件的组成部件。很多其他处理框架和引擎通过与Hadoop集成也可以使用HDFS和YARN资源管理器。

    总结

    Apache Hadoop及其MapReduce处理引擎提供了一套久经考验的批处理模型,最适合处理对时间要求不高的非常大规模数据集。通过非常低成本的组件即可搭建完整功能的Hadoop集群,使得这一廉价且高效的处理技术可以灵活应用在很多案例中。与其他框架和引擎的兼容与集成能力使得Hadoop可以成为使用不同技术的多种工作负载处理平台的底层基础。

    二、流处理系统

    流处理系统会对随时进入系统的数据进行计算。相比批处理模式,这是一种截然不同的处理方式。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作。

    流处理中的数据集是“无边界”的,这就产生了几个重要的影响:

    完整数据集只能代表截至目前已经进入到系统中的数据总量。
    工作数据集也许更相关,在特定时间只能代表某个单一数据项。
    处理工作是基于事件的,除非明确停止否则没有“尽头”。处理结果立刻可用,并会随着新数据的抵达继续更新。
    

    流处理系统可以处理几乎无限量的数据,但同一时间只能处理一条(真正的流处理)或很少量(微批处理,Micro-batch Processing)数据,不同记录间只维持最少量的状态。虽然大部分系统提供了用于维持某些状态的方法,但流处理主要针对副作用更少,更加功能性的处理(Functional processing)进行优化。

    功能性操作主要侧重于状态或副作用有限的离散步骤。针对同一个数据执行同一个操作会或略其他因素产生相同的结果,此类处理非常适合流处理,因为不同项的状态通常是某些困难、限制,以及某些情况下不需要的结果的结合体。因此虽然某些类型的状态管理通常是可行的,但这些框架通常在不具备状态管理机制时更简单也更高效。

    此类处理非常适合某些类型的工作负载。有近实时处理需求的任务很适合使用流处理模式。分析、服务器或应用程序错误日志,以及其他基于时间的衡量指标是最适合的类型,因为对这些领域的数据变化做出响应对于业务职能来说是极为关键的。流处理很适合用来处理必须对变动或峰值做出响应,并且关注一段时间内变化趋势的数据。

    Apache Storm

    Apache Storm是一种侧重于极低延迟的流处理框架,也许是要求近实时处理的工作负载的最佳选择。该技术可处理非常大量的数据,通过比其他解决方案更低的延迟提供结果。

    流处理模式

    Storm的流处理可对框架中名为Topology(拓扑)的DAG(Directed Acyclic Graph,有向无环图)进行编排。这些拓扑描述了当数据片段进入系统后,需要对每个传入的片段执行的不同转换或步骤。

    拓扑包含:

    Stream:普通的数据流,这是一种会持续抵达系统的无边界数据。
    Spout:位于拓扑边缘的数据流来源,例如可以是API或查询等,从这里可以产生待处理的数据。
    Bolt:Bolt代表需要消耗流数据,对其应用操作,并将结果以流的形式进行输出的处理步骤。Bolt需要与每个Spout建立连接,随后相互连接以组成所有必要的处理。在拓扑的尾部,可以使用最终的Bolt输出作为相互连接的其他系统的输入。
    

    Storm背后的想法是使用上述组件定义大量小型的离散操作,随后将多个组件组成所需拓扑。默认情况下Storm提供了“至少一次”的处理保证,这意味着可以确保每条消息至少可以被处理一次,但某些情况下如果遇到失败可能会处理多次。Storm无法确保可以按照特定顺序处理消息。

    为了实现严格的一次处理,即有状态处理,可以使用一种名为Trident的抽象。严格来说不使用Trident的Storm通常可称之为Core Storm。Trident会对Storm的处理能力产生极大影响,会增加延迟,为处理提供状态,使用微批模式代替逐项处理的纯粹流处理模式。

    为避免这些问题,通常建议Storm用户尽可能使用Core Storm。然而也要注意,Trident对内容严格的一次处理保证在某些情况下也比较有用,例如系统无法智能地处理重复消息时。如果需要在项之间维持状态,例如想要计算一个小时内有多少用户点击了某个链接,此时Trident将是你唯一的选择。尽管不能充分发挥框架与生俱来的优势,但Trident提高了Storm的灵活性。

    Trident拓扑包含:

    流批(Stream batch):这是指流数据的微批,可通过分块提供批处理语义。
    操作(Operation):是指可以对数据执行的批处理过程。
    

    优势和局限

    目前来说Storm可能是近实时处理领域的最佳解决方案。该技术可以用极低延迟处理数据,可用于希望获得最低延迟的工作负载。如果处理速度直接影响用户体验,例如需要将处理结果直接提供给访客打开的网站页面,此时Storm将会是一个很好的选择。

    Storm与Trident配合使得用户可以用微批代替纯粹的流处理。虽然借此用户可以获得更大灵活性打造更符合要求的工具,但同时这种做法会削弱该技术相比其他解决方案最大的优势。话虽如此,但多一种流处理方式总是好的。

    Core Storm无法保证消息的处理顺序。Core Storm为消息提供了“至少一次”的处理保证,这意味着可以保证每条消息都能被处理,但也可能发生重复。Trident提供了严格的一次处理保证,可以在不同批之间提供顺序处理,但无法在一个批内部实现顺序处理。

    在互操作性方面,Storm可与Hadoop的YARN资源管理器进行集成,因此可以很方便地融入现有Hadoop部署。除了支持大部分处理框架,Storm还可支持多种语言,为用户的拓扑定义提供了更多选择。

    总结

    对于延迟需求很高的纯粹的流处理工作负载,Storm可能是最适合的技术。该技术可以保证每条消息都被处理,可配合多种编程语言使用。由于Storm无法进行批处理,如果需要这些能力可能还需要使用其他软件。如果对严格的一次处理保证有比较高的要求,此时可考虑使用Trident。不过这种情况下其他流处理框架也许更适合。

    Apache Samza

    Apache Samza是一种与Apache Kafka消息系统紧密绑定的流处理框架。虽然Kafka可用于很多流处理系统,但按照设计,Samza可以更好地发挥Kafka独特的架构优势和保障。该技术可通过Kafka提供容错、缓冲,以及状态存储。

    Samza可使用YARN作为资源管理器。这意味着默认情况下需要具备Hadoop集群(至少具备HDFS和YARN),但同时也意味着Samza可以直接使用YARN丰富的内建功能。

    流处理模式

    Samza依赖Kafka的语义定义流的处理方式。Kafka在处理数据时涉及下列概念:

    Topic(话题):进入Kafka系统的每个数据流可称之为一个话题。话题基本上是一种可供消耗方订阅的,由相关信息组成的数据流。
    Partition(分区):为了将一个话题分散至多个节点,Kafka会将传入的消息划分为多个分区。分区的划分将基于键(Key)进行,这样可以保证包含同一个键的每条消息可以划分至同一个分区。分区的顺序可获得保证。
    Broker(代理):组成Kafka集群的每个节点也叫做代理。
    Producer(生成方):任何向Kafka话题写入数据的组件可以叫做生成方。生成方可提供将话题划分为分区所需的键。
    Consumer(消耗方):任何从Kafka读取话题的组件可叫做消耗方。消耗方需要负责维持有关自己分支的信息,这样即可在失败后知道哪些记录已经被处理过了。
    

    由于Kafka相当于永恒不变的日志,Samza也需要处理永恒不变的数据流。这意味着任何转换创建的新数据流都可被其他组件所使用,而不会对最初的数据流产生影响。

    优势和局限

    乍看之下,Samza对Kafka类查询系统的依赖似乎是一种限制,然而这也可以为系统提供一些独特的保证和功能,这些内容也是其他流处理系统不具备的。

    例如Kafka已经提供了可以通过低延迟方式访问的数据存储副本,此外还可以为每个数据分区提供非常易用且低成本的多订阅者模型。所有输出内容,包括中间态的结果都可写入到Kafka,并可被下游步骤独立使用。

    这种对Kafka的紧密依赖在很多方面类似于MapReduce引擎对HDFS的依赖。虽然在批处理的每个计算之间对HDFS的依赖导致了一些严重的性能问题,但也避免了流处理遇到的很多其他问题。

    Samza与Kafka之间紧密的关系使得处理步骤本身可以非常松散地耦合在一起。无需事先协调,即可在输出的任何步骤中增加任意数量的订阅者,对于有多个团队需要访问类似数据的组织,这一特性非常有用。多个团队可以全部订阅进入系统的数据话题,或任意订阅其他团队对数据进行过某些处理后创建的话题。这一切并不会对数据库等负载密集型基础架构造成额外的压力。

    直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。

    Samza可以使用以本地键值存储方式实现的容错检查点系统存储数据。这样Samza即可获得“至少一次”的交付保障,但面对由于数据可能多次交付造成的失败,该技术无法对汇总后状态(例如计数)提供精确恢复。

    Samza提供的高级抽象使其在很多方面比Storm等系统提供的基元(Primitive)更易于配合使用。目前Samza只支持JVM语言,这意味着它在语言支持方面不如Storm灵活。

    总结

    对于已经具备或易于实现Hadoop和Kafka的环境,Apache Samza是流处理工作负载一个很好的选择。Samza本身很适合有多个团队需要使用(但相互之间并不一定紧密协调)不同处理阶段的多个数据流的组织。Samza可大幅简化很多流处理工作,可实现低延迟的性能。如果部署需求与当前系统不兼容,也许并不适合使用,但如果需要极低延迟的处理,或对严格的一次处理语义有较高需求,此时依然适合考虑。

    三、混合处理系统:批处理和流处理

    一些处理框架可同时处理批处理和流处理工作负载。这些框架可以用相同或相关的组件和API处理两种类型的数据,借此让不同的处理需求得以简化。

    如你所见,这一特性主要是由Spark和Flink实现的,下文将介绍这两种框架。实现这样的功能重点在于两种不同处理模式如何进行统一,以及要对固定和不固定数据集之间的关系进行何种假设。

    虽然侧重于某一种处理类型的项目会更好地满足具体用例的要求,但混合框架意在提供一种数据处理的通用解决方案。这种框架不仅可以提供处理数据所需的方法,而且提供了自己的集成项、库、工具,可胜任图形分析、机器学习、交互式查询等多种任务。

    Apache Spark

    Apache Spark是一种包含流处理能力的下一代批处理框架。与Hadoop的MapReduce引擎基于各种相同原则开发而来的Spark主要侧重于通过完善的内存计算和处理优化机制加快批处理工作负载的运行速度。

    Spark可作为独立集群部署(需要相应存储层的配合),或可与Hadoop集成并取代MapReduce引擎。

    批处理模式

    与MapReduce不同,Spark的数据处理工作全部在内存中进行,只在一开始将数据读入内存,以及将最终结果持久存储时需要与存储层交互。所有中间态的处理结果均存储在内存中。

    虽然内存中处理方式可大幅改善性能,Spark在处理与磁盘有关的任务时速度也有很大提升,因为通过提前对整个任务集进行分析可以实现更完善的整体式优化。为此Spark可创建代表所需执行的全部操作,需要操作的数据,以及操作和数据之间关系的Directed Acyclic Graph(有向无环图),即DAG,借此处理器可以对任务进行更智能的协调。

    为了实现内存中批计算,Spark会使用一种名为Resilient Distributed Dataset(弹性分布式数据集),即RDD的模型来处理数据。这是一种代表数据集,只位于内存中,永恒不变的结构。针对RDD执行的操作可生成新的RDD。每个RDD可通过世系(Lineage)回溯至父级RDD,并最终回溯至磁盘上的数据。Spark可通过RDD在无需将每个操作的结果写回磁盘的前提下实现容错。

    流处理模式

    流处理能力是由Spark Streaming实现的。Spark本身在设计上主要面向批处理工作负载,为了弥补引擎设计和流处理工作负载特征方面的差异,Spark实现了一种叫做微批(Micro-batch)*的概念。在具体策略方面该技术可以将数据流视作一系列非常小的“批”,借此即可通过批处理引擎的原生语义进行处理。

    Spark Streaming会以亚秒级增量对流进行缓冲,随后这些缓冲会作为小规模的固定数据集进行批处理。这种方式的实际效果非常好,但相比真正的流处理框架在性能方面依然存在不足。

    优势和局限

    使用Spark而非Hadoop MapReduce的主要原因是速度。在内存计算策略和先进的DAG调度等机制的帮助下,Spark可以用更快速度处理相同的数据集。

    Spark的另一个重要优势在于多样性。该产品可作为独立集群部署,或与现有Hadoop集群集成。该产品可运行批处理和流处理,运行一个集群即可处理不同类型的任务。

    除了引擎自身的能力外,围绕Spark还建立了包含各种库的生态系统,可为机器学习、交互式查询等任务提供更好的支持。相比MapReduce,Spark任务更是“众所周知”地易于编写,因此可大幅提高生产力。

    为流处理系统采用批处理的方法,需要对进入系统的数据进行缓冲。缓冲机制使得该技术可以处理非常大量的传入数据,提高整体吞吐率,但等待缓冲区清空也会导致延迟增高。这意味着Spark Streaming可能不适合处理对延迟有较高要求的工作负载。

    由于内存通常比磁盘空间更贵,因此相比基于磁盘的系统,Spark成本更高。然而处理速度的提升意味着可以更快速完成任务,在需要按照小时数为资源付费的环境中,这一特性通常可以抵消增加的成本。

    Spark内存计算这一设计的另一个后果是,如果部署在共享的集群中可能会遇到资源不足的问题。相比Hadoop MapReduce,Spark的资源消耗更大,可能会对需要在同一时间使用集群的其他任务产生影响。从本质来看,Spark更不适合与Hadoop堆栈的其他组件共存一处。

    总结

    Spark是多样化工作负载处理任务的最佳选择。Spark批处理能力以更高内存占用为代价提供了无与伦比的速度优势。对于重视吞吐率而非延迟的工作负载,则比较适合使用Spark Streaming作为流处理解决方案。

    Apache Flink

    Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。

    这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。

    流处理模型

    Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:

    Stream(流):是指在系统中流转的,永恒不变的无边界数据集
    Operator(操作方):是指针对数据流执行操作以产生其他数据流的功能
    Source(源):是指数据流进入系统的入口点
    Sink(槽):是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器
    

    为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。

    此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。

    批处理模型

    Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。

    Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。

    另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。

    优势和局限

    Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。

    Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。

    Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。

    在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。

    Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。

    目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署。

    总结

    Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。

    结论

    大数据系统可使用多种处理技术。

    对于仅需要批处理的工作负载,如果对时间不敏感,比其他解决方案实现成本更低的Hadoop将会是一个好选择。

    对于仅需要流处理的工作负载,Storm可支持更广泛的语言并实现极低延迟的处理,但默认配置可能产生重复结果并且无法保证顺序。Samza与YARN和Kafka紧密集成可提供更大灵活性,更易用的多团队使用,以及更简单的复制和状态管理。

    对于混合型工作负载,Spark可提供高速批处理和微批处理模式的流处理。该技术的支持更完善,具备各种集成库和工具,可实现灵活的集成。Flink提供了真正的流处理并具备批处理能力,通过深度优化可运行针对其他平台编写的任务,提供低延迟的处理,但实际应用方面还为时过早。

    最适合的解决方案主要取决于待处理数据的状态,对处理所需时间的需求,以及希望得到的结果。具体是使用全功能解决方案或主要侧重于某种项目的解决方案,这个问题需要慎重权衡。随着逐渐成熟并被广泛接受,在评估任何新出现的创新型解决方案时都需要考虑类似的问题。

    展开全文
  • 流处理基本介绍

    万次阅读 多人点赞 2017-01-23 10:14:56
    1. 什么是流处理 一种被设计来处理无穷数据集的数据处理系统引擎 2. 流处理的几个概念 1. 无穷数据(Unbounded data):一种持续生成,本质上是无穷尽的数据集。它经常会被称为“流数据”。然而,用流和批次来...


    1.  什么是流处理

    一种被设计来处理无穷数据集的数据处理系统引擎

    2.  流处理的几个概念

    1.     无穷数据(Unbounded data):一种持续生成,本质上是无穷尽的数据集。它经常会被称为“流数据”。然而,用流和批次来定义数据集的时候就有问题了,因为如前所述,这就意味着用处理数据的引擎的类型来定义数据的类型。现实中,这两类数据的本质区别在于是否有限,因此用能体现出这个区别的词汇来定性数据就更好一些。因此我更倾向于用无穷数据来指代无限流数据集,用有穷数据来指代有限的批次数据。

    2.     无穷数据处理(Unbounded dataprocessing):一种发展中的数据处理模式,应用于前面所说的无穷数据类型。尽管我本人也喜欢使用流式计算来代表这种类型的数据处理方式,但是在本文这个环境里,这个说法是误导的。用批处理引擎循环运行来处理无穷数据这个方法在批处理系统刚开始构思的时候就出现了。相反的,设计完善的流计算系统则比批处理系统更能承担处理有穷数据的工作。因此,为了清晰明了,本文里我就只用无穷数据处理。

    3.     低延迟,近似和/或推测性结果(Low-latency,approximate,and/or speculative results):这些结果和流处理引擎经常关联在一起。批处理系统传统上不是设计来处理低延迟或推测性结果这个事实仅仅是一个历史产物,并无它意。当然,如果想,批处理引擎也完全能产生近似结果。因此就如其他的术语,最好是用这些术语是什么来描述这些结果,而不是用历史上它们是用什么东西(通过流计算引擎)产生的来描述。

    3.  流处理的六种方式

    3.1.  Event Sourcing

    Event Sourcing是由老马(Martinfowler)提出来的一种模式,可以解释为事件溯源,该模式从事件源开始保存应用程序状态,保证每次更改都可以被保存为事件序列,也就是说,它不仅保存时间的自身,还保存时间的所有状态变化,在有需要的时候,可以随时根据时间日志重建当时状态,即不仅能做到知道在哪里,也能做到知道如何到那里去的,它会完整的描述对象的整个生命周期中经历的所有事件。

    这方面典型的架构就是LMAX。

    3.2.  Reactive

    反应式编程,也称为响应式编程。顾名思义,就是根据用户的输入来做出响应。这种动作一般是实时的。

    根据响应式编程宣言所述,反应式变成一般具有如下几个特点:

    响应:响应性是系统高可用的基础,但更重要的是,响应性意味着可以快速检测问题并处理问题。响应系统专注于提供快速一致的响应时间,建立可靠的上限,以便提供一致的服务质量,从而简化错误处理,建立用户信息,并鼓励做出更进一步的交互。

    弹性:系统在面对失败时候,仍然保持响应。

    弹性:系统在负载变化的时候仍然保持响应。我们可以通过增加或者减少分配给这些服务的资源来对负载变化做出响应。这意味着整个设计没有竞争和单点瓶颈。

    消息驱动:响应式系统依靠异步消息在组件之间建立一个边界,确保组件之间的松耦合和隔离性。这个边界还提供了失败委托,负载均衡,弹性和流控等手段来保证系统的高可用性,这是响应式系统的一个必备特点。

    3.3.  CEP

    Complex event processing。复杂事件处理。

    事件即事物的状态信息变化,事物之间的作用的动作。复杂事件处理描述的就是系统如何持续地处理这些事件,对系统对变化的持续反应。不论是个体还是系统,都需要从大量的实践中过滤提取,按照既定的处理反应规则做处理。CEP主要依靠规则语言或者持续查询语言来完成事件的过滤、判断和处理。

    这类典型应用比如Esper,TIBCO,IBM Streams等。

    3.4.  Stream Processing

    这个流处理架构,从大数据量领域发展起来的实时数据处理模型,其主要强调分布式,高性能,高可靠性。目前主要有Storm,Flink,Spark Streaming等,这类介绍的比较多,这里就不详细介绍了。

    3.5.  Actors/SEDA

    SEDA(Staged event-driven architecture) 阶段事件驱动架构,也成为阶段是服务器模型,其主要是将复杂的,事件驱动的应用分解为一系列通过队列连接的阶段,从而避免线程的并发模型带来高负载问题,同时还可以达到解耦,负载均衡,分布式等特性。

    Actor模型包装了消息传输和封装机制,用户只需要面对消息和业务逻辑,因此天然就具有分布式、高并发、无状态的特性;对外提供简单编程接口。

    这类应用,典型的有Apache Gearpump(基于Akka),Kafka Streams等。

    3.6.  Change Capture

    变更数据捕获,捕捉数据库插入、更新和删除的动作并作出相应反应。

    目前很多数据库都提供这样的功能,能够将数据操作日志导出并可以由其他工具导入Kafka等系统中来做二次处理。比如Kafka,Mysql等都提供这样的功能。

    4.  流处理的发展目标

    但是Kafka Streams的作者提出了一个观点,个人非常赞同,

     

    So what did we learn?Lots. One of the key misconceptions we had was that stream processing would beused in a way sort of like a real-time MapReduce layer. What we eventually cameto realize, though, was that the most compelling applications for streamprocessing are actually pretty different from what you would typically do witha Hive or Spark job—they are closer to being a kind of asynchronousmicroservice rather than being a faster version of a batch analytics job.

    What do I mean by this? What Imean is that these stream processing apps were most often software thatimplemented core functions in the business rather than computing analyticsabout the business.

     

    翻译过来的核心观点就是:

    我们曾经有过的一个关键的错觉是以为流处理将会被以一种类似于实时的MapReduce层的方式使用。我们最终却发现,大部分对流处理有需求的应用实际上和我们通常使用Hive或者Spark job所做的事情有很大不同,这些应用更接近于一种异步的微服务,而不是批量分析任务的快速版本

    大部分流处理程序是用来实现核心的业务逻辑,而不是用于对业务进行分析

    流处理当前需要在如下几个方面进行进入以保证自己的核心竞争力。

    1.     强一致性:这保证流计算能和批处理平起平坐。

    本质上,准确性取决于存储的一致性。流计算系统需要一些类似于checkpoint的方法来保证长时间的持久化状态。几年前,当Spark刚刚出现在大数据领域的时候,它几乎就是照亮了流计算黑暗面的灯塔(因为Spark支持强一致)。在这之后,情况越来越好。但是还是有不少流计算系统被设计和开发成尽量不去支持强一致性。目前Flink,Kafka Streams也能够支持强一致性,这简直就是流处理的福音。

    再次强调一遍重点:强一致性必须是“只处理一次(exactly-onceprocessing)”,这样才能保证正确性。只有这样的系统才能追平并最终超越批处理系统。除非你对计算的结果是否正确并不介意,否则我还是请你放弃任何不能保证强一致性的流计算系统。现有的批处理系统都保证强一致性,不会让你在使用前去检查计算结果是否正确。所以也不要浪费你的时间在那些达不到这样标准的流计算系统上。

    2. 时间推理的工具:这一点让流计算超越批处理。

    在处理无穷的、无序的、事件—时间分布不均衡的数据时,好的时间推理工具对于流计算系统是极其重要的。现在越来越多的数据已经呈现出上面的这些特征,而现有的批处理系统(也包括几乎所有的流计算系统)都缺少必要的工具来应对这些特性带来的难题。

    3、弹性伸缩功能,即在保证Exactly-once 语义的情况下,流处理应用无需用户的介入也能自动修改并发数,实现应用的自动扩容和缩容。

    4、流上的SQL查询功能以及完整SQL支持,包含窗口,模式匹配等语法支持。


    5.  时间

    流处理系统中的时间分为两种:事件时间和处理时间。

    事件时间(Event Time): 事件发生的时间

    处理时间(Processing Time) 事件处理的时间。

    时间是流处理的基础,绝大部分场景都对时间又严格要求。(PS如果没有的话,那这个世界简直就太简单了。)

    Kafka Streams系统中就要求必须以包含时间,如果没有事件时间,就必须在保存的时候添加系统时间。

    我们能够基于时间来对数据做聚合,实现窗口功能,能够解决乱序问题,总之,时间是流处理最为重要的一个因素。

    6.  Lambda架构

    Lambda架构最初是由Storm的创始人NatanMarz在2011年提出的。在他的文章《How to beat the CAP theorem》中提出了Lambda架构,通过流和批的融合,实现快速的实时数据处理,或者说是让批来为流提供服务。

    一个典型的lambada的架构来自于下面的图。


    这在当时是一个影响力很大的架构,并且有很多产品是基于该架构的,但是随着技术的演进,在现在看来,还是有很大问题的。比如:

    1、  lambda架构要搭建部署和维护两套队列的集群,并且对结果做合并,这是十分麻烦的,并且可靠性也是相当差的。

    2、  数据冗余,数据要同时进入两套系统,存放两份。

    7.  乱序问题

    分布式的流处理系统,不可避免的会遇到数据的乱序问题,数据乱序就是指数据达到某一个节点的时候,已经不是按照原来发生的顺序了,期间可能有丢失,错乱等。

             乱序问题一般的处理方式是使用时间排序窗口,不论是系统时间驱动还是事件时间驱动。其实就是数据在进入节点之后,按照时间进行排序,然后等待一段时间,等待事件都已经到达之后再来进行下一步操作。如果无法确定事件都已经到达,或者是由部分时间一直没有达到,那么就等到窗口超时为止,然后计算结果。

             这个时候计算出来的结果,在有的系统里面已经作为最终结果,直接输出了,即使后续时间过了很长时间已经到达了,也是直接丢弃,不会影响最终结果。在另外一些系统中,比如google的MillWheel中,有水印(WaterMark)机制可以在事件最终到来之后,重新计算并刷新结果。

    8.  数据可靠性问题

    流上的可靠性一直是一个老大难问题,在和业界其他人交流的时候,也纷纷摇头,这个问题无解。目前流处理最广泛的应用还是做一些不怎么关注可靠性的计算。

    不过这个问题在2016年有了很大的突破。

    Intel在2016年终于发布了自己的内存快照技术,通过使用新的存储介质,能够达到内存一半的存取速度,这是一个很了不起的成就,已经基本可以商用了,在流计算领域,为了性能考虑,数据都是保存在内存中的,如果操作系统能够自动完成快照,那就很大程度上保证了数据的可靠性,流处理系统就可以完全不用关注这些什么状态,数据等信息,内存里面已经全部都有了。如果Intel能够做到增量式的内存快照,或者是快照速度和内存读取速度一致,甚至更快的时候,流处理系统的春天就会到来,哪个时候,流处理系统就会变得无比简单。

    在硬件得到突破的同时,软件方面也在不断的取得突破。

    Flink的流处理系统实现了CheckPoint功能,能够将窗口数据保存到内存当中,当流计算发生故障的时候,得到快速恢复,目前功能已经可用,不过性能还是会差一些,在百万TPS级别,快照速度在秒级,还是稍微有些慢,不过已经可用。

    Kafka Streams的推出,利用Kafka消息队列自身Offset的特性,再加上新开发的compact功能,成功实现了流和表的结果,并且也可以通过重放来实现可靠性,状态信息通过数据库保存,这也是流处理在可靠性方面的突破,利用外界第三方组件来实现自身的可靠性。

    流处理系统可靠性处理还有一种趋势就是数据库。数据库系统天然具有可靠性和事务性的特点,能够很好的适应金融等事务性和可靠性比较高的场景,唯一就是数据量和拓展性存在问题,但是随着分布式内存数据库等的发展,也许后面分布式内存数据库替代流处理系统也不是不可能的。但是无论怎么说,我们最求的是实时计算,而不是流处理系统,我们的目标是Fast Data。

    9.        流上的SQL

    以前,CEP和其他流处理平台上面的SQL是五花八门,各有各的特点,包括我们的StreamCQL,都属于类SQL的范畴,但是从现在来看,流上的SQL目前发展趋势已经很明显了,那就是兼容标准SQL,语义不能和标准SQL冲突,这个是第一步;然后就是做尽可能少的拓展,尽可能的利用其他数据库SQL中已有的语法,比如Oracle的Match  recognize等语法来实现模式匹配功能,这样用户最容易接受。窗口等特性,使用函数功能或者是复用SQL的 over语法即可。不支持的场景可以适当做少量拓展。总的来说,流式SQL还在拓展阶段,大家都在拼命抢地盘,想占领制高点,然后推广自己的语法。

     

    9.1.    流和表的关系

    流和表的关系是理解流上SQL的基础,也是最重要的

    流和表实际上是一体的。

    流很容易理解,就是一个管道,当有窗口存在的时候,数据才会发生汇聚。

    表就是我们通常理解的数据库的表。

    流上窗口中的数据,实际上就是一张表。同样的,当表上数据在不断发生变化的时候,这种changelog,就是流。

    这种观点现在已经基本成为流处理领域概念理解的标准。而且绝大部分数据库都支持的data capture,也是传统数据库为了和流处理相结合而做出的改变。Kafka Streams就包含data capture的工具,支持将数据从数据库中导入kafka成为一个流。

    9.2.    流和表的转换

    1)        流和计算结果是表,这一点在分组窗口上体现的特别明显。而且我们的使用习惯也和表一致。

    比如:

    我们统计最近10分钟各区域的用户点击次数,输出每个区域的总点击次数,那么一次性就会输出多行结果,每个地区一行记录,这个就是流上的聚合结果,也是流的中间状态;而为了可靠性,我们一般会将这些计算结果保存到数据库中,以便于故障时候的恢复。

    2)        表中每一次的数据变化,用change log体现就是流。

    3)        表和流之间的Join,实际上是窗口和表之间的Join

    10.         参考

    https://www.oreilly.com.cn/ideas/?p=18&from=timeline

    http://www.cnblogs.com/devos/p/5616086.html

    http://www.oreilly.com/data/free/stream-processing.csp

    http://www.martinfowler.com/eaaDev/EventSourcing.html

    http://www.reactivemanifesto.org/

    http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html

     

    展开全文
  • Kafka基础-流处理

    千次阅读 2018-10-26 17:10:20
    1. 什么是流处理? 首先,让我们说一下什么是数据流(也称为事件流)?它是无边界数据集的抽象说法,无边界意味着无限且不断增长,因为随着时间的推移,新数据会不断地到来。 除了无边界的特性之外,事件流模型...

    1. 什么是流处理?

    首先,让我们说一下什么是数据流(也称为事件流)?它是无边界数据集的抽象说法,无边界意味着无限且不断增长,因为随着时间的推移,新数据会不断地到来。

    除了无边界的特性之外,事件流模型还有其它几个属性:

    1.1 事件流是有序的

    这在交易事件里是最容易理解的,先在账户里存钱然后消费与先消费再还钱是非常不同的,后者将产生透支费用,而前者不能透支。这是事件流和数据库表之间的不同点之一:表中的记录始终被视为无序的(SQL的“order by”子句不是关系模型的一部分)。

    1.2 事件流的数据是不可变的

    事件一旦发生,就永远无法修改。例如取消一个交易事务,这个记录本身是不会被删除的,相反,会向流写入一个附加事件,记录之前事务的取消。这是事件流和数据库表之间的另一个不同点:我们可以删除或更新表中的数据。

    1.3 事件流是可重放的

    对于大多数业务应用程序来说,能够重放几个月前(甚至几年前)发生的原始事件流是至关重要的,这是为了分析修正错误或执行审计所必须的。

    2. 流处理的相关概念

    流处理和任何类型的数据处理非常相似,都是接收数据,对数据执行某些操作,例如转换、聚合等,然后把结果保存在某处。但是,有一些关键概念是流处理所特有的:

    2.1 时间

    时间可能是流处理中最重要的概念,因为大多数流式应用程序都在一定的时间内执行操作。例如,计算连续5分钟的股票价格平均值。流处理系统通常参考以下时间概念:

    2.1.1 消息创建时间

    这是事件被创建的时间。例如,进行测量的时间、出售商品的时间、用户在网站浏览页面的时间等等。在0.10.0及更高版本中,Kafka会在生产者创建消息时自动添加当前的时间。

    2.1.2 消息保存时间

    这是事件被保存在Kafka broker的时间。在0.10.0及更高版本中,Kafka broker可以配置自动为接收到的消息添加当前的时间。

    2.1.3 消息被处理时间

    这是流处理应用程序接收事件的时间,这时间可以是事件发生的几毫秒、几小时或者几天之后。

    2.2 状态

    通常我们是把状态保存在流处理应用程序本地的变量中,例如一个用于保存移动计数的简单哈希表。然而,在流处理应用程序中,这不是一个管理状态的可靠方法,因为当流处理应用程序停止时,状态将丢失。流处理有以下2种类型的状态:

    2.2.1 本地或内部状态

    只能由流处理应用程序的特定实例访问的状态,该状态通常是使用应用程序内运行的内存数据库来维护和管理。使用本地状态的优点是性能非常快,缺点是受限于可用的内存大小。因此,流处理的许多设计模式都是把数据分到多个子流来处理。

    2.2.2 外部状态

    使用外部数据存储维护的状态,例如是类似Cassandra的NoSQL数据库。使用外部状态的优点是几乎没有大小的限制,并且可以从应用程序的多个实例或甚至从不同的应用程序访问它;缺点是由于引入了外部组件导致额外的延时和复杂性。

    2.3 流-表二元性(Stream-Table Duality)

    数据流包含从一开始到现在的完整历史,它表示了过去和当前。它是一系列的事件,其中每个事件都会引起一个变化。而表包含的是某一时刻的状态,是许多变化的结果。从某种意义上可以说流和表是一个硬币的两面,世界总是在变化,有时我们会对引起这些变化的事件感兴趣,而有时我们却对当前的状态感兴趣。我们将这两者之间来回转换的内在关系称为“流-表二元性”。

    2.4 时间窗口

    大多数流的操作都是在一定的时间内执行 - 移动的平均值、本周最畅销商品等等。例如,当计算移动平均值时,我们需要知道:

    • 时间窗口的大小:我们需要计算每5分钟的平均值?还是每15分钟?还是每天?越大的时间窗口数据变化反应会越滞后,例如价格上涨需要更长时间才能注意到。
    • 时间窗口移动的频率(间隔):五分钟的平均值可以每分钟,每秒,或每次有新事件时更新。当间隔等于时间窗口的大小时被称为翻滚窗口(tumbling window);当时间窗口在每条记录上移动时被称为滑动窗口(sliding window)。
    • 时间窗口保持可更新的时间:例如我们五分钟移动平均值是计算00:00-00:05时间窗口的平均值。现在一小时后,我们得到了更多的事件,但它们的事件时间显示为00:02。我们是否应该更新00:00-00:05期间的平均值?还是忽略它们?理想情况下,我们需要定义一个特定的时间段,在此期间接收到的事件将被添加到各自的时间窗口中。

    时间窗口可以和时钟对齐,例如每分钟移动的一个5分钟窗口,第一个时间段是00:00-00:05,第二个是00:01-00:06。或者可以简单地从应用程序启动时开始计时,例如03:17-03:22。滑动窗口是永远不会和时钟对齐,因为只要有新记录就会移动。有关这两种时间窗口之间的区别,请参见下图:

    3. 流处理设计模式

    3.1 单事件处理

    流处理的最基本模式是单独处理每个事件,这也称为map/filter模式,因为它通常用于从流中过滤不必要的事件或转换每个事件。术语“map”是基于map/reduce模式而来,在map阶段转换事件,reduce阶段执行聚合。在此模式中,流处理应用程序从流中消费事件、修改每个事件,然后将事件写入到另一个流中。例如,一个应用程序从流中读取日志消息并将ERROR事件写入一个高优先级的流中,其余事件写入到一个低优先级的流中。另一个例子是把读取到的事件格式由JSON转换为Avro。这种模式可以通过一个简单的生产者和一个消费者处理,如下图:

    3.2 利用本地状态处理

    大多数流处理应用程序都用于聚合操作,特别是时间窗口的聚合。例如,查找出每天股票交易的最低和最高价格并计算其移动的平均值。此类聚合操作需要维护流的状态,例如需要保存当前时刻最低和最高的价格并和流中的每个新价格比较,然后更新它们,所有这些都可以使用本地状态来实现。我们使用Kafka的分区器来保证具有相同股票代码的事件都会被写入到相同的分区,然后,应用程序的每个消费者将从各自负责的分区读取相应的事件,这意味着应用程序的每个消费者都可以维护相应分区的股票子集的状态。如下图所示:

    当应用程序使用本地状态时,流处理应用程序会变得非常复杂。以下几个问题必须要注意:

    • 内存使用情况:本地状态使用的内存大小必须小于应用程序实例可用的内存大小。
    • 持久化:当应用程序实例停止时,我们需要确保状态不会丢失,并且当实例再次启动或由其它实例替换时可以恢复之前的状态。Kafka Streams在这方面处理地非常好-本地状态会被保存在嵌入的RocksDB的内存中,而且还会持久化到磁盘以便在重启后能够快速恢复。但同时也会把本地状态所有的改变都发送到Kafka的一个topic中。如果流的其中一个节点故障,本地状态不会丢失,因为可以容易地从Kafka的topic中重新读取并创建它。
    • 负载再均衡:分区有时会被重新分配给不同的消费者。当发生这种情况时,丢失分区的消费者必须保存最后的状态,而重新分配到的消费者必须知道恢复正确的状态。

    3.3 多阶段处理/重新分区

    如果你需要根据所有事件来计算结果,例如统计每天交易前十的股票,显然使用本地状态不足以实现,因为所有前十的股票可能位于不同的分区中。我们需要两个阶段来实现:首先分别对每个股票代码计算每日收益/损失,我们可以在每个实例中使用本地状态来实现。然后把结果都写到一个只有单分区的新topic里,使用1个消费者来计算前十的股票,如下图所示。但有时候也需要更多的阶段来处理结果。

    3.4 连接外部系统处理:流-表连接(Stream-Table Join)

    有时候流处理需要与外部系统整合,例如根据保存在数据库的一组规则来验证事务的合法性,或者使用用户画像来丰富用户的点击事件,如下图所示:

    但这种模式的问题是与外部系统额外的交互增加了明显的延时,通常在5-15毫秒之间。在许多情况下,这是不可行的。流处理系统通常每秒可处理100K-500K个事件,但数据库一般每秒只能处理10K个事件。为了提高性能,我们需要在流处理应用中缓存数据库的信息。但是管理这些缓存是极具挑战性的,例如怎样防止缓存中的信息变得陈旧?我们可以使用事件流来捕捉数据库表所有的更改,然后实时更新缓存。捕捉数据库更改输出到一个事件流称为CDC(change data capture),Kafka Connect工具有多个connectors可以实现CDC把数据库表转换为事件流。这允许你保存多一份数据库表的副本,并且只要有更改事件就会收到通知,以便相应地更新副本的数据,如下图所示:

    我们称这为流-表连接是因为其中的一个流表示本地缓存表的更改。

    3.5 流连接(Streaming Join)

    有时候你想连接两个事件流而不是一个流和一个表。当使用一个流来表示一个表,你可以忽略流中的大多数历史记录,因为你只关心当前的记录。但是当连接两个流时,你连接的可以是所有记录,例如尝试将一个流中的事件与另一个流中具有相同key并发生在同一时间窗口的事件进行匹配。这就是为什么流连接也被称为窗口连接(windowed-join)的原因。

    例如,假设有一个保存用户查询行为的流和一个对查询结果点击行为的流。我们想把查询和点击结果相匹配,以便知道哪个查询结果是最多人点击的。显然,我们可以根据查询条件来匹配查询结果,但只限定在特定的时间窗口内。我们假设查询结果的点击是在用户输入查询条件后几秒,所以我们在每个流上选取一个几秒钟的移动窗口,并匹配每个窗口的结果,如下图所示:

    查询和点击的流都是基于相同keys来分区的,这些keys同时也是连接keys。这样,来自user_id:42的所有点击事件都会在点击topic的分区5中,而user_id:42的所有查询事件都会在查询topic的分区5中。然后Kafka Streams确保这两个topics的分区5都会分配给同一个任务,因此,该任务可以读取user_id:42的所有相关事件。它是使用其嵌入的RocksDB缓存来实现这两个topics的窗口连接。

    3.6 乱序事件(Out-of-Sequence Events)

    处理在错误时间到达流的事件不仅是流处理的挑战,而且也是传统ETL系统的挑战。在IoT(物联网)场景中,无序事件经常发生并且预期会发生。例如,一个移动设备丢失WiFi信号几个小时,然后在重新连接时发送前几个小时的事件。流应用程序需要能够处理这些场景,这通常意味着应用程序必须执行以下操作:

    • 识别乱序事件,这需要应用程序检查事件的时间并与当前时间比较。
    • 定义一个时间段,在此期间将尝试排解乱序的事件,例如一个延时三小时的事件可以重新被处理,但超过此时间的将会被丢弃。
    • 具有排解乱序事件的功能,这是流应用和批处理任务的主要区别。如果有一个批处理任务,并且在任务完成后还接收到一些事件,我们通常可以重新执行之前的任务。但一个流处理却不能这样做,因为它是一个持续的处理,在任何时刻都需要处理新旧的事件。
    • 能够更新结果,例如把流处理的结果写入数据库。

    包括Google的Dataflow和Kafka Streams在内的多个流处理框架内置了对事件时间概念的支持,能够处理事件时间比当前处理时间旧或新的事件。这通常通过在本地状态下维护多个可用于更新的聚合窗口来完成,而且开发人员能够配置保持这些窗口的时间长短。当然,保持聚合窗口可用于更新的时间越长,维护本地状态所需的内存就越多。

    Kafka Streams的API始终把聚合结果写入到一个结果topic中。那些通常是精炼的topics,这意味着只保存每个key对应的最新值。如果由于一个乱序事件需要更新聚合结果,Kafka Streams将简单地为相应的聚合窗口写入一个新的结果(覆盖之前的)。

    3.7 重新处理

    这个模式一般有两种用例:

    • 我们有一个优化版本的流处理应用程序。我们希望在与旧版本相同的事件流上运行新版本的应用程序,生成新的事件流结果,但不会替换旧版本的结果,而是比较它们,客户端会使用新的结果。
    • 现有的流处理应用程序有bugs。我们修复了bugs之后希望重新处理事件流并重新计算结果。

    第一个用例很容易实现,因为Kafka会长时间把整个事件流存储在一个可伸缩存储中。这意味着拥有分别生成两个结果流的两个版本的流处理应用程序只需要以下步骤:

    • 将新版本的应用程序作为一个新的消费者组来使用。
    • 配置新版本的应用程序从topic的第一个offset开始处理所有事件。
    • 当新版本的应用程序处理的进度赶上时,让它继续处理并将客户端切换到新的结果流。

    第二个用例的实现具有挑战性,它需要“重置”现在应用程序从头开始重新处理,重置本地状态(因此我们不会混淆两个版本的结果),以及很可能清除之前的输出流。虽然Kafka Streams有一个重置流处理应用程序状态的工具,但建议只要有足够的容量来运行两个版本的应用程序并生成两个结果流,就尝试使用第一种方法。因为第一种方法更安全,它允许在多个版本之间来回切换和比较结果,并且在清理过程中不会有丢失数据或引起错误的风险。

    4. Kafka Streams例子

    Kafka有两类流APIs,low-level Processor API和high-level Streams DSL。后续的例子将会使用后者,DSL允许你在流处理应用程序中定义一系列的转换。转换可以是简单的filter或者是复杂的流连接。low-level的API允许你创建自己的转换,但这通常比较少见。使用DSL API的应用程序始终以StreamBuilder来创建处理的拓扑开始-应用于流中事件转换的有向无环图(DAG,Directed Acyclic Graph)。然后从拓扑中创建KafkaStreams的执行对象,启动KafkaStreams对象将启动多个线程,每个线程会把处理的拓扑应用到流中的事件。当关闭KafkaStreams对象时,处理会结束。

    4.1 Word Count

    创建流处理应用程序首先要做的是配置Kafka Streams,它有很多配置属性,但这里不做详细介绍。另外,你也可以通过向Properties对象添加生产者或消费者配置来配置嵌套在Kafka Streams中的生产者和消费者:

    public class WordCountExample {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            // 必须要有一个唯一的ID
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            // 使用默认的序列化和反序列化类
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    创建处理的拓扑:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream("TextLinesTopic");
    KStream<String, Long> counts = source
        // 转换为多个word
        .flatMapValues(textLines -> Arrays.asList(textLines.toLowerCase().split("\\W+")))
        // 转换为word-word键值对
        .map((key, word) -> new KeyValue<>(word, word))
        // 过滤the
        .filter((key, word) -> !word.equals("the"))
        // 按相同word分组
        .groupByKey()
        // 计算每个word的数量
        .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
        .toStream();
    // 把计算结果写到另外一个topic中,这里指定序列化和反序列化类
    counts.to("WordsWithCountsTopic", Produced.<String, Long>with(Serdes.String(), Serdes.Long()));

    启动流应用程序:

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

    4.2 股市统计

    本例会读取一系列股票市场的交易事件,包括股票代码,要价和要价数量。为了简单起见,本例忽略出价,也不会在数据中包含timestamp,而会依赖于Kafka生产者生成的事件时间。然后,我们将创建以下一些统计信息的输出流,所有统计数据将每秒更新一次。

    • 每五秒钟的最佳(最低)要价
    • 每五秒钟的交易数量
    • 每五秒钟的平均要价

    配置Kafka Streams和上例类似:

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());

    不同的是这里使用了自定义的反序列化类TradeSerde,它使用Google的Gson库来生成JSON的序列化与反序列化器,封装在WrapperSerde类里:

    static public final class TradeSerde extends WrapperSerde<Trade> {
        public TradeSerde() {
            super(new JsonSerializer<Trade>(), new JsonDeserializer<Trade>(Trade.class));
        }
    }

    创建TradeStats类用于统计计算:

    public class TradeStats {
    
        String type;
        String ticker;
        // tracking count and sum so we can later calculate avg price
        int countTrades;
        double sumPrice;
        double minPrice;
        double avgPrice;
    
        public TradeStats add(Trade trade) {
    
            if (trade.type == null || trade.ticker == null)
                throw new IllegalArgumentException("Invalid trade to aggregate: " + trade.toString());
    
            if (this.type == null)
                this.type = trade.type;
            if (this.ticker == null)
                this.ticker = trade.ticker;
    
            if (!this.type.equals(trade.type) || !this.ticker.equals(trade.ticker))
                throw new IllegalArgumentException("Aggregating stats for trade type " + this.type + " and ticker "
                        + this.ticker + " but recieved trade of type " + trade.type + " and ticker " + trade.ticker);
    
            if (countTrades == 0)
                this.minPrice = trade.price;
    
            this.countTrades = this.countTrades + 1;
            this.sumPrice = this.sumPrice + trade.price;
            this.minPrice = this.minPrice < trade.price ? this.minPrice : trade.price;
    
            return this;
        }
    
        public TradeStats computeAvgPrice() {
            this.avgPrice = this.sumPrice / this.countTrades;
            return this;
        }
    
    }

    然后就可以创建处理的拓扑:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Trade> source = builder.stream("stocks");
    KStream<Windowed<String>, TradeStats> stats = source
        // 按股票代码分组
        .groupByKey()
        // 创建5秒的时间窗口,每1秒移动一次
        .windowedBy(TimeWindows.of(5000).advanceBy(1000))
        // 对时间窗口内的Trade对象执行聚合,这里是简单的add到TradeStats对象里进行统计数量、最低价
        .<TradeStats>aggregate(() -> new TradeStats(), (k, v, tradestats) -> tradestats.add(v),
            Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
                .withValueSerde(new TradeStatsSerde()))
        .toStream()
        // 计算平均价格并返回包含统计信息的TradeStats对象
        .mapValues((trade) -> trade.computeAvgPrice());
    // 写到另外一个topic中
    stats.to("stockstats-output", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));

    最后启动流应用程序:

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

    完整代码可以参见:https://github.com/gwenshap/kafka-streams-stockstats

    4.3 丰富网站的点击事件

    本例会展示如何通过流连接来丰富网站的点击事件。我们将生成一个模拟用户点击的流、一个用于更新用户档案数据库的流和网页搜索的流。然后,我们将连接这三个流来获得每个用户行为的360度视图。例如,用户搜索了什么?点击了什么搜索结果?是否在用户档案中更新了“感兴趣的事物”?通过连接这些类型可以为分析提供丰富的数据,产品的推荐通常是基于这些信息。例如,用户搜索自行车,点击品牌“Trek”的搜索结果链接,并对旅行感兴趣,因此我们可以将Trek、头盔和适合自行车旅行相关的广告推送给该用户。

    以下是创建处理的拓扑:

    StreamsBuilder builder = new StreamsBuilder();
    // 分别创建二个流,一个是点击流,另外一个是搜索流
    KStream<Integer, PageView> views = builder.stream("clicks.pages.views",
        Consumed.with(Serdes.Integer(), new PageViewSerde()));
    KStream<Integer, Search> searches = builder.stream("clicks.search",
        Consumed.with(Serdes.Integer(), new SearchSerde()));
    // 创建缓存的用户档案表(这个是通过一个流来更新)
    KTable<Integer, UserProfile> profiles = builder.table("clicks.user.profile",
        Materialized.<Integer, UserProfile, KeyValueStore<Bytes, byte[]>>as("profile-store"));
    
    // 先连接点击流和用户档案
    KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles, (view, profile) -> {
        if (profile != null)
            return new UserActivity(profile.getUserID(), profile.getUserName(), profile.getZipcode(),
                profile.getInterests(), "", view.getPage());
        else
            return new UserActivity(-1, "", "", null, "", view.getPage());
    });
    
    // 用上面连接的结果再连接最后一个流
    KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches,
        (userActivity, search) -> {
            if (search != null)
                return userActivity.updateSearch(search.getSearchTerms());
            else
                return userActivity.updateSearch("");
        // 仅仅关联点击搜索后1秒的相关点击事件
        }, JoinWindows.of(1000), Joined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde()));

    完整代码可以参见:https://github.com/gwenshap/kafka-clickstream-enrich
        
    5. Kafka Streams架构概述

    上面的例子讲述了如何使用Kafka Streams API实现一些众所周知的流处理设计模式。但是为了更好地理解Kafka Streams实际是怎样工作和扩展,我们需要深入了解API背后的一些设计原则。

    5.1 创建拓扑

    每个流应用程序都实现并执行至少一个拓扑。拓扑(在其它流处理框架中也称为有向无环图DAG,Directed Acyclic Graph)是一系列的操作和转换,每个事件从输入流动到输出。下图是上面Word Count例子的拓扑图:

    即使是简单的处理流程也有拓扑图,它是由多个处理器组成,也就是拓扑图中的椭圆形节点。大多数处理器实现数据过滤、映射、聚合等操作。还有源处理器,它从一个topic读取数据并将其传递,和接收处理器,它从之前的处理器读取数据并发到另外一个topic。拓扑图始终从一个或多个源处理器开始,并以一个或多个接收处理器结束。

    5.2 扩展拓扑

    Kafka Streams通过在一个应用的实例中允许运行多个线程和在应用的分布实例之间支持负载均衡来进行扩展。Streams引擎通过切分任务来并行化一个拓扑的执行,任务的数量由Streams引擎决定,并取决于应用程序处理的topics的分区数。每个任务负责一部分分区,对每个读取的事件,任务将在最终将结果写入接收处理器之前按顺序执行适用于该分区的所有处理步骤。那些任务是Kafka Streams并行处理的基本单元,因为每个任务都可以独立于其它任务执行。如下图所示:

    应用程序的开发人员可以选择每个应用程序实例将执行的线程数。如果有多个线程可用,每个线程会执行应用程序创建任务的子集。如果应用程序的多个实例运行在多个服务器,则每个服务器的每个线程会执行不同的任务。如果想提高处理性能,就启动更多的线程;如果服务器资源不足,就增加服务器,这就是流应用程序扩展的方式。Kafka会自动协调工作,它将为每个任务分配相应的分区,并且每个任务都是独立处理事件和维护自己的本地状态。如下图所示:

    有时候处理步骤可能需要来自多个分区的结果,这会在任务之间创建依赖关系。例如,如果我们连接两个流,就像上述点击事件的例子,我们需要每个流中的数据才能得出结果。Kafka Streams通过分配所有需要的分区给同一个任务,以便该任务可以读取所有相关分区的事件并执行连接操作。这就是为什么Kafka Streams目前要求参与连接操作的所有topics具有相同数量的分区并根据连接key进行分区。

    任务之间依赖关系的另一个例子是当我们的应用程序需要重新分区时。例如在上述点击事件的例子中,所有事件都是使用用户ID作为key值,但如果我们想按每个页面或者按zip code生成统计信息,我们需要按页面或者zip code重新分区然后执行聚合。如果任务1处理分区1的数据,然后到达需要重新分区的处理器(groupBy操作),则需要重新移动数据。与其它流处理器框架不同的是,Kafka Streams通过使用新的keys和分区把事件写入一个新的topic。然后另一组的任务从这个新的topic读取事件并继续处理,这个重新分区的步骤把原来的拓扑结构分为两个有各自任务的子拓扑。第二组的任务依赖于第一组,因为它是处理来自第一个子拓扑的结果。但是,第一组和第二组的任务仍然可以独立和并行地运行,因为第一组的任务以它自己的速率把数据写入topic,而第二组的任务只是从这个topic读取事件并处理,它们之间没有通讯也没有共享资源,不需要运行在相同的线程或服务器上。这是Kafka其中的一个优点,也就是减少管道不同部分的依赖关系。如下图所示:

    5.3 处理故障

    Kafka是高可用的,因此我们保存在Kafka的数据也是高可用的。如果应用程序故障并需要重启,可以从Kafka的流中查找其故障前提交的最后offset并继续处理。但请注意,如果是本地状态存储丢失,例如需要替换服务器,流应用程序总是可以从存储在Kafka的更改日志重新创建本地状态。

    Kafka Streams还利用其消费者协调器来为任务提供高可用性。如果一个任务失败但流应用程序的线程或其它实例还处于活动状态,其中一个可用的线程会重新启动该任务。这与消费者组通过把分区分配给剩余消费者之一来处理组内其中一个消费者的故障类似。

    END O(∩_∩)O

    展开全文
  • 第1章 状态化流处理概述 传统数据处理 绝大多数企业所实现的传统架构都会将数据处理分为两类: 事务型处理 分析型处理 事务型处理 企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件、基于Web的...
  • 流处理和批处理框架的异同

    千次阅读 2019-01-23 19:00:00
    实现流处理系统有两种完全不同的方式:一种是称作原生流处理,意味着所有输入的记录一旦到达即会一个接着一个进行处理。  第二种称为微批处理。把输入的数据按照某种预先定义的时间间隔(典型的是几秒钟)...
  • 大数据篇:如何区分流处理和批处理 今天我们来讲讲大数据的处理模式:批处理(Batching Processing)和流处理(Streaming Processing)。 这几年大规模的物联网(IoT)数据监控系统和视频流系统等的大数据系统出现,...
  • 大数据之实时流处理常用框架

    万次阅读 2018-05-07 10:53:59
    实时流处理简单概述:实时是说整个流处理相应时间较短,流式技算是说数据是源源不断的,没有尽头的。实时流处理一般是将业务系统产生的数据进行实时收集,交由流处理框架进行数据清洗,统计,入库,并可以通过可视化...
  • [Flink基础]--什么是流处理

    千次阅读 2018-09-29 12:56:46
    什么是流处理? Data Artisans由ApacheFlink®的原始创建者创建,我们花了很长时间来解决流处理领域的问题。在这篇介绍性文章中,我们将提供有关流处理和Apache Flink适合的视角。要了解更多信息,您可以下载有关...
  • 如何区分批处理和流处理

    千次阅读 2019-05-12 22:43:58
    数据可以抽象成两种类型:无边界数据 和 有边界数据 无边界数据,是一种不断增长可以说是无限的数据...在处理大数据时,通常还关心时域问题:事件时间 和 处理时间 事件时间,数据实际产生的时间 处理时间,处理数据...
  • 以新的数据仓库平台为基础,结合行内的通用文件传输平台、统一调度平台,规范了源数据系统的数据报送,梳理构建了新的数据模型,大数据平台解决了传统数仓在批量数据处理能力的不足,在相关任务上体验到了从数小时到...
  • flink的批处理和流处理

    千次阅读 2019-04-17 10:40:28
    1.流处理系统 2.批处理系统 3.flink的流处理和批处理 4.flink的流处理和批处理代码的区别 流处理系统与批处理系统最大不同在于节点间的数据传输方式 1.流处理系统 流处理系统,其节点间数据传输的标准模型...
  • 而Flink专注的是无限流处理,那么他是怎么做到批处理的呢? 无限流处理:输入数据没有尽头;数据处理从当前或者过去的某一个时间 点开始,持续不停地进行 另一种处理形式叫作有限流处理,即从某一个时间点开始处理...
  • NiFi流处理引擎

    千次阅读 2017-10-24 19:26:10
    有特点的流处理引擎NiFi 流处理不止有flink、storm、spark streaming,今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi。 前面写了flink的文章,其实流处理不止有flink、storm、spark ...
  • Flink流处理入门和socket发送数据

    千次阅读 2019-08-02 17:41:01
    Flink流处理入门和socket发送数据Flink初步使用Flink流处理程序运行前提Socket发送数据 这块笔记是学习陈世敏老师的大规模数据和大数据系统分析时候做的大作业的一块内容,其中题目是Flink和Spark调研,研究大数据的...
  • Stream流处理list分组方法

    万次阅读 2019-05-25 17:36:06
    工作中经常碰到list去重的问题,现在我简单分享一下我的处理方式。 @Data @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor class Student { /** * 姓名 */ private String name; /** * ...
  • 批处理和流处理

    千次阅读 2018-05-08 13:50:28
    这类用户的行为信息是源源不断的,一个接一个来,比如败家娘们在7点40分32秒浏览了iPhone6plus,在7点40分35秒就看了小米4,这些信息一个个来到,越积越多,我们要求要迅速处理这些信息,没有延迟。就像在溪流的某个...
  • Flink流处理过程的部分原理分析

    万次阅读 2018-12-19 23:20:07
    文章目录前言 前言 ...笔者做为一个研究存储模块出身的人,最近在研读Flink流处理的部分原理,小小作番总结。很多时候,以存储的眼光来看待计算过程中的处理过程,还是有很多不一样的地方的。 ...
  • Flink是一个开源流处理框架,注意它是一个处理计算框架,类似Spark框架,Flink在数据摄取方面非常准确,在保持状态的同时能轻松地从故障中恢复。 Flink内置引擎是一个分布式流数据流引擎,支持 流处理和批处理 ,...
  • java 8 流处理字符串

    千次阅读 2018-09-24 12:11:21
    java 8 流处理字符串 java 8 引入新的Stream api,方便我们使用声明方式处理数据。本文我们将说明如何使用Stream api分割逗号分割字符串至list,以及如何连接字符串数组至逗号分割字符串,也会介绍如何使用Stream api...
  • Flink运行时之流处理程序生成流图

    千次阅读 2017-02-05 22:11:01
    DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。
  • Flink流处理之迭代案例

    万次阅读 2016-11-29 20:14:27
    当前Flink将迭代的重心集中在批处理...但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。
  • 1.Flink是一个针对数据和批数据的分布式处理引擎,主要用Java代码实现。 2.Apache Flink作为Apache的顶级项目,Flink集众多优点于一身,包括快速、可靠可扩展、完全兼容Hadoop、使用简便、表现卓越。 通过以上的...
  • 文章目录前言Flink能做什么选择微批处理还是实时处理计算流程(组件)TODO:容错机制的类比、计算资源调度的类比Flink如何支持批流处理参考 前言 以下都尽量对比Spark(或者大数据生态的其他技术)进行理解 Flink能...
  • 基础 Flink系列:无界和有界数据流定义与区别 批处理 适合有边界数据 使用DataSet 流处理 适合无边界数据 使用DataStream
  • Flink流处理之迭代任务

    千次阅读 2016-12-12 21:21:00
    前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:
  • 本文翻译自DataBricks官方博客,主要描述了Apache Spark 2.0中推出的新功能Structured Streaming(结构化流处理)从Kafka中读取消息,实时处理后再写入不同的下游系统的使用示例。 结构化流处理API使得以一种兼具一.....
  • Flink流处理之窗口算子分析

    万次阅读 2016-10-29 19:40:20
    而对于处理时间的场景,WindowOperator将自身实现为一个基于处理时间的触发器,以触发trigger方法来消费处理时间定时器队列中的定时器满足条件则会调用窗口触发器的onProcessingTime,根据触发结果判断是否对窗口...
  • 本课程以爱奇艺视频实时数据产生和流向的各个环节出发,通过集成主流的分布式日志收集框架Flume、分布式消息队列Kafka、分布式列式数据库HBase、及当前最火爆的Spark Streaming打造实时流处理项目实战,一套代码让你...
  • java中文件和流处理

    千次阅读 2016-03-05 12:28:24
    与C语言只有单一类型FILE*即可工作良好不同,java拥有一个包含...处理文本输入常用的方式是同步哦BufferedReader类,它其中有一个方法readline,使得我们可以读入一行文本。package com.zl.exercise; import java.io

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,539,488
精华内容 615,795
关键字:

流处理