精华内容
下载资源
问答
  • SparkES 多维分析引擎设计

    热门讨论 2016-03-03 15:19:00
    设计动机 ElasticSearch 毫秒级的...其列式存储可以有效的支持高效的聚合类查询,譬如groupBy等操作,分布式存储则提升了处理的数据规模。 相应的也存在一些缺点: 缺乏优秀的SQL支持 缺乏水平扩展的Reduce(Merg...
        

    设计动机

    ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有:

    1. 优秀的全文检索能力
    2. 高效的列式存储与查询能力
    3. 数据分布式存储(Shard 分片)

    其列式存储可以有效的支持高效的聚合类查询,譬如groupBy等操作,分布式存储则提升了处理的数据规模。

    相应的也存在一些缺点:

    1. 缺乏优秀的SQL支持
    2. 缺乏水平扩展的Reduce(Merge)能力,现阶段的实现局限在单机
    3. JSON格式的查询语言,缺乏编程能力,难以实现非常复杂的数据加工,自定义函数(类似Hive的UDF等)

    Spark 作为一个计算引擎,可以克服ES存在的这些缺点:

    1. 良好的SQL支持
    2. 强大的计算引擎,可以进行分布式Reduce
    3. 支持自定义编程(采用原生API或者编写UDF等函数对SQL做增强)

    所以在构建即席多维查询系统时,Spark 可以和ES取得良好的互补效果。通过ES的列式存储特性,我们可以非常快的过滤出数据,
    并且支持全文检索,之后这些过滤后的数据从各个Shard 进入Spark,Spark分布式的进行Reduce/Merge操作,并且做一些更高层的工作,最后输出给用户。

    通常而言,结构化的数据结构可以有效提升数据的查询速度,但是会对数据的构建产生一定的吞吐影响。ES强大的Query能力取决于数据结构化的存储(索引文件),为了解决这个问题,我们可以通过Spark Streaming
    有效的对接各个数据源(Kafka/文件系统)等,将数据规范化后批量导入到ES的各个Shard。Spark Streaming 基于以下两点可以实现为ES快速导入数据。

    1. Spark RDD 的Partition 能够良好的契合ES的Shard的概念。能够实现一一对应。避免经过ES的二次分发
    2. Spark Streaming 批处理的模式 和 Lucene(ES的底层存储引擎)的Segment对应的非常好。一次批处理意味着新生成一个文件,
      我们可以有效的控制生成文件的大小,频度等。

    架构设计

    下面是架构设计图:

    1063603-8b7c006fb3422d8e.png
    spark-es-4.png

    整个系统大概分成四个部分。分别是:

    1. API层
    2. Spark 计算引擎层
    3. ES 存储层
    4. ES 索引构建层

    API 层

    API 层主要是做多查询协议的支持,比如可以支持SQL,JSON等形态的查询语句。并且可是做一些启发式查询优化。从而决定将查询请求是直接转发给后端的ES来完成,还是走Spark 计算引擎。也就是上图提到的 Query Optimize,根据条件决定是否需要短路掉 Spark Compute。

    Spark 计算引擎层

    前面我们提到了ES的三个缺陷,而Spark 可以有效的解决这个问题。对于一个普通的SQL语句,我们可以把 where 条件的语句,部分group 等相关的语句下沉到ES引擎进行执行,之后可能汇总了较多的数据,然后放到Spark中进行合并和加工,最后转发给用户。相对应的,Spark 的初始的RDD 类似和Kafka的对接,每个Kafka 的partition对应RDD的一个partiton,每个ES的Shard 也对应RDD的一个partition。

    ES 存储层

    ES的Shard 数量在索引构建时就需要确定,确定后无法进行更改。这样单个索引里的Shard 会越来越大从而影响单Shard的查询速度。但因为上层有了 Spark Compute层,所以我们可以通过添加Index的方式来扩大Shard的数目,然后查询时查询所有分片数据,由Spark完成数据的合并工作。

    ES 索引构建层

    数据的结构化必然带来了构建的困难。所以有了Spark Streaming层作为数据的构建层。这里你有两种选择:

    1. 通过ES原生的bulk API 完成索引的构建
    2. 然Spark 直接对接到 ES的每个Shard,直接针对该Shard 进行索引,可有效替身索引的吞吐量。
    展开全文
  • c和head命令查看gzip压缩的JSON文件: 现在我们可以直接查看准点情况记录了,这样我们可以更容易地理解这些数据: 使用PySpark读取gzip压缩的JSON格式数据很简单,只需使用SparkSession类型的变量spark: 读取...
  • 文中的Spark为阿里云EMR产品的Spark,博主之前也考虑过类似的问题,受到了一些启发,所以转载分享一下。 背景 Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最...

    本文转自云栖社区,作者:李呈祥(司麟)
    文中的Spark为阿里云EMR产品的Spark,博主之前也考虑过类似的问题,受到了一些启发,所以转载分享一下。

    背景

    Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。

    除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。

    在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:

    1. Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark
      Context无法共享,且当Spark Context退出后,cache数据也会被删除。
    2. Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached
      dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
    3. Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project
      filter pushdown等SQL优化处理。

    Relational Cache的设计

    基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:

    1. 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational
      Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational
      Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
    2. cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
    3. cache数据可用于优化后续任意可优化的用户查询。

    Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:

    1. Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。
    2. Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。
    3. 扩展Spark Catalyst,支持Cache Based
      Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。
    4. 基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。

    Relational Cache的使用

    创建Relational Cache

    CACHE [LAZY] TABLE table_name
      [REFRESH ON (DEMAND | COMMIT)]
      [(ENABLE | DISABLE) REWRITE]
      [USING datasource
      [OPTIONS (key1=val1, key2=val2, ...)]
      [PARTITIONED BY (col_name1, col_name2, ...)]
      [ZORDER BY (col_name3, col_name4, ...)]
      [CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
      [COMMENT table_comment]
      [TBLPROPERTIES (key1=val1, key2=val2, ...)]]
      [AS select_statement]
    

    创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。

    REFRESH ON (DEMAND || COMMIT) 指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。

    (ENABLE | DISABLE) REWRITE 指定是否允许该cache被用于后续的执行计划优化。

    此外,Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查

    UNCACHE TABLE [IF EXISTS] table_name
    ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
    ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
    REFRESH TABLE cache_name
    SHOW CACHES
    (DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name
    

    EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite参数开启或者关闭执行计划优化。

    使用Relational Cache优化查询

    下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:

    SELECT n_name, sum(o_totalprice)
    FROM orders, customer, nation
    WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
    GROUP BY n_name
    

    对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:

    == Physical Plan ==
    *(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
    +- Exchange hashpartitioning(n_name#36, 200)
       +- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
          +- *(6) Project [o_totalprice#10, n_name#36]
             +- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
                :- *(6) Project [o_totalprice#10, c_nationkey#30L]
                :  +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
                :     :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
                :     :  +- Exchange hashpartitioning(o_custkey#8L, 200)
                :     :     +- *(1) Project [o_custkey#8L, o_totalprice#10]
                :     :        +- *(1) Filter isnotnull(o_custkey#8L)
                :     :           +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
                :     +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
                :        +- Exchange hashpartitioning(c_custkey#27L, 200)
                :           +- *(3) Project [c_custkey#27L, c_nationkey#30L]
                :              +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
                :                 +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
                +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
                   +- *(5) Project [n_nationkey#35L, n_name#36]
                      +- *(5) Filter isnotnull(n_nationkey#35L)
                         +- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
    

    在这里插入图片描述
    创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:

    CREATE VIEW nation_cust_cache AS
    SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
    FROM orders, customer, nation
    WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
    
    CACHE TABLE nation_cust_cache
    ENABLE REWRITE
    USING parquet;
    

    或者也可以直接创建视图并cache数据。

    CACHE TABLE nation_cust_cache
    ENABLE REWRITE
    USING parquet
    AS
    SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
    FROM orders, customer, nation
    WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
    

    Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:

    == Physical Plan ==
    *(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
    +- Exchange hashpartitioning(n_name#35, 200)
       +- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
          +- *(1) Project [o_totalprice#20, n_name#35]
             +- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
                +- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...
    

    在这里插入图片描述
    可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。

    总结

    Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。

    展开全文
  • 腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践 当业务发展到一定规模,实时数据仓库是一个必要的基础服务。从数据驱动方面考虑,多维实时数据分析系统的重要性也不言而喻。但是当数据量巨大的情况下,拿腾讯...

    腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践

    当业务发展到一定规模,实时数据仓库是一个必要的基础服务。从数据驱动方面考虑,多维实时数据分析系统的重要性也不言而喻。但是当数据量巨大的情况下,拿腾讯看点来说,一天上报的数据量达到万亿级的规模,要实现极低延迟的实时计算和亚秒级的多维实时查询是有技术挑战的。
    本文将介绍信息流场景下,腾讯看点的实时数据仓库和多维实时数据分析系统的技术架构。
    1、可解决的痛点
    可以先看一下,多维实时数据分析系统可以解决哪些痛点。比如:
    • 推荐同学 10 分钟前上了一个推荐策略,想知道在不同人群的推荐效果怎么样?
    • 运营同学想知道,在广东省的用户中,最火的广东地域内容是哪些,方便做地域 Push。
    • 审核同学想知道,过去 5 分钟,游戏类被举报最多的内容和账号是哪些?
    • 老板可能想了解,过去 10 分钟有多少用户在看点消费了内容,对消费人群有一个宏观了解。
    2、调研
    在这里插入图片描述

    在进行开发之前,我们做了这些调研。

    1. 离线数据分析平台能否满足这些需求,结论是不能满足。离线数据分析平台不行的原因如下。
      • C 侧数据上报过来,需要经过 Spark 的多层离线计算,最终结果出库到 MySQL 或者 ES 提供给离线分析平台查询。这个过程的延时最少 3-6 个小时,目前比较常见的都是提供隔天的查询,所以很多实时性要求高的业务场景都是不能满足的。
      • 另一个问题是,腾讯看点的数据量太大,带来的不稳定性也比较大,经常会有预料不到的延迟。所以,离线分析平台是无法满足很多需求的。
    2. 实时数据分析平台的话,事业群内部提供了准实时数据查询的功能,底层技术用的是 Kudu+Impala,Impala 虽然是 MPP 架构的大数据计算引擎,并且访问以列式存储数据的 Kudu。但是对于实时数据分析场景来说,查询响应的速度和数据的延迟都还是比较高,查询一次实时 DAU,返回结果耗时至少几分钟,无法提供良好的交互式用户体验。所以(Kudu+Impala)这种通用大数据处理框架的速度优势更多的是相比(Spark+Hdfs)这种离线分析框架来说的,对于我们这个实时性要求更高的场景,是无法满足的。
      3、项目背景在这里插入图片描述
      在这里插入图片描述

    经过刚才的介绍,再来看下我们这个项目的背景。作者发文的内容被内容中心引入,经过内容审核链路,启用或者下架。启用的内容给到推荐系统和运营系统,然后推荐系统和运营系统将内容进行 C 侧分发。内容分发给 C 侧用户之后,用户会产生各种行为,曝光、点击、举报等,通过埋点上报实时接入到消息队列中。接下来我们做了两部分工作,就是图中有颜色的这两部分。
    • 第一部分构建了一个腾讯看点的实时数据仓库。
    • 第二部分就是基于 OLAP 存储引擎,开发了多维实时数据分析系统。
    我们为什么要构建实时数仓,因为原始的上报数据量非常大,一天上报峰值就有上万亿条。而且上报格式混乱。缺乏内容维度信息、用户画像信息,下游没办法直接使用。而我们提供的实时数仓,是根据腾讯看点信息流的业务场景,进行了内容维度的关联,用户画像的关联,各种粒度的聚合,下游可以非常方便的使用实时数据。
    4、方案选型
    在这里插入图片描述

    那就看下我们多维实时数据分析系统的方案选型,选型我们对比了行业内的领先方案,选择了最符合我们业务场景的方案。
    • 第一块是实时数仓的选型,我们选择的是业界比较成熟的 Lambda 架构,他的优点是灵活性高、容错性高、成熟度高和迁移成本低;缺点是实时、离线数据用两套代码,可能会存在一个口径修改了,另一个没改的问题,我们每天都有做数据对账的工作,如果有异常会进行告警。
    • 第二块是实时计算引擎选型,因为 Flink 设计之初就是为了流处理,SparkStreaming 严格来说还是微批处理,Strom 用的已经不多了。再看 Flink 具有 Exactly-once 的准确性、轻量级 Checkpoint 容错机制、低延时高吞吐和易用性高的特点,我们选择了 Flink 作为实时计算引擎。
    • 第三块是实时存储引擎,我们的要求就是需要有维度索引、支持高并发、预聚合、高性能实时多维 OLAP 查询。可以看到,Hbase、Tdsql 和 ES 都不能满足要求,Druid 有一个缺陷,它是按照时序划分 Segment,无法将同一个内容,存放在同一个 Segment上,计算全局 TopN 只能是近似值,所以我们选择了最近两年大火的 MPP 数据库引擎 ClickHouse。
    5、设计目标与设计难点
    在这里插入图片描述

    我们多维实时数据分析系统分为三大模块

    1. 实时计算引擎
    2. 实时存储引擎
    3. App层
      难点主要在前两个模块:实时计算引擎和实时存储引擎。
    4. 千万级/s 的海量数据如何实时接入,并且进行极低延迟维表关联。
    5. 实时存储引擎如何支持高并发写入、高可用分布式和高性能索引查询,是比较难的。
      这几个模块的具体实现,看一下我们系统的架构设计。
      6、架构设计
      在这里插入图片描述

    前端采用的是开源组件 Ant Design,利用了 Nginx 服务器,部署静态页面,并反向代理了浏览器的请求到后台服务器上。
    后台服务是基于腾讯自研的 RPC 后台服务框架写的,并且会进行一些二级缓存。
    实时数仓部分,分为了接入层、实时计算层和实时数仓存储层。
    • 接入层主要是从千万级/s 的原始消息队列中,拆分出不同行为数据的微队列,拿看点的视频来说,拆分过后,数据就只有百万级/s 了;
    • 实时计算层主要负责,多行行为流水数据进行行转列,实时关联用户画像数据和内容维度数据;
    • 实时数仓存储层主要是设计出符合看点业务的,下游好用的实时消息队列。我们暂时提供了两个消息队列,作为实时数仓的两层。一层 DWM 层是内容 ID-用户ID 粒度聚合的,就是一条数据包含内容 ID-用户ID 还有 B 侧内容数据、C 侧用户数据和用户画像数据;另一层是 DWS 层,是内容ID粒度聚合的,一条数据包含内容 ID,B 侧数据和 C 侧数据。可以看到内容 ID-用户ID 粒度的消息队列流量进一步减小到十万级/s,内容 ID 粒度的更是万级/s,并且格式更加清晰,维度信息更加丰富。
    实时存储部分分为实时写入层、OLAP 存储层和后台接口层。
    • 实时写入层主要是负责 Hash 路由将数据写入;
    • OLAP 存储层利用 MPP 存储引擎,设计符合业务的索引和物化视图,高效存储海量数据;
    • 后台接口层提供高效的多维实时查询接口。
    7、实时计算
    在这里插入图片描述

    这个系统最复杂的两块,实时计算和实时存储。
    先介绍实时计算部分:分为实时关联和实时数仓。
    7.1 实时高性能维表关联
    在这里插入图片描述

    实时维表关联这一块难度在于。百万级/s的实时数据流,如果直接去关联 HBase,1 分钟的数据,关联完 HBase 耗时是小时级的,会导致数据延迟严重。
    我们提出了几个解决方案:
    • 第一个是,在 Flink 实时计算环节,先按照 1 分钟进行了窗口聚合,将窗口内多行行为数据转一行多列的数据格式,经过这一步操作,原本小时级的关联耗时下降到了十几分钟,但是还是不够的。
    • 第二个是,在访问 HBase 内容之前设置一层 Redis 缓存,因为 1000 条数据访问 HBase 是秒级的,而访问 Redis 是毫秒级的,访问 Redis 的速度基本是访问 HBase 的 1000 倍。为了防止过期的数据浪费缓存,缓存过期时间设置成 24 小时,同时通过监听写 HBase Proxy 来保证缓存的一致性。这样将访问时间从十几分钟变成了秒级。
    • 第三个是,上报过程中会上报不少非常规内容 ID,这些内容 ID 在内容 HBase中是不存储的,会造成缓存穿透的问题。所以在实时计算的时候,我们直接过滤掉这些内容 ID,防止缓存穿透,又减少一些时间。
    • 第四个是,因为设置了定时缓存,会引入一个缓存雪崩的问题。为了防止雪崩,我们在实时计算中,进行了削峰填谷的操作,错开设置缓存的时间。
    可以看到,优化前后,数据量从百亿级减少到了十亿级,耗时从小时级减少到了数十秒,减少 99%。
    7.2 下游提供服务
    在这里插入图片描述

    实时数仓的难度在于:它处于比较新的领域,并且各个公司各个业务差距比较大,怎么能设计出方便,好用,符合看点业务场景的实时数仓是有难度的。
    先看一下实时数仓做了什么,实时数仓对外就是几个消息队列,不同的消息队列里面存放的就是不同聚合粒度的实时数据,包括内容 ID、用户ID、C 侧行为数据、B 侧内容维度数据和用户画像数据等。
    我们是怎么搭建实时数仓的,就是上面介绍的实时计算引擎的输出,放到消息队列中保存,可以提供给下游多用户复用。
    我们可以看下,在我们建设实时数据仓库前后,开发一个实时应用的区别。没有数仓的时候,我们需要消费千万级/s 的原始队列,进行复杂的数据清洗,然后再进行用户画像关联、内容维度关联,才能拿到符合要求格式的实时数据,开发和扩展的成本都会比较高,如果想开发一个新的应用,又要走一遍这个流程。有了数仓之后,如果想开发内容 ID 粒度的实时应用,就直接申请 TPS 万级/s 的 DWS 层的消息队列。开发成本变低很多,资源消耗小很多,可扩展性也强很多。
    看个实际例子,开发我们系统的实时数据大屏,原本需要进行如上所有操作,才能拿到数据。现在只需要消费 DWS 层消息队列,写一条 Flink SQL 即可,仅消耗 2 个 CPU 核心,1G 内存。
    可以看到,以 50 个消费者为例,建立实时数仓前后,下游开发一个实时应用,可以减少 98%的资源消耗。包括计算资源,存储资源,人力成本和开发人员学习接入成本等等。并且消费者越多,节省越多。就拿 Redis 存储这一部分来说,一个月就能省下上百万人民币。
    8、实时存储
    在这里插入图片描述

    介绍完实时计算,再来介绍实时存储。
    这块分为三个部分来介绍
    • 第一是 分布式-高可用
    • 第二是 海量数据-写入
    • 第三是 高性能-查询
    8.1 分布式-高可用
    在这里插入图片描述

    我们这里听取的是 Clickhouse 官方的建议,借助 ZK 实现高可用的方案。数据写入一个分片,仅写入一个副本,然后再写 ZK,通过 ZK 告诉同一个分片的其他副本,其他副本再过来拉取数据,保证数据一致性。
    这里没有选用消息队列进行数据同步,是因为 ZK 更加轻量级。而且写的时候,任意写一个副本,其它副本都能够通过 ZK 获得一致的数据。而且就算其它节点第一次来获取数据失败了,后面只要发现它跟 ZK 上记录的数据不一致,就会再次尝试获取数据,保证一致性。
    8.2 海量数据-写入
    在这里插入图片描述

    数据写入遇到的第一个问题是,海量数据直接写入 Clickhouse 的话,会导致 ZK 的 QPS 太高,解决方案是改用 Batch 方式写入。Batch 设置多大呢,Batch 太小的话缓解不了 ZK 的压力,Batch 也不能太大,不然上游内存压力太大,通过实验,最终我们选用了大小几十万的 Batch。
    第二个问题是,随着数据量的增长,单 QQ 看点的视频内容每天可能写入百亿级的数据,默认方案是写一张分布式表,这就会造成单台机器出现磁盘的瓶颈,尤其是 Clickhouse 底层运用的是 Mergetree,原理类似于 HBase、RocketsDB 的底层 LSM-Tree。在合并的过程中会存在写放大的问题,加重磁盘压力。峰值每分钟几千万条数据,写完耗时几十秒,如果正在做 Merge,就会阻塞写入请求,查询也会非常慢。我们做的两个优化方案:一是对磁盘做 Raid,提升磁盘的 IO;二是在写入之前进行分表,直接分开写入到不同的分片上,磁盘压力直接变为 1/N。
    第三个问题是,虽然我们写入按照分片进行了划分,但是这里引入了一个分布式系统常见的问题,就是局部的 Top 并非全局 Top 的问题。比如同一个内容 ID 的数据落在了不同的分片上,计算全局 Top100 阅读的内容 ID,有一个内容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,导致汇总的时候,会丢失一部分数据,影响最终结果。我们做的优化是在写入之前加上一层路由,将同一个内容 ID 的记录,全部路由到同一个分片上,解决了该问题。
    介绍完写入,下一步介绍 Clickhouse 的高性能存储和查询。
    8.3 高性能-存储-查询
    Clickhouse 高性能查询的一个关键点是稀疏索引。稀疏索引这个设计就很有讲究,设计得好可以加速查询,设计不好反而会影响查询效率。我根据我们的业务场景,因为我们的查询大部分都是时间和内容 ID 相关的,比如说,某个内容,过去 N 分钟在各个人群表现如何?我按照日期,分钟粒度时间和内容 ID 建立了稀疏索引。针对某个内容的查询,建立稀疏索引之后,可以减少 99%的文件扫描。
    还有一个问题就是,我们现在数据量太大,维度太多。拿 QQ 看点的视频内容来说,一天流水有上百亿条,有些维度有几百个类别。如果一次性把所有维度进行预聚合,数据量会指数膨胀,查询反而变慢,并且会占用大量内存空间。我们的优化,针对不同的维度,建立对应的预聚合物化视图,用空间换时间,这样可以缩短查询的时间。

    在这里插入图片描述

    分布式表查询还会有一个问题,查询单个内容 ID 的信息,分布式表会将查询下发到所有的分片上,然后再返回查询结果进行汇总。实际上,因为做过路由,一个内容 ID 只存在于一个分片上,剩下的分片都在空跑。针对这类查询,我们的优化是后台按照同样的规则先进行路由,直接查询目标分片,这样减少了 N-1/N 的负载,可以大量缩短查询时间。而且由于我们是提供的 OLAP 查询,数据满足最终一致性即可,通过主从副本读写分离,可以进一步提升性能。
    我们在后台还做了一个 1 分钟的数据缓存,针对相同条件查询,后台就直接返回了。
    8.4 扩容
    这里再介绍一下我们的扩容的方案,调研了业内的一些常见方案。
    比如 HBase,原始数据都存放在 HDFS 上,扩容只是 Region Server 扩容,不涉及原始数据的迁移。但是 Clickhouse 的每个分片数据都是在本地,是一个比较底层存储引擎,不能像 HBase 那样方便扩容。
    Redis 是哈希槽这种类似一致性哈希的方式,是比较经典分布式缓存的方案。Redis slot 在 Rehash 的过程中虽然存在短暂的 ask 读不可用,但是总体来说迁移是比较方便的,从原 h[0]迁移到 h[1],最后再删除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查询,不是点查,而且由于列式存储,不支持删除的特性,一致性哈希的方案不是很适合。
    目前扩容的方案是,另外消费一份数据,写入新 Clickhouse 集群,两个集群一起跑一段时间,因为实时数据就保存 3 天,等 3 天之后,后台服务直接访问新集群。
    9、成果
    腾讯看点实时数据仓库:DWM 层和 DWS 层,数据延迟 1 分钟。
    远见多维实时数据分析系统:亚秒级响应多维条件查询请求,在未命中缓存情况下,过去 30 分钟的查询,99%的请求耗时在 1 秒内;过去 24 小时的查询,90%的请求耗时在 5 秒内,99%的请求耗时在 10 秒内。

    展开全文
  • 两个问题分别为多维数据存储和多维数据操作,是数据分析和机器学习的科研工作中最常遇见的问题。 常见的多维度数据有:真彩图、遥感影像图、医学影像图,最近比较火的深度图等等。 多维数据的特点: 数据量大,单个...

    多维数据存储多维数据操作,是数据分析和机器学习的科研工作中最常遇见的问题。
    常见的多维度数据有:真彩图、遥感影像图、医学影像图,最近比较火的深度图等等。

    多维数据的特点:
    1. 数据量大,单个数据最少也是(1000,1000,10)这个量级的。
    2. 矩阵形式
    在存储的时候,需要考虑多个方面的问题:
    1. 要对数据进行压缩。如果存储为常见的文本格式,单个数据动不动就是10多个G,绝对不行
    2. 存储以后,可以分片读取。否则想读取数据中的一小块数据,也要把所有的数据读取到内存中进行检索,这也是不行的
    3. 储存格式要通用。否则业务上游做好了数据,业务下游在使用的时候,会有很大的障碍
    4. 数据可以通过工具可视化查看。
    5. 库的安装使用方便。不要动不动就让科研工作者玩Hadoop,Spark好吧。。。
    最优解决方案

    针对上述问题,最优的解决方案是保存为HDF5格式。对应的Python库为h5py
    HDF5官方地址:https://www.hdfgroup.org/solutions/hdf5/
    h5py官方地址:https://www.h5py.org/

    HDF5的优点
    1. Python库安装方便。Anaconda中已经集成了,装好Anaconda,你就有了h5py这个库
    2. 格式通用。也是Pytorch中推荐的多维数据保存格式,使用h5py读取的数据,直接就是numpy.ndarray
    3. 可视化。可以在官网下载HDF5的HDFViewer软件,直接打开文件进行查看。
    4. 数据压缩。按经验,一个保存为TXT格式需要2G空间的数据,使用HDF5只需要几十MB,节省了几十倍的空间
    5. 分片读取。不需要将数据全部读取到内存进行检索,直接可以使用切片操作读取,其他不需要的数据就不会被读取到内存

    HDF5的常用操作,基于Python的h5py和numpy

    数据拼接

    首先回答第二个问题,如何拼接多维数据。

    import numpy as np
    
    # 创建(3, 1, 2)维度的矩阵
    shape1 = (3, 1, 2)
    a = np.zeros(shape1, dtype=np.int)
    print(a.shape)  # (3, 1, 2)
    
    # 创建(4, 1, 2)维度的矩阵
    shape2 = (4, 1, 2)
    b = np.ones(shape2, dtype=np.int)
    print(b.shape)  # (4, 1, 2)
    
    # 在第一个维度上拼接a和c
    axis = 0  # axis指的是你想在第几个维度上进行拼接,因为numpy是0-base,所以第一个维度其实是0
    c = np.concatenate((a, b), axis=axis)
    print(c.shape)  # (7, 1, 2)
    
    储存为HDF5格式
    import h5py
    with h5py.File("c.hdf5", 'w') as f:  # 写入的时候是‘w’
        f.create_dataset("a_data", data=a, compression="gzip", compression_opts=5)
        f.create_dataset("b_data", data=b, compression="gzip", compression_opts=5)
        f.create_dataset("c_data", data=c, compression="gzip", compression_opts=5)
    # c_data:是HDF5文件中c数据储存的名字
    # compression="gzip":是对数据进行压缩,压缩的方式一个gzip
    # compression_opts=5:是压缩等级,压缩等级越高,压缩的越好,但是压缩消耗的CPU时间会增加
    
    读取HDF5格式文件中的数据
    with h5py.File("c.hdf5", 'r') as f:  # 读取的时候是‘r’
        print(f.keys())
        a_new = f.get("a_data")[:]
        b_new = f.get("b_data")[:]
        c_new = f.get("c_data")[:]
    

    验证一下写入的数据和读取的数据是不是一样

    print(a == a_new)
    print(b == b_new)
    print(c == c_new)
    

    对比一下存储一个(5000, 5000)的数据,省多少空间,数据INT

    import os
    import numpy as np
    data_5000_5000 = np.arange(0, 5000*5000, dtype=np.int).reshape(5000, 5000)
    print(data_5000_5000)
    

    [[ 0 1 2 … 4997 4998 4999]
    [ 5000 5001 5002 … 9997 9998 9999]
    [ 10000 10001 10002 … 14997 14998 14999]

    [24985000 24985001 24985002 … 24989997 24989998 24989999]
    [24990000 24990001 24990002 … 24994997 24994998 24994999]
    [24995000 24995001 24995002 … 24999997 24999998 24999999]]

    使用TXT
    np.savetxt("data.txt", data_5000_5000)
    print(os.path.getsize("data.txt") / 1024 / 1024)
    

    596.0512161254883 MB

    使用HDF5
    with h5py.File("data.hdf5", 'w') as f:
        f.create_dataset("data", data=data_5000_5000, compression="gzip", compression_opts=5)
    

    33.43095302581787 MB

    结论

    可以看到,使用TXT格式保存使用了596.05MB,使用HDF5格式保存使用了33.43MB

    596.05 / 33.43
    

    17.829793598564162
    节省了大概17倍的空间

    展开全文
  • 使用Apache Spark进行预测性数据分析系列文章的开篇,http://www.data-automaton.com/2019/01/03/predictive-da...
  • YDB全称延云YDB,是一个基于Hadoop分布式架构下的实时的、多维的、交互式的查询、统计、分析引擎,具有万亿数据规模下的秒级性能表现,并具备企业级的稳定可靠表现。  YDB是一个细粒度的索引,精确粒度的索引。...
  • 1.集成Spark、Flink: 查看官网http://kylin.apache.org/docs/tutorial/cube_spark.html
  • YDB,一种Spark快速数据分析替代方案

    千次阅读 2017-03-12 15:23:22
    排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个“刚需”,无论大数据采用的是hadoop,还是spark,还是...
  • scala的数组、变长数组、多维数组等 scala的映射、元祖等操作 scala的类,包括bean属性、辅助构造器、主构造器等 scala的对象、单例对象、伴生对象、扩展类、apply方法等 scala的包、引入、继承等概念 scala的特质 ...
  • SparkCube是一个开源项目,用于极快速的OLAP数据分析。 SparkCube是的扩展。 从源代码构建 mvn -DskipTests package 使用的默认Spark版本是2.4.4。 运行测试 mvn test 与Apache Spark一起使用 您应该将几个配置添加...
  • 1.性能量化: 如何做性能量化? 方法一 根据用户自己划分性能量化。 ...Hive查询性能量化:例如使用TPC-DS来测试集群某些...还是80%,剩余20%不可使用),每S、M、H能处理多少数据,处理速度的标准是多少?; ...
  • Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc. 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。 所谓的...
  • 导语 | 在产品精细化运营时代,经常会遇到产品增长问题:比如指标涨跌原因分析、版本迭代效果分析、运营活动效果分析等。这一类分析问题高频且具有较高时效性要求,然而在人力资源紧张情况,传统的...
  • 本节书摘来自华章出版社《Spark数据分析:核心概念、技术及实践》一书中的第1章,第1节,作者穆罕默德·古勒(Mohammed Guller)更多章节内容可以访问云栖社区“华章计算机”公众号查看。 大数据技术一览 我们正...
  • 本节书摘来自华章出版社《Spark数据分析:核心概念、技术及实践》一书中的第1章,第1.5节,作者[美] 穆罕默德·古勒(MohammedGuller),更多章节内容可以访问云栖社区“华章计算机”公众号查看。 1.5 NoSQL ...
  • 第一课 Python入门知识点1:Python安装知识点2:常用数据分析库NumPy、Scipy、Pandas、matplotlib安装知识点3:常用高级数据分析库scikit-learn、NLTK安装知识点4:IPython的安装与使用知识点5:Python2与Python3...
  • OLAP(On-Line Analytical Processing)联机分析处理,1993 年由关系型数据库之父埃德加·科德(Edgar Frank Codd)提出,其中主要为多维分析多维分析常见操作 下钻 从高层次想低层次明细数据穿透。例如“省”...
  • 这一类分析问题高频且具有较高时效性要求,然而在人力资源紧张情况,传统的数据分析模式难以满足。本文尝试从0到1实现一款轻量级大数据分析系统——MVP,以解决上述痛点问题。文章作者:数据熊,腾讯云大数据分析...
  • 系列一:《python数据分析基础与实践》 章节1Python概况 课时2Python简介 章节2Python安装 课时3安装Anaconda 课时4使用Anaconda 章节3数据准备 课时5数据类型 – 布尔型 课时6数据类型 – 数值型 课时7数据类型 – ...
  • 作者 |云祁封图| CSDN下载于视觉中国一、前言作者最近看了《Hadoop构建数据仓库实践》这本书,收获很多,把一些关于数仓实践的心得整理出来,方便大家共同学习。二、数据仓库的定义数...
  • Kylin 之对大数据量的多维分析

    千次阅读 2017-10-12 13:46:07
    Apache Kylin(http://kylin.apache.org/cn/)是一个开源的分布式分析引擎,提供Hadoop之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive...
  • 2018 年线上线下融合已成大势,苏宁易购提出并践行双线融合模式,提出了智慧零售的大战略,其本质是数据驱动,为消费者提供更好的服务, 苏宁日志分析系统作为数据分析的第一环节,为数据运营打下了坚实基础。...
  • 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用...1 Python Spark SQL 基本数据处理 Python Spark DataFrame 基础 df = spark.read.p...
  • 正文开始分享嘉宾:杨军蚂蚁金服高级技术专家编辑整理:兴金朝内容来源:DataFun Talk《数据分析平台:平台演进及数据分析方法应用》出品社区:DataFun大家好,今天主要分享数...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,454
精华内容 2,181
关键字:

多维数据分析spark