精华内容
下载资源
问答
  • 本标准规定了移动通信终端与电源适配器 之间实施快速充电的接口及通信协议, 以及终端、 适配器、 线缆、 电池的通用技术要求和测试方法, 以及对快速充电系统中的终端、适配器、线缆和电池在普通充电模式下和快速...
  • YDB 143.2-2014 面向政务的云服务第1部分:云主机服务要求;第2部分:对象存储服务要求;第3部分:云数据库服务要求;第4部分:应用托管容器服务要求;第5部分:网站托管服务要求
  • YDB之大数据时代

    2017-07-06 15:54:25
    YDB之大数据时代是现在大数据发展的前沿科技
  • YDB144-2014云计算服务协议参考框架
  • YDB编程指南

    2018-01-12 11:43:39
    延云YDB安装与使用说明书超千亿规模的数据,数据库根本就运行不了,怎么办? 数据从产生到能够查询,要延迟一天才能看到,如何能做到分钟级延迟?
  • YDB 165-2016 面向物联网的蜂窝窄带接入(NB-IoT) 无线网总体技术要求 通信行业的参考标准,规范了通信物联网的蜂窝窄带接入(NB-IOT)网络的架构,规定了物理层,高层,及PRC层的技术要求及移动管理性,本标准使用...
  • YDB编程指南-预览版

    2017-01-03 15:23:27
    YDB全称延云YDB,是一个基于Hadoop分布式架构下的实时的、多维的、交互式的查询、统计、分析引擎,具有万亿数据规模下的秒级性能表现,并具备企业级的稳定可靠表现。 YDB是一个细粒度的索引,精确粒度的索引。数据...
  • 延云YDB安装与使用说明书 超千亿规模的数据,数据库根本就运行不了,怎么办? 数据从产生到能够查询,要延迟一天才能看到,如何能做到分钟级延迟? 50台规模的hadoop集群,几亿条数据,一个MR任务要运行几小时,...
  • YDB基本使用详解

    千次阅读 2017-02-20 11:02:21
    第七章YDB基本使用详解 一、如何与YDB对接(交互) 目前延云YDB提供如下几种方式 l命令行的方式 lWeb http接口的方式 lJDBC接口的方式 通过Java编程接入 通过可视化SQL分析统计接入 通过...

    第七章YDB基本使用详解

    一、如何与YDB对接(交互)

    目前延云YDB提供如下几种方式

    l命令行的方式

    lWeb http接口的方式

    lJDBC接口的方式

    通过Java编程接入

    通过可视化SQL分析统计接入

    通过报表分析工具接入

    二、命令行接入

    进入ya100的安装目录的bin目录

    1.直接执行 ./conn.sh 即可。


    2.通过./sql.sh -f xxx.sql 直接执行文件中的SQL

     

    三、WEB接入

    WEB接口主要是为了给那些不支持HDBC访问的程序提供接口支持,如PHP

    1.图形SQL 提交地址

    http://xxx.xx.xx.xx:1210/sparkview


    2.Json接口

    http://xxx.xx.xx.xx:1210/sparksql?sql=?

    SQL参数可以GET方式提交,也可以POST方式提交


    四、JDBC接口

    1.JDBC接入方式连接字符串

    Connection conn = DriverManager.getConnection("jdbc:hive2://ydbmaster:10009/default", "hdfs", "");

     

    l10009表示JDBC的端口号,配置的值在ya100_evn.sh里面可以找到


    lhdfs表示连接时所使用的Hadoop账号,大家也要跟配置文件中一致,以免其他未知账号产生垃圾文件没有及时的清理掉,以及造成Hadoop权限问题。

    这个账号的配置目前存在两个位置,请大家配置一致,使用同一个账号。




    2.JAVA编程接口

                      Class.forName("org.apache.Hive.jdbc.HiveDriver");

                      Connection conn = DriverManager.getConnection("jdbc:hive2://ydbmaster:10009/default", "hdfs", "");

                      Statement smst = conn.createStatement();

                      ResultSet rs = smst.executeQuery("/*ydb.pushdown('->')*/ select * from ydb_example_shu where ydbpartion = '3000w' limit 10 /*('<-')pushdown.ydb*/");

                      ResultSetMetaData m = rs.getMetaData();

                      int columns = m.getColumnCount();

                      for (int i = 1; i <= columns; i++) {

                               System.out.print(m.getColumnName(i));

                               System.out.print("\t\t");

                      }

                      while (rs.next()) {

                               for (int i = 1; i <= columns; i++) {

                                        System.out.print(rs.getString(i));

                                        System.out.print("\t\t");

                               }

                               System.out.println();

                      }

                      rs.close();

                      conn.close();

    依赖的JDBC客户端jar包可以从这个地址获取,本质上就是HIVE的thrift接口,依赖的jar包也是Hive的jar包

    http://url.cn/42R4CG8



     

     

    3.通过可视化SQL分析统计接入

    SQL分析工具有很多,只要支持HIVE接口即可,免费的有Squirrel、收费的有DbVisualizer



    4.通过报表分析工具接入

    通过可视化报表分析工具,可以极大的提高程序开发的效率,只要是支持HIVE接口的可视化报表工具,都可以与YDB集成,下面以帆软报表为例。

     





     

    五、YDB分区

    1.关于分区的说明

           随着时间的日积月累,单个索引会越来越大,从而导致系统瓶颈。YDB不会将全部的数据都完整的创建在一个索引中,YDB会对数据进行分区,分区的方式由用户来定义,可以按照日期分区,也可以按照某些固定的HASH方式来分区。

           一条数据的分区,在导入的时候直接指定,具体请参考后面的数据导入用法。

           如果按照日期进行分区,每天就会生成一个分区,如需查询哪天的数据,就去对应的分区中检索,其他的分区则闲置。

           YDB 的SQL需要通过ydbpartion来指定分区; SQL查询必须要设置分区,而且要写在SQL的最外层。

           如果没有指定ydbpartion分区的查询,ydb表默认会去查询 " ydb_default_partion" 这个分区,也就是说,如果我们真的不想进行数据分区,不想在sql上添加ydbpartion的条件来指定分区,那么请将数据都导入到 " ydb_default_partion"这个分区里面。

     

    设置分区例子如下:

     ydbpartion ='20140928'

     ydbpartion in ('20140928','20140927')

    目前不支持大于等于,小于等于的范围指定分区,仅支持等于与in的方式。

     

    2.关于分区的数量与粒度,控制多少比较好?

    l如果我们的数据可以按照时间进行切分,是不是切分的越细越好?

           很遗憾,YDB并不适合特别多的分区,分区越多代表索引文件越多

    1)YDB中打开一个索引是有很大的开销的,打开一个索引加载的列的信息、索引的BlockTree的相关主干节点等,需要消耗较多的内存,而且要持久化到内存里去维护这个索引的状态。这就是为什么大家会发现,对于一个表第一次查询会比较慢,但是我们进行一次count以后,在进行别的查询就会快很多。

    2)YDB在一个进程里能够打开的索引数量是有限的,如果超过了打开的索引文件数量,那么就要关闭一些索引,以保证内存不会OOM。

    3)小文件太多,对HDFS的NameNode的压力较大。

     

    l那么分区粒度控制在多少为好?

    基本原则就是在避免索引频繁的打开与关闭的情况下,索引粒度越小越好。

    1)如果我们的数量不是很大,一年加在一起还不到10亿,那么我就建议采用按年分区。

    2)如果我们的数据处于中等,每月的数据增量为1亿左右,那么我们建议按照季度分区。

    3)如果我们的数据每天写入量特别大,如果按照月份分区,单个索引太大会造成写入瓶颈,那么我们建议按照天进行分区。

    很多时候我们还可以根据不同的查询方式,采用两种粒度的分区

    1)最近一两天的数据经常被查询,我们最近3天的数据按照天进行分区

    2)但是偶尔也会发生查询整年的数据,如果采用按天分区的话,一次打开的索引太多,那么我们可以再加一个按照季度的分区。

    3)按天的数据分区只保存最近7天的数据,超过7天的数据会通过insert的方式归档按照季度的分区里。

          

     

    六、YDB的数据类型

    1.基本类型

    基本类型的存储方式均为 按列存储

    YDB类型

    只索引

    只存储

    Hive类型

    解释

    string

    synn

    ---

    string

    字符串类型,该类型不分词,通常用来存储比较短的字符串,如类目

    tint

    tiynn

    ---

    int

    整形32位-适合大范围的range过滤查询

    tlong

    tlynn

    ---

    bigint

    整形64位-适合大范围的range过滤查询

    tdouble

    tdynn

    ---

    double

    Double类型-适合大范围的range过滤查询

    tfloat

    tfynn

    ---

    float

    Float类型-适合大范围的range过滤查询

    int

    iynn

    ---

    int

    整形32位,占用存储空间少,但是范围查找性能低

    long

    lynn

    ---

    bigint

    整形64位,占用存储空间少,但是范围查找性能低

    double

    dynn

    ---

    double

    Double类型,占用存储空间少,但是范围查找性能低

    float

    fynn

    ---

    float

    Float类型,占用存储空间少,但是范围查找性能低

    geopoint

    ---

    ---

    bigint

    用于地理位置搜索-使用方法详见《26.地理位置感知搜索.txt》

     

    2.分词类型

           分词( Word Segmentation) 指的是将一个词字序列切分成一个一个单独的词。分词就是将连续的词序列按照一定的规范重新组合成词序列的过程.

           分词类型,均为按行存储,在YDB中可以进行模糊检索,但是不能在SQL里面进行group by(YSQL函数以外是可以的)。

     

    YDB类型

    只索引

    只存储

    Hive类型

    解释

    simpletext

    simpletextyn

    simpletextny

    string

     ydb内置的普通文本分词 采用1~3元分词

    haoma

    haomayn

    haomany

    string

    ydb内置的适合号码类型的分词,采用3~5元分词实现,分词粒度为char

    chepai

    chepaiyn

    chepainy

    string

    ydb内置的适合号码类型的分词,采用2~5元分词实现,分词粒度为char

    text

    tyn

    tny

    string

    为lucene默认的standard分词,在(处理手机号,邮箱,IP地址,网址等中英文与字典组合的数据上 不准确,请慎用)

    cjkyy

    cjkyn

    cjkny

    string

    为lucene默认的cjk分词即二元分词 (处理手机号,邮箱,IP地址,网址等中英文与字典组合的数据上 不准确,请慎用)

     

    以下类型除了分词外,还保存了分词后的词的顺序 ,可以进行顺序匹配

     

    YDB类型

    只索引

    只存储

    Hive类型

    解释

    charlike

    ---

    ---

    string

    按照字符char 1~5元分词 (效果较好,term区分了词元,适合车牌,手机号类型的较短文本)

    wordlike

    ---

    ---

    string

    按字与词 1~3元分词 (效果较好,term区分了词元,适合文本类型)

    pchepai

    ---

    ---

    string

    按照字符char 2~5元分词

    phaoma

    ---

    ---

    string

    按照字符char 3~5元分词

    psimpletext

    ---

    ---

    string

    按字与词 1~3元分词

    pyy

    pyn

    pny

    string

    lucene的cjk分词,中文采用二元分词,英文与数字采用 单字分词

     

    3.多值列类型

    有些时候,我们想在一个列里面存储多个值的时候,就可以考虑使用多值列了

    比如说,可以将一个人 的多个标签值 存储在一个记录里面,一个人的每天的行为数据 放在一个记录里面。

    一定要注意,

    1.字符串类型的多值列,返回的值的无序,并且是排重的,故这块有额外注意。

    2.数值型的则是有序的(与导入的顺序一致),并且是没有排重的。

    3.传递的数值是按照空格拆分的,如 11 22 33 44 

    4.如果传递的是空值,会当做null处理

    多值列所有数据类型均为按列存储

     

    YDB类型

    Hive类型

    解释

    mt_syn

    string

     string类型的多值列

    mt_tlyn

    string

    tlong类型的多值列

    mt_lyn

    string

    long类型的多值列

    mt_tdyn

    string

    tdouble类型的多值列

    mt_dyn

    string

    double类型的多值列

    mt_iyn

    string

    int类型的多值列

    mt_tiyn

    string

    tint类型的多值列

    mt_fyn

    string

    float类型的多值列

    mt_tfyn

    string

    tfolat类型的多值列

     


     

     

    七、创建YDB表

    /*ydb.pushdown('->')*/

    create table ydb_example_shu(

    phonenum long,

    usernick string,

    ydb_sex string,

    ydb_province string,

    ydb_grade string,

    ydb_age string,

    ydb_blood string,

    ydb_zhiye string,

    ydb_earn string,

    ydb_prefer string,

    ydb_consume string,

    ydb_day string,

    amtdouble tdouble,

    amtlong int,

    content textcjk

    )

    /*('<-')pushdown.ydb*/


     

     

    八、HIVE表中的数据导入到YDB

    通过ydbpartion表向YDB中导入数据,下面示例中的ydb_example_shu为YDB表的表名,3000w为YDB表的分区名。

    1.直接追加数据

    insert into  table ydbpartion

    select 'ydb_example_shu', '3000w', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_txt;

     

    2.覆盖数据

    insert overwrite table  ydbpartion

     select 'ydb_example_shu', '3000w', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_txt;

     

    3.在追加数据前,先执行按条件删除

    insert into  table ydbpartion

     select 'ydb_example_shu', '3000w', 'ydb_sex='男'  and ydb_blood='A'',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_txt;

     


      

    4.HIVE表数据导入优化-控制并发数

    #######为什么要控制并发数############

    1)启动时候的Map数量不容易控制,如果启动的map数量很多,而Spark又没有容量调度器,会占满所有的资源,影响查询。

    2)所以很多时候我们的业务期望,在进行数据导入的时候,不要启动太多的Map数量,而是希望留出一部分资源,能让给查询,于是控制Map数量就显得特别重要了。

    3)我们导入数据,倾向于数据能更均衡一些,这样查询的时候,不会因为数据倾斜而影响性能。

    4)针对大量小文件,Spark并没有像Hive那样使用了combine inputformat ,合并map查询,这样会导致启动的map数量很多,我们希望依然采用Hive那种能够将一些小的Map进行合并。

     

    YDB提供了combine的方法,用来解决上述问题

    类名为cn.NET.ycloud.ydb.handle.YdbCombineInputFormat (旧版名字为:cn.Net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat)

     

    1)####文本形式的示例####

    drop table ydb_import_txt;

    CREATE external  table ydb_import_txt(

    phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string,multyvalue string

    )

    row format delimited fields terminated by ','

    stored as

        INPUTFORMAT 'cn.net.ycloud.ydb.handle.YdbCombineInputFormat'

        OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

    location '/data/example/ydb'

    TBLPROPERTIES (

        'ydb.combine.input.format.raw.format'='org.apache.hadoop.mapred.TextInputFormat'

    );

    select count(*) from ydb_import_txt limit 10;

     

    insert overwrite table  ydbpartion

    select 'ydb_example_shu', 'txt', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_txt;

     

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion = 'txt'

    /*('<-')pushdown.ydb*/

    ;

     

     

    2)####RCFILE格式示例####

    drop table ydb_import_rcfile;

     

    CREATE external  table ydb_import_rcfile(

    phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

    )

    ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'

    STORED AS

        INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

        OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

    TBLPROPERTIES (

        'ydb.combine.input.format.raw.format'='org.apache.hadoop.hive.ql.io.RCFileInputFormat'

    );

     

    insert overwrite  table ydb_import_rcfile select * from ydb_import_txt;

    select count(*) from ydb_import_rcfile limit 10;

     

    insert overwrite table  ydbpartion

    select 'ydb_example_shu', 'rcfile', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_rcfile;

     

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion = 'rcfile'

    /*('<-')pushdown.ydb*/

    ;

     

     

     

    3)####SEQUENCEFILE格式示例####

    drop table ydb_import_sequencefile;

     

    CREATE external  table ydb_import_sequencefile(

    phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

    )

    ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'

    STORED AS

        INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

        OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'

    TBLPROPERTIES (

        'ydb.combine.input.format.raw.format'='org.apache.hadoop.mapred.SequenceFileInputFormat'

    );

     

    SET hive.exec.compress.output=true;

    SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;

    SET mapred.output.compression.type=BLOCK;

    insert overwrite  table ydb_import_sequencefile select * from ydb_import_txt;

    select count(*) from ydb_import_sequencefile limit 10;

     

    insert overwrite table  ydbpartion

    select 'ydb_example_shu', 'sequencefile', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_sequencefile;

     

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion = 'sequencefile'

    /*('<-')pushdown.ydb*/

     

    4)####PARQUET格式示例####

    ###Spark内部对SERDE含有Parquet格式的类名进行了特殊处理,会导致设置的inputformat不生效,所以YDB也特殊处理下,就换成不含有Parquet的名字

    drop table ydb_import_parquet;

    CREATE external  table ydb_import_parquet(

    phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

    )

    ROW FORMAT SERDE  'cn.net.ycloud.ydb.handle.combine.YdbParHiveSerDe'

    STORED AS

        INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

        OUTPUTFORMAT   'cn.net.ycloud.ydb.handle.combine.YdbParMapredParquetOutputFormat'

    TBLPROPERTIES (

        'ydb.combine.input.format.raw.format'='org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'

    );

     

    set parquet.block.size=16777216;

    insert overwrite  table ydb_import_parquet select * from ydb_import_txt;

    select count(*) from ydb_import_parquet limit 10;

    insert overwrite table  ydbpartion

    select 'ydb_example_shu', 'parquet', '',

        YROW(

            'phonenum',phonenum,

            'usernick',usernick,

            'ydb_sex',ydb_sex,

            'ydb_province',ydb_province,

            'ydb_grade',ydb_grade,

            'ydb_age',ydb_age,

            'ydb_blood',ydb_blood,

            'ydb_zhiye',ydb_zhiye,

            'ydb_earn',ydb_earn,

            'ydb_prefer',ydb_prefer,

            'ydb_consume',ydb_consume,

            'ydb_day',ydb_day,

            'amtdouble',amtdouble,

            'amtlong',amtlong,

            'content',content

        )

    from ydb_import_parquet;

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion = 'parquet'

    /*('<-')pushdown.ydb*/

     

     

     

    九、YDB 查询SQL 写法

    注意YDB的表强制必须指定分区

    为了区分YDB表与Hive表,YDB语句需要使用

    /*ydb.pushdown('->')*/ 与 /*('<-')pushdown.ydb*/ 前后包含起来,以方便解析

    1.基本示例

    ----count(*)计数

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion = '2015'

    /*('<-')pushdown.ydb*/ ;

     

    ----数据预览

    /*ydb.pushdown('->')*/

    select * from ydb_example_shu where ydbpartion = '3000w' limit 10

    /*('<-')pushdown.ydb*/;

     

    ----全文检索

    /*ydb.pushdown('->')*/

    select content,usernick from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' limit 10

    /*('<-')pushdown.ydb*/;

     

    ----多个条件组合过滤

    /*ydb.pushdown('->')*/

    select ydb_sex,ydb_grade,ydb_age,ydb_blood,amtlong from ydb_example_shu where ydbpartion = '3000w' and ydb_sex='女' and ydb_grade='本科' and (ydb_age='20到30岁' or ydb_blood='O') and  (amtlong like '([3000 TO 4000] )') limit 10

    /*('<-')pushdown.ydb*/;

     

    ----sum求和

    /*ydb.pushdown('->')*/

    select sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w'

    /*('<-')pushdown.ydb*/;

     

    ----avg求平均数

    /*ydb.pushdown('->')*/

    select avg(amtdouble) as avgamt from ydb_example_shu where ydbpartion = '3000w'

    /*('<-')pushdown.ydb*/;

     

    ----更复杂点的统计

    /*ydb.pushdown('->')*/

    select count(*),count(amtdouble),avg(amtdouble),sum(amtdouble),min(amtdouble),max(amtdouble)

    ,min(ydb_province),max(ydb_province) from ydb_example_shu where ydbpartion = '3000w'

    /*(‘<-’)pushdown.ydb*/;

     

    ----单列group by

    /*ydb.pushdown('->')*/

    select ydb_sex,count(*),count(amtdouble),sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex limit 10

    /*('<-')pushdown.ydb*/;

     

    ----多列group by

    /*ydb.pushdown('->')*/

    select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble),sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province order by cnt desc limit 10

    /*('<-')pushdown.ydb*/;

     

    ----top N 排序

    /*ydb.pushdown('->')*/

    select ydb_sex, phonenum,amtlong,amtdouble

     from ydb_example_shu where ydbpartion='3000w'  order by amtdouble desc ,amtlong limit 10

    /*('<-')pushdown.ydb*/;

     

    2.YDB特有的BlockSort排序(排序大跃进)

       按照时间逆序排序可以说是很多日志系统的硬指标。在延云YDB系统中,我们改变了传统的暴力排序方式,通过索引技术,可以超快对数据进行单列排序,不需要全表暴力扫描,这个技术我们称之为BlockSort,目前支持tlongtdoubletinttfloat四种数据类型。

       由于BlockSort是借助搜索的索引来实现的,所以采用blockSort的排序,不需要暴力扫描,性能有大幅度的提升。

       BlockSort的排序,并非是预计算的方式,可以全表进行排序,也可以基于任意的过滤筛选条件进行过滤排序。

     

     

    正常写法

    blockSort写法

    单列升序

    /*ydb.pushdown('->')*/

    select tradetime, nickname from blocksort_ydb

     order by tradetime limit 10

    /*('<-')pushdown.ydb*/;

     

    /*ydb.pushdown('->')*/

    select tradetime, nickname from blocksort_ydb where

        ydbkv='blocksort.field:tradetime' and 
        ydbkv='blocksort.desc:false' and

        ydbkv='blocksort.limit:10'

         order by tradetime limit 10

    /*('<-')pushdown.ydb*/;

    单列降序

    /*ydb.pushdown('->')*/

    select tradetime, nickname from blocksort_ydb

     order by tradetime desc limit 10

    /*('<-')pushdown.ydb*/;

    /*ydb.pushdown('->')*/

    select tradetime, nickname from blocksort_ydb where

        ydbkv='blocksort.field:tradetime' and

        ydbkv='blocksort.limit:10' and

        ydbkv='blocksort.desc:true'

         order by tradetime desc limit 10

    /*('<-')pushdown.ydb*/;

    3.数据导出

     

    ----导出数据到hive表

    insert overwrite table ydb_import_importtest

    /*ydb.pushdown('->')*/

    select  phonenum,usernick,ydb_sex,ydb_province,

    ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,

    ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content

    from ydb_example_shu where ydbpartion = '3000w'

    /*('<-')pushdown.ydb*/;

     

    #有limit的导出示例 (在Spark的当前版本有BUG,需要采用如下变通方法解决)

    insert overwrite table ydb_import_importtest

    select * from (

    /*ydb.pushdown('->')*/

        select 

         phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content 

    from ydb_example_shu where ydbpartion = '3000w' and ydbkv='export.max.return.docset.size:1000' 

    /*('<-')pushdown.ydb*/

       

    ) tmp  order by rand() limit 1000;

     

    ----数据导出到YDB的其他分区里示例

    insert overwrite table ydbpartion

    select 'ydb_example_shu', 'test3', '',

        YROW(

            'phonenum',tmp.phonenum,

            'usernick',tmp.usernick,

            'ydb_sex',tmp.ydb_sex,

            'ydb_province',tmp.ydb_province,

            'ydb_grade',tmp.ydb_grade,

            'ydb_age',tmp.ydb_age,

            'ydb_blood',tmp.ydb_blood,

            'ydb_zhiye',tmp.ydb_zhiye,

            'ydb_earn',tmp.ydb_earn,

            'ydb_prefer',tmp.ydb_prefer,

            'ydb_consume',tmp.ydb_consume,

            'ydb_day',tmp.ydb_day,

            'amtdouble',tmp.amtdouble,

            'amtlong',tmp.amtlong,

            'content',tmp.content

        )

    from (

    /*ydb.pushdown('->')*/

    select

        phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content 

    from ydb_example_shu where ydbpartion = '3000w'

    /*('<-')pushdown.ydb*/

    ) tmp

    ;

     

    ----导出数据到HDFS

     

    由于Spark当前版本无法通过insert Directory的方式直接导出数据到HDFS,但是可以将数据导出到Hive表,故数据导出到HDFS可以通过导出到Hive表变通的方式来解决

     

    可以通过创建一个导出表来解决

     CREATE external  table ydb_import_importtest(

        phonenum bigint, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong int,content string

    )location '/data/example/ydb_import_importtest';

     

     

    如果我们创建表的时候,没有加location,我们可以通过show create table xxx表名 可以看到location的位置

     

     

    4.多表关联示例

     

    1)---两个卡口left semi join

      select  k1.vehiclePlate as vehiclePlate from (

               /*ydb.pushdown('->')*/

              select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'    

              /*('<-')pushdown.ydb*/

        ) k1

        LEFT SEMI JOIN

       (

           /*ydb.pushdown('->')*/

           select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'   

           /*('<-')pushdown.ydb*/

        ) k2

        on (k1.vehiclePlate=k2.vehiclePlate);

     

    +---------------+--+

    | vehiclePlate  |

    +---------------+--+

    | c22           |

    | c23           |

    | c33           |

    | c34           |

    +---------------+--+

     

    2)---两个卡口left join

      select  k1.vehiclePlate as vehiclePlate,k2.vehiclePlate from (

           /*ydb.pushdown('->')*/

           select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

           /*('<-')pushdown.ydb*/

    ) k1

    LEFT JOIN

     (

        /*ydb.pushdown('->')*/

        select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

        /*('<-')pushdown.ydb*/

    ) k2

    on (k1.vehiclePlate=k2.vehiclePlate);

     

    +---------------+---------------+--+

    | vehiclePlate  | vehiclePlate  |

    +---------------+---------------+--+

    | c11           | NULL          |

    | c22           | c22           |

    | c23           | c23           |

    | c33           | c33           |

    | c34           | c34           |

    +---------------+---------------+--+

     

     

    3)---三个卡口left semi join

    select k21.vehiclePlate from(

        select  k1.vehiclePlate as vehiclePlate from (

                /*ydb.pushdown('->')*/

                 select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

               /*('<-')pushdown.ydb*/

        ) k1

        LEFT SEMI JOIN

       (

           /*ydb.pushdown('->')*/

           select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'

          /*('<-')pushdown.ydb*/ 

        ) k2

        on (k1.vehiclePlate=k2.vehiclePlate)

     ) k21

    LEFT SEMI JOIN

    (

       /*ydb.pushdown('->')*/

       select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

      /*('<-')pushdown.ydb*/

     ) k22 on k21.vehiclePlate=k22.vehiclePlate order by k21.vehiclePlate;

     

    +---------------+--+

    | vehiclePlate  |

    +---------------+--+

    | c33           |

    | c34           |

    +---------------+--+

     

     

     

    4)---三个卡口left join

    select k21.vehiclePlate,k22.vehiclePlate from(

        select  k1.vehiclePlate as vehiclePlate from (

                /*ydb.pushdown('->')*/

                select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1' 

               /*('<-')pushdown.ydb*/

        ) k1

        LEFT JOIN

       (

             /*ydb.pushdown('->')*/

             select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'

             /*('<-')pushdown.ydb*/

        ) k2

        on (k1.vehiclePlate=k2.vehiclePlate)

     ) k21

    LEFT JOIN

    (

         /*ydb.pushdown('->')*/

        select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

        /*('<-')pushdown.ydb*/

     ) k22 on k21.vehiclePlate=k22.vehiclePlate ;

     +---------------+---------------+--+

    | vehiclePlate  | vehiclePlate  |

    +---------------+---------------+--+

    | c11           | NULL          |

    | c22           | NULL          |

    | c23           | NULL          |

    | c33           | c33           |

    | c34           | c34           |

    +---------------+---------------+--+

     

     

    5)----三个卡口 先left SEMI join 之后再 left join

     

    select k21.vehiclePlate,k22.vehiclePlate from(

        select  k1.vehiclePlate as vehiclePlate from (

               /*ydb.pushdown('->')*/

               select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

               /*('<-')pushdown.ydb*/

        ) k1

        LEFT SEMI JOIN

       (

            /*ydb.pushdown('->')*/

           select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2' 

           /*('<-')pushdown.ydb*/

        ) k2

        on (k1.vehiclePlate=k2.vehiclePlate)

     ) k21

    LEFT JOIN

    (

        /*ydb.pushdown('->')*/

        select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

        /*('<-')pushdown.ydb*/

     ) k22 on k21.vehiclePlate=k22.vehiclePlate ;

     

     

     +---------------+---------------+--+

    | vehiclePlate  | vehiclePlate  |

    +---------------+---------------+--+

    | c22           | NULL          |

    | c23           | NULL          |

    | c33           | c33           |

    | c34           | c34           |

    +---------------+---------------+--+

     

     

    5.UNION示例

     

    1)--union--统计的结果

     

    select sum(cnt) as cnt from

    (

     

    /*ydb.pushdown('->')*/

     select count(*) as cnt from ydb_example_shu where ydbpartion = '3000w'

    /*('<-')pushdown.ydb*/

     

    union all

    /*ydb.pushdown('->')*/

    select count(*) as cnt from ydb_example_shu where ydbpartion = '300winsert'

    /*('<-')pushdown.ydb*/

     

    union all

    /*ydb.pushdown('->')*/

    select count(*) as cnt from ydb_example_shu where ydbpartion = '300winsert2'

    /*('<-')pushdown.ydb*/

     

    union all

    /*ydb.pushdown('->')*/

    select count(*) as cnt from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' 

    /*('<-')pushdown.ydb*/

     

    union all

    /*ydb.pushdown('->')*/

    select count(*) as cnt from ydb_example_shu where ydbpartion = '20151011' and content='工商银行'

    /*('<-')pushdown.ydb*/

     

    union all

    /*ydb.pushdown('->')*/

     select count(*) as cnt from ydb_example_shu where ydbpartion = '20151011' 

    /*('<-')pushdown.ydb*/

     

    ) tmp limit 10;

     

     

    2)--union order by的结果,注意,这里有个子查询SQL

    select * from

    (

    /*ydb.pushdown('->')*/ s

    elect amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='旺旺' order by amtlong desc limit 1

    /*('<-')pushdown.ydb*/

    union all

    /*ydb.pushdown('->')*/

    select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' order by amtlong desc limit 1 

    /*('<-')pushdown.ydb*/

    union all

    /*ydb.pushdown('->')*/

    select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='汇源' order by amtlong desc limit 1  

    /*('<-')pushdown.ydb*/

    union all

    /*ydb.pushdown('->')*/

    select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='哇哈哈' order by amtlong desc limit 1 

    /*('<-')pushdown.ydb*/ 

     

    ) tmp  limit 1000;

     

    3)YDB表的多个分区一起查询,通过IN来实现

     

    /*ydb.pushdown('->')*/

    select count(*),count(amtdouble),sum(amtdouble),avg(amtdouble),min(amtdouble),max(amtdouble),min(ydb_province),max(ydb_province) from ydb_example_shu where   ydbpartion in (  '3000w0','3000w1' ,'3000w2','3000w3','3000w4','3000w5','3000w6','3000w7','3000w8','3000w9','3000w10' ,'3000w11','3000w12','3000w13','3000w14','3000w15' ,'3000w16'  ,'3000w17','3000w18','3000w19'

    ,'3000a0','3000a1' ,'3000a2','3000a3','3000a4','3000a5','3000a6','3000a7','3000a8','3000a9','3000a10' ,'3000a11','3000a12','3000a13','3000a14','3000a15' ,'3000a16'  ,'3000a17','3000a18','3000a19'

    ,'3000b0','3000b1' ,'3000b2','3000b3','3000b4','3000b5','3000b6','3000b7','3000b8','3000b9','3000b10' ,'3000b11','3000b12','3000b13','3000b14','3000b15' ,'3000b16'  ,'3000b17','3000b18','3000b19'

    )

    /*('<-')pushdown.ydb*/

    ;

     

    6.DISTINCT示例

    -----#####如果distinct的数据并不多,可以考虑采用collect_set 性能较好#######

     

    1)----####直接count distinct##########

    select

        size(collect_set(tmp.ydb_sex)) as dist_sex,

        size(collect_set(tmp.ydb_province)) as dist_province,

        count(*) as cnt,

        count(tmp.amtlong) as cnt_long,

        count(distinct tmp.amtlong) as dist_long

    from (

        /*ydb.pushdown('->')*/

            select ydb_sex,ydb_province,amtlong from ydb_example_shu where ydbpartion = '3000w' and content='王老吉'

        /*('<-')pushdown.ydb*/

    ) tmp limit 10;

     

     

    2)----group by 加 count distinct####

    select

        tmp.ydb_sex as ydb_sex,

        size(collect_set(tmp.ydb_province)) as dist_province,

        count(*) as cnt,

        count(tmp.amtlong) as cnt_long,

        count(distinct tmp.amtlong) as dist_long

    from

    (

        /*ydb.pushdown('->')*/

            select ydb_sex,ydb_province,amtlong from ydb_example_shu where ydbpartion = '3000w' and content='王老吉'

        /*('<-')pushdown.ydb*/

    ) tmp

    group by tmp.ydb_sex limit 10;

     

    7.行转列示例

     

    select ydb_sex,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_province,cnt,cntamt,sumamt)))) from (

        /*ydb.pushdown('->')*/

            select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt,sum(amtdouble) as sumamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province 

        /*('<-')pushdown.ydb*/

    )tmp  group by ydb_sex limit 10;

     


     

    select ydb_province,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_sex,cnt,cntamt,sumamt)))) from (

        /*ydb.pushdown('->')*/

            select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt,sum(amtdouble) as sumamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province 

        /*('<-')pushdown.ydb*/

    )tmp  group by ydb_province order by scnt desc limit 10;

     


    select ydb_province,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_blood,ydb_sex,cnt,cntamt)))) from (

        /*ydb.pushdown('->')*/

            select ydb_blood,ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_blood,ydb_sex,ydb_province 

        /*('<-')pushdown.ydb*/

    )tmp  group by ydb_province order by scnt desc limit 10;


     

    select ydb_day,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_blood,ydb_sex,cnt,cntamt)))) from (

        /*ydb.pushdown('->')*/

            select ydb_day,ydb_sex,ydb_blood,count(*) as cnt,count(amtdouble) as cntamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_day,ydb_sex,ydb_blood

        /*('<-')pushdown.ydb*/

    )tmp  group by ydb_day order by scnt desc limit 10;

     


     

    8.对于时间的高效处理

    我数据里面时间格式是yyyy-MM-dd hh:mm:ss

    1)ydb没有时间类型,应该怎么处理?

     

    可以用tlong类型代替时间类型

           存储的值 转换成 yyyyMMddhhmmss ,这样是定长的,而且可读性好(比unix时间磋可读性好)

    如果时间精度是 秒 ,毫秒,纳秒的 话 一定要使用 tlong  (范围查找比long快很多),如果是天,小时的话,可以使用long 节省存储空间

     

    2)这些用于时间操作的转换函数我们一定会用到

    cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHHmmss') as bigint),

    cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHHmm')as bigint),

    cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHH')as bigint) ,

     

    select (2017-cast(substring('201831198307123487',7,4) as bigint) ) from spark_txt limit 10;

     

     

     

     

    9.null值与空值的匹配

    1)----匹配空串

    /*ydb.pushdown('->')*/

    select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex='empty'

    /*('<-')pushdown.ydb*/

    ;


    2)--匹配非空值

    /*ydb.pushdown('->')*/

    select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and  ydb_sex<>'empty'

    /*('<-')pushdown.ydb*/

    ;


     

    3)--匹配null值

    ##############null值的匹配非常消耗性能,采用暴力扫描倒排表的方式实现,如果该列的值排重后的值特别多,如sessionId,身份证号码,手机号等,请慎用########

    ####如果是检索明细数据,建议在hive层进行过滤####

    ####TODO 未来可以通过标签里面的live bits改进null值匹配的性能####

     

    /*ydb.pushdown('->')*/

    select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex='null'

    /*('<-')pushdown.ydb*/

     

    4)--匹配非null值

     

     

    /*ydb.pushdown('->')*/

    select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex<>'null'

    /*('<-')pushdown.ydb*/

    ;


     

    10.近似文本匹配

     

    1)近似文本匹配

    有些时候,我们只想找到一篇跟当前指定文章类似的文章。可能中间相差几个字不一样无所谓,或者局部的字顺序前后颠倒也无所谓。

    需要注意

    a)单词会进行排重。

    b)并不考虑单词顺序(虽然伪造的数据是有顺序的,但是匹配是不考虑顺序的)。

    c)30表示排重后,至少有30%的单词会匹配上才算匹配

    d)匹配是按照分词的结果后进行匹配的,并不是按照空格进行拆分的,具体如何检验分词,

     

    1:近似文本匹配示例

     

    /*ydb.pushdown('->')*/

    select  content from ydb_example_shu where ydbpartion = '3000w'  and content='YTermlike@30@100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 ' limit 10

    /*('<-')pushdown.ydb*/;

     


     

    2)近似特征匹配

    有一种搜索是这样的搜索,我指定一系列的特征,如 高矮、胖瘦、年龄段、性别、时间等一系列目击者看到的嫌疑人特征,但是有可能有些目击者描述的不准确,所以不能进行精确匹配,如果能与大部分的匹配条件都相似,一两个条件没匹配上,但已经足以相似了,那么也要返回匹配结果。

     

    --五个特征中必须匹配4个特征

     

    /*ydb.pushdown('->')*/

    select ydb_sex,ydb_province,ydb_blood,amtdouble,content from ydb_example_shu where ydbpartion = '3000w'  and ydb_raw_query_s like 'YQuerylike@hits=4&fq=ydb_sex:女&fq=ydb_province:辽宁&fq=amtdouble:[14 TO 200]&fq=ydb_blood:O&fq=content:王老吉'

    limit 10

    /*('<-')pushdown.ydb*/;

     


    ==通过wt与score参数将content模糊匹配的权重增大

     

       

    /*ydb.pushdown('->')*/

    select ydb_sex,ydb_province,ydb_blood,amtdouble,content from ydb_example_shu where ydbpartion = '3000w'  and ydb_raw_query_s like 'YQuerylike@hits=4&score=8&fq=ydb_sex:女&wt=1&fq=ydb_province:辽宁&wt=1&fq=amtdouble:[14 TO 200]&wt=1&fq=ydb_blood:O&wt=1&fq=content:王老吉&wt=9'

    limit 10

    /*('<-')pushdown.ydb*/;

     


     

     

    11.多值列示例

    有些时候,我们想在一个列里面存储多个值的时候,就可以考虑使用多值列了

    比如说,可以将一个人 的多个标签值 存储在一个记录里面,一个人的每天的行为数据 放在一个记录里面。

    一定要注意,

    1.字符串类型的多值列,返回的值的无序,并且是排重的,故这块有额外注意。

    2.数值型的则是有序的(与导入的顺序一致),并且是没有排重的。

    3.传递的数值是按照空格 拆分的  ,如 11 22 33 44 

    4.如果传递的是空值,会当做null处理

    5.只要数据类型定义为多之列,程序目前会按照空格识别来将传入的数据拆分成多个值分别存储

     

    1)--检索--

     

    /*ydb.pushdown('->')*/

    select multyvalue_string,multyvalue_tlong,multyvalue_long,multyvalue_tdouble,multyvalue_double from ydb_example_shu_multyvalue where ydbpartion='3000w'

    limit 10

    /*('<-')pushdown.ydb*/;

     


     

    2)--多值列group by

     

    /*ydb.pushdown('->')*/

    select multyvalue_long,count(*) as cnt from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long order by cnt

    limit 10

    /*('<-')pushdown.ydb*/;

     


     

    3)普通列与多值列group by

     

    /*ydb.pushdown('->')*/

    select multyvalue_long,ydb_sex,count(*) as cnt from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long,ydb_sex order by cnt

    limit 10

    /*('<-')pushdown.ydb*/;

    4)--两个多值列group by ,(笛卡尔集,要注意内存,以及性能,要慎重)

    /*ydb.pushdown('->')*/

    select multyvalue_long,multyvalue_string,count(*) from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long,multyvalue_string order by  multyvalue_long,multyvalue_string

    limit 10

    /*('<-')pushdown.ydb*/;

     

     

    12.地理位置感知搜索

           现在手机APP满天飞,我想大家都用过这个功能:【搜索我附近的饭店或宾馆】之类的功能,类似这样的地理位置搜索功能非常适用,因为它需要利用到用户当前的地理位置数据,是以用户角度出发,找到符合用户自身需求的信息,应用返回的信息对于用户来说满意度会比较高。可见,地理位置空间搜索在提高用户体验方面有至关重要的作用。在Lucene中,地理位置空间搜索是借助Spatial模块来实现的。

           要实现地理位置空间搜索,我们首先需要对地理位置数据创建索引,比较容易想到的就是把经度和纬度存入索引,可是这样做,有个弊端,因为地理位置数据(经纬度)是非常精细的,一般两个地点相差就0.0几,这样我们需要构建的索引体积会很大,这会显著减慢你的搜索速度。在精确度上采取折衷的方法通常是将纬度和经度封装到层中。您可以将每个层看作是地图的特定部分的缩放级别,比如位于美国中央上方的第 2 层几乎包含了整个北美,而第 19 层可能只是某户人家的后院。尤其是,每个层都将地图分成 2层的箱子或网格。然后给每个箱子分配一个号码并添加到文档索引中。如果希望使用一个字段,那么可以使用 Geohash编码方式将纬度/经度编码到一个 String 中。Geohash 的好处是能够通过切去散列码末尾的字符来实现任意的精度。在许多情况下,相邻的位置通常有相同的前缀。

     

     

    1)测试表的创建,注意使用mortonhash的列的类型是geopoint

    create table lonlattable_test(

    lon tdouble,

    lat tdouble,

    mortonhash geopoint

    )

     

    2)导入数据-注意YMortonHash函数是用于生成Morton数的,将来在索引中用于匹配

    insert overwrite table  ydbpartion

    select 'lonlattable_test', '3000w', '',

        YROW(

            'lon',r[0],

            'lat',r[1],

            'mortonhash',YMortonHash(r[0],r[1])

        )

    from  ydb where YSQL('from','select LAT,LON from ydb_oribit where ydbpartion='20160619' ','segment') ;

     

    3)#数据预览,注意YMortonUnHash用于将数据在还原为经纬度,YMortonHashDistance则用来计算距离,单位是m

    select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

    (

    /*ydb.pushdown('->')*/

    select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'

    /*('<-')pushdown.ydb*/

    )tmp  order by distance limit 10 ;

     


     

     

    4)地理位置检索,给一个坐标,搜寻最近多少米远的所有记录,注意YGeo@的使用

     

     

    select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

    (

    /*ydb.pushdown('->')*/

    select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'  and ydb_raw_query_s like 'YGeo@fl=mortonhash&lon=8.1&lat=9.2&radius=10000'

    /*('<-')pushdown.ydb*/

    )tmp  order by distance limit 10 ;

     


     

    5)####################按照矩形区域搜索isbox=true

     

    select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

    (

    /*ydb.pushdown('->')*/

    select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'  and ydb_raw_query_s like 'YGeo@fl=mortonhash&isbox=true&lon=8.1&lat=9.2&radius=10000'

    /*('<-')pushdown.ydb*/

    )tmp  order by distance limit 10 ;


     

    13.考虑单词顺序的模糊匹配

     

    默认YDB提供了simpletex,haoma等类型进行模糊匹配。

    他们本质上是通过分词进行匹配,并不考虑匹配的词的顺序,如果要进行模糊匹配并且又要保证匹配的先后顺序,那么就需要在进行中文分词的时候保存词的位置。

     

    如果保存了顺序,我们可以通过Ylike@方法 按照单词顺序进行匹配查询

    如:

    phonenum='Ylike@824963'

    phonenum='Ylike@188*63*72*76'

    phonenum='Ylike@188*2*6*3*6*88'

    content='Ylike@可口*可乐*磊'

    content='Ylike@14 15 * 24 28 * 37 41  '

     

    目前保存词的位置的数据类型有如下几种:

    charlike: 按照字符char 1~5元分词 (效果较好,term区分了词元,适合车牌,手机号类型的较短文本)

    wordlike: 按字与词 1~3元分词 (效果较好,term区分了词元,适合文本类型)

    pchepai:按照字符char 2~5元分词

    phaoma :按照字符char 3~5元分词

    psimpletext: 按字与词 1~3元分词

    pyy :lucene的cjk分词,中文采用二元分词,英文与数字采用 单字分词

     

     

     

    注意:目前的这种Ylike还实现不了前缀与后缀匹配,如果要进行前缀与后缀匹配,建议在导入数据前,加入前缀与后缀的特殊符号

    比如说如果ip地址是192.168.3.40,那么我们可以使用charlike类型的字段,并且导入的时候 加上 start192.168.3.40end ,这样前后分别由start与end里两个特殊的字符串

    这样进行前缀匹配的时候,可以通过phonenum='Ylike@start192.168' 来匹配,后缀匹配可以通过 phonenum='Ylike@3.40end' 来进行匹配

     

     

    1.##############号码与车牌类型的示例

     

    /*ydb.pushdown('->')*/

    select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@824963'

     limit 10

    /*('<-')pushdown.ydb*/

    ;

     

    +------------------+--+

    | phonenum  |

    +------------------+--+

    | 18882496377      |

    | 18824963110      |

    | 18824963481      |

    | 17082496383      |

    | 13824963971      |

    | 15928249639      |

    | 18824963904      |

    | 13238249639      |

    +------------------+--+

    8 rows selected (0.272 seconds)

     

    2.#######使用*通配符###

     

     

    /*ydb.pushdown('->')*/

    select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@824*963'

     limit 100

    /*('<-')pushdown.ydb*/

    ;

     

    +------------------+--+

    | phonenum  |

    +------------------+--+

    | 13824096330      |

    | 13824229634      |

    | 18824963481      |

    | 18824096302      |

    | 17082496383      |

    | 18824296372      |

    | 18824963110      |

    | 18824196307      |

    | 13238249639      |

    | 13824769963      |

    | 18824649639      |

    | 18882496377      |

    | 13482479635      |

    | 13824799638      |

    | 13824963971      |

    | 18824396346      |

    | 15928249639      |

    | 18824963904      |

    | 18898248963      |

    +------------------+--+

    19 rows selected (0.26 seconds)

     

     

     

     

    /*ydb.pushdown('->')*/

    select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@188*63*72*76'

     limit 100

    /*('<-')pushdown.ydb*/

    ;

    +------------------+--+

    | phonenum  |

    +------------------+--+

    | 18863872476      |

    | 18863767276      |

    | 18836372076      |

    | 18863726576      |

    +------------------+--+

    4 rows selected (0.241 seconds)

     

     

    3.文本类型顺序匹配检索示例

    /*ydb.pushdown('->')*/

    select  content from ydb_example_shu_positon where ydbpartion = '3000w'  and content='Ylike@1 5 14 15 24 28 37 41 49'

     limit 100

    /*('<-')pushdown.ydb*/

    ;

    4.通过* 允许中间某些词 不连续,但依然保证顺序######

     

    /*ydb.pushdown('->')*/

    select  content from ydb_example_shu_positon where ydbpartion = '3000w'  and content='Ylike@1 5 14 * 24 28 37'

     limit 100

    /*('<-')pushdown.ydb*/

    ;

     

    14.管理员命令

    --查看YDB表

    /*ydb.pushdown('->')*/

    show tables

    /*('<-')pushdown.ydb*/

    ;

     

    --查看表的分区

    /*ydb.pushdown('->')*/

    show partions ydb_example_shu

    /*('<-')pushdown.ydb*/

    ;

     

     

    --按条件删除

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion='3000w' and ydb_sex='男' and ydb_blood='A' and  ydbkv='ydb.delete.query:true'

    /*('<-')pushdown.ydb*/

    ;

     

    --整个分区清理,数据清空,但是分区还在

     

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_shu where ydbpartion='3000w' and ydbkv='ydb.truncate:true'

    /*('<-')pushdown.ydb*/

    ;

     

     

    --物理清理掉整个分区的数据(清理后分区也跟着删掉)

    /*ydb.pushdown('->')*/

     drop table ydb_example_shu partions 3000a4

    /*('<-')pushdown.ydb*/

    ;

     

    /*ydb.pushdown('->')*/

     drop table ydb_example_shu partions 3000a4,3000a5,3000a6

    /*('<-')pushdown.ydb*/

    ;

     

     

    --删除一个表的所有分区-保留表结构

    /*ydb.pushdown('->')*/

     truncate table ydb_example_shu

    /*('<-')pushdown.ydb*/

    ;

     

    --删除一个表,表结构也删除掉

    /*ydb.pushdown('->')*/

     drop table ydb_example_shu

    /*('<-')pushdown.ydb*/

    ;

    --暂停kafka的消费3600秒

    /*ydb.pushdown('->')*/

    select count(*) from y_system_log where ydbkv='ydb.reader.pause:true' and ydbkv='ydb.reader.pause.secs:3600'

    /*('<-')pushdown.ydb*/

    ;

     

    --恢复 暂停的kafka的消费,让kafka继续消费数据

    /*ydb.pushdown('->')*/

    'select count(*) from y_system_log where  ydbkv='ydb.reader.pause:true' and ydbkv='ydb.reader.pause.secs:0'

    /*('<-')pushdown.ydb*/

    ;

     

    --将binlog立即刷到磁盘上

     

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync.binlog:true'

    /*('<-')pushdown.ydb*/

    ;

     

    --刷新缓冲区的数据,让其能被搜索到,(binlog会持久化但数据并不会立即持久化到hdfs)

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync.ram:true'

    /*('<-')pushdown.ydb*/

    ;

     

    --主动触发,将内存中的数据刷盘操作,(数据会被搜索到,并且持久化到磁盘)

     

    /*ydb.pushdown('->')*/

    select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync:true'

    /*('<-')pushdown.ydb*/

    ;

     

    15.变通方式的分页方案

           默认Spark SQL无法进行分页,YDB由于使用了Spark也存在这个问题,故我们采取了变通方式来实现分页。

    以每页pagesize大小为10为例

     

    ######1024条记录以内####

    第一页 直接limit 10,并且将每一行的数据,都按pagekey取个crc32的值 存储在lru的hashmap中

    第二页 直接limit 20,并且根据第一页的crc32与当前的20条记录进行移除,有可能剩余12条或更多,但至少剩余10条,然后取出10条返回,并且将crc32缓存在LRU的hashmap中

    第三页 直接limit 30,同第二页一样,移除掉与crc32匹配的记录,返回10条并且添加第三页的crc32

    这样一直处理到1024条记录 ,如果同时能处理1024个session,我们认为内存是能够放1024*1024个crc32的long类型

     

    ######超过1024条记录######

    我们采用导出成文件的方式,即insert overwrite table的方式,但是考虑到insert 的方式响应可能会很慢,故我们改写了outputformat,也就是后面大家看到的YdbMoreTextOutputFormat

    通过YdbMoreTextOutputFormat我们不需要等待这个insert overwrite执行完毕后才返回,而是在YdbMoreTextOutputFormat中将少量数据发送到缓冲区供立即返回,而大量数据写入到磁盘。

     

    在这种方式下,由于需要与先前生成的crc32值进行排重,目前的实现没有精确控制返回的数据条数,而是返回介于pagesize到pagesize*2的记录数,

     

     

     

    ----这个分页的使用限制大家注意--

    1.不能跳页,只能一页一页的向下翻。

    2.只能向后翻页,不能向前翻页。

    3.每页返回的行数是一个近似值,介于pagesize到pagesize*2的记录数。

    4.SQL本身就不在需要写limit了

    4.sql中的 as pagekey与pagevalue 不能省略,本质是KV返回

     

     

    如果数据表的规模很大,建议配置如下参数控制每个segments导出的记录条数,以免占用太多的HDFS空间

    and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"

     

    ###使用方法##

     

    ---先创建如下的表

    drop table ydb_page_session;

     CREATE external  table ydb_page_session(

    pagekey string,

    pagevalue string

    )

    partitioned by (ydbsession string)

    stored as INPUTFORMAT 'cn.net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat' OUTPUTFORMAT 'cn.net.ycloud.ydb.handle.more.YdbMoreTextOutputFormat'

    location '/data/ycloud/ydb/rawdata/ydb_page_session';

     

    drop table ydb_more_session;

     CREATE external  table ydb_more_session(

    line string

    )

    partitioned by (ydbsession string)

    stored as INPUTFORMAT 'cn.net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat' OUTPUTFORMAT 'cn.net.ycloud.ydb.handle.more.YdbMoreTextOutputFormat'

    location '/data/ycloud/ydb/rawdata/ydb_more_session';

     

     

     

    ---通过如下接口查询数据----

     

    注意生成的pagekey不能省略,用于排重,如果是查询明细可以用y_uuid_s的内置列填充。

     

     http://ydbmaster:1210/ydbpage?reqid=002_page&pagesize=100&sql=select r[0] as pagekey,concat_ws(',',r) as pagevalue from  ydb where YSQL('from','select y_uuid_s,phonenum,usernick,content from ydb_example_shu where ydbpartion="3000w" and content="王老吉" and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

     http://ydbmaster:1210/ydbpage?reqid=00d3_page&pagesize=100&sql=select r[0] as pagekey,concat_ws(',',r) as pagevalue from  ydb where YSQL('from','select y_uuid_s,phonenum,usernick from ydb_example_shu where ydbpartion="3000w"  and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

     http://ydbmaster:1210/ydbpage?reqid=0234_page&pagesize=100&sql=select concat_ws(',',r[0],r[1],r[2]) as pagekey,concat_ws(',',r) as pagevalue from ydb where YSQL('from','select amtdouble,amtlong,y_uuid_s,content,usernick,ydb_sex from ydb_example_shu where ydbpartion="3000w" and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

     

     

    16.分词

     

    分词( Word Segmentation) 指的是将一个词字序列切分成一个一个单独的词。分词就是将连续的词序列按照一定的规范重新组合成词序列的过程。我们知道,在英文的行文中,单词之间是以空格作为自然分界符的,而中文只是字、句和段能通过明显的分界符来简单划界,唯独词没有一个形式上的分界符,虽然英文也同样存在短语的划分问题,不过在词这一层上,中文比之英文要复杂的多、困难的多。

     

     

    默认YDB提供了如下几种分词

     

    simpletext ydb内置的普通文本分词 采用1~3元分词

    haoma ydb内置的适合号码类型的分词,采用3~5元分词实现,分词粒度为char

    chepai ydb内置的适合号码类型的分词,采用2~5元分词实现,分词粒度为char

    text 为lucene默认的standard分词,在(处理手机号,邮箱,IP地址,网址等中英文与字典组合的数据上 不准确,请慎用)

    cjkyy 为lucene默认的cjk分词即二元分词 (处理手机号,邮箱,IP地址,网址等中英文与字典组合的数据上 不准确,请慎用)

    ikyy与textik 为开源的ik分词的实现,采用词库分词,词库我们可以再lib下找到

     

     

    以下类型除了分词外,还保存了分词后的词的顺序,可以进行顺序匹配 更多请参考《27.考虑单词顺序的模糊匹配》

    charlike: 按照字符char 1~5元分词 (效果较好,term区分了词元,适合车牌,手机号类型的较短文本)

    wordlike: 按字与词 1~3元分词 (效果较好,term区分了词元,适合文本类型)

    pchepai:按照字符char 2~5元分词

    phaoma :按照字符char 3~5元分词

    psimpletext: 按字与词 1~3元分词

    pyy :lucene的cjk分词,中文采用二元分词,英文与数字采用 单字分词

     

     

    --我们可以通过如下SQL 了解不同分词的差异

     

     

     

    select

        YAnalyzer('charlike','query','中华人民123456') as charlikequery,

        YAnalyzer('charlike','index','中华人民123456') as charlikeindex

    from (

        /*ydb.pushdown('->')*/ 

        select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

        /*('<-')pushdown.ydb*/

    ) tmp

       

    limit 1;

     

     


     

     

    select

     

        YAnalyzer('wordlike','query','中华人民123456') as wordlikequery,

        YAnalyzer('wordlike','index','中华人民123456') as wordlikeindex

    from

        (

        /*ydb.pushdown('->')*/ 

        select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

        /*('<-')pushdown.ydb*/

    ) tmp

    limit 1;

     

    +-------------------------+---------------------------------------------------------------------------+--+

    |      wordlikequery      |                               wordlikeindex                               |

    +-------------------------+---------------------------------------------------------------------------+--+

    | 3@中华人 3@华人民 3@人民123456  | 1@中 1@华 1@人 1@民 1@123456 2@中华 2@华人 2@人民 2@民123456 3@中华人 3@华人民 3@人民123456  |

    +-------------------------+---------------------------------------------------------------------------+--+

    1 row selected (0.889 seconds)

     

     

    select

     

        YAnalyzer('phaoma','query','中华人民123456') as phaomaquery,

        YAnalyzer('phaoma','index','中华人民123456') as phaomaindex

    from

        (

        /*ydb.pushdown('->')*/ 

        select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

        /*('<-')pushdown.ydb*/

    ) tmp

    limit 1;

     

     

    +--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

    |             phaomaquery              |                                               phaomaindex                                               |

    +--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

    | 中华人民1 华人民12 人民123 民1234 12345 23456  | 中华人 华人民 人民1 民12 123 234 345 456 中华人民 华人民1 人民12 民123 1234 2345 3456 中华人民1 华人民12 人民123 民1234 12345 23456  |

    +--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

     

     

     

    select

        YAnalyzer('psimpletext','query','中华人民123456') as psimpletextquery,

        YAnalyzer('psimpletext','index','中华人民123456') as psimpletextindex

    from

        (

        /*ydb.pushdown('->')*/ 

        select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

        /*('<-')pushdown.ydb*/

    ) tmp

    limit 1;

     

    +-------------------+---------------------------------------------------+--+

    | psimpletextquery  |                 psimpletextindex                  |

    +-------------------+---------------------------------------------------+--+

    | 中华人 华人民 人民123456  | 中 华 人 民 123456 中华 华人 人民 民123456 中华人 华人民 人民123456  |

    +-------------------+---------------------------------------------------+--+

     

     

    ik词库分词

    词库文件位于 ya100/lib/IK_ext.dic

     

     

    17.with as 写法简化SQL

           如果我们的SQL,嵌套层级太深,可以考虑通过with as 方法,将子SQL抽取出来,让整体的SQL看起来逻辑更清晰,大家阅读SQL的时候也便于理解。

     

           with as 遵循HIVE语法,下面为写法示例

           Hive 可以用with as将某个查询命名为一个临时的表名,其他语句可以随时使用该临时表名进行查询。

    with q1 as (select * from src where key= ‘5’),

    q2 as (select * from src s2 where key = ‘4’)

    select * from q1 union all select * from q2;

     

     

    一个简单的例子

     

    with

    y_customer as (

    /*ydb.pushdown('->')*/

    select c_custkey from customer_ydb where c_mktsegment = 'BUILDING'

    /*('<-')pushdown.ydb*/

    ),

    y_lineitem as (

    /*ydb.pushdown('->')*/

    select l_orderkey,l_extendedprice,l_discount from lineitem_ydb where l_shipdate > '1995-03-15'

    /*('<-')pushdown.ydb*/

    )

    ,

    y_orders as (

    /*ydb.pushdown('->')*/

    select o_orderdate, o_shippriority,o_orderkey,o_custkey  from orders_ydb where o_orderdate < '1995-03-15'

    /*('<-')pushdown.ydb*/

    )

    select

      l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority

    from

      y_customer c join y_orders o

        on  c.c_custkey = o.o_custkey

      join y_lineitem l

        on l.l_orderkey = o.o_orderkey

    where

      o_orderdate < '1995-03-15'

    group by l_orderkey, o_orderdate, o_shippriority

    order by revenue desc, o_orderdate ,l_orderkey, o_shippriority

    limit 10;

     

    一个稍微复杂点的例子

     

    with

    y_nation as (

    /*ydb.pushdown('->')*/

    select n_name,n_regionkey,n_nationkey from nation_ydb

    /*('<-')pushdown.ydb*/

    ),

    y_region as (

    /*ydb.pushdown('->')*/

    select r_regionkey,r_name from region_ydb where r_name = 'EUROPE'

    /*('<-')pushdown.ydb*/

    ),

    y_supplier as (

    /*ydb.pushdown('->')*/

    select   s_acctbal, s_name,s_address,s_phone, s_comment ,s_nationkey,s_suppkey

     from supplier_ydb 

    /*('<-')pushdown.ydb*/

    ),

    y_partsupp as (

    /*ydb.pushdown('->')*/

    select   ps_supplycost,ps_suppkey,ps_partkey

     from partsupp_ydb 

    /*('<-')pushdown.ydb*/

    ),

    y_part as (

    /*ydb.pushdown('->')*/

    select   p_partkey,p_mfgr,p_size,p_type

     from part_ydb   where p_size = 15 and p_type like '%BRASS'

    /*('<-')pushdown.ydb*/

    ),

     

    q2_minimum_cost_supplier_tmp1 as (select

      s.s_acctbal, s.s_name, n.n_name, p.p_partkey, ps.ps_supplycost, p.p_mfgr, s.s_address, s.s_phone, s.s_comment

    from

      y_nation n join y_region r

      on 

        n.n_regionkey = r.r_regionkey

      join y_supplier s

      on 

    s.s_nationkey = n.n_nationkey

      join y_partsupp ps

      on 

    s.s_suppkey = ps.ps_suppkey

      join y_part p

      on 

        p.p_partkey = ps.ps_partkey ),

     q2_minimum_cost_supplier_tmp2 as (

    select

      p_partkey, min(ps_supplycost)  as ps_min_supplycost

    from 

      q2_minimum_cost_supplier_tmp1 

    group by p_partkey

    )

    select

      t1.s_acctbal, t1.s_name, t1.n_name, t1.p_partkey, t1.p_mfgr, t1.s_address, t1.s_phone, t1.s_comment

    from

      q2_minimum_cost_supplier_tmp1 t1 join q2_minimum_cost_supplier_tmp2 t2

    on

      t1.p_partkey = t2.p_partkey and t1.ps_supplycost=t2.ps_min_supplycost

    order by s_acctbal desc, n_name, s_name, p_partkey,p_mfgr,s_address,s_phone,s_comment

    limit 100;

     

     

    十、通过Kafka实时导入数据

    默认的Kafka导入数据只支持Json格式,如果需要支持其他格式,需要自己通过java写Parser.。

    1.Kafka配置的注意点

    第一:注意Kafka server 的num.partitions一定要大于YDB启动的进程*线程数量,否则有的进程消费不到数据

    如果发现之前配置错了,要清空下ZK相关路径(现在好像有API接口了),否则修改完了也不生效

    Kafka的启动很简单 ./kafka-server-start.sh ../config/server.properties

    第二:请参考第四章的Kafka配置注意事项,这里不再重复介绍

     

    2.YDB中配置Kafka的消费

    在ydb_site.yaml中添加如下的配置,并更改相关连接参数

     

     ydb.reader.list: "default,filesplit,kafka_json"

     

     #如果您要使用其他的消息中间件,可以自定义Reader,默认为Kafka的实现

     ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

     

     #如果您的数据格式是非标准的JSON格式,可以自定义Parser,默认为按照Json方式解析

     ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

     

     kafka.topic.kafka_json: "kafkaydb"

     kafka.group.kafka_json: "kafkaydb_group"

     bootstrap.servers.kafka_json: "192.168.3.2:6667"

     

    3.重启YDB后,10分钟后会开始导入数据

    4.Kafka导入的数据格式如下

    1)一次只导入一条

    {"tablename":"ydbexample","ydbpartion":"20151005","data":{"indexnum":4,"label":"l_4","userage":14,"clickcount":4,"paymoney":4.132,"price":4.12,"content":"4 4 4 延云 ydb 延云  测试  中文分词 中华人民共和国 沈阳延云云计算技术有限公司","contentcjk":"4 4 4 延云 ydb 延云  测试  中文分词 中华人民共和国 沈阳延云云计算技术有限公司"}}

    2)一次导入多条(通过减少数据条数,提升Kafka性能)

    {"tablename":"ydb_example_trade","ydbpartion":"20151018","list":[{"tradeid":"2016030811044018","nickname":"凌浦泽","province":"澳门特别行政区","tradetype"

    ...... ",\"amt\":8321,\"bank\":\"交通银行\"}"}]}

     

     

     

     

     

    5.Kafka模式实时导入数据,为什么会有重复数据

    YDB能确保从Kafka消费到的数据0丢失,但是由于Kafka的实现机制,以下情况会导致出现重复数据

    具体原理可以参考这篇文章

    http://www.iteblog.com/pdf/1716

    进程异常退出(主动kill或者进程BUG等原因导致)

           Kafka采用commitoffset的方式提交数据,由于此时会存在数据已经消费,但是Kafka的offset没有来得及提交,这样会导致数据重复。

    Kafka的Rebalancing

           由于进程退出,或者首次启动时,会产生一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。也就会发生了rebalancing 。

           如果一个消息已经被消费了,但是还没有提交offset,就开始了Rebalancing,这个时候会造成数据的重复。这个在Kafka里已经积累了一部分数据后的首次启动时最为明显。

    如果仅仅发生了一个进程异常退出,但是没有导致Rebalancing,那么最多重复的数据条数就是这个进程还没有来得及提交的部分。

    如果发生了Rebalancing(进程异常退出也会导致Rebalancing),那么则要按全部没有来得及提交的线程数来计算。

     

    6.Kafka模式对数据可靠性的几种配置

    ydb.realtime.kafka.commit.intervel用来控制Kafka的offset commit频率,每次commit也会导致binglog(wal)同步,所以该值一般要小于等于ydb.realtime.binlog.sync.intervel的频率

     

     

    尽量减少进程重启 导致的数据重复的配置(每个线程32条重复数据)

    ydb.realtime.kafka.commit.intervel: 32

    ydb.realtime.binlog.sync.intervel: 1024

     

     

    尽量增加吞吐量的配置, 可能有重复(每个线程1024条重复数据)

     

    ydb.realtime.kafka.commit.intervel: 1024

    ydb.realtime.binlog.sync.intervel: 2048

    7.多个Kafka Topic一起消费

    ydb.reader.list: "default,filesplit,kafka_json,kafka_json2,kafka_json3"

     

     ##kafka_json##

     ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

     ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

     kafka.topic.kafka_json: "a961"

     kafka.group.kafka_json: "bn961n_groupv1_kafka_json"

     bootstrap.servers.kafka_json: "192.168.3.2:6667"

     

     ##kafka_json2##

     ydb.reader.read.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

     ydb.reader.parser.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.JsonParser"

     kafka.topic.kafka_json2: "b961"

     kafka.group.kafka_json2: "bn961n_groupv1_kafka_json2"

     bootstrap.servers.kafka_json2: "192.168.3.2:6667"

     

     ##kafka_json3##

     ydb.reader.read.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

     ydb.reader.parser.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.JsonParser"

     kafka.topic.kafka_json3: "c961"

     kafka.group.kafka_json3: "bn961n_groupv1_kafka_json3"

     bootstrap.servers.kafka_json3: "192.168.3.2:6667"

     

    十一、通过mdrill提升简单查询的查询速度

           默认配置所有的SQL查询均是通过spark SQL进行相应,但是由于SPARK本身的框架很重,每次任务调度都会有200~300毫秒的调度时间,对于简单查询来说,我们可以将简单查询的SQL通过轻量级的mdrill接口进行访问,而不通过SPARK SQL进行调度,从而在调度上节省时间。

           默认简单SQL通过mdrill来执行这个功能是关闭的,大家可以通过下述方式,将一个简单SQL转换为mdrill查询。

    1.通过统一配置

           可以在ydb_site.yaml里配置 ydb.sql.ismdrill.first的值为true,让ydb系统自动选择是使用mdrill来进行查询还是使用spark调度来执行SQL。

           通过该配置,数据明细查询、排序,以及不含有group by的统计会通过mdrill查询。

           group by由于mdrill的一万个group 的限制,该方式不会启用。

     

    2.通过ydb_force_to_mdrill_mark让SQL以mdrill的方式显示执行

    如果ydb.sql.ismdrill.first为false,不会使用mdrill的调度,但如果我们在SQL加上该标记,就会强制该SQL采用mdrill的调度,而不是采用spark的调度。

     

    示例如下:

    /*ydb.pushdown('->')*/

    select ydb_sex, count(*) as cbt from ydb_example_shu where ydbpartion='20150811' and ydbkv='mdrill.force:ydb_force_to_mdrill_mark' group by ydb_sex limit 10

    /*('<-')pushdown.ydb*/

     

    3.通过ydb_force_to_spark_mark让SQL以spark的方式执行

    /*ydb.pushdown('->')*/

    select ydb_sex, count(*) as cbt from ydb_example_shu where ydbpartion='20150811' and ydbkv='mdrill.force:ydb_force_to_spark_mark' group by ydb_sex limit 10

    /*('<-')pushdown.ydb*/

    展开全文
  • YDB是我们自主研发的一个大型分布式索引系统。旨在为数据总量为万亿级别、每天千亿级别数据增量的项目提供近似实时的数据导入,并提供近似实时响应的多维查询与统计服务。 大索引技术 为什么要使用大索引?使用后会...
  • 延云YDB安装与使用说明书 超千亿规模的数据,数据库根本就运行不了,怎么办? 数据从产生到能够查询,要延迟一天才能看到,如何能做到分钟级延迟? 50台规模的hadoop集群,几亿条数据,一个MR任务要运行几小时,每天...
  • YDB是我们自主研发的一个大型分布式索引系统。旨在为数据总量为万亿级别、每天千亿级别数据增量的项目提供近似实时的数据导入,并提供近似实时响应的多维查询与统计服务。 大索引技术 为什么要使用大索引?使用后会...
  • 延云YDB 大数据 万亿数据秒查
  • YDB技术原理

    2017-02-20 20:54:01
    第十二章YDB技术原理 一、铺一条让Spark跑的更快的路   二、YDB的本质 在Spark之上基于搜索引擎技术,实现索引和搜索功能。 既有搜索引擎的查询速度,又有Spark强大的分析计算...


    第十二章YDB技术原理

    一、铺一条让Spark跑的更快的路

     

    二、YDB的本质

    Spark之上基于搜索引擎技术,实现索引和搜索功能。

    既有搜索引擎的查询速度,又有Spark强大的分析计算能力。

    可对多个字段进行关键字全匹配或模糊匹配检索,并可对检索结果集进行分组、排序、计算等统计分析操作。

     

    三、多种技术组合-万亿数据秒级查询

     

    四、整体架构

     

     

     

     

     

     

    五、通过读写双向BLOCK-BUFFER减少文件IO

     

    读写双向BLOCK-BUFFER的设计注意事项

     

     

    六、倒排索引与跳跃表

    1.倒排索引与跳跃表基本原理

     

     

     

     

    2.与开源的倒排索引系统相比

     

    l数据不要存储在本地

            硬盘容易坏,恢复麻烦,每次数据都的从备份恢复么?20T数据的恢复要多久-7个小时够么? 

    l系统资源问题

            索引不要持久化的打开的,永远不关闭,在万亿规模下-太耗资源,要修改为LRU按需加载,不   

       经常使用的索引要关闭掉,节省资源。 

    lIO与内存问题

            fdx,tvx,fnm,si,tip等文件是常驻内存的,要改为按需读取,提高首次打开索引的速度。 

    lLuceneDocvalues可以优化一下

            三到四次重复IO,略加改动在索引合并是就可以节省2~3倍的IO。 

            多个segments之间要做关系映射,特别耗费CPU与内存,这也是SOLR耗内存的主要原因之一。 

             HASHSET操作太影响性能,要去掉换成数组。 

    lGC问题

            创建Field的时候,有对象可以复用,否则GC问题严重。 

          (在solr里每个field要创建60多个对象,每行要创建600多个对象。) 

    l数据倾斜问题

            如 性别=男 and 手机号=1234567890。多个条件查询的时候要充分利用跳跃表。

     

    3.针对范围查找我们所做的优化(skiplist IO 分析)

           范围查找,尤其是时间范围的查找,在日常检索中会被经常使用,在范围查找中跳跃表的利用与否对性能影响非常大。

           我们对lucene的默认范围查找做了一个小实验,对IO情况做了分析。

    测试结果如下

    1)普通的等值SQL分析-占用IO较小

     筛选条件为:phonenum='13470881895' and amtdouble=50

    2)使用小范围的 term扫描(IO也较小)

     筛选条件为:phonenum='13470881895' and amtdouble like '([50 to 50])'

    3)使用大范围的term扫描(IO非常大,超出想象)

     筛选条件为:phonenum='13470881895' and  (amtdouble>='50' or amtdouble<='50')

     

     

    amtlong采用的数据类型为tlong类型,已经尽量通过tree的层次结构减少了term的个数,但是没想到,doclist本很成为瓶颈。

    doclist用来存储一个term对应的doc id的列表,由于数据量很大,有些term可能达数亿甚至几十亿个。

     

    问题分析

           我们在上述查找中,都限定了手机号码,理论上,只要利用了skiplist的跳跃功能(lucene中对应advance方法),IO会很小,但是明显第三种测试的IO超出了我们的预期。

           对于文档数量较少的范围查找,是否使用了跳跃功能对性能影响不大,但是YDB的场景更偏重大数据场景,倒排表对应的skiplist会特别长,如果没有使用跳跃功能就会出现上面那种一个查询耗费几个GB的IO的情况,严重影响查询性能。

           我们针对每个IO,打印出详细的函数调用关系,验证我们的推测。

    前两种情况均使用了advance。







    第三种情况没有advance,而是采用了暴力遍历的方式,所以IO特别巨大,我们通过源码分析到了具体原因,超过16个term后,lucene默认就不会继续使用skiplist了。







     

    如何解决?

           lucene这样优化是有明显的原因的,即当term数量特别多的时候,跳跃的功能会带来更多的随机读,相反性能会更差。

           但显然对于海量数据的情况下不适用,因IO巨大导致检索性能很慢,YDB针对范围查找做了如下的变更改动

           16个term真的太小太小,我们更改为1024个,针对tlong,tint,tfloat,tdouble类型的数据将会有特别高的扫描性能。

           大多时候term对应的skiplist也是有数据倾斜的,尤其是tlong,ting,tfloat,tdouble类型本身的分层特性。对于有数据倾斜的term我们要区别对待,对于skiplist很长的term采用跳跃功能能显著减少IO,对于skiplist很短的term则采用顺序读取,遍历的方式,减少随机读。

          

     

    七、采用标签代替原始值-进行分组与排序

     

    采用标签标记技术-让大数据化小

    优点

    1.重复值仅存储一份,可以减少存储空间占用。

    2.标签值采用定长存储,可随机读取。

    3.Group  by分组计算的时候,使用标签代替原始值,数值型计算速度比字符串的计算速度快很多。

    4.标签值的大小原始值的大小是对应的,故排序的时候也仅读取标签进行排序。

    5.标签比原始值占的内存少。

    缺点

    1.如果数据重复值很低,存储空间相反比原始数据大。

    2.如果重复值很低,且查询逻辑需要大量的根据标签值获取原始值的操作的时候,性能比原始值慢。

    下图为替换示例,示意图

     

     

     

     

     

    在真实的数据中,数据肯定是有重复的,比如说类目,性别,年龄,成交金额等信息,传统的分析工具存储的是原始的值,比如说我们有1千亿条数据,那么就要存储一千亿条记录,那么进行统计的时候,一条一条的读出这些类目肯定要慢的不得了。

     

           还记得机器人总动员中的伊娃么?当数据规模达到一定程度以后,如果还是直接对原始值进行读取,对大数据的搬运工作将会特别的消耗体力,而且工作效率很低。



    ydb对原始数据做了一些处理,基本思路是:虽然你有1千亿的数据,但是你的类目不会那么多,典型的系统一般是几万个类目,2~3个性别值,故ydb在存储的时候虽然有1千亿条记录,但是只会存储几万个类目,2个性别,这根原始的千亿条记录在数据规模上可是相差千万倍,那么在之后的统计(count,sum,avg等)势必会比传统的分析工具快上千倍万倍。

           ydb的这种方式我们称为标签技术,就是将数据的真实值用一个数值标签来替换数据本身,原始数据每个值我们只存储一份,这样当有大量重复值的数据,可以节省很多IO,即使数据重复值很少,我们也可以一个数字来代表原始值,因为原始值有可能比较大,但数值确可以很好的压缩。

           基本的处理过程如下图所示:



     

    八、按单元格存储

           YDB在列的处理上也采用了列存储的技术,列和列之间的值是分开的,基于YDB的一个项目(face),是一个几万个列的大宽表,用户分析的时候往往只关心几个列(维度),如果像传统的分析软件那样,将几万个列的值都读出来,然后只取其中几个列的值,那么太浪费了,所以列存储技术YDB这种基于检索的分析系统来说也是必备的。

           但是仅仅列存储也是不够的,如果数据行数很多,即使至于一个列的暴力扫描也会很慢,所以YDB结合了索引+标签技术,如下图所示,我们使用了按照单元格存储的方式,每个单元格存储的是一个标签的值,而非原始数据,这样就可以进行定长的跳跃的读取,而且根据标签值的情况,我们可以采取不同的压缩算法

     

          

     

    九、利用倒排索引跳过不需要的行与列-不进行暴力扫描

     

     

     

           YDB跟传统的分析工具最大的不同,是传统的分析工具很多时候采用的非常暴力的全表扫描的方式进行统计,有1千亿条记录,就要扫描1千亿行,你可能只需要在其中找几条记录而已,却要匹配1千亿次。

           如果你没有变种人X教授的最强大脑,那么千万不要学他这么做,一个一个的找人,真的会累死的。




     

           YDB使用索引直接定位到相关的记录,不需要的记录则全部都跳过去,这样无疑会节省很多的IO,从目前的几个案例来看,对几千亿的数据量进行一次检索耗时也就是几秒钟,这要是采用哪种暴力扫描的方式,怎么说不得几个小时才能算完啊。

     

           这里面涉及倒排索引、跳跃表、delta压缩,doclist压缩与跳跃,bitset等相关跟索引有关的技术,看着很高大上,其实并不难理解。其实大家可以回想一下,我们小时候使用的新华字典了,目录其实就是一种索引,只有拼音,和偏旁部首等几个维度,但我们绝不会为了查找某一个字,翻遍每一页,而是借助目录的多级索引进行快速的定位,YDB与之类似,只不过复杂了一些,在细节上进行了很多的优化。

     




     



     

    十、非排序的列最后延迟读取

     

     

    十一、采用blockSort快速排序

    blockSort排序(排序大跃进)

           按照时间逆序排序可以说是很多日志系统的硬指标。在延云YDB系统中,我们改变了传统的暴力排序方式,通过索引技术,可以超快对数据进行单列排序,不需要全表暴力扫描,这个技术我们称之为blockSort,目前支持tlong,tdouble,tint,tfloat四种数据类型。

           由于blockSort是借助搜索的索引来实现的,所以,采用blockSort的排序,不需要暴力扫描,性能有大幅度的提升。

           blockSort的排序,并非是预计算的方式,可以全表进行排序,也可以基于任意的过滤筛选条件进行过滤排序。

    十二、两段式查询

    1.将原先的一次查询化为两次或多次查询。

    2.第一次查询仅读取必备的列,如排序的列,需要group by与统计的列。

    3.第一次查询不会获取数据的真实值,仅仅读取数据标签

    4.所有的计算都完成后,因为数据进行过排序或汇总,剩余的记录数不多

    这个时候在将标签从字典中转换为真实值,其他列的值也跟着读取过来。

    十三、多区域数据实时导入

     

     

    十四、采用PROCESS-LOCAL 更充分的利用cache

    HadoopRdd只有HDFSCacheTaskLocation与HostTaskLocation,并没有ExecutorCacheTaskLocation无法做到PROCESS-LOCAL

    但是注意下这里的源码,可以通过变通的方式实现。

     

    其实进程在发生故障后,重新启动后,executorId是变化的。要注意修正

     

    十五、创建持久化的进程

           YDB与常规的spark和Hive应用的最大的区别是,YDB里面是在YDB启动的时候就先将Container启动好,而不是等一个SQL查询的时候才去动态的启动进程

    1.这边避免来来回回的复制jar包

    2.jvm本身创建进程开销很大

    3.利用process-local的特性,可以更高效的利用Cache

     

    十六、按需加载

           数据表,索引,列信息,列的值,文件,文件cache均采用LRU的方式加载,只有用到才会打开,不经常使用的会释放掉资源。

     

    十七、addIndexesNoOptimize的优化

    该方法了解lucene的人应该知道,是向当前索引中添加一个新的索引,通常来说我们在mapreduce的第一个阶段会通过大并发创建小索引,在第二个阶段会通过addIndexesNoOptimize的方法将这些小的索引合并成一个完整的最终的索引。

     

        目前lucene在这个地方的实现并不是特别好,addIndexesNoOptimize的处理逻辑是先将外部的索引copy到当前索引所在的目录,然后在进行合并,所以这个就多了一个copy的过程

     

    这样做目前有3个缺点

     

    第一、     当数据量特别大的时候,因为有了一次额外的copy,这种copy带来的开销是很大的,而且也是没必要的。

     

    第二、     因为这这种copy将索引都copy到同一个目录上了,也就意味着在同一个磁盘上,那么在合并索引的时候还需要将这些文件重新读取一遍,单个磁盘的读取速度是有限的,不能利用多个磁盘进行合并会影响合并速度。

     

    第三、     很多时候我希望当前索引下的不同的sigments能够分布到不同的硬盘上,这样检索的时候,同一个索引不同的sigments能够使用不同的硬盘进行检索。

    原理:

     

        针对上述问题,我们对lucene进行了一次比较小的改进,大家可以将其理解为Linux下的文件的软连接,实际的addIndexesNoOptimize方法并不会真正的发生copy,而是仅仅在当前的索引中做了一个标记,标记出他们附加的外部索引存储在什么位置,而不是真的去copy他们。

     

    十八、 solr 的FQ Cache的不足以及在TOP N 全文检索上的改进

    举个倒排表的例子

     

     

     

    性别:男 =>1,2,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

     

    手机:1340100xxxx =>11

     

     

     

    可以看出上述两个列的值有很大区别,性别列,因为值得重复程度特别多会有大量的docid对应性别是男的用户

     

    而对于手机这个列为,一般一个手机号只对应一个docid

     

     

     

    第一个场景

     

    那么如果我去查找性别是男的 前10条记录 而不考虑任何的排序的话,我仅仅从头读出10个docid 就可以了,但实际上solr和lucene本身并没有这样干,solr是为了生成一个完整的bitset作为缓存,将全部的值都会读出来,之后作为缓存放在内存里,对于lucene来说它的默认的collect实现也是收集全部的docid,而不是收集到10个就停止了(它这样做的目的是为了全文检索里面的余弦排序,但很多场景并不需要排序),如果对应几千万条记录的话,IO浪费很多,是很亏的,很有必要自己单独写一个collect.

     

     

     

    第二个场景

     

    我们查找性别是男的并且手机号是 1340100xxxx的用户,很明显,结果就是docid=11的这个用户,这个处理的时候如若大家的过滤条件是通过solr的两个不同的fq参数传递进去的时候,就还会存在第一个场景的问题,性别是男的那个列浪费了很多的IO,所以这个地方要注意改为让他们在同一个FQ里面,使用lucene的booleanQuery去查询,这样因为doclist本身具有跳跃的性质,性别的那个列的相当一部分的docid都会跳跃过去,而节省了IO,所以自某些场景要做适当的优化

     

    十九、ThreadLocal引起的内存泄露

           无论是lucene还是spark 均使用了大量的ThreadLocal对象,采用普通线程使用ThreadLocal不会有问题,线程结束资源就释放了,但是如果想solr与ES那样采用线程池就会引起内存泄露的问题,因为线程池中的线程有可能永久都不释放,所以对于spark,solr,es都存在内存泄露的问题。

           threadlocal里面使用了一个存在弱引用的map,当释放掉threadlocal的强引用以后,map里面的value却没有被回收.而这块value永远不会被访问到了. 所以存在着内存泄露. 最好的做法是将调用threadlocal的remove方法.

           每个thread中都存在一个map, map的类型是ThreadLocal.ThreadLocalMap. Map中的key为一个threadlocal实例. 这个Map的确使用了弱引用,不过弱引用只是针对key. 每个key都弱引用指向threadlocal. 当把threadlocal实例置为null以后,没有任何强引用指向threadlocal实例,所以threadlocal将会被gc回收. 但是,我们的value却不能回收,因为存在一条从current thread连接过来的强引用. 只有当前thread结束以后, current thread就不会存在栈中,强引用断开, Current Thread, Map, value将全部被GC回收.

     

           所以得出一个结论就是只要这个线程对象被gc回收,就不会出现内存泄露,但在threadLocal设为null和线程结束这段时间不会被回收的,就发生了我们认为的内存泄露。其实这是一个对概念理解的不一致,也没什么好争论的。最要命的是线程对象不被回收的情况,这就发生了真正意义上的内存泄露。比如使用线程池的时候,线程结束是不会销毁的,会再次使用的。就可能出现内存泄露。  

     

           PS.Java为了最小化减少内存泄露的可能性和影响,在ThreadLocal的get,set的时候都会清除线程Map里所有key为null的value。所以最怕的情况就是,threadLocal对象设null了,开始发生“内存泄露”,然后使用线程池,这个线程结束,线程放回线程池中不销毁,这个线程一直不被使用,或者分配使用了又不再调用get,set方法,那么这个期间就会发生真正的内存泄露。

     

    我们贴下lucene中跟内存泄露有关的关键代码






    二十、spark的Thread.UncaughtExceptionHandler问题

           1.默认spark会捕获所有的线程异常,一旦发现异常,直接报错退出进程

           2.而lucene在索引合并的时候如果发生了异常,lucene merger schedule,会进行rallback,期望对线程抛出的异常不进行处理,而是应该忽略改异常,但是因为spark 的这个异常捕获,或导致改进程直接退出。

    ,这样的实现对于一个持久化的进程来说是不合理的,如果我们在创建索引过程中由于磁盘很繁忙,就很有可能遇到hdfs的异常,但是这个时候我们期望是程序能够进行重试而不是直接退出。故我们更改了这个地方的实现,让lucene索引合并的时候能够顺利进行重试,而不是一个小小的错误造成持久化进程的退出。

     

     

     

    二十一、spark 内存泄露

    1.高并发情况下的内存泄露的具体表现

    很遗憾,spark的设计架构并不是为了高并发请求而设计的,我们尝试在网络条件不好的集群下,进行100并发的查询,在压测3天后发现了内存泄露。

    a)在进行大量小SQL的压测过程中发现,有大量的activejob在spark ui上一直处于pending状态,且永远不结束,如下图所示



    b)并且发现driver内存爆满



    c)用内存分析分析工具分析了下



    2.高并发下AsynchronousListenerBus引起的WEB UI的内存泄露

    短时间内 SPARK 提交大量的SQL ,而且SQL里面存在大量的 union与join的情形,会创建大量的event对象,使得这里的 event数量超过10000个event ,

    一旦超过10000个event就开始丢弃 event,而这个event是用来回收 资源的,丢弃了 资源就无法回收了
    。 针对UI页面的这个问题,我们将这个队列长度的限制给取消了。

     







    3.AsynchronousListenerBus本身引起的内存泄露

    抓包发现





     

    这些event是通过post方法传递的,并写入到队列里


     

    但是也是由一个单线程进行postToAll的



     

    但是在高并发情况下,单线程的postToAll的速度没有post的速度快,会导致队列堆积的event越来越多,如果是持续性的高并发的SQL查询,这里就会导致内存泄露

     

    接下来我们在分析下postToAll的方法里面,那个路径是最慢的,导致事件处理最慢的逻辑是那个?



     



    可能您都不敢相信,通过jstack抓取分析,程序大部分时间都阻塞在记录日志上

     

    可以通过禁用这个地方的log来提升event的速度

     

    log4j.logger.org.apache.spark.scheduler=ERROR

     



     

     

    4.高并发下的Cleaner的内存泄露

           说道这里,Cleaner的设计应该算是spark最糟糕的设计。spark的ContextCleaner是用于回收与清理已经完成了的 广播boradcast,shuffle数据的。但是高并发下,我们发现这个地方积累的数据会越来越多,最终导致driver内存跑满而挂掉。

    l我们先看下,是如何触发内存回收的



           没错,就是通过System.gc() 回收的内存,如果我们在jvm里配置了禁止执行System.gc,这个逻辑就等于废掉(而且有很多jvm的优化参数一般都推荐配置禁止system.gc 参数)

    lclean过程

    这是一个单线程的逻辑,而且每次清理都要协同很多机器一同清理,清理速度相对来说比较慢,但是SQL并发很大的时候,产生速度超过了清理速度,整个driver就会发生内存泄露。而且brocadcast如果占用内存太多,也会使用非常多的本地磁盘小文件,我们在测试中发现,高持续性并发的情况下本地磁盘用于存储blockmanager的目录占据了我们60%的存储空间。

     



    我们再来分析下 clean里面,那个逻辑最慢



    真正的瓶颈在于blockManagerMaster里面的removeBroadcast,因为这部分逻辑是需要跨越多台机器的。

     

    针对这种问题,

    l我们在SQL层加了一个SQLWAITING逻辑,判断了堆积长度,如果堆积长度超过了我们的设定值,我们这里将阻塞新的SQL的执行。堆积长度可以通过更改conf目录下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值来设置。



    l建议集群的带宽要大一些,万兆网络肯定会比千兆网络的清理速度快很多。

    l给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。

    l增大spark的线程池,可以调节conf下的spark-defaults.conf的如下值来改善。



     

     

    5.线程池与threadlocal引起的内存泄露

           发现spark,hive,lucene都非常钟爱使用threadlocal来管理临时的session对象,期待SQL执行完毕后这些对象能够自动释放,但是与此同时spark又使用了线程池,线程池里的线程一直不结束,这些资源一直就不释放,时间久了内存就堆积起来了。

    针对这个问题,延云修改了spark关键线程池的实现,更改为每1个小时,强制更换线程池为新的线程池,旧的线程数能够自动释放。

     

    6.文件泄露

           您会发现,随着请求的session变多,spark会在hdfs和本地磁盘创建海量的磁盘目录,最终会因为本地磁盘与hdfs上的目录过多,而导致文件系统和整个文件系统瘫痪。在YDB里面我们针对这种情况也做了处理。

     

    7.deleteONExit内存泄露





     

    为什么会有这些对象在里面,我们看下源码



     







    8.JDO内存泄露

    多达10万多个JDOPersistenceManager












     

     



     



    9.listerner内存泄露

    通过debug工具监控发现,spark的listerner随着时间的积累,通知(post)速度运来越慢

    发现所有代码都卡在了onpostevent上






     

    jstack的结果如下



     

    研究下了调用逻辑如下,发现是循环调用listerners,而且listerner都是空执行才会产生上面的jstack截图



     

    通过内存发现有30多万个linterner在里面



     

    发现都是大多数都是同一个listener,我们核对下该处源码



     

    最终定位问题

    确系是这个地方的BUG ,每次创建JDBC连接的时候 ,spark就会增加一个listener, 时间久了,listener就会积累越来越多   针对这个问题 我简单的修改了一行代码,开始进入下一轮的压测

     



     

     

    二十二、spark源码调优

           测试发现,即使只有1条记录,使用 spark进行一次SQL查询也会耗时1秒,对很多即席查询来说1秒的等待,对用户体验非常不友好。针对这个问题,我们在spark与hive的细节代码上进行了局部调优,调优后,响应时间由原先的1秒缩减到现在的200~300毫秒。

          

    以下是我们改动过的地方

    1.SessionState 的创建目录 占用较多的时间



     

    另外使用Hadoop namenode HA的同学会注意到,如果第一个namenode是standby状态,这个地方会更慢,就不止一秒,所以除了改动源码外,如果使用namenode ha的同学一定要注意,将active状态的node一定要放在前面。

    2.HiveConf的初始化过程占用太多时间

    频繁的hiveConf初始化,需要读取core-default.xml,hdfs-default.xml,yarn-default.xml

    ,mapreduce-default.xml,hive-default.xml等多个xml文件,而这些xml文件都是内嵌在jar包内的。

    第一,解压这些jar包需要耗费较多的时间,第二每次都对这些xml文件解析也耗费时间。

     











     

     

    3.广播broadcast传递的hadoop configuration序列化很耗时

    lconfiguration的序列化,采用了压缩的方式进行序列化,有全局锁的问题

    lconfiguration每次序列化,传递了太多了没用的配置项了,1000多个配置项,占用60多Kb。我们剔除了不是必须传输的配置项后,缩减到44个配置项,2kb的大小。

     



     



     

    4.对spark广播数据broadcast的Cleaner的改进

     

    由于SPARK-3015 的BUG,spark的cleaner 目前为单线程回收模式。

    大家留意spark源码注释




     

    其中的单线程瓶颈点在于广播数据的cleaner,由于要跨越很多机器,需要通过akka进行网络交互。

    如果回收并发特别大,SPARK-3015 的bug报告会出现网络拥堵,导致大量的 timeout出现。

    为什么回收量特变大呢? 其实是因为cleaner 本质是通过system.gc(),定期执行的,默认积累30分钟或者进行了gc后才触发cleaner,这样就会导致瞬间,大量的akka并发执行,集中释放,网络不瞬间瘫痪才不怪呢。

    但是单线程回收意味着回收速度
    恒定,如果查询并发很大,回收速度跟不上cleaner的速度,会导致cleaner积累很多,会导致进程OOM(YDB做了修改,会限制前台查询的并发)。

    不论是OOM还是限制并发都不是我们希望看到的,所以针对高并发情况下,这种单线程的回收速度是满足不了高并发的需求的。


    对于官方的这样的做法,我们表示并不是一个完美的cleaner方案。并发回收一定要支持,只要解决akka的timeout问题即可。
    所以这个问题要仔细分析一下,akka为什么会timeout,是因为cleaner占据了太多的资源,那么我们是否可以控制下cleaner的并发呢?比如说使用4个并发,而不是默认将全部的并发线程都给占满呢?这样及解决了cleaner的回收速度,也解决了akka的问题不是更好么?

    针对这个问题,我们最终还是选择了修改spark的ContextCleaner对象,将广播数据的回收 改成多线程的方式,但现在了线程的并发数量,从而解决了该问题。

     

     

    第十二章YDB技术原理

    一、铺一条让Spark跑的更快的路

     

    二、YDB的本质

    Spark之上基于搜索引擎技术,实现索引和搜索功能。

    既有搜索引擎的查询速度,又有Spark强大的分析计算能力。

    可对多个字段进行关键字全匹配或模糊匹配检索,并可对检索结果集进行分组、排序、计算等统计分析操作。

     

    三、多种技术组合-万亿数据秒级查询

     

    四、整体架构

     

     

     

     

     

     

    五、通过读写双向BLOCK-BUFFER减少文件IO

     

    读写双向BLOCK-BUFFER的设计注意事项

     

     

    六、倒排索引与跳跃表

    1.倒排索引与跳跃表基本原理

     

     

     

     

    2.与开源的倒排索引系统相比

     

    l数据不要存储在本地

            硬盘容易坏,恢复麻烦,每次数据都的从备份恢复么?20T数据的恢复要多久-7个小时够么? 

    l系统资源问题

            索引不要持久化的打开的,永远不关闭,在万亿规模下-太耗资源,要修改为LRU按需加载,不   

       经常使用的索引要关闭掉,节省资源。 

    lIO与内存问题

            fdx,tvx,fnm,si,tip等文件是常驻内存的,要改为按需读取,提高首次打开索引的速度。 

    lLuceneDocvalues可以优化一下

            三到四次重复IO,略加改动在索引合并是就可以节省2~3倍的IO。 

            多个segments之间要做关系映射,特别耗费CPU与内存,这也是SOLR耗内存的主要原因之一。 

             HASHSET操作太影响性能,要去掉换成数组。 

    lGC问题

            创建Field的时候,有对象可以复用,否则GC问题严重。 

          (在solr里每个field要创建60多个对象,每行要创建600多个对象。) 

    l数据倾斜问题

            如 性别=男 and 手机号=1234567890。多个条件查询的时候要充分利用跳跃表。

     

    3.针对范围查找我们所做的优化(skiplist IO 分析)

           范围查找,尤其是时间范围的查找,在日常检索中会被经常使用,在范围查找中跳跃表的利用与否对性能影响非常大。

           我们对lucene的默认范围查找做了一个小实验,对IO情况做了分析。

    测试结果如下

    1)普通的等值SQL分析-占用IO较小

     筛选条件为:phonenum='13470881895' and amtdouble=50

    2)使用小范围的 term扫描(IO也较小)

     筛选条件为:phonenum='13470881895' and amtdouble like '([50 to 50])'

    3)使用大范围的term扫描(IO非常大,超出想象)

     筛选条件为:phonenum='13470881895' and  (amtdouble>='50' or amtdouble<='50')

     

     

    amtlong采用的数据类型为tlong类型,已经尽量通过tree的层次结构减少了term的个数,但是没想到,doclist本很成为瓶颈。

    doclist用来存储一个term对应的doc id的列表,由于数据量很大,有些term可能达数亿甚至几十亿个。

     

    问题分析

           我们在上述查找中,都限定了手机号码,理论上,只要利用了skiplist的跳跃功能(lucene中对应advance方法),IO会很小,但是明显第三种测试的IO超出了我们的预期。

           对于文档数量较少的范围查找,是否使用了跳跃功能对性能影响不大,但是YDB的场景更偏重大数据场景,倒排表对应的skiplist会特别长,如果没有使用跳跃功能就会出现上面那种一个查询耗费几个GB的IO的情况,严重影响查询性能。

           我们针对每个IO,打印出详细的函数调用关系,验证我们的推测。

    前两种情况均使用了advance。







    第三种情况没有advance,而是采用了暴力遍历的方式,所以IO特别巨大,我们通过源码分析到了具体原因,超过16个term后,lucene默认就不会继续使用skiplist了。







     

    如何解决?

           lucene这样优化是有明显的原因的,即当term数量特别多的时候,跳跃的功能会带来更多的随机读,相反性能会更差。

           但显然对于海量数据的情况下不适用,因IO巨大导致检索性能很慢,YDB针对范围查找做了如下的变更改动

           16个term真的太小太小,我们更改为1024个,针对tlong,tint,tfloat,tdouble类型的数据将会有特别高的扫描性能。

           大多时候term对应的skiplist也是有数据倾斜的,尤其是tlong,ting,tfloat,tdouble类型本身的分层特性。对于有数据倾斜的term我们要区别对待,对于skiplist很长的term采用跳跃功能能显著减少IO,对于skiplist很短的term则采用顺序读取,遍历的方式,减少随机读。

          

     

    七、采用标签代替原始值-进行分组与排序

     

    采用标签标记技术-让大数据化小

    优点

    1.重复值仅存储一份,可以减少存储空间占用。

    2.标签值采用定长存储,可随机读取。

    3.Group  by分组计算的时候,使用标签代替原始值,数值型计算速度比字符串的计算速度快很多。

    4.标签值的大小原始值的大小是对应的,故排序的时候也仅读取标签进行排序。

    5.标签比原始值占的内存少。

    缺点

    1.如果数据重复值很低,存储空间相反比原始数据大。

    2.如果重复值很低,且查询逻辑需要大量的根据标签值获取原始值的操作的时候,性能比原始值慢。

    下图为替换示例,示意图

     

     

     

     

     

    在真实的数据中,数据肯定是有重复的,比如说类目,性别,年龄,成交金额等信息,传统的分析工具存储的是原始的值,比如说我们有1千亿条数据,那么就要存储一千亿条记录,那么进行统计的时候,一条一条的读出这些类目肯定要慢的不得了。

     

           还记得机器人总动员中的伊娃么?当数据规模达到一定程度以后,如果还是直接对原始值进行读取,对大数据的搬运工作将会特别的消耗体力,而且工作效率很低。



    ydb对原始数据做了一些处理,基本思路是:虽然你有1千亿的数据,但是你的类目不会那么多,典型的系统一般是几万个类目,2~3个性别值,故ydb在存储的时候虽然有1千亿条记录,但是只会存储几万个类目,2个性别,这根原始的千亿条记录在数据规模上可是相差千万倍,那么在之后的统计(count,sum,avg等)势必会比传统的分析工具快上千倍万倍。

           ydb的这种方式我们称为标签技术,就是将数据的真实值用一个数值标签来替换数据本身,原始数据每个值我们只存储一份,这样当有大量重复值的数据,可以节省很多IO,即使数据重复值很少,我们也可以一个数字来代表原始值,因为原始值有可能比较大,但数值确可以很好的压缩。

           基本的处理过程如下图所示:



     

    八、按单元格存储

           YDB在列的处理上也采用了列存储的技术,列和列之间的值是分开的,基于YDB的一个项目(face),是一个几万个列的大宽表,用户分析的时候往往只关心几个列(维度),如果像传统的分析软件那样,将几万个列的值都读出来,然后只取其中几个列的值,那么太浪费了,所以列存储技术YDB这种基于检索的分析系统来说也是必备的。

           但是仅仅列存储也是不够的,如果数据行数很多,即使至于一个列的暴力扫描也会很慢,所以YDB结合了索引+标签技术,如下图所示,我们使用了按照单元格存储的方式,每个单元格存储的是一个标签的值,而非原始数据,这样就可以进行定长的跳跃的读取,而且根据标签值的情况,我们可以采取不同的压缩算法

     

          

     

    九、利用倒排索引跳过不需要的行与列-不进行暴力扫描

     

     

     

           YDB跟传统的分析工具最大的不同,是传统的分析工具很多时候采用的非常暴力的全表扫描的方式进行统计,有1千亿条记录,就要扫描1千亿行,你可能只需要在其中找几条记录而已,却要匹配1千亿次。

           如果你没有变种人X教授的最强大脑,那么千万不要学他这么做,一个一个的找人,真的会累死的。




     

           YDB使用索引直接定位到相关的记录,不需要的记录则全部都跳过去,这样无疑会节省很多的IO,从目前的几个案例来看,对几千亿的数据量进行一次检索耗时也就是几秒钟,这要是采用哪种暴力扫描的方式,怎么说不得几个小时才能算完啊。

     

           这里面涉及倒排索引、跳跃表、delta压缩,doclist压缩与跳跃,bitset等相关跟索引有关的技术,看着很高大上,其实并不难理解。其实大家可以回想一下,我们小时候使用的新华字典了,目录其实就是一种索引,只有拼音,和偏旁部首等几个维度,但我们绝不会为了查找某一个字,翻遍每一页,而是借助目录的多级索引进行快速的定位,YDB与之类似,只不过复杂了一些,在细节上进行了很多的优化。

     




     



     

    十、非排序的列最后延迟读取

     

     

    十一、采用blockSort快速排序

    blockSort排序(排序大跃进)

           按照时间逆序排序可以说是很多日志系统的硬指标。在延云YDB系统中,我们改变了传统的暴力排序方式,通过索引技术,可以超快对数据进行单列排序,不需要全表暴力扫描,这个技术我们称之为blockSort,目前支持tlong,tdouble,tint,tfloat四种数据类型。

           由于blockSort是借助搜索的索引来实现的,所以,采用blockSort的排序,不需要暴力扫描,性能有大幅度的提升。

           blockSort的排序,并非是预计算的方式,可以全表进行排序,也可以基于任意的过滤筛选条件进行过滤排序。

    十二、两段式查询

    1.将原先的一次查询化为两次或多次查询。

    2.第一次查询仅读取必备的列,如排序的列,需要group by与统计的列。

    3.第一次查询不会获取数据的真实值,仅仅读取数据标签

    4.所有的计算都完成后,因为数据进行过排序或汇总,剩余的记录数不多

    这个时候在将标签从字典中转换为真实值,其他列的值也跟着读取过来。

    十三、多区域数据实时导入

     

     

    十四、采用PROCESS-LOCAL 更充分的利用cache

    HadoopRdd只有HDFSCacheTaskLocation与HostTaskLocation,并没有ExecutorCacheTaskLocation无法做到PROCESS-LOCAL

    但是注意下这里的源码,可以通过变通的方式实现。

     

    其实进程在发生故障后,重新启动后,executorId是变化的。要注意修正

     

    十五、创建持久化的进程

           YDB与常规的spark和Hive应用的最大的区别是,YDB里面是在YDB启动的时候就先将Container启动好,而不是等一个SQL查询的时候才去动态的启动进程

    1.这边避免来来回回的复制jar包

    2.jvm本身创建进程开销很大

    3.利用process-local的特性,可以更高效的利用Cache

     

    十六、按需加载

           数据表,索引,列信息,列的值,文件,文件cache均采用LRU的方式加载,只有用到才会打开,不经常使用的会释放掉资源。

     

    十七、addIndexesNoOptimize的优化

    该方法了解lucene的人应该知道,是向当前索引中添加一个新的索引,通常来说我们在mapreduce的第一个阶段会通过大并发创建小索引,在第二个阶段会通过addIndexesNoOptimize的方法将这些小的索引合并成一个完整的最终的索引。

     

        目前lucene在这个地方的实现并不是特别好,addIndexesNoOptimize的处理逻辑是先将外部的索引copy到当前索引所在的目录,然后在进行合并,所以这个就多了一个copy的过程

     

    这样做目前有3个缺点

     

    第一、     当数据量特别大的时候,因为有了一次额外的copy,这种copy带来的开销是很大的,而且也是没必要的。

     

    第二、     因为这这种copy将索引都copy到同一个目录上了,也就意味着在同一个磁盘上,那么在合并索引的时候还需要将这些文件重新读取一遍,单个磁盘的读取速度是有限的,不能利用多个磁盘进行合并会影响合并速度。

     

    第三、     很多时候我希望当前索引下的不同的sigments能够分布到不同的硬盘上,这样检索的时候,同一个索引不同的sigments能够使用不同的硬盘进行检索。

    原理:

     

        针对上述问题,我们对lucene进行了一次比较小的改进,大家可以将其理解为Linux下的文件的软连接,实际的addIndexesNoOptimize方法并不会真正的发生copy,而是仅仅在当前的索引中做了一个标记,标记出他们附加的外部索引存储在什么位置,而不是真的去copy他们。

     

    十八、 solr 的FQ Cache的不足以及在TOP N 全文检索上的改进

    举个倒排表的例子

     

     

     

    性别:男 =>1,2,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

     

    手机:1340100xxxx =>11

     

     

     

    可以看出上述两个列的值有很大区别,性别列,因为值得重复程度特别多会有大量的docid对应性别是男的用户

     

    而对于手机这个列为,一般一个手机号只对应一个docid

     

     

     

    第一个场景

     

    那么如果我去查找性别是男的 前10条记录 而不考虑任何的排序的话,我仅仅从头读出10个docid 就可以了,但实际上solr和lucene本身并没有这样干,solr是为了生成一个完整的bitset作为缓存,将全部的值都会读出来,之后作为缓存放在内存里,对于lucene来说它的默认的collect实现也是收集全部的docid,而不是收集到10个就停止了(它这样做的目的是为了全文检索里面的余弦排序,但很多场景并不需要排序),如果对应几千万条记录的话,IO浪费很多,是很亏的,很有必要自己单独写一个collect.

     

     

     

    第二个场景

     

    我们查找性别是男的并且手机号是 1340100xxxx的用户,很明显,结果就是docid=11的这个用户,这个处理的时候如若大家的过滤条件是通过solr的两个不同的fq参数传递进去的时候,就还会存在第一个场景的问题,性别是男的那个列浪费了很多的IO,所以这个地方要注意改为让他们在同一个FQ里面,使用lucene的booleanQuery去查询,这样因为doclist本身具有跳跃的性质,性别的那个列的相当一部分的docid都会跳跃过去,而节省了IO,所以自某些场景要做适当的优化

     

    十九、ThreadLocal引起的内存泄露

           无论是lucene还是spark 均使用了大量的ThreadLocal对象,采用普通线程使用ThreadLocal不会有问题,线程结束资源就释放了,但是如果想solr与ES那样采用线程池就会引起内存泄露的问题,因为线程池中的线程有可能永久都不释放,所以对于spark,solr,es都存在内存泄露的问题。

           threadlocal里面使用了一个存在弱引用的map,当释放掉threadlocal的强引用以后,map里面的value却没有被回收.而这块value永远不会被访问到了. 所以存在着内存泄露. 最好的做法是将调用threadlocal的remove方法.

           每个thread中都存在一个map, map的类型是ThreadLocal.ThreadLocalMap. Map中的key为一个threadlocal实例. 这个Map的确使用了弱引用,不过弱引用只是针对key. 每个key都弱引用指向threadlocal. 当把threadlocal实例置为null以后,没有任何强引用指向threadlocal实例,所以threadlocal将会被gc回收. 但是,我们的value却不能回收,因为存在一条从current thread连接过来的强引用. 只有当前thread结束以后, current thread就不会存在栈中,强引用断开, Current Thread, Map, value将全部被GC回收.

     

           所以得出一个结论就是只要这个线程对象被gc回收,就不会出现内存泄露,但在threadLocal设为null和线程结束这段时间不会被回收的,就发生了我们认为的内存泄露。其实这是一个对概念理解的不一致,也没什么好争论的。最要命的是线程对象不被回收的情况,这就发生了真正意义上的内存泄露。比如使用线程池的时候,线程结束是不会销毁的,会再次使用的。就可能出现内存泄露。  

     

           PS.Java为了最小化减少内存泄露的可能性和影响,在ThreadLocal的get,set的时候都会清除线程Map里所有key为null的value。所以最怕的情况就是,threadLocal对象设null了,开始发生“内存泄露”,然后使用线程池,这个线程结束,线程放回线程池中不销毁,这个线程一直不被使用,或者分配使用了又不再调用get,set方法,那么这个期间就会发生真正的内存泄露。

     

    我们贴下lucene中跟内存泄露有关的关键代码






    二十、spark的Thread.UncaughtExceptionHandler问题

           1.默认spark会捕获所有的线程异常,一旦发现异常,直接报错退出进程

           2.而lucene在索引合并的时候如果发生了异常,lucene merger schedule,会进行rallback,期望对线程抛出的异常不进行处理,而是应该忽略改异常,但是因为spark 的这个异常捕获,或导致改进程直接退出。

    ,这样的实现对于一个持久化的进程来说是不合理的,如果我们在创建索引过程中由于磁盘很繁忙,就很有可能遇到hdfs的异常,但是这个时候我们期望是程序能够进行重试而不是直接退出。故我们更改了这个地方的实现,让lucene索引合并的时候能够顺利进行重试,而不是一个小小的错误造成持久化进程的退出。

     

     

     

    二十一、spark 内存泄露

    1.高并发情况下的内存泄露的具体表现

    很遗憾,spark的设计架构并不是为了高并发请求而设计的,我们尝试在网络条件不好的集群下,进行100并发的查询,在压测3天后发现了内存泄露。

    a)在进行大量小SQL的压测过程中发现,有大量的activejob在spark ui上一直处于pending状态,且永远不结束,如下图所示



    b)并且发现driver内存爆满



    c)用内存分析分析工具分析了下



    2.高并发下AsynchronousListenerBus引起的WEB UI的内存泄露

    短时间内 SPARK 提交大量的SQL ,而且SQL里面存在大量的 union与join的情形,会创建大量的event对象,使得这里的 event数量超过10000个event ,

    一旦超过10000个event就开始丢弃 event,而这个event是用来回收 资源的,丢弃了 资源就无法回收了
    。 针对UI页面的这个问题,我们将这个队列长度的限制给取消了。

     







    3.AsynchronousListenerBus本身引起的内存泄露

    抓包发现





     

    这些event是通过post方法传递的,并写入到队列里


     

    但是也是由一个单线程进行postToAll的



     

    但是在高并发情况下,单线程的postToAll的速度没有post的速度快,会导致队列堆积的event越来越多,如果是持续性的高并发的SQL查询,这里就会导致内存泄露

     

    接下来我们在分析下postToAll的方法里面,那个路径是最慢的,导致事件处理最慢的逻辑是那个?



     



    可能您都不敢相信,通过jstack抓取分析,程序大部分时间都阻塞在记录日志上

     

    可以通过禁用这个地方的log来提升event的速度

     

    log4j.logger.org.apache.spark.scheduler=ERROR

     



     

     

    4.高并发下的Cleaner的内存泄露

           说道这里,Cleaner的设计应该算是spark最糟糕的设计。spark的ContextCleaner是用于回收与清理已经完成了的 广播boradcast,shuffle数据的。但是高并发下,我们发现这个地方积累的数据会越来越多,最终导致driver内存跑满而挂掉。

    l我们先看下,是如何触发内存回收的



           没错,就是通过System.gc() 回收的内存,如果我们在jvm里配置了禁止执行System.gc,这个逻辑就等于废掉(而且有很多jvm的优化参数一般都推荐配置禁止system.gc 参数)

    lclean过程

    这是一个单线程的逻辑,而且每次清理都要协同很多机器一同清理,清理速度相对来说比较慢,但是SQL并发很大的时候,产生速度超过了清理速度,整个driver就会发生内存泄露。而且brocadcast如果占用内存太多,也会使用非常多的本地磁盘小文件,我们在测试中发现,高持续性并发的情况下本地磁盘用于存储blockmanager的目录占据了我们60%的存储空间。

     



    我们再来分析下 clean里面,那个逻辑最慢



    真正的瓶颈在于blockManagerMaster里面的removeBroadcast,因为这部分逻辑是需要跨越多台机器的。

     

    针对这种问题,

    l我们在SQL层加了一个SQLWAITING逻辑,判断了堆积长度,如果堆积长度超过了我们的设定值,我们这里将阻塞新的SQL的执行。堆积长度可以通过更改conf目录下的ya100_env_default.sh中的ydb.sql.waiting.queue.size的值来设置。



    l建议集群的带宽要大一些,万兆网络肯定会比千兆网络的清理速度快很多。

    l给集群休息的机会,不要一直持续性的高并发,让集群有间断的机会。

    l增大spark的线程池,可以调节conf下的spark-defaults.conf的如下值来改善。



     

     

    5.线程池与threadlocal引起的内存泄露

           发现spark,hive,lucene都非常钟爱使用threadlocal来管理临时的session对象,期待SQL执行完毕后这些对象能够自动释放,但是与此同时spark又使用了线程池,线程池里的线程一直不结束,这些资源一直就不释放,时间久了内存就堆积起来了。

    针对这个问题,延云修改了spark关键线程池的实现,更改为每1个小时,强制更换线程池为新的线程池,旧的线程数能够自动释放。

     

    6.文件泄露

           您会发现,随着请求的session变多,spark会在hdfs和本地磁盘创建海量的磁盘目录,最终会因为本地磁盘与hdfs上的目录过多,而导致文件系统和整个文件系统瘫痪。在YDB里面我们针对这种情况也做了处理。

     

    7.deleteONExit内存泄露





     

    为什么会有这些对象在里面,我们看下源码



     







    8.JDO内存泄露

    多达10万多个JDOPersistenceManager












     

     



     



    9.listerner内存泄露

    通过debug工具监控发现,spark的listerner随着时间的积累,通知(post)速度运来越慢

    发现所有代码都卡在了onpostevent上






     

    jstack的结果如下



     

    研究下了调用逻辑如下,发现是循环调用listerners,而且listerner都是空执行才会产生上面的jstack截图



     

    通过内存发现有30多万个linterner在里面



     

    发现都是大多数都是同一个listener,我们核对下该处源码



     

    最终定位问题

    确系是这个地方的BUG ,每次创建JDBC连接的时候 ,spark就会增加一个listener, 时间久了,listener就会积累越来越多   针对这个问题 我简单的修改了一行代码,开始进入下一轮的压测

     



     

     

    二十二、spark源码调优

           测试发现,即使只有1条记录,使用 spark进行一次SQL查询也会耗时1秒,对很多即席查询来说1秒的等待,对用户体验非常不友好。针对这个问题,我们在spark与hive的细节代码上进行了局部调优,调优后,响应时间由原先的1秒缩减到现在的200~300毫秒。

          

    以下是我们改动过的地方

    1.SessionState 的创建目录 占用较多的时间



     

    另外使用Hadoop namenode HA的同学会注意到,如果第一个namenode是standby状态,这个地方会更慢,就不止一秒,所以除了改动源码外,如果使用namenode ha的同学一定要注意,将active状态的node一定要放在前面。

    2.HiveConf的初始化过程占用太多时间

    频繁的hiveConf初始化,需要读取core-default.xml,hdfs-default.xml,yarn-default.xml

    ,mapreduce-default.xml,hive-default.xml等多个xml文件,而这些xml文件都是内嵌在jar包内的。

    第一,解压这些jar包需要耗费较多的时间,第二每次都对这些xml文件解析也耗费时间。

     











     

     

    3.广播broadcast传递的hadoop configuration序列化很耗时

    lconfiguration的序列化,采用了压缩的方式进行序列化,有全局锁的问题

    lconfiguration每次序列化,传递了太多了没用的配置项了,1000多个配置项,占用60多Kb。我们剔除了不是必须传输的配置项后,缩减到44个配置项,2kb的大小。

     



     



     

    4.对spark广播数据broadcast的Cleaner的改进

     

    由于SPARK-3015 的BUG,spark的cleaner 目前为单线程回收模式。

    大家留意spark源码注释




     

    其中的单线程瓶颈点在于广播数据的cleaner,由于要跨越很多机器,需要通过akka进行网络交互。

    如果回收并发特别大,SPARK-3015 的bug报告会出现网络拥堵,导致大量的 timeout出现。

    为什么回收量特变大呢? 其实是因为cleaner 本质是通过system.gc(),定期执行的,默认积累30分钟或者进行了gc后才触发cleaner,这样就会导致瞬间,大量的akka并发执行,集中释放,网络不瞬间瘫痪才不怪呢。

    但是单线程回收意味着回收速度
    恒定,如果查询并发很大,回收速度跟不上cleaner的速度,会导致cleaner积累很多,会导致进程OOM(YDB做了修改,会限制前台查询的并发)。

    不论是OOM还是限制并发都不是我们希望看到的,所以针对高并发情况下,这种单线程的回收速度是满足不了高并发的需求的。


    对于官方的这样的做法,我们表示并不是一个完美的cleaner方案。并发回收一定要支持,只要解决akka的timeout问题即可。
    所以这个问题要仔细分析一下,akka为什么会timeout,是因为cleaner占据了太多的资源,那么我们是否可以控制下cleaner的并发呢?比如说使用4个并发,而不是默认将全部的并发线程都给占满呢?这样及解决了cleaner的回收速度,也解决了akka的问题不是更好么?

    针对这个问题,我们最终还是选择了修改spark的ContextCleaner对象,将广播数据的回收 改成多线程的方式,但现在了线程的并发数量,从而解决了该问题。

     

     

    展开全文
  • ###yDB:基于 RTM 的高性能、可扩展的内存键值存储团队成员:Zhiyuan Yang、Zhizhou Yang ###SUMMARY 我们基于英特尔 RTM(受限事务内存)和乐观并发控制 [2] 实现了一个名为 yDB 的内存键值存储。 yDB 在多核机器...
  • YDB 086.1-2012

    2015-05-25 14:14:44
    LTE数字移动通信系统电磁兼容性要求和测量方法 第1部分:移动台及其辅助设备
  • php调试工具ydb.zip

    2019-07-19 09:07:35
     ydb也可以称为一个在线调试工具,什么叫在线调试?就是在线上生产环境进行调试,假设有一天某个用户报某个页面某个数据怎么不对啊,看来线上出BUG了,于是你要迅速找出原因,首先看日志,可是悲剧的没有足够的...
  • YDB进阶使用详解

    2017-02-20 20:52:35
    第十章YDB进阶使用详解 一、Lucene原生查询语法的使用  YDB的索引本质上是Lucene索引,如果之前有使用Solr Cloud、ElasticSearch的朋友肯定对Lucene不陌生,对于Lucene有自己的强大的过滤筛选方式,YDB里面...

    第十章YDB进阶使用详解

    一、Lucene原生查询语法的使用

           YDB的索引本质上是Lucene索引,如果之前有使用Solr Cloud、ElasticSearch的朋友肯定对Lucene不陌生,对于Lucene有自己的强大的过滤筛选方式,YDB里面依然兼容这种语法方式。

    1.使用ydb_raw_query_s like 进行Lucene语法匹配

    示例:

    ydb_raw_query_s like 'cardnum:9612880118617710'

     

    使用临近搜索的示例

    ydb_raw_query_s like 'content:"建设银行"~0'

     

    2.使用ydb_code_query_s like 进行Lucene语法匹配

    ydb_raw_query_s与ydb_code_query_s的区别是ydb_code_query_s里的值要进行urlencode编码

     

    如:

    ydb_code_query_s like 'content%3A%22%E5%BB%BA%E8%AE%BE%E9%93%B6%E8%A1%8C%22%7E0'

     

     

    二、自定义拓展分词类型

           默认YDB提供了一些分词的数据类型,可以满足大部分的匹配场景,但是如果因为业务的特殊,我们需要自己对Lucene分词进行拓展,YDB也提供了拓展自定义分词的方式。具体使用方法跟Solr的schema.xml非常类似。

     

    1.首先 编辑lib下的fieldType.txt文件,增加新的分词类型


    2.然后将相关依赖的jar包放到ydb的lib目录下,并重启YDB

    3.建表示例

    create table myntest(

    c1 ft_yanniantest_y,

    c2 ft_yanniantest_yy,

    c3 ft_yanniantest_yyy,

    c4 ft_yanniantest_yn,

    c5 ft_yanniantest_yny,

    c6 ft_yanniantest_yyyyyyy,

    c7 ft_yanniantest_ynynyny

    )

    对应关系如下


    三、IK词库分词

           YDB内置了第三方的IK词库分词,IK分词的介绍如下

    1.IKAnalyzer 简介

           IKAnalyzer 是一个开源基于 Java 语言的轻量级的中文分词第三方工具包,从 2006 年推出已经经历了三个较为完整的版本,目前最新版本为3.2.8,它基于Lucene 为应用主体,但是也支持脱离 Lucene,成为一个独立的面向JAVA 的分词工具。

    2.IKAnalyzer 特性

    a. 算法采用“正向迭代最细粒度切分算法”, 支持细粒度和最大词长两种分词方式,速度最大支持 80W 字/秒(1600KB/秒) 。

    b. 支持多子处理器分析模式:中文、数字、字母,并兼容日文、韩文。

    c. 较小的内存占用,优化词库占有空间,用户可自定义扩展词库。 采用歧义分析算法优化查询关键字的搜索排列组

    d. 扩展 Lucene 的扩展实现,

    3.YDB内的IK数据类型

    textikikyy 存储,并且采用IK进行分词

    ikyn,只进行IK分词,但不进行存储

    4.IK分词的词库,怎样拓展

    IK词库文件位于lib目录下的IK_ext.dic

    大家修改后,记得重启YDB,以便让词库生效。

    四、动态列,动态字段

           跟Solr 一样,正常情况下,需要事先把知道的字段定义在创建表中创建好,但是有的时候某个表的字段会不确定,并没同一个表的不同行之间有可能使用的字段也会不同,这时可以使用动态字段。

           这是一段动态字段的配置示例:

    <dynamicField name="*_s" type="string" indexed="true" stored="false" docValues="true" omitNorms="true" omitTermFreqAndPositions="true" />

           这样,送过来的索引数据中,如果有以 _s 结尾的字段的值都都会被按照上述的索引方式进行索引,如字段名称为title_s,content_s。

           在匹配过程,如果动态字段和静态字段都符合,会优先匹配静态字段。另外动态字段的仅支持 * 这一通配符,这个通配符仅能位于开头或结尾。

           动态字段(Dynamic fields)允许 YDB 索引没有在 create table 中明确定义的字段。这个在忘记定义一些字段时很有用。动态字段可以让系统更灵活,通用性更强。

           动态字段和成规字段类似,除了它名字中包含一个通配符外,在索引文档时,一个字段如果在常规字段中没有匹配时,将到动态字段中匹配。

           假设schema中定义了一个叫*_i的动态动态字段,如果要索引一个叫 cost_i 的字段,但是 schema 中不存在 cost_i 的字段,这样 cost_i  将被索引到 *_i 字段中。

    动态字段也是定义在 schema.xml 文件中,和其他字段一样,它也有个名词,字段类型,和属性。

     

    5.动态字段的配置解析

    <dynamicField name="*_s" type="string" indexed="true" stored="false" docValues="true" omitNorms="true" omitTermFreqAndPositions="true" />

    1)name="*_s"  匹配规则

    2)type="string" 该动态字段 匹配的Solr的fieldType(记住,并不是YDB的数据类型,后面我们会列出默认YDB内部支持的Solr的fieldType,当然也可以通过《自定义拓展分词类型》对于分词的类型进行自定义拓展)

    3)indexed="true" 是否启用索引,如果启用该字段可以用于where中的检索

    4)stored="false" 其否启用行存储,一般除了分词的类型使用按行存储外,其他的类型均建议按列存储

    5)docValues="true"其否启用列存储,一般除了分词的类型使用按行存储外,其他的类型均建议按列存储

    6)omitNorms="true" 是否对字段的长度进行规范化,一般我们用不上,建议配置为true直接忽略

    7)omitTermFreqAndPositions="true" 对于保存了位置的分词类型,通过这个参数可以配置该字段是否需要保存词频和位置,一般我们都设置为true,不启用。

     

    6.YDB内部支持的solr的fieldType

    1)string类型

        <fieldType name="string" class="solr.StrField" sortMissingLast="true" omitNorms="true" omitTermFreqAndPositions="true" />

    2)int类型

        <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    3)float类型

        <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    4)long类型

        <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    5)double类型

    <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    6)tint类型

        <fieldType name="tint" class="solr.TrieIntField" precisionStep="8" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true"  />

    7)tfloat类型

        <fieldType name="tfloat" class="solr.TrieFloatField" precisionStep="8" positionIncrementGap="0"  omitNorms="true" omitTermFreqAndPositions="true" />

    8)tlong类型

        <fieldType name="tlong" class="solr.TrieLongField" precisionStep="8" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    9)tdouble类型

        <fieldType name="tdouble" class="solr.TrieDoubleField" precisionStep="8" positionIncrementGap="0" omitNorms="true" omitTermFreqAndPositions="true" />

    10)多值列的string类型

        <fieldType name="mstring" class="solr.StrFi