精华内容
下载资源
问答
  • MaxCompute SQL不支持对数据的Update和Delete操作,但是实际工作中可能确实有一些场景需要这样处理,怎么办呢?实际上,MaxCompute SQL的Insert语法支持Insert Into/Overwrite两种数据导入的方式。分别对应数据导入...
  • MaxCompute insert语句使用 本文介绍使用INSERT OVERWRITE和INSERT INTO两种命令更新表数据,主要内容包括: insert into table … values …语句 insert into/overwrite … select …语句 Insert多路输出(MULTI ...
  • 该文档详细描述了Maxcompute函数的两种UDF函数注册方式,以及多个jar包如何打包成一个jar包的方式。
  • maxcompute 快速入门

    2019-01-31 09:12:08
    maxcompute 快速入门,最全文档,0基础入门maxcompute开发
  • MaxCompute

    2021-06-20 18:49:37
    MaxCompute 大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题...

    MaxCompute

    大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。

    产品优势

    大规模计算存储
    MaxCompute适用于100 GB以上规模的存储及计算需求,最大可达EB级别。

    多种计算模型
    MaxCompute支持SQL、MapReduce、UDF(Java/Python)、Graph、基于DAG的处理、交互式、内存计算、机器学习等计算类型及MPI迭代类算法。简化了企业大数据平台的应用架构。

    强数据安全
    MaxCompute已稳定支撑阿里全部数据仓库业务9年以上,提供多层沙箱防护、细粒度权限管理及监控。
    MaxCompute通过了独立的第三方审计师针对阿里云对AICPA可信服务标准中关于安全性、可用性和机密性原则符合性描述的审计。审计报告请参见SOC 3报告。
    低成本
    与企业自建专有云相比,MaxCompute的计算存储更高效,可以降低30%~50%的采购成本。

    免运维
    基于MaxCompute的Serverless无服务器的设计思路,用户只需关心作业和数据,而无需关心底层分布式架构及运维。

    极致弹性扩展
    MaxCompute提供按量付费模式下的作业级别的资源管理。用户无需受困于资源扩展难题,系统会自动扩展计算、存储、网络等资源,最大程度地节省成本。

    系统架构
    MaxCompute以数据为中心,内建多种计算模型和服务接口,满足广泛的数据分析需求。一切服务开通即用,更好地赋能数据业务。
    在这里插入图片描述

    *功能概述

    数据通道
    批量历史数据通道
    Tunnel是MaxCompute为您提供的数据传输服务,提供高并发的离线数据上传下载服务。支持每天TB/PB级别的数据导入导出,特别适合于全量数据或历史数据的批量导入。Tunnel为您提供Java编程接口,并且在MaxCompute的客户端工具中,提供对应的命令实现本地文件与服务数据的互通。

    实时增量数据通道
    针对实时数据上传的场景,MaxCompute提供了延迟低、使用方便的DataHub服务,特别适用于增量数据的导入。DataHub还支持多种数据传输插件,例如Logstash、Flume、Fluentd、Sqoop等,同时支持日志服务Log Service中的投递日志到MaxCompute,进而使用DataWorks进行日志分析和挖掘。

    计算及分析任务
    MaxCompute支持多种计算模型,详情如下:
    SQL:MaxCompute以表的形式存储数据,支持多种数据类型版本说明,并对外提供SQL查询功能。您可以将MaxCompute作为传统的数据库软件操作,但其却能处理TB、PB级别的海量数据。
    说明
    MaxCompute SQL不支持事务、索引,也不支持Update或Delete操作。
    MaxCompute的SQL语法与Oracle、MySQL有一定差别,您无法将其他数据库中的SQL语句无缝迁移至MaxCompute中。详情请参见与其他SQL语法的差异。
    MaxCompute主要用于100GB以上规模的数据计算,因此MaxCompute SQL最快支持在分钟或秒钟级别完成查询返回结果,但无法在毫秒级别返回结果。
    MaxCompute SQL的优点是学习成本低,您不需要了解复杂的分布式计算概念。如果您具备数据库操作经验,便可快速熟悉MaxCompute SQL的使用。
    UDF:即用户自定义函数。
    MaxCompute提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。

    MapReduce:MaxCompute MapReduce是MaxCompute提供的Java MapReduce编程模型,它可以简化开发流程,更为高效。使用MaxCompute MapReduce,需要对分布式计算概念有基本了解,并有相对应的编程经验。MaxCompute MapReduce为您提供Java编程接口。
    Graph:MaxCompute提供的Graph功能是一套面向迭代的图计算处理框架。图计算作业使用图进行建模,图由点 (Vertex)和边(Edge)组成,点和边包含权值(Value)。通过迭代对图进行编辑、演化,最终求解出结果,典型应用:PageRank、单源最短距离算法 、K-均值聚类算法等。
    Spark on MaxCompute:Spark on MaxCompute是阿里云开发的大数据分析引擎,为您提供大数据处理能力。详情请参见概述。
    SDK
    SDK是MaxCompute提供给开发者的工具包,当前支持Java SDK及Python SDK。

    安全
    MaxCompute提供了功能强大的安全服务,为您的数据安全提供保护

    阿里云官方文档

    展开全文
  • MaxCompute用户指南

    2018-04-29 19:36:35
    MaxCompute用户指南,阿里MaxCompute(原ODPS)用户指南
  • sdk源码MaxCompute Go 驱动程序 ODPS,又称ODPS,是由.net提供的分布式存储服务和SQL引擎。 这个仓库包含一个MaxCompute的Go。 如果你要写一个Go程序,调用标准库database/sql访问MaxCompute数据库,可以使用这个...
  • 阿里云MaxCompute数据收集器 该项目是一组bigdata插件,用于与aliyun maxcompute交换数据。 这些插件包含flume-plugin,kettle-plugin,ogg-plugin和odps-sqoop。 要求 JDK 1.6或更高版本 Apache Maven 3.x 建立...
  • 阿里云MaxCompute ODPS文档
  • maxcompute

    2020-01-09 10:31:00
    1.大数据计算服务(Maxcompute,原名ODPS) MaxCompute is a big data processing platform developed by Alibaba Cloud independently. It is a fast and cloud-based big data solution that supports multiple ...

    1.大数据计算服务(Maxcompute,原名ODPS)  

    MaxCompute is a big data processing platform developed by Alibaba Cloud independently. It is a fast and cloud-based big data solution that supports multiple distributed data storage and processing models, which can provide massive data warehouse and big data modeling service.

    MaxCompute Studio is a plugin for IntelliJ platform allowing data developers works with MaxCompute platform including authoring SQL scripts, UDF extensions, MapReduce programs and other functions like local debugging, data browsing and uploading/downloading, job browsing and analytics, etc.
    Features include:
    MaxCompute SQL language support
    MaxCompute function development
    MaxCompute data management
    MaxCompute job management

    分布式的计算模型对数据分析人员要求较高且不易维护。数据分析人员不仅需要了解业务需求,同时还需要熟悉底层分布式计算模型。MaxCompute为您提供完善的数据导入方案以及多种经典的分布式计算模型,您可以不必关心分布式计算和维护细节,便可轻松完成大数据分析。

    DataWorks和MaxCompute关系紧密:DataWorks为MaxCompute提供一站式的数据同步、业务流程设计、数据开发、管理和运维功能。

    产品优势

    • 大规模计算存储

      MaxCompute适用于100GB以上规模的存储及计算需求,最大可达EB级别。

    • 多种计算模型

      MaxCompute支持SQL、MapReduce、UDF(Java/Python)、Graph、基于DAG的处理、交互式、内存计算、机器学习等计算类型及MPI迭代类算法。简化了企业大数据平台的应用架构。

    • 强数据安全

      MaxCompute已稳定支撑阿里全部数据仓库业务9年以上,提供多层沙箱防护、细粒度权限管理及监控。

    • 低成本

      与企业自建专有云相比,MaxCompute的计算存储更高效,可以降低30%~50%的采购成本。

    • 免运维

      基于MaxCompute的Serverless无服务器的设计思路,用户只需关心作业和数据,而无需关心底层分布式架构及运维。

    • 极致弹性扩展

      MaxCompute提供按量付费模式下的作业级别的资源管理。用户无需受困于资源扩展难题,系统会自动扩展计算、存储、网络等资源,最大程度地节省成本。

    maxcompute系统架构

    功能: 

    数据通道

    • 批量历史数据通道

      TUNNEL是MaxCompute为您提供的数据传输服务,提供高并发的离线数据上传下载服务。支持每天TB/PB级别的数据导入导出,特别适合于全量数据或历史数据的批量导入。Tunnel为您提供Java编程接口,并且在MaxCompute的客户端工具中,提供对应的命令实现本地文件与服务数据的互通。

    • 实时增量数据通道

      针对实时数据上传的场景,MaxCompute提供了延迟低、使用方便的DataHub服务,特别适用于增量数据的导入。DataHub还支持多种数据传输插件,例如Logstash、Flume、Fluentd、Sqoop等,同时支持日志服务Log Service中的投递日志到MaxCompute,进而使用DataWorks进行日志分析和挖掘。

    • 计算及分析任务
      MaxCompute支持多种计算模型,详情如下:
      • SQL:MaxCompute以表的形式存储数据,支持多种数据类型,并对外提供SQL查询功能。您可以将MaxCompute作为传统的数据库软件操作,但其却能处理TB、PB级别的海量数据。
         
        说明
        • MaxCompute SQL不支持事务、索引,也不支持Update或Delete操作。
        • MaxCompute的SQL语法与Oracle、MySQL有一定差别,您无法将其他数据库中的SQL语句无缝迁移至MaxCompute中。详情请参见与其他SQL语法的差异
        • MaxCompute主要用于100GB以上规模的数据计算,因此MaxCompute SQL最快支持在分钟或秒钟级别完成查询返回结果,但无法在毫秒级别返回结果。
        • MaxCompute SQL的优点是学习成本低,您不需要了解复杂的分布式计算概念。如果您具备数据库操作经验,便可快速熟悉MaxCompute SQL的使用。
      • UDF:即用户自定义函数。

        MaxCompute提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。

      • MapReduce:MaxCompute MapReduce是MaxCompute提供的Java MapReduce编程模型,它可以简化开发流程,更为高效。您若使用MaxCompute MapReduce,需要对分布式计算概念有基本了解,并有相对应的编程经验。MaxCompute MapReduce为您提供Java编程接口。
      • Graph:MaxCompute提供的Graph功能是一套面向迭代的图计算处理框架。图计算作业使用图进行建模,图由点 (Vertex)和边(Edge)组成,点和边包含权值(Value)。通过迭代对图进行编辑、演化,最终求解出结果,典型应用:PageRank单源最短距离算法 、K-均值聚类算法等。
    • SDK

      SDK是MaxCompute提供给开发者的工具包,当前支持Java SDKPython SDK

    • 安全

      MaxCompute提供了功能强大的安全服务,为您的数据安全提供保护,详情请参见安全指南

    参考资料:1.https://help.aliyun.com/document_detail/27800.html?spm=a2c4g.11186623.6.547.3ec77a55TkBUgk

     

    MaxCompute与DataWorks

    DataWorks是基于MaxCompute计算和存储,提供工作流可视化开发、调度运维托管的一站式海量数据离线加工分析平台。在数加(一站式大数据平台)中,DataWorks控制台即为MaxCompute控制台。

    MaxCompute和DataWorks一起向用户提供完善的ETL和数仓管理能力,以及SQL、MR、Graph等多种经典的分布式计算模型,能够更快速地解决用户海量数据计算问题,有效降低企业成本,保障数据安全。更多使用说明请参见DataWorks什么是DataWorks

    说明 您可以将DataWorks理解成MaxCompute的一种Web客户端。MaxCompute是DataWorks的一种计算引擎。

    MaxCompute与数据集成

    MaxCompute可以通过数据集成加载不同数据源(例如:MySQL数据库等)数据,同样也可以通过数据集成把MaxCompute的数据导出到各种业务数据库。

    数据集成功能已经集成到DataWorks作为数据同步任务进行配置、运行。您可直接在DataWorks上配置MaxCompute数据源,再配置读取MaxCompute表或者写入MaxCompute表任务,数据的导入和导出整个过程只需在一个平台上进行操作。

    MaxCompute与机器学习PAI

    机器学习PAI是基于MaxCompute的一款机器学习算法平台。它实现了数据无需搬迁,便可进行从数据处理、模型训练、服务部署到预测的一站式机器学习。创建MaxCompute项目,开通机器学习,即可通过机器学习平台的算法组件对MaxCompute数据进行模型训练等操作。详情请参见机器学习PAI操作文档

    MaxCompute与QuickBI

    数据在MaxCompute进行加工处理后,将Project添加为QuickBI数据源,即可在QuickBI页面对MaxCompute表数据进行报表制作,实现数据可视化分析。

    MaxCompute与AnalyticDB for MySQL

    AnalyticDB for MySQL是海量数据实时高并发在线分析(Realtime OLAP)的云计算服务,与MaxCompute结合实现大数据驱动业务系统的场景。通过MaxCompute离线计算挖掘,产出高质量数据后,导入分析型数据库,供业务系统调用分析。

    将MaxCompute数据导入到AnalyticDB for MySQL,有以下两种方式:

    ......

     

    MaxCompute的表格有两种类型:内部表和外部表(MaxCompute2.0版本开始支持外部表)。

    • 对于内部表,所有的数据都被存储在MaxCompute中,表中列的数据类型可以是MaxCompute支持的任意一种数据类型
    • 对于外部表,MaxCompute并不真正持有数据,表格的数据可以存放在OSSOTS中 。MaxCompute仅会记录表格的Meta信息,您可以通过MaxCompute的外部表机制处理OSS或OTS上的非结构化数据,例如视频、音频、基因、气象、地理信息等。

    分区表是指在创建表时指定分区空间,即指定表内的一个或者某几个字段作为分区列。分区表实际就是对应分布式文件系统上的独立的文件夹,该文件夹下是该分区所有数据文件。而分区可以理解为分类,通过分类把不同类型的数据放到不同的目录下。分类的标准就是分区字段,可以是一个,也可以是多个。

     

    分区表的意义在于优化查询。查询表时通过WHERE子句查询指定所需查询的分区,避免全表扫描,提高处理效率,降低计算费用。

    参考资料:1.https://help.aliyun.com/document_detail/27820.html?spm=a2c4g.11186623.6.554.165939a4ucghb8

    --创建一个二级分区表,以日期为一级分区,地域为二级分区
    CREATE TABLE src (key string, value bigint) PARTITIONED BY (pt string,region string);

    --正确使用方式。MaxCompute在生成查询计划时只会将'20170601'分区下region为'hangzhou'二级分区的数据纳入输入中。
    select * from src where pt='20170601'and region='hangzhou'; --错误的使用方式。在这样的使用方式下,MaxCompute并不能保障分区过滤机制的有效性。pt是STRING类型,当STRING类型与BIGINT(20170601)比较时,MaxCompute会将二者转换为DOUBEL类型,此时有可能会有精度损失。 select * from src where pt = 20170601; 


    MaxCompute表的生命周期(Lifecycle),指表(分区)数据从最后一次更新的时间算起,在经过指定的时间后没有变动,则此表(分区)将被MaxCompute自动回收。这个指定的时间就是生命周期。
    • 生命周期单位:Days(天),只接受正整数。
    • 对于非分区表,如果表数据在生命周期Days天内没有被修改,经过Days天后此表将会被MaxCompute自动回收(类似DROP TABLE操作)。生命周期从最后一次表数据被修改的时间(LastDataModifiedTime)起开始计算。
    • 对于分区表,每个分区可以分别被回收。在生命周期Days天内数据未被修改的分区,经过指定的天数后此分区将会被回收,否则会被保留。每个分区的生命周期是从最后一次分区数据被修改的时间LastDataModifiedTime起开始计算。不同于非分区表,分区表的最后一个分区被回收后,该表不会被删除。
    • 生命周期只能设定到表级别,不能在分区级设置生命周期。创建表时即可指定生命周期。
    • 如果您没有为表指定生命周期,则表(分区)不会根据生命周期规则被MaxCompute自动回收。

    资源类型

    MaxCompute支持上传的单个资源大小上限为500MB,资源包括以下几种类型:
    • File类型。
    • Table类型:MaxCompute中的表。
       
      说明 MapReduce引用的table类型资源中,table字段类型目前只支持BIGINT、DOUBLE、STRING、DATETIME、BOOLEAN,其他类型暂未支持。
    • Jar类型:编译好的Java Jar包。
    • Archive类型:通过资源名称中的后缀识别压缩类型,支持的压缩文件类型包括.zip/.tgz/.tar.gz/.tar/jar。

     

    展开全文
  • 分享如何使用阿里云大数据计算服务MaxCompute进行数据分析,掌握如何使用云端相关大数据服务进行开发和测试的能力。
  • 基于MaxCompute的大数据BI分析技术技术创新变革未来场景描述在互联网电商及游戏等行业通常需要对海量数据做快速实时分析本方案阐述如何将业务数据和日志数据快速汇总到ADB后再通过QuickBI等 工具进行可视化分析和...
  • MaxCompute客户端odpscmd操作使用.pdf,圣远
  • MaxCompute SQL

    千次阅读 2020-09-13 09:57:23
    MaxCompute SQL 一、概述 1.1 定义 ​ MaxCompute(原 ODPS) SQL 适用于海量数据(TB 级别),实时性要求不高的场合,比如离线批量计算,它的每个作业的准备,提交等阶段要花费较长时间。 ​ 采用的是类似与SQL的...

    MaxCompute SQL

    • 知识点结构图
      在这里插入图片描述

    • 本文初衷是为了学习归纳,若有错误,请指出。

    修改记录

    时间内容
    2020年9月13日第一次发布

    一、概述

    1.1 定义

    ​ MaxCompute(原 ODPS) SQL 适用于海量数据(TB 级别),实时性要求不高的场合,比如离线批量计算,它的每个作业的准备,提交等阶段要花费较长时间。

    ​ 采用的是类似与SQL的语法,可以看作是标准SQL的子集,但和数据库也有很多不同,比如没有事务,主键约束,索引等。

    1.2 优缺点

    • 优点:
      • MaxCompute SQL的优点是学习成本低,不需要了解复杂的分布式计算概念,只需要会sql便可以操作;
      • MaxCompute主要用于大规模的数据计算,最快支持在分钟或秒钟级别完成查询返回结果,但无法在毫秒级别返回结果。
    • 缺点:
      • 一次作业批量处理海量数据,不适合直接对接需要每秒处理几千至数万笔事务的业务系统。
      • MaxCompute的SQL语法与Oracle、MySQL有一定差别,无法将其他数据库中的SQL语句无缝迁移至MaxCompute中。
      • MaxCompute SQL不支持事务、索引,也不支持Update或Delete操作

    1.3 支持的数据类型

    ​ 支持的数据类型有:Bigint、Double、String、Datetime、Boolean、Decimal,Float。

    ​ 目前MaxCompute SQL中使用到新数据类型(TINYINT、SMALLINT、INT、FLOAT、VARCHAR、TIMESTAMP、BINARY)时,需要用set命令开启:

    • Session级别:要使用新数据类型,需在SQL语句前加上set语句set odps.sql.type.system.odps2=true;,并与SQL语句一起提交执行。

    • Project级别:支持对Project级别进行新类型打开。Project Owner可根据需要对Project进行设置,命令为:

      set odps.sql.type.system.odps2=true;
      CREATE TABLE test_newtype (
          c1 tinyint
          ,c2 smallint
          ,c3 int
          ,c4 BIGINT
          ,c5 float
          ,c6 DOUBLE
          ,c7 decimal
          ,c8 binary
          ,c9 timestamp
          ,c10 ARRAY<map<BIGINT,BIGINT>>
          ,c11 map<STRING,ARRAY<BIGINT>>
          ,c12 STRUCT<s1:STRING,s2:BIGINT>
          ,c13 varchar(20))
      LIFECYCLE 1
      ;
      

    1.4 运算符

    • 关系运算符注意点:

      • 在进行部分关系运算之前,需要首先进行类型转换,否则可能返回NULL。如下举例,'2019-02-16 00:00:01'为DATETIME类型,而'2019-02-16'为STRING类型,在进行比较关系运算前需首先完成显示类型转换:

        select cast('2019-02-16 00:00:01' AS string) > '2019-02-16';
        select cast('2019-02-16 00:00:02' AS datetime) > '2019-02-16 00:00:01';
        
      • 由于DOUBLE值存在一定的精度差,因此建议两个DOUBLE类型相减,取绝对值的方式进行判断。当绝对值足够小时,认为两个DOUBLE数值相等,比如:

      abs(0.9999999999 - 1.0000000000) < 0.000000001
       -- 0.9999999999和1.0000000000为10位精度,而0.000000001为9位精度。
       -- 此时可以认为0.9999999999和1.0000000000相等。
      
      • 在将String类型和Bigint类型数据进行比较时,这两个数据都将转换成Double类型,比较过程中可能丢失精度,所以需要将String用cast显示转换成Bigint类型运算。
    • 算术运算符注意点:

      • A/B中,如果A、B都是Bigint类型,那么结果会是Double类型。
      • 只有参数是STRING、BIGINT或DOUBLE类型才能参与算术运算,日期型和布尔型不允许参与运算。
      • STRING类型在参与运算前会进行隐式类型转换,转换为DOUBLE类型。
    • 位运算符注意点

      • 位运算符不支持隐式转换,只允许BIGINT类型。
    • 逻辑运算符注意点:

      • 逻辑运算符只允许BOOLEAN类型参与运算,不支持隐式类型转换。
      • Null And False = False,Null and True = Null;True or Nnll=True,False or Null = Null;

    1.5 MaxCompute的ACID特性

    • 原子性(Atomicity)
      • 任何时候MaxCompute会保证在冲突时只会一个作业成功,其它冲突作业失败。
      • 对于单个表或分区的CREATE、OVERWRITE、DROP操作,可以保证其原子性。
      • 跨表操作时不支持原子性(例如MULTI-INSERT)。
      • 在极端情况下,以下操作可能不保证原子性:
        • DYNAMIC INSERT OVERWRITE多于一万个分区,不支持原子性。
        • INTO类操作:这类操作失败的原因是事务回滚时数据清理失败,但不会造成原始数据丢失。
    • 一致性(Consistency)
      • OVERWRITE类作业可保证一致性。
      • INTO类作业在冲突失败后可能存在失败作业的数据残留。
    • 隔离性(Isolation)
      • 非INTO类操作保证读已提交。
      • INTO类操作存在读未提交的场景。
    • 持久性(Durability)
      • MaxCompute保证数据的持久性。

    二、类型转换

    ​ MaxCompute SQL允许数据类型之间的转换,类型转换方式包括显式类型转换隐式类型转换

    2.1 显式转换

    ​ 显式类型转换是通过cast()函数将一种数据类型的值转换为另一种类型的值,在MaxCompute SQL中支持的显式类型转换,如下表所示。

    在这里插入图片描述

    • 注意点:
      • 将DOUBLE类型转为BIGINT类型时,小数部分会被截断,例如cast(1.6 as BIGINT) = 1
      • 同样,满足DOUBLE格式的STRING类型转换为BIGINT时,会先将STRING转换为DOUBLE,再将DOUBLE转换为BIGINT,因此,小数部分会被截断。
      • 满足BIGINT格式的STRING类型可以被转换为DOUBLE类型,小数点后保留一位,例如cast(“1” as DOUBLE) = 1.0
      • 日期类型转换时采用默认格式yyyy-mm-dd hh:mi:ss。
      • MaxCompute支持复杂类型的类型转换功能,规则同上。

    2.2 隐式转换

    ​ 隐式类型转换是指在运行时,由MaxCompute依据上下文使用环境及类型转换规则自动进行的类型转换。MaxCompute支持的隐式类型转换规则,如下表所示。

    在这里插入图片描述

    在这里插入图片描述

    • 注意点:
      • 这里的Y是指两者之间支持隐式转换,而没有说明转换成谁。

    2.2.1 关系运算符作用下的隐式转换

    ​ 关系运算符包括=、<>、<、<=、>、>=、IS NULL、IS NOT NULL、LIKE、RLIKE、IN。由于LIKE、RLIKE、IN的隐式类型转换规则不同于其他关系运算符,将单独对其进行说明。此处的说明不包含这三种特殊的关系运算符。

    ​ 当不同类型的数据共同参与关系运算时,按照下述原则进行隐式类型转换。

    在这里插入图片描述

    • 总结:
      • Bigint、String、Double,三者Double最大,Bigint和String互转也会变成Double。
      • 有Decimal的情况,Decimal最大。

    2.2.2 特殊的关系运算符作用下的隐式转换

    ​ 特殊的关系运算符包括LIKE、RLIKE、IN

    • LIKE和RLIKE的source和pattern参数均仅接受STRING类型。
    • 其他类型不允许参与运算,也不能进行到STRING类型的隐式类型转换。
    • IN右侧的VALUE值列表中的数据类型必须一致。
    • key in (value1, value2, …)中,当KEY与VALUES之间比较时,如果数据类型包含BIGINT、DOUBLE、STRING,建议统一转为DOUBLE类型;如果数据类型包含DATETIME、STRING,建议统一转为DATETIME类型。除此之外不允许其它类型之间的转换。

    2.2.3 算术运算符作用下的隐式转换

    • 只有STRING、BIGINT、DOUBLE和DECIMAL才能参与算术运算。
    • STRING在参与运算前会进行隐式类型转换到DOUBLE。
    • BIGINT和DOUBLE共同参与计算时,会将BIGINT隐式转换为DOUBLE。
    • 日期型和布尔型不允许参与算数运算。

    2.2.4 逻辑运算符作用下的隐式转换

    ​ 只有BOOLEAN才能参与逻辑运算。

    2.3 内建函数涉及到的隐式转换

    ​ 在调用函数时,如果输入参数的数据类型与函数定义的参数数据类型不一致,把输入参数的数据类型转换为函数定义的数据类型。

    2.4 CASE WHEN作用下的隐式转换

    ​ 针对case when中有不同结果类型的情况。

    • 如果返回类型只有BIGINT、DOUBLE,统一转换为DOUBLE。
    • 如果返回类型中有STRING类型,统一转换为STRING,如果不能转换(如BOOLEAN类型)则报错。
    • 除此之外不允许其它类型之间的转换。

    2.5 String和Datetime之间的转换

    ​ 这两者之间转换要遵循下面的格式:

    • 注意

      • 以下将转换失败,没有严格按照格式来。可以先用TO_DATE函数调整格式再转
        cast("2013/12/31 02/34/34" as datetime)  
        cast("20131231023434" as datetime)  
        cast("2013-12-31 2:34:34" as datetime)
      
      • 如果超出对应月份实际拥有的天数,将会导致异常退出。

    三、分区表

    ​ MaxCompute 用户在创建表时,允许指定表的某些列为分区列,从而决定数据的存储流向。指定分区列(在 select 语句的 where 条件过滤中使用分区列作为过滤条件)会给用户带来诸多便利,例如:提高 SQL 运行效率,减少计费等。

    ​ 一张表最多允许60000个分区,单表的分区层次不能超过6级。。

    ​ 但在使用 输出到动态分区(DYNAMIC PARTITION),SQL 的运行效率较低,并且会带来较高的计费。

    四、SQL操作

    4.1 表操作

    4.1.1 创建表

    ​ 创建表的语法格式,如下所示,但一般有很多用不到,用不到的加“–”号注释了:

    #完整版
    CREATE [EXTERNAL] TABLE IF NOT EXISTS table_name
    [(col_name 数据类型 [DEFAULT value] [COMMENT 字段注释], ...)]
    [COMMENT 表中文注释]
    [PARTITIONED BY (col_name 数据类型 [COMMENT col_comment], ...)]
    --[CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name --[ASC | DESC] ...])] INTO number_of_buckets BUCKETS] -- 用于创建Hash Clustering表时设置表的----Shuffle和Sort属性。
    --[STORED BY StorageHandler] -- 仅限外部表。
    --[WITH SERDEPROPERTIES (Options)] -- 仅限外部表。
    --[LOCATION OSSLocation]; -- 仅限外部表。
    [LIFECYCLE days]
    
    
    # 第二种建表:
     CREATE TABLE [IF NOT EXISTS] table_name
     LIKE existing_table_name
     
    # 第三种建表
    create table if not exists table_name 
    AS select_statement;
    
    • 注意点:
      • 1.partitioned by的字段必须不是建表里面的字段。
      • 2.不加EXTERNAL代表内部表,加EXTERNAL代表外部表,外部表、内部表和Hive的差不多,如下:
      • 3.partitioned by指定表的分区字段,目前支持TINYINT、SMALLINT、INT、BIGINT、VARCHAR和STRING类型。
      • 4.LIFECYCLE是表的生命周期,单位:天。注意,create table like语句不会复制源表的生命周期属性。
      • 5.如果是用create table… as select …的方式来建表,那么在建表的同时将数据复制到新表中,但是不会复制分区属性,只会把源表的分区列作为目标表的一般列处理,也就是新表会多一列。
      • 6.如果是用第二种建表create table …like …的方式来建表,那么源表和目标表具有相同的表结构,即列名、列注释以及表注释等均相同(生命周期不会)。但源表中的数据不会被复制到目标表

    hive外部表和内部表的区别:

    • 1.创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;创建外部表时需要加上external关键字,它仅记录数据所在的路径,不对数据的位置做任何改变。
    • 2.删除表时:删除表后,内部表的元数据和真实数据会被一起删除,而外部表仅删除元数据,不删除真实数据,这样外部表相对来说更加安全些,数据组织也比较灵活,方便共享原始数据。(直接重建原来的表后,数据就自动导入到原来的表去了,location直接指向原来存储的位置)
    • 外部表保障底层数据的安全性,内部表适用于管理中间表和结果表。

    clustered by指定Hash Key。MaxCompute将对指定列进行Hash运算,按照Hash值分散到各个Bucket中。

    • 为避免数据倾斜和热点,取得较好的并行执行效果,clustered by列适宜选择取值范围大,重复键值少的列。此外,为了达到join优化的目的,也应该考虑选取常用的Join/Aggregation Key,即类似于传统数据库中的主键。

    • sorted by用于指定在Bucket内字段的排序方式。建议sorted byclustered by一致,以取得较好的性能。此外,当sorted by子句指定之后,MaxCompute将自动生成索引,并且在查询的时候利用索引来加快执行。

    • INTO number_of_buckets BUCKETS
      

      指定了哈希桶的数目。这个数字必须填写,且由数据量大小来决定。此外,缺省条件下MaxCompute只能支持最多1111个Reducer,所以此处最多也只支持1111个哈希桶。您可以使用

      set odps.sql.reducer.instances=xxx;
      

      来提升这个限制,但最大不得超过4000,否则会影响性能。

      选择哈希桶数目时,请您遵循以下两个原则:

      • 哈希桶大小适中:经验值是每个Bucket的大小在500M左右比较合理。例如,分区大小估计为500G,初略估算Bucket数目应该设为1000,这样平均每个Bucket大小约为500M。对于特别大的表,500M的限制可以突破,每个Bucket在2-3G左右比较合适。同时,可以结合set odps.sql.reducer.instances=xxx;来突破1111个桶的限制。
      • 对于需要经常join的两个表,哈希桶数目应设为一样,这样才能够优化join,省略掉Shuffle和Sort步骤。如果按照上述原则计算两个表的哈希桶数不一致,建议统一使用数字大的Bucket Number,保证合理的并发度和执行效率。
    • Hash Clustering表的优点:

      • 优化Bucket Pruning。
      • 优化Aggregation。
      • 优化存储。
    • Hash Clustering表的限制:

      • 不支持insert into,只能通过insert overwrite来添加数据。
      • 不支持Tunnel直接Upload到Range Cluster表,因为Tunnel上传数据是无序的。
    • 案例:

    CREATE TABLE T1 (a string, b string, c bigint) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS; --创建Hash Clustering非分区表。
    CREATE TABLE T1 (a string, b string, c bigint) PARTITIONED BY (dt string) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS; --创建Hash Clustering分区表。
    

    4.1.2 表其他操作

    -- 查看表
    DESC <table_name>;
    DESC extended <table_name>; --查看外部表信息。
    
    -- 查看建表语句
    SHOW CREATE TABLE <table_name>;
    
    -- 删除表
    -- 删除外部表时,OSS上的数据不会被删除。跟Hive一样意思。
    DROP TABLE [IF EXISTS] table_name;
    
    -- 重命名表
    ALTER TABLE table_name RENAME TO new_table_name;
    
    -- 修改表Owner
    ALTER TABLE table_name CHANGEOWNER TO 'ALIYUN$xxx@aliyun.com';
    
    -- 修改表的注释
    ALTER TABLE table_name SET COMMENT 'new coments for table sale_detail';
    
    -- 修改表的修改时间
    -- 此时,MaxCompute会认为表的数据有变动,生命周期的计算会重新开始。
    ALTER TABLE table_name TOUCH;
    
    -- 修改表的Hash Clustering属性
    ALTER TABLE table_name     
    [CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS]
    -- 去除表的Hash Clustering属性的语法格式如下
    ALTER TABLE table_name NOT CLUSTERED;
    
    -- 清空非分区表里的数据
    TRUNCATE TABLE table_name;
    
    -- 清空分区表里的数据
    ALTER TABLE table_name DROP PARTITION(dt = '??????');
    
    -- 强制删除表数据(分区数据),可以加purge
    DROP TABLE table_name PURGE;
    ALTER TABLE table_name DROP PARTITION(dt = '??????') PURGE;
    
    -- 备份表
    

    4.1.3 备份表的数据

    ​ 如果project 里的空间比较紧张,在想办法进行删除数据或者压缩数据,那么可以考虑 MaxCompute 里对表 的 archive 功能,效果是可以将存储空间压缩 50%左右,而且在这个过程中会将多个小文件自动的合并掉

    ​ 代价是如果某个数据块 损坏或某台机器损坏,恢复数据块的时间要比原来的方式长了,读的性能会有一定损失。

    ​ 所以现在这种功能可以用在一些冷数据的压缩存储上,比如一些非常大的日志数据,超过一定时间期限后使用 的频率非常低,但是又需要长期保存,则可以考虑用 raid file 来存储。

    ​ 语法:

    ALTER TABLE [table_name] <PARTITION(partition_name='partition_value')> ARCHIVE;
    
    -- 案例
    alter table my_log partition(ds='20140101') archive;
    

    4.2 生命周期操作

    ​ MaxCompute提供了数据生命周期管理功能,方便释放存储空间,简化回收数据的流程

    4.2.1 注意事项

    • 在MaxCompute中,每当表的数据被修改后,表的LastDataModifiedTime将会被更新,非分区表和分区表的数据是否要回收都是根据每张表的LastDataModifiedTime和设置的生命周期天数来判断的,从最后一次数据被修改开始计算,如果经过days后仍没有被改动,就会被系统自动回收(类似drop table操作)
    • 不同的是非分区表不支持取消生命周期的设置,只能修改生命周期;而分区表可以取消某个具体分区的生命周期设置,并且就算最后一个分区被回收了,这张分区表也不会被删除
    • 生命周期只能在表级别设置,不能在分区级别设置,但分区表设置生命周期后,生命周期是在分区级别生效。
    • create table …like …的方式来建表,会复制列名、列注释以及表注释等,但生命周期不会

    4.2 2 禁止生命周期

    ​ 在某些情况下有些特定的分区可能不希望被生命周期功能自动回收掉,比如一个月的月初,或者双 11 期间的数 据,在这种情况下可以禁止该分区被生命周期功能回收。

    ​ 语法:

    -- 禁止生命周期
    ALTER TABLE table_name [partition_spec] ENABLE|DISABLE LIFECYCLE;
    
    -- 示例
    ALTER TABLE trans PARTITION(dt='20141111') DISABLE LIFECYCLE;
    

    4.2.3 修改表的生命周期属性

    ​ MaxCompute 提供数据生命周期管理功能,方便用户释放存储空间。

    ​ 语法:

    -- 修改表的生命周期属性
    ALTER TABLE table_name SET LIFECYCLE DAYS;
    
    -- 案例
    ALTER TABLE test_lifecycle SET lifecycle 50;
    
    • 第一种情况,如果表是非分区表,自最后一次数据被修改开始计算,经过 days 天后数据仍未被改动,则此表无需 用户干预, 将会被 MaxCompute 自动回收(类似 drop table 操作)。在 MaxCompute 中,每当表的数据被修改后,表的LastDataModifiedTime 将会被更新,因此, MaxCompute 会根据每张表的LastDataModifiedTime 以及 lifecycle 的设 置来判断是否要回收此表。
    • 第二中情况,如果是分区表,则根据各分区的 LastDataModifiedTime 判断该分区是否该被回收。不同于非分区表,分区表的最后一个分区被回收后,该表不会被删除。

    4.3 分区和列操作

    • 注意:
      • MaxCompute SQL仅支持新增分区,不支持新增分区字段;
      • 不能删除列;
      • 添加的新列不支持指定顺序,默认在最后一列。

    4.3.1 分区操作

    -- 添加分区
    -- 对于多级分区的表,如果需要添加新的分区,必须指明全部的分区值。
    ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION(par1 = '',par2 = ''...);
    
    -- 删除分区操作
    ALTER TABLE table_name DROP [IF EXISTS] PARTITION(par1 = '',par2 = ''...);
    
    -- 修改分区值
    ALTER TABLE table_name PARTITION (par1 = '旧值',par2 = '旧值', ...) RENAME TO PARTITION (par1 = '新值',par2 = '新值', ...);
    
    -- 修改表、分区的更新时间
    ALTER TABLE table_name TOUCH PARTITION(par1 = '',...);
    

    4.3.2 列操作

    -- 添加列
    ALTER TABLE table_name ADD COLUMNS (col_name1 数据类型,col_name2 数据类型...);
    
    -- 修改列名操作
    ALTER TABLE table_name CHANGE COLUMN old_col_name RENAME TO new_col_name;
    
    -- 修改列的注释
    ALTER TABLE table_name CHANGE COLUMN col_name COMMENT comment_string;
    
    -- 同时修改列名及列注释
    ALTER TABLE table_name CHANGE COLUMN old_col_name new_col_name column_type COMMENT column_comment;
    

    4.3.3 合并分区

    ​ MaxCompute SQL提供MERGE PARTITION对分区进行合并,即同一个表下多个分区数据合并成一个分区,同时删除被合并的分区维度的信息,把数据移动到指定分区。

    • 注意:
      • 如果运行过程中出现源数据被并发修改(包括INSERTRENAMEDROP)时,即使指定IF EXISTS也会报错。
      • 不支持外部表和SHARD表,对于CLUSTERED表合并后的分区文件会消除CLUSTERED属性。
      • 一次性合并分区数量限制为4000个。

    ​ 语法:

    ALTER TABLE <tableName> MERGE [IF EXISTS] PARTITION(<predicate>) [, PARTITION(<predicate2>) ...] OVERWRITE PARTITION(<fullPartitionSpec>) [PURGE];
    
    -- 示例:
    +------------+------------+------------+------------+
    | value      | ds         | hh         | mm         |
    +------------+------------+------------+------------+
    | 1          | 20181101   | 00         | 00         |
    | 1          | 20181101   | 00         | 10         |
    | 1          | 20181101   | 10         | 00         |
    | 1          | 20181101   | 10         | 10         |
    +------------+------------+------------+------------+
    -- 合并所有满足hh='00' 的分区到hh='00',mm='00'中。
    ALTER TABLE intpstringstringstring MERGE PARTITION(hh='00') OVERWRITE PARTITION(ds='20181101', hh='00', mm='00');
    -- 合并后:
    ds=20181101/hh=00/mm=00
    ds=20181101/hh=10/mm=00
    ds=20181101/hh=10/mm=10
    
    -- MERGE PARTITIONS允许指定多个谓词条件,示例如下,指定到具体分区下,合并剩余分区。
    ALTER TABLE intpstringstringstring MERGE IF EXISTS 
    PARTITION(ds='20181101', hh='00', mm='00'), partition(ds='20181101', hh='10', mm='00'),  PARTITION(ds='20181101', hh='10', mm='10') 
    OVERWRITE PARTITION(ds='20181101', hh='00', mm='00') PURGE;
    

    4.4 视图操作

    4.4.1 创建视图

    • 注意:
      • 不允许向视图写入数据,例如使用insert into或者insert overwrite操作视图。
      • 创建视图时,您必须有对视图所引用表的读权限。

    ​ 语法:

    CREATE [OR REPLACE] VIEW [IF NOT EXISTS] view_name
        [(col_name [COMMENT col_comment], ...)]		-- 没有数据类型
        [COMMENT view_comment]
        [AS select_statement]
        
    -- 创建视图sale_detail_view。
    create view if not exists sale_detail_view
    (store_name, customer_id, price, sale_date, region)
    comment 'a view for table sale_detail'
    as select * from sale_detail;
    

    4.4.2 视图其他操作

    -- 重命名视图
    ALTER VIEW view_name RENAME TO new_view_name;
    
    -- 删除视图
    DROP VIEW [IF EXISTS] view_name;
    

    4.5 更新表数据(Insert OverWrite 和 Insert Into)

    • 语法:

      INSERT OVERWRITE|INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)] [(col1,col2 ...)]
      select_statement
      FROM from_statement;
      
    • 注意

      • MaxCompute的INSERT语法与通常使用的MySQL或Oracle的INSERT语法有差别,在INSERT OVERWRITE/INTO后需要加入TABLE关键字,而非直接使用TABLENAME
      • 当遇到并发写入时,MaxCompute会保证在冲突时只会一个作业成功,其它冲突作业失败。
      • OVERWRITE类作业可保证一致性,但INTO类作业在冲突失败后可能存在失败作业的数据残留。
      • Insert后面的partition参数不允许使用函数等表达式,只能是常量;
      • 源表与目标表的对应关系依赖于在select子句中列的顺序,而不是表与表之间列名的对应关系。
      • 向某个分区插入数据时,分区列只能出现在select的条件中。

    4.5.1 功能说明

    • insert into:直接向表或表的分区中追加数据。不支持INSERT INTO到Hash Clustering表。如果您需要插入少量测试数据,可以配合VALUES使用。
    • insert overwrite:先清空表中的原有数据,再向表或分区中插入数据。目前INSERT OVERWRITE不支持指定插入列的功能,暂时只能用INSERT INTO

    4.5.2 Insert 动态分区注意事项

    • insert into partition时,如果分区不存在,会自动创建分区。
    • 多个insert into partition作业并发时,如果分区不存在,会自动创建分区,但只会成功创建一个分区。(这点和并发写入保证只有一个作业成功一样)
    • 如果不能控制insert into partition作业并发,则只能通过预创建分区避免问题。

    4.6 多路输出-Multi Insert

    ​ MaxCompute SQL支持在一个语句中将数据插入不同的目标表或者分区中实现多路输出。

    ​ 语法:就是一个From开头,后面多个insert语句。

    FROM from_statement
    INSERT OVERWRITE | INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)]
    select_statement1 [FROM from_statement]
    [INSERT OVERWRITE | INTO TABLE tablename2 [PARTITION (partcol1=val3, partcol2=val4 ...)]
    select_statement2 [FROM from_statement]]
    
    • 注意:
      • 通常,单个SQL中最多可以写255路输出。超过255路,则报语法错误。
      • 在一个multi insert中,对于分区表,同一个目标分区不允许出现多次。
      • 在一个multi insert中,对于未分区表,该表不能出现多次。
      • 对于同一张分区表的不同分区,不能同时有insert overwriteinsert into操作,否则报错返回
    • 示例:
    --创建表sale_detail_multi。
    create table sale_detail_multi like sale_detail;
    
    --将表sale_detail中的数据插入到表sale_detail_multi。
    set odps.sql.allow.fullscan=true; //开启全表扫描,仅此session有效。
    from sale_detail
    insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' ) 
    select shop_name, customer_id, total_price 
    insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
    select shop_name, customer_id, total_price ;
    

    4.7 输出到动态分区

    4.7.1 说明

    ​ 在使用INSERT OVERWRITE语句将数据插入到分区表时,MaxCompute提供了如下两种方式:

    • 输出到静态分区:在INSERT语句中直接指定分区值,将数据插入指定的分区。
    • 输出到动态分区:在INSERT语句中不直接指定分区值,只指定分区列名。分区列的值在SELECT子句中提供,系统自动根据分区字段的值将数据插入到相应分区。(在SQL运行之前,您无法得知会产生哪些分区。只有在语句运行结束后,才能通过分区字段产生的值确定产生的分区)

    4.7.2 动态分区语法

    INSERT OVERWRITE|INTO TABLE tablename PARTITION (partcol1, partcol2 ...) --也就是这里不指定具体值
    select_statement FROM from_statement;
    

    4.7.3 动态分区注意事项

    • 使用INSERT INTO最多可以生成10000个动态分区,使用INSERT OVERWRITE最多可以生成60000个动态分区。
    • 分布式环境下,使用动态分区功能的SQL中,单个进程最多只能输出512个动态分区,否则会引发运行时异常。
    • 动态生成的分区值不允许为NULL,也不支持含特殊字符和中文,否则会引发运行时异常FAILED: ODPS-0123031:Partition exception - invalid dynamic partition value: province=xxx
    • 如果目标表有多级分区,在运行INSERT语句时允许指定部分分区为静态,但是静态分区必须是高级分区。
    • 如果目标表为Hash Clustering Table,则不支持动态分区。
    • 动态分区中,select_statement字段和目标表动态分区的对应是按字段顺序决定,并不是按照列名称决定的。
      • 动态分区插入时, 。这点和普通插入分区表不一样。
    • 在动态分区进行时,如果分区列的数据类型和select列表中类型不一致,MaxCompute能支持隐式类型转换。

    五、SELECT语句

    5.1 Select语法注意事项

    • SELECT语句查询分区表时禁止全表扫描,屏显目前最多只能显示10000行结果。

      • 如果想要执行全表扫描,需要执行以下开关:set odps.sql.allow.fullscan=true;
    • select_expr支持这样的正则表达式:

      SELECT `abc.*` FROM t;选出t表中所有列名以abc开头的列。
      SELECT `(ds)?+.+` FROM t;选出t表中列名不为ds的所有列。
      SELECT `(ds|pt)?+.+` FROM t;选出t表中排除ds和pt两列的其它列。
      SELECT `(d.*)?+.+` FROM t;选出t表中排除列名以d开头的其它列。
      
    • 在SQL解析中,GROUP BY操作先于SELECT操作,因此GROUP BY的取值是SELECT输入表的列名或者由输入表的列构成的表达式,不允许是SELECT语句的输出列的别名。

    • OFFSET 和 ORDER BY LIMIT语句配合,可以指定跳过OFFSET数目的行。

      --将src按照key从小到大排序后,输出第11到第30行(OFFSET 10指定跳过前10行,LIMIT 20指定最多输出20行)。
      SELECT * FROM src ORDER BY key LIMIT 20 OFFSET 10;
      
    • 子查询必须要有别名

    5.2 select语序

    案例一:

    SELECT  key
            ,MAX(value)
    FROM    src t
    WHERE   value > 0
    GROUP BY key
    HAVING  SUM(value) > 100
    ORDER BY key
    LIMIT   100
    ;
    
    -- 以上语句的逻辑执行顺序是FROM->WHERE->GROUY BY->HAVING->SELECT->ORDER BY->LIMIT
    
    -- 案例二
    SELECT  shop_name
            ,total_price
            ,region
    FROM    sale_detail
    WHERE   total_price > 150
    DISTRIBUTE BY region
    SORT BY region
    ;
    
    -- 以上语句的逻辑执行顺序是FROM->WHERE->SELECT->DISTRIBUTE BY->SORT BY。
    
    • ORDER BY中只能引用SELECT列表中生成的列,而非访问FROM的源表中的列。
    • HAVING可以访问GROUP BY key和聚合函数。
    • SELECT时,如果语句中有GROUP BY,便只能访问GROUP BY key和聚合函数,而不是FROM中源表中的列。

    5.3 交集、并集、补集

    交集:Intersect & Intersect distinct(交集后去重)

    并集:Union All & Union

    补集:Except All & Except distinct (即第一个数据集减去第二个数据集的结果,默认是EXCEPT DISTINCT)

    ​ 这里EXCEPT等同于MINUS

    -- EXCEPT ALL 如果第一个数据集有重复的,有多少个减多少个
    SELECT * FROM VALUES (1, 2), (1, 2), (3, 4), (3, 4), (5, 6), (7, 8) t(a, b) 
    EXCEPT ALL 
    SELECT * FROM VALUES (3, 4), (5, 6), (5, 6), (9, 10) t(a, b);
    
    -- EXCEPT DISTINCT 如果第一个数据集有重复的,那么只当做去重后的一个来减
    SELECT * FROM VALUES (1, 2), (1, 2), (3, 4), (3, 4), (5, 6), (7, 8) t(a, b) 
    EXCEPT
    SELECT * FROM VALUES (3, 4), (5, 6), (5, 6), (9, 10) t(a, b);
    

    ​ 注意,集合操作左右两个分支要求列个数必须一致。如果数据类型不一致,可能会进行隐式类型转换。

    ​ MaxCompute最多允许256个分支的集合操作,超出256个将报错。

    5.4 Join操作

    • LEFT OUTER JOIN:左连接,可简写为LEFT JOIN。返回左表中的所有记录,右表中没有与之匹配的记录显示为空。
      • 如果右表值不唯一,建议不要连续使用过多LEFT JOIN,会造成数据重复,数据膨胀导致作业停止。
    • RIGHT OUTER JOIN:右连接,可简写为RIGHT JOIN。返回右表中的所有记录,左表中没有与之匹配的记录显示为空
    • FULL OUTER JOIN:全连接,可简写为FULL JOIN。返回左右表中的所有记录。
    • INNER JOIN:内连接。关键字INNER可以省略。返回表之间相互都匹配的数据行
    • 隐式连接,即不指定JOIN关键字执行连接。就普通的写法
    • 自然连接,natural join

    5.5 Semi Join

    参考:https://help.aliyun.com/document_detail/73784.html?spm=a2c4g.11186623.6.700.2f0d3dd6JPv77w

    5.6 MapJoin Hint

    ​ 当一个大表和一个或多个小表JOIN时,可以在SELECT语句中显式指定MAPJOIN以提升查询性能。

    5.6.1 原理

    ​ 在大表和一个或多个小表JOIN的场景下,MAPJOIN会将指定的小表全部先加载到的内存中,然后在Map阶段完成大表和小表的连接从而加快JOIN的执行速度

    ​ MAPJOIN在Map阶段执行表连接,而不用等到Reduce阶段才执行表连接。这样就节省了大量数据传输的时间以及系统资源,从而起到了优化作业的作用。

    ​ 指定的表仅能为小表,且表被加载到内存后占用的总内存不得超过512 MB。

    通常情况下,JOIN操作在Reduce阶段执行表连接。整个JOIN过程包含Map、Shuffle和Reduce三个阶段。

    5.6.2 使用方法

    ​ 在SELECT语句中使用Hint提示/*+ MAPJOIN(table) */,如下:

    SELECT /* + MAPJOIN(a) */
            a.shop_name,
            b.customer_id,
            b.total_price
    FROM shop a JOIN sale_detail b
    ON a.shop_name = b.shop_name;
    

    5.6.3 使用(限制)条件

    • MAPJOIN支持小表为子查询,在引用小表或子查询时,需要引用别名。
    • LEFT OUTER JOIN的左表必须是大表,RIGHT OUTER JOIN的右表必须是大表,INNER JOIN的左表或右表均可以作为大表,FULL OUTER JOIN不能使用MAPJOIN。
    • MaxCompute在MAPJOIN中最多支持指定128张小表,否则报语法错误。MAPJOIN中多个小表用逗号隔开,例如/*+MAPJOIN(a,b,c)*/。(实际开发一般是8张)
    • 如果使用MAPJOIN,则小表占用的总内存不得超过512 MB。由于MaxCompute是压缩存储,因此小表在被加载到内存后,数据大小会急剧膨胀。此处的512 MB是指加载到内存后的空间大小。
    SELECT /* + MAPJOIN(a) */
            a.total_price,
            b.total_price
    FROM shop a JOIN sale_detail b
    ON a.total_price < b.total_price OR a.total_price + b.total_price < 500;
    

    5.7 Laterval View

    5.8 Grouping Sets

    -待补充。

    参考:https://help.aliyun.com/document_detail/92807.html?spm=a2c4g.11186623.6.704.625e8030Qazc5p

    5.9 SELECT TRANSFORM语法

    -待补充。

    参考:https://help.aliyun.com/document_detail/73719.html?spm=a2c4g.11186623.6.705.4ba67830goviwc

    5.10 With AS语法(CTE)

    ​ MaxCompute支持SQL标准的CTE,提高SQL语句的可读性与执行效率。

    ​ 语法格式:

    WITH 
         cte_name AS
        (
            cte_query
        )
        [,cte_name2  AS 
         (
         cte_query2
         )
        ,……]
    

    示例:

    -- 这样写看起来复杂
    INSERT OVERWRITE TABLE srcp PARTITION (p='abc')
    SELECT * FROM (
        SELECT a.key, b.value
        FROM (
            SELECT * FROM src WHERE key IS NOT NULL    ) a
        JOIN (
            SELECT * FROM src2 WHERE value > 0    ) b
        ON a.key = b.key
    ) c
    UNION ALL
    SELECT * FROM (
        SELECT a.key, b.value
        FROM (
            SELECT * FROM src WHERE key IS NOT NULL    ) a
        LEFT OUTER JOIN (
            SELECT * FROM src3 WHERE value > 0    ) b
        ON a.key = b.key AND b.key IS NOT NULL
    )d;
    
    -- 可以改成with as的形式比较直观。也不用反复嵌套
    with 
      a as (select * from src where key is not null),
      b as (select  * from src2 where value>0),
      c as (select * from src3 where value>0),
      d as (select a.key,b.value from a join b on a.key=b.key),
      e as (select a.key,c.value from a left outer join c on a.key=c.key and c.key is not null)
    insert overwrite table srcp partition (p='abc')
    select * from d union all select * from e;
    

    5.11 Clone Table 复制表

    -待补充。

    5.12 Explain执行计划

    -待补充。

    六、 MaxCompute和其他数据库的区别

      1. MaxCompute适合海量存储和大数据分析,不适合在线服务;
      1. MaxCompute表不支持主键、索引和字段约束,不支持事务操作。
      1. MaxCompute表不⽀持UPDATE操作和DELECT操作,只能DROP 整个表或者某一分区数据,然后用INSERT OVERWRITE/INSERT INTO写入数据。
      1. MaxCompute可以在select中使用mapjoin Hint,将指定的小表全部先加载到的内存中,然后在Map阶段完成大表和小表的连接从而加快JOIN的执行速度,提高查询速度;
      1. SELECT操作输出屏显的数据行数受限制,最大为10000条。不支持通过SQL下载数据。
      1. MaxCompute表可以指定生命周期,生命周期结束后自动清除数据;
      1. 插入语句有insert into | overwrite into,后面需加Table关键字,overwrite into写入前会清空表数据;
      1. 子查询必须要有别名,select输入表的数量不能超过16张。
      1. 数据插入表的字段映射不是根据Select的别名做的,而是根据Select的字段的顺序和表里的字段的顺序。
      1. 不能修改分区列列名,只能修改分区列对应的值,支持增加列,但是不支持删除列以及修改列的数据类型(数据类型应该可以吧)。
      1. 在MaxCompute中需要通过Tunnel、Dship工具或MaxCompute Tunnel SDK导出数据。
      1. select语句的查询结果默认使用了limit,如果希望数据量很大时返回所有记录,需要设置

    其他区别参考:https://help.aliyun.com/document_detail/51823.html?spm=a2c4g.11186623.2.19.68516f416InQIc#concept-yjp-crl-vdb --SQL使用限制

    七、函数

    7.1 内建函数

    -待补充。

    7.2 自定义函数

    -待补充。

    八、MaxCompute SQL优化

    ​ 首先优化SQL的过程,实际上就是要尽可能减少IO读取,尽可能减少计算资源的使用,尽可能减少SQL复杂度,尽可能提升运行速度。

    • (1) 建分区表:但建议分区层数不超过3层,后续查询时为了避免全表扫描需要分区裁剪,分区值尽量常量化,避免不可确定值;插入数据时尽量采用写入静态分区的方式,优化数据存储,提高运行效率;如果用动态分区,底层来说会生成较多的小文件和多个mapreduce任务,增加系统负担。
    • (2) 只select有效列,并用limit限制返回的条数。考虑这两种情况:(1)有些事实表的字段很多,多到有四五十个字段,这种表实际是可能存在的,比如企业报税的申报表,那么select * from整张表的所有字段时,数据量一大就会消耗更多查询时间;(2)第二种情况是假设事实表中存有某个复杂格式的字段,比如json格式,那么在select * from时也会耗时严重,因此select有效列是较好的习惯。
    • (3) mapjoin优化:mapjoin的原理简单说就是先将小表加载到内存中,后续大表与内存中的小表直接join连接计算(底层是大表与内存中的小表的计算在map阶段直接完成,避免了reduce阶段的计算耗时)
    • (4) 避免一些消耗资源的操作,比如:
      • 少用distinct,容易触发数据倾斜
      • count(distinct)处理的时间会很长可以转换成count()+ group by。因为如果原先只用count(distinct)来处理表的数据,只会开启一个reduce任务来完成所有的计算,而改成count/sum + group by后,除了count/sum会开启一个reduce计算之外,group by也会单独开启另一个reduce来计算,因此数据量大时可以明显提升查询速度。
      • 尽量避免Order by,order by会触发全局排序,只能单点运行,效率低,如果业务允许,可以改成distribute by + sort by
      • 将full outer join 改为left outer join + union all 并对小表使用上mapjoin。
      • 多个表join时,join顺序很重要,优先选择join结果输出小的表先关联,能有效减少中间数据量,节省IO和计算资源。
      • 读取相同源表时可以合并成一条sql,系统会优化只读取一次。
    • (5) 尽量使用内置的UDF函数和窗口函数:内置UDF在实现时做了很多优化,运行快,省资源,窗口函数本身能处理很多复杂问题。
    • (6) 尽可能保证表达式两边的数据类型一致:如果发生隐式转换容易造成精度问题,比如string和bigint都转成double来相等比较,悲观情况下,可能触发数据倾斜。这时要cast显式转换一下
    工作中还用到的:
    - 数据量达到千万级别的,尽量固化分区,不在查询sql里面进行动态分区,减少资源使用
    - sql行数较长的,尽量拆分成工作流任务,可以优化性能和排查问题的难易度。
    - 维度表要放到mapjoin里,加大资源利用率
    - 数据倾斜较高的表,先落地成临时表
    - 每日全量推送任务的,如果表级数据量较高的,分析是否可以改成增量模式。
    

    九、问题案例收集

    ​ 待补充

    参考:https://help.aliyun.com/knowledge_detail/150534.html#section-iok-hsp-b2z

    展开全文
  • MaxCompute重磅发布.pdf

    2019-08-29 06:43:35
    阿里云产品专家郭坤在2017云栖大会·北京峰会中做了题为《MaxCompute重磅发布》的分享,就Python UDF的特点,如何进入官网申请公测等方面的内容做了深入的分析。
  • 9月4日MaxCompute直播课件下载。 了解更多MaxCompute产品和技术相关内容,可扫描二维码加入“MaxCompute开发者交流”钉钉群。
  • 简介:作为业界少有的EB级数据分布式平台,MaxCompute每天支撑上千万个分布式作业的运行。这些作业特点各异,既有包含数十万计算节点的超大型作业,也有中小规模的分布式作业。不同用户对于不同规模/特点的作业,在...

    简介: 作为业界少有的EB级数据分布式平台,MaxCompute每天支撑上千万个分布式作业的运行。这些作业特点各异,既有包含数十万计算节点的超大型作业,也有中小规模的分布式作业。不同用户对于不同规模/特点的作业,在运行时间,资源使用效率,数据吞吐率等方面,也有着不同的期待。DAG作为MaxCompute执行引擎的核心技术之一,在提供了底层统一的动态执行框架的同时,实现了一个在离线混合的执行模式(Bubble Execution),达到了平衡极致性能以及高效的资源利用率的目的。

    作为业界少有的EB级别数据分布式平台,MaxCompute系统每天支撑上千万个分布式作业的运行。在这个量级的作业数目上,毫无疑问平台需要支撑的作业特点也多种多样:既有在"阿里体量"的大数据生态中独有的包含数十万计算节点的超大型作业,也有中小规模的分布式作业。同时不同用户对于不同规模/特点的作业,在运行时间,资源使用效率,数据吞吐率等方面,也有着不同的期待。

    image.png

    Fig.1 MaxCompute线上数据分析

    基于作业的不同规模,当前MaxCompute平台提供了两种不同的运行模式,下表对于这两种模式做了总结对比:

    image.png

    Fig.2 离线(batch)模式 vs 一体化调度准实时(smode)模式

    从上图可以看到,离线作业和一体化调度的准实时作业,在调度方式,数据传输,使用资源来源等多个方面,都有非常显著的区别。可以说,这两种运行方式分别代表了在海量数据场景上按需申请资源来优化吞吐量和资源利用率,以及在处理中等(少量)数据时通过计算节点的全量预拉起来(以及数据直传等手段加速)降低执行时延的两个极端。而这些区别,最终会通过执行时间和作业资源利用率等方面体现出来。很显然,以高Throughput为主要优化目标的离线模式,和以追求低Latency的准实时模式,在各方面的性能指标会有很大的区别。比如以1TB-TPCH标准benchmark为例,此报告执行时间(性能)和资源消耗两个维度来做比较。可以看到,准实时的(SMODE)在性能上有着非常明显的优势(2.3X),但是这样的性能提升也并不是没有代价的。在TPCH这个特定的场景上,一体化执行的SMODE模式,在获取了2.3X性能提升的同时,也消耗了3.2X的系统资源(cpu * time)。

    1588374237965-533f7ae9-ca0a-4b07-8bf5-3e526385cf65.png

    Fig.3 性能/资源消耗比较:离线(batch)模式 vs 一体化调度准实时(smode)模式

    这个观察结论其实并不意外,或者从某种程度上是by design的。拿下图一个典型SQL产生的DAG来看,所有计算节点都在作业提交伊始就被拉起,虽然这样的调度方式允许数据得以(在需要的时候)pipeline起来,从而可能加速数据的处理。但并不是所有的执行计划里的所有上下游计算节点都可以有理想化的pipelined dataflow。事实上对于许多作业而言,除了DAG的根节点(下图中的M节点)以外,下游的计算节点在某种程度上都存在着一定程度的浪费。

    1588374772549-e3812c1a-f26b-45cf-b9e9-9937aefc6747.png

    Fig.4 一体化调度准实时(smode)模式下,可能的资源使用低效

    这种空转造成的资源使用的低效,在数据的处理流程上存在barrier算子而无法pipeline,以及在DAG图比较深的情况下会尤为明显。当然对于希望极致优化作业运行时间的场景而言,通过更多的资源消耗,来获取极致的性能优化,在一些场景上是有其合理性的。 事实上,在一些business-critical的在线服务系统中,为了保证服务总是能迅速响应并处理峰值数据,平均个位数的CPU利用率也并非少见。但是对于计算平台这种量级的分布式系统,能否在极致性能以及高效的资源利用率之间,获取一个更好的平衡呢

    答案是肯定的。这就是我们在这里要介绍的混合计算模式:Bubble Execution

    1. Bubble Execution 概述

    DAG框架的核心架构思想,在于对执行计划的逻辑层与物理层的清晰分层设计。物理执行图是通过对逻辑图中的节点、边等的物理特性(如数据传输介质,调度时机,资源特性等)的物化来实现的。对比在Fig.2中描述的batch模式和smode模式,DAG提供了在一套灵活的调度执行框架之上,统一离线模式和准实时一体化执行模式的实现。如同下图所示,通过调整计算节点和数据连接边的不同物理特性,不仅能对现有的两种计算模式做清晰的表述,在对其进行更通用化的扩展后,还可以探索一种全新的混合运行模式,也就是Bubble Execution。

    1.png

    Fig.5 DAG框架上的多种计算模式

    直观上来理解,如果我们把一个Bubble当作一个大的调度单位,Bubble内部的资源一起申请运行,并且内部上下游节点的数据均通过网络/内存直连传输。与之相对的,Bubbles之间连接边上的数据传输,则通过落盘方式来传输。那么离线和准实时作业执行,其实可以认为是Bubble执行的两个极端场景:离线模式可以认为是每个stage都单独作为single-bubble的特例,而准实时框架则是将作业所有计算节点都规划到一个大Bubble内部,来做一体化调度执行的另一个极端。DAG AM已经将两种计算模式统一到一套调度执行infra之上。使得在两种模式上进行优点互补成为可能,为引入Bubble Execution奠定了基础。

    Bubble Execution通过灵活自适应的子图(Bubble)切割,在现有的两个极端之间,提供了一种选取更细粒度,更通用的调度执行方法,达到作业性能和资源利用率之间获取优化的tradeoff的方法。在根据输入数据量、算子特性、作业规模等信息进行分析后,DAG的Bubble执行模式可以将一个离线作业切分出多个Bubbles,在Bubble内部充分利用网络/内存直连和计算节点预热等方式提升性能。这种切分方式下,一个DAG运行图中的计算节点,可以都被切入某个Bubble,根据所在DAG中的位置被切入不同Bubbles,还可以完全不被切入任何Bubble(依然以传统离线作业模式运行)。这种高度灵活的混合运行模式,使整个作业的运行能更加灵活的自适应线上多种多样作业的特点,在实际生产中具有重要的意义:

    • Bubble模式使更多作业的加速成为可能:一体化调度的准实时作业具有基于整体规模(线上默认2000)的"一刀切"式的准入条件。这一方面是出于有限资源的公平使用,另一方面也是为了控制节点failure带来的cost。但对于中大型作业,虽然整体规模可能超过准入门限,但是其内部的不同子图,有可能是规模合适,且可以通过数据pipeline等方法来加速的。此外线上部分计算节点由于其本身的特性(比如包含UDF等用户逻辑需要安全沙箱),无法使用预热的准实时资源池执行,而当前非黑即白的模式,会使得一个作业中,只要包含一个这种计算节点,整个作业都无法使用加速模式执行。Bubble模式能较好的解决这些问题。
    • Bubble模式将enable线上两个资源池的打通:当前离线资源(cold)和准实时资源池(warm)作为两种特性不同的线上资源,完全隔离,各自管理。这种分离的现状,可能导致资源的浪费。比如对于大规模作业,因为完全无法利用准实时资源池,排队等待离线资源,而同时准实时资源池可能正处于空闲状态,反之亦然。Bubble模式能通过在作业内部拉通不同资源的混合使用,使得两者各自补充,削峰填谷。
    • Bubble模式可以整体上提高资源的利用率:从资源利用的角度来看,对于可以满足准实时模式准入的中型作业,由于准实时模式一体式调度拉起的运行模式,虽然运行速度能有所提升,但客观上会造成一定程度资源的空转与浪费(尤其是DAG图较深以及计算逻辑有barrier时)。这种情况下,按照节点数目,计算barrier等条件,将一体化模式拆解成多个Bubble。这能够有效的减少节点大量的空转消耗,而且在拆分条件合理的情况下,性能方面的损失也可以做到较低。
    • Bubble模式能有效降低单个计算节点failure带来的代价:一体化的准实时模式执行,由于其数据pipeline的特性,作业的容错粒度和其调度粒度是紧密挂钩的:都是all-in-one。也就是说,只要有一个节点运行失败,整个作业都要重新运行。因为作业规模越大,运行过程中可能有节点失败的概率也就越大,这样的failover粒度无疑也限制了其能支持的最大作业规模。而Bubble模式则提供了一个更好的平衡点:单个计算节点的失败,最多只影响同处于一个Bubble的节点。此外Bubble模式对于各种failover做了细粒度的各种处理,我们将在下文描述。

    我们可以通过标准的TPCH-1TB测试benchmark来直观评测Bubble执行模式的效果。在上层计算引擎(MaxCompute优化器以及runtime等)保持不变,并且Bubble的大小维持在500(具体Bubble切分规则下文介绍)时,做一下Bubble执行模式与标准离线模式,以及准实时模式,在性能(Latency) 以及资源消耗(cpu * time)两个方面的比较:

    1.png

    Fig.6.a 性能(Latency)比较:Bubble模式 vs 离线(batch)模式 vs 一体化调度准实时(smode)模式

    从运行时间来看,Bubble模式显然要远优于离线模式(整体2X的性能提升),而较准实时的一体化调度模式而言,Bubble的执行性能也并没有太明显的下降。当然在一些数据可以非常有效利用pipeline处理的query(比如Q5, Q8等),准实时作业还是有一定的优势。但SMODE作业在执行时间上的优势并不是没有代价的,如果同时考虑资源消耗,在下图中,我们可以看到,准实时作业的性能提升是建立在资源消耗远远大于Bubble模式的前提之上的。而Bubble在性能远优于离线模式的同时,其资源消耗,则整体上是相近的。

    1.png

    Fig.6.b 资源消耗(cpu * time)比较:

    Bubble模式 vs 离线(batch)模式 vs 一体化调度准实时(smode)模式

    综合起来看,Bubble Execution可以很好的结合batch模式和准实时模式的优点:

    • 在执行时间层面,对于TPCH测试集中的任意query,bubble模式的执行时间都比batch模式要短,整体上22个Queries总耗时缩减将近2X,接近service mode模式的耗时;
    • 在资源消耗层面,bubble模式基本上和batch模式相当,相比于service mode模式有大幅度的减少,整体缩减2.6X。

    1.png

    Fig.6.c Bubble模式与离线/准实时模式的整体比较

    值得说明的是,在上面的TPCH Benchmark比较中,我们把Bubble切分条件简单化了,也就是整体上之限制bubble的大小在500,而没有充分考虑barrier等条件,如果在切分bubble的时候进一步调优,比如对于数据可以有效pipeline起来的节点,尽量保证切分在bubble内部,那作业的执行性能和资源利用率等方面都还可以进一步得到的提升,这是我们在实际生产系统上线过程中会注重考虑的。具体上线的效果见Section 3。

    在了解了Bubble执行模式的整体设计思想与架构后,接下来展开来讲一下具体Bubble模式的实现细节,以及将这种全新的混合执行模式推上线所需要的具体工作。

    2. Bubble的切分与执行

    采用Bubble Execution的作业(以下简称Bubble作业)和传统的离线作业一样,会通过一个DAG master(aka. Application Master)来管理整个DAG的生命周期。AM负责对DAG进行合理的bubble切分,以及对应的资源申请和调度运行。整体而言,Bubble内部的计算节点,将按照计算加速度原则,包括同时使用预拉起的计算节点以及数据传输通过内存/网络直传进行pipeline加速。而不切在bubble内部的计算节点则通过经典离线模式执行,不在bubble内部的连接边(包括横跨bubble boundary的边)上的数据,均通过落盘方式进行传输。

    1.png

    Fig.7 混合Bubble执行模式

    Bubble切分方法,决定了作业的执行时间和资源利用率。需要根据计算节点的并发规模,节点内部算子属性等信息综合考虑。而在切分出bubble之后,Bubble的执行则涉及到节点的执行,与数据pipeline/barrier的shuffle方式怎么做到有机的结合,这里分开做一下描述。

    2.1 Bubble 切分原理

    Bubble Execution的核心思想在于将一个离线作业拆分成多个Bubble来执行。为了切分出有利于作业整体高效运行的bubble,有几个因素需要综合考虑:

    • 计算节点内部算子特性:对于同时拉起bubble所有计算节点的调度模式而言,数据在bubble内部的上下游节点之间能否有效的进行pipeline处理,很大程度上决定了在bubble内部,下游节点是否会因处于空转状态带来资源浪费。所以在切分bubble的逻辑中,当节点包含barrier特性的算子而可能阻塞数据的pipeline时,将考虑不将该节点与其下游切入同一个bubble。
    • 单个Bubble内部计算节点数目的多少:如同之前讨论的,一体化的资源申请/运行,当包含的计算节点过多时,可能无法申请到资源,或者即使能申请到其failure代价也可能无法控制。限定Bubble的大小,可以避免过大的一体化运行带来的负面作用。
    • 聚合计算节点,切割Bubble的迭代方向:考虑到bubble大小的限制,从上而下切分bubble与从下而上切分bubble两种方式,可能导致切分的结果的不同。对于线上大部分作业而言,处理的数据往往呈倒三角型,对应的DAG也大多数是倒三角形态,所以默认采用自底向上的算法来切割bubble,也就是从距离root vertex最远的节点开始迭代。

    在上述的几个因素中,算子的barrier属性由上层计算引擎(e.g., MaxCompute的optimizer)给出。一般而言,依赖global sort操作的算子(比如MergeJoin, SorteAggregate等),会被认为会造成数据阻塞(barrier),而基于hash特性操作的算子则对于pipeline更加友好。对于单个Bubble内部允许的计算节点数目,根据我们对线上准实时作业特点的分析和Bubble作业的实际灰度实验,选定的默认上限在500。这是一个在大多数场景下比较合理的值,既能保证比较快速的拿到全量资源,同时由于处理数据量和DoP基本成正相关关系,这个规模的bubble一般也不会出现内存超限的问题。当然这些参数和配置,均允许作业级别通过配置进行微调,同时Bubble执行框架也会后继提供作业运行期间动态实时调整的能力。

    在DAG的体系中,边连接的物理属性之一,就是边连接的上下游节点,是否有运行上的前后依赖关系。对于传统的离线模式,上下游先后运行,对应的是sequential的属性,我们称之为sequential edge。而对于bubble内部的上下游节点,是同时调度同时运行的,我们称连接这样的上下游节点的边,为concurrent edge。可以注意到,这种concurrent/sequential的物理属性,在bubble应用场景上,实际与数据的传送方式(网络/内存直传 vs 数据落盘)的物理属性是重合的(Note: 但这两种依然是分开的物理属性,比如在必要的时候concurrent edge上也可以通过数据落盘方式传送数据)。

    基于这样的分层抽象,Bubble切分算法,本质上就是尝试聚合DAG图的节点,将不满足bubble准入条件的concurrent edge还原成sequential edge的过程。最终,由concurrent edge联通的子图即为bubble。在这里我们通过一个实际的例子来展示Bubble切分算法的工作原理。假设存在下图所示的DAG图,图中的圆圈表示计算顶点(vertex),每个圆圈中的数字表示该vertex对应的实际计算节点并发度。其中V1和V3因为在作业提交初始,就因为其内部包含barrier算子,而被标注成barrier vertex。圆圈之间的连接线表示上下游的连接边(edge)。橙色线代表(初始)concurrent edge,黑色线代表sequential edge,初始状态图中的sequential edge根据barrier vertex的输出边均为sequential edge的原则确定,其他边默认均初始化为concurrent edge。

    1.png

    Fig.8 示例DAG图(初始状态)

    在这个初始DAG基础上,按照上面介绍过的整体原则,以及本章节最后描述的一些实现细节,上图描述的初始状态,可以经过多轮算法迭代,最终产生如下的Bubble切分结果。在这个结果中产生了两个Bubbles: Bubble#0 [V2, V4, V7, V8],Bubble#1 [V6, V10], 而其他的节点则被判断将使用离线模式运行。

    1.png

    Fig.9 示例DAG图Bubble切分结果

    在上图的切分过程中,自底向上的遍历vertex,并秉承如下原则:

    若当前vertex不能加入bubble,将其输入edge均还原为sequential edge(比如DAG图中的V9);

    若当前vertex能够加入bubble,执行广度优先遍历算法聚合生成bubble,先检索输入edge连接的vertex,再检索输出edge连接的,对于不能联通的vertex,将edge还原为sequential edge(比如DAG图中遍历V2的输出vertex V5时会因为total task count超过500触发edge还原)。

    而对任意一个vertex,只有当满足以下条件才能被添加到bubble中:

    • vertex和当前bubble之间不存在sequential edge连接;
    • vertex和当前bubble不存在循环依赖,即:
    • Case#1:该vertex的所有下游vertex中不存在某个vertex是当前bubble的上游
    • Case#2:该vertex的所有上游vertex中不存在某个vertex是当前bubble的下游
    • Case#3:该vertex的所有下游bubble中不存在某个vertex是当前bubble的上游
    • Case#4:该vertex的所有上游bubble中不存在某个vertex是当前bubble的下游

    注:这里的上游/下游不仅仅代表当前vertex的直接后继/前驱,也包含间接后继/前驱

    1.png

    Fig.10 切分Bubble过程可能存在循环依赖的几种场景

    而实际线上bubble的切分还会考虑到实际资源和预期运行时间等信息,比如计算节点的plan memory 是否超过一定数值,计算节点中是否包含UDF算子,生产作业中计算节点基于历史信息(HBO)的预估执行时间是否超长,等等,这里不再赘述。

    2.2 Bubble的调度与执行

    2.2.1 Bubble调度

    为了实现计算的加速,Bubble内部的计算节点的来源默认均来自常驻的预热资源池,这一点与准实时执行框架相同。与此同时我们提供了灵活的可插拔性,在必要的情况下,允许Bubble计算节点从Resource Manager当场申请(可通过配置切换)。

    从调度时机上来看,一个Bubble内部的节点调度策略与其对应的输入边特性相关,可以分成下面几种情况:

    • 不存在任何input edge的bubble root vertext(比如 Fig.9中的V2):作业一运行就被调度拉起。
    • 只有sequential edge输入bubble root vertex(比如 Fig.9中的V6):等待上游节点完成度达到配置的min fraction比例(默认为100%,即所有上游节点完成)才被调度。
    • Bubble内部的vertex(即所有输入边都是concurrent edge,比如 Fig.9中的V4, V8, V10),因为其完全是通过concurrent edge进行连接的,会自然的被与上游同时触发调度。
    • Bubble边界上存在mixed-inputs的bubble root vertex(比如 Fig.9中的V7)。这种情况需要一些特殊处理,虽然V7与V4是通过concurrent edge链接,但是由于V7的调度同时被V3通过sequential edge控制,所以事实上需要等待V3完成min-fraction后才能调度V7。对于这种场景,可以将V3的min-fraction配置为较小(甚至0)来提前触发;此外Bubble内部我们也提供了progressive调度的能力,对这种场景也会有帮助。

    比如图7中的Bubble#1,只有一条SequentialEdge外部依赖边,当V2完成后,就会触发V6 + V10(通过concurrent edge)的整体调度,从而将整个Bubble#1运行起来。

    在Bubble被触发调度后,会直接向SMODE Admin申请资源,默认使用的是一体化Gang-Scheduling(GS)的资源申请模式,在这种模式下,整个Bubble会构建一个request,发送给Admin。当Admin有足够的资源来满足这个申请时,会将,再包含预拉起worker信息的调度结果发送给bubble作业的AM。

    1.png

    Fig.11 Bubble与Admin之间的资源交互

    为了同时支持紧张资源上以及Bubble内部动态调整的场景,Bubble同时还支持Progressive的资源申请模式。这种模式允许Bubble内的每个Vertex独立申请资源和调度。对于这种申请,Admin只要有增量的资源调度即会将结果发送给AM,直到对应Vertex的request完全满足。对于这种场景上的独特应用这里暂时不做展开。

    在准实时执行框架升级后,SMODE服务中的资源管理(Admin)和多DAG作业管理逻辑(MultiJobManager)已经解耦,因此bubble模式中的资源申请逻辑,只需要和Admin进行交互,而不会对于正常准实时作业的DAG执行管理逻辑带来任何影响。另外,为了支持线上灰度热升级能力,Admin管理的资源池中的每个常驻计算节点均通过Agent+多Labor模式运行,在调度具体资源时,还会根据AM版本,进行worker版本的匹配,并调度满足条件的labor给Bubble作业。

    2.2.2 Bubble数据Shuffle

    对于穿越Bubble bourndary上的sequential edge,其上传输的数据和普通离线作业相同,都是通过落盘的方式来进行数据传输。这里我们主要讨论在Bubble内部的数据传输方式。根据之前描述的作业bubble切分原则,bubble内部的通常具备充分的数据pipeline特性,且数据量不大。因此对于bubble内部concurrent edge上的数据,均采用执行速度最快的网络/内存直传方式来进行shuffle。

    这其中网络shuffle的方式和经典的准实时作业相同,通过上游节点和下游节点之间建立TCP链接,进行网络直连发送数据。这种push-based的网络传送数据方式,要求上下游必须同时拉起,根据链式的依赖传递,这种网络push模式强依赖于Gang-Scheduling,此外在容错,长尾规避等问题上也限制了bubble的灵活性。

    为了更好的解决以上问题,在Bubble模式上,探索了内存shuffle模式。在这一模式下,上游节点将数据直接写到集群ShuffleAgent(SA)的内存中,而下游节点则从SA中读取数据。内存shuffle模式的容错,扩展,包括在内存不够的时候将部分数据异步落盘保证更高的可用性等能力,由ShuffleService独立提供。这种模式可以同时支持Gang-Scheduling/Progressive两种调度模式,也使其具备了较强的可扩展性,比如可以通过SA Locality调度实现更多的Local数据读取,通过基于血缘的instance level retry实现粒度更精细的容错机制等等。

    1.png

    Fig.12 Network Shuffle VS Memory Shuffle

    鉴于内存shuffle提供的诸多可扩展优势,这也是线上Bubble作业选用的默认shuffle方式,而网络直传则作为备选方案,允许在容错代价很小的超小规模作业上,通过配置使用。

    2.3 Fault-Tolerance

    作为一种全新的混合执行模式,Bubble执行探索了在离线作业和一体化调度的准实时作业间的各种细粒度平衡。在线上复杂的集群中,运行过程中各种各样的失败在所难免。而bubble这种全新模式下,为了保证失败的影响最小,并在可靠性和作业性能之间取得最佳的平衡,其对于失败处理的策略也更加的多样化。

    针对不同的异常问题,我们设计了各种针对性容错策略,通过各种从细到粗的力度,处理执行过程中可能涉及的各种异常场景处理,比如:向admin申请资源失败、bubble中的task执行失败(bubble-rerun)、bubble多次执行失败的回退(bubble-renew),执行过程中AM发生failover等等。

    2.3.1 Bubble Rerun

    目前Bubble在内部计算节点失败时,默认采用的retry策略是rerun bubble。即当bubble内的某个节点的本次执行(attempt)失败,会立即rerun整个bubble,取消正在执行的同一版本的attempt。在归还资源的同时,触发bubble重新执行。通过这种方式,保证bubble内所有计算节点对应的(retry) attempt版本一致。

    触发bubble rerun的场景有很多,比较常见的有以下几种:

    • Instance Failed:计算节点执行失败,通常由上层引擎的runtime错误触发(比如抛出retryable-exception)。
    • Resource Revoked:在线上生产环境,有很多种场景会导致资源节点重启。比如所在的机器整机oom、机器被加黑等。在worker被杀之后,重启之后的worker会依照最初的启动参数重新连回admin。此时,admin会将这个worker重启的消息封装成Resource Revoked发送给对应的AM,触发bubble rerun。
    • Admin Failover: 由于Bubble作业所使用的计算资源来自于SMODE的admin资源池,当admin由于某些原因Failover,或者SMODE整体服务被重启时,分配给AM的计算节点会被停止。Admin在Failover之后不感知当前各个节点被分配的AM信息,无法将这些重启的消息发送给AM。目前的处理方法是,每个AM订阅了admin对应的nuwa,在admin重启之后会更新这个文件. AM感知到信息更新后,会触发对应的taskAttempt Failed,从而rerun bubble。
    • Input Read Error:在计算节点执行时,读不到上游数据是一个很常见的错误,对于bubble来说,这个错误实际上有三种不同的类型:
    • Bubble内的InputReadError:由于shuffle数据源也在bubble内,在rerun bubble时,对应上游task也会重跑。不需要再做针对性的处理。
    • Bubble边界处的InputReadError: shuffle数据源是上游离线vertex(或也可能是另一个bubble)中的task产生,InputReadError会触发上游的task重跑,当前bubble rerun之后会被delay住,直到上游血缘(lineage)的新版本数据全部ready之后再触发调度。
    • Bubble下游的InputReadError: 如果bubble下游的task出现了InputReadError,这个事件会触发bubble内的某个task重跑,此时由于该task依赖的内存shuffle数据已经被释放,会触发整个bubble rerun。

    2.3.2 Bubble Renew

    在Admin资源紧张时, Bubble从Admin的资源申请可能等因为等待而超时。在一些异常情况下,比如bubble申请资源时刚好onlinejob服务处于重启的间隔,也会出现申请资源失败的情况。在这种情况下,bubble内所有vertex都将回退成纯离线vertex状态执行。此外对于rerun次数超过上限的bubble,也会触发bubble renew。在bubble renew发生后,其内部所有边都还原成sequential edge,并在所有vertex重新初始化之后,通过回放内部所有调度状态机触发事件,重新以纯离线的方式触发这些vertex的内部状态转换。确保当前bubble内的所有vertex在回退后,均会以经典离线的模式执行,从而有效的保障了作业能够正常terminated。

    1.png

    Fig. 13 Bubble Renew

    2.3.3 Bubble AM Failover

    对于正常的离线作业,在DAG框架中,每个计算节点相关的内部调度事件都会被持久化存储,方便做计算节点级别的增量failover。但是对于bubble作业来说,如果在bubble执行过程发生了AM failover重启,通过存储事件的replay来恢复出的bubble,有可能恢复到running的中间状态。然而由于内部shuffle数据可能存储在内存而丢失,恢复成中间running状态的bubble内未完成的计算节点,会因读取不到上游shuffle数据而立刻失败。

    这本质上是因为在Gang-Scheduled Bubble的场景上,bubble整体是作为failover的最小粒度存在的,所以一旦发生AM的failover,恢复粒度也应该在bubble这个层面上。所以对于bubble相关的所有调度事件,在运行中都会被当作一个整体,同时当bubble开始和结束的时候分别刷出bubbleStartedEvent和bubbleFInishedEvent。一个bubble所有相关的events在failover后恢复时会被作为一个整体,只有结尾的bubbleFInishedEvent才表示这个bubble可以被认为完全结束,否则将重跑整个bubble。

    比如在下图这个例子中,DAG中包含两个Bubble(Bubble#0: {V1, V2}, Bubble#1: {V3, V4}),在发生AM重启时,Bubble#0已经TERMINATED,并且写出BubbleFinishedEvent。而Bubble#1中的V3也已经Terminated,但是V4处于Running状态,整个Bubble #1并没有到达终态。AM recover之后,V1,V2会恢复为Terminated状态,而Bubble#1会重头开始执行。

    2.png

    Fig 14. AM Failover with Bubbles

    3. 上线效果

    当前Bubble模式已经在公共云全量上线,SQL作业中34%执行Bubble,日均执行包含176K个Bubble。

    我们针对signature相同的query在bubble execution关闭和打开时进行对比,我们发现在整体的资源消耗基本不变的基础上,作业的执行性能提升了34%,每秒处理的数据量提升了54%。

    image.png

    image.png

    Fig 15. 执行性能/资源消耗对比

    除了整体的对比之外,我们针对VIP用户也进行了针对性的分析,用户Project在打开了Bubble开关之后(下图中红色标记的点为打开Bubble的时间点),作业的平均执行性能有非常明显的提升。

    image.png

    Fig 16. VIP用户开启Bubble后平均执行时间对比

    原文链接

    本文为阿里云原创内容,未经允许不得转载。 

    展开全文
  • 本篇主要通过五个部分介绍MaxCompute Tunnel MaxCompute Tunnel技术原理 MaxCompute Tunnel丰富的生态 Tunnel功能简介 SDK的使用方式 最佳实践 一、MaxCompute Tunnel技术原理 上图是架构图,可以看到对外...
  • MaxCompute Java SDK介绍

    2020-04-27 20:04:56
    MaxCompute Java SDK介绍 MaxCompute提供了Java SDK,可以对实例、资源、表、函数等几个方面进行操作。使用SDK调用MaxCompute产生的计算、存储等费用与直接使用MaxCompute产生的费用一致。较为常用的MaxCompute核心...
  • MaxCompute技术公开课第四季之如何将Kafka数据同步至MaxCompute.pdf
  • MaxCompute常用命令
  • MaxCompute数据开发快速入门 该快速入门演示一个使用MaxCompute对银行贷款购房人员进行分析的完整过程。包含如下步骤: 创建并查看表 导入数据 运行SQL并导出数据 编写MapReduce 在进行该部分内容前,需要完成...
  • maxcompute的时间日期函数 DATEADD 按照指定的单位和幅度修改日期值 datetime dateadd(date|datetime|timestamp <date>, bigint <delta>, string <datepart>) 参数说明 date 必填。待...
  • 简介: MaxCompute 是面向分析的企业级 SaaS 模式云数据仓库,以 Serverless 架构提供快速、全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您可以经济并高效的...
  • MaxCompute基本概念和数据类型

    千次阅读 2020-04-17 11:17:36
    MaxCompute基本概念 MaxCompute的核心概念主要包括:项目、表、分区、生命周期、资源、函数、任务、任务实例(实例)、ACID语义等。MaxComopute常用术语表参见:MaxCompute术语表。 项目(Project) 项目...
  • MaxCompute赋能人工智能
  • 本资料为阿里巴巴计算平台事业部高级技术专家少杰在云栖大讲堂数据智能技术论坛上的演讲PPT。
  • MaxCompute高级专家 艺卓在2017杭州云栖大会中做了题为《MaxCompute对开源系统的支持与融合》的分享,就MaxCompute 和开源,开源的协议及工具,开源的编程接口做了深入的分析。
  • MaxCompute Spark开发指南

    千次阅读 2019-03-11 15:54:22
    MaxCompute Spark是MaxCompute提供的兼容开源的Spark计算服务,它在统一的计算资源和数据集权限体系之上,提供Spark计算框架,支持用户以熟悉的开发使用方式提交运行Spark作业,以满足更丰富的数据处理分析场景。...
  • 简介:本文主要介绍如何进行MaxCompute存储资源和计算资源的评估及规划管理。 一、MaxCompute资源规划背景介绍 MaxCompute资源主要有两类:存储资源、计算资源(包含cpu和内存)。存储资源用于存储MaxCompute的库表...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,963
精华内容 3,185
关键字:

Maxcompute