精华内容
下载资源
问答
  • kudu

    2021-03-25 19:50:42
    Apache Kudu是专为Apache Hadoop平台开发的列式存储管理器。https://mirrors.tuna.tsinghua.edu.cn/apache/kudu/1.14.0/
  • Kudu

    千次阅读 2020-10-31 21:54:42
    1. 为什么使用Kudu作为存储介质 2. Kudu入门 2.1 Kudu介绍 2.2 Java代码操作Kudu 2.3 Spark操作Kudu

    1. 为什么使用Kudu作为存储介质

    • 数据库数据上的快速分析
       目前很多业务使用事务型数据库(MySQL、Oracle)做数据分析,把数据写入数据库,然后使用 SQL 进行有效信息提取,当数据规模很小的时候,这种方式确实是立竿见影的,但是当数据量级起来以后,会发现数据库吃不消了或者成本开销太大了,此时就需要把数据从事务型数据库里拷贝出来或者说剥离出来,装入一个分析型的数据库里。发现对于实时性和变更性的需求,目前只有 Kudu 一种组件能够满足需求,所以就产生了这样的一种场景:
      在这里插入图片描述

     MySQL 数据库增、删、改的数据通过 Binlog 实时的被同步到 Kudu 里,同时在 Impala(或者其他计算引擎如 Spark、Hive、Presto、MapReduce)上可以实时的看到。
    这种场景也是目前业界使用最广泛的,认可度最高。

    • 用户行为日志的快速分析
      对于用户行为日志的实时性敏感的业务,比如电商流量、AB 测试、优惠券的点击反馈、广告投放效果以及秒级导入秒级查询等需求,按 Kudu 出现以前的架构基本上都是这张图的模式:
      在这里插入图片描述

     不仅链路长而且实时性得不到有力保障,有些甚至是 T + 1 的,极大的削弱了业务的丰富度。
     引入 Kudu 以后,大家看,数据的导入和查询都是在线实时的:
    在这里插入图片描述

     这种场景目前也是网易考拉和hub在使用的,其中hub甚至把 Kudu 当 HBase 来作点查使用。

    2. Kudu入门

    2.1 Kudu介绍

    2.1.1 背景介绍

    在Kudu之前,大数据主要以两种方式存储;

    • 静态数据:
      • 以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。
      • 这类存储的局限性是数据无法进行随机的读写。
    • 动态数据:
      • 以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。
      • 这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。

     从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。
    在这里插入图片描述

     如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但它有如下缺点:

    • 架构复杂。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
    • 时效性低。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。
    • 难以应对后续的更新。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。

     为了解决上述架构的这些问题,Kudu应运而生。Kudu的定位是Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。
    在这里插入图片描述
     从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁。

    2.1.2 新的硬件设备

     内存(RAM)的技术发展非常快,它变得越来越便宜,容量也越来越大。Cloudera的客户数据显示,他们的客户所部署的服务器,2012年每个节点仅有32GB RAM,现如今增长到每个节点有128GB或256GB RAM。存储设备上更新也非常快,在很多普通服务器中部署SSD也是屡见不鲜。HBase、HDFS、以及其他的Hadoop工具都在不断自我完善,从而适应硬件上的升级换代。然而,从根本上,HDFS基于03年GFS,HBase基于05年BigTable,在当时系统瓶颈主要取决于底层磁盘速度。当磁盘速度较慢时,CPU利用率不足的根本原因是磁盘速度导致的瓶颈,当磁盘速度提高了之后,CPU利用率提高,这时候CPU往往成为系统的瓶颈。HBase、HDFS由于年代久远,已经很难从基本架构上进行修改,而Kudu是基于全新的设计,因此可以更充分地利用RAM、I/O资源,并优化CPU利用率。
     我们可以理解为:Kudu相比与以往的系统,CPU使用降低了,I/O的使用提高了,RAM的利用更充分了。

    2.1.3 Kudu是什么

     Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。
     Kudu支持水平扩展,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密。

    2.1.4 Kudu的应用场景

     Kudu的很多特性跟HBase很像,它支持索引键的查询和修改。Cloudera曾经想过基于Hbase进行修改,然而结论是对HBase的改动非常大,Kudu的数据模型和磁盘存储都与Hbase不同。HBase本身成功的适用于大量的其它场景,因此修改HBase很可能吃力不讨好。最后Cloudera决定开发一个全新的存储系统。

    • Strong performance for both scan and random access to help customers simplify complex hybrid architectures(适用于那些既有随机访问,也有批量数据扫描的复合场景)
    • High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高计算量的场景)
    • High IO efficiency in order to leverage modern persistent storage(使用了高性能的存储设备,包括使用更多的内存)
    • The ability to upDATE data in place, to avoid extraneous processing and data movement(支持数据更新,避免数据反复迁移)
    • The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的实时数据备份和查询)

    2.1.5 Kudu架构

     下图显示了一个具有三个 master 和多个 tablet server 的 Kudu 集群,每个服务器都支持多个 tablet。
     它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader 和 follow。
     此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。
    在这里插入图片描述

    下面是一些基本概念:

    角色 作用
    Master 集群中的老大,负责集群管理、元数据管理等功能
    Tablet Server 集群中的小弟,负责数据存储,并提供数据读写服务
    一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。
    只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。
    Table(表) 一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。
    Tablet 一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。

    2.2 Java代码操作Kudu

    2.2.1 构建maven工程

    2.2.2 导入依赖

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.9.0-cdh6.2.1</version>
        </dependency>
    
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>1.9.0-cdh6.2.1</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.9.0-cdh6.2.1</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    

    2.2.3 创建包结构

    包名 说明
    com.erainm 代码所在的包目录

    2.2.4 初始化方法

    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.KuduClient;
    import org.junit.Before;
    
    public class TestKudu {
        //定义KuduClient客户端对象
        private static KuduClient kuduClient;
        //定义表名
        private static String tableName = "person";
    
        /**
         * 初始化方法
         */
        @Before
        public void init() {
            //指定master地址
            String masterAddress = "node2";
            //创建kudu的数据库连接
            kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
        }
    
        //构建表schema的字段信息
        //字段名称   数据类型     是否为主键
        public ColumnSchema newColumn(String name, Type type, boolean isKey) {
            ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
            column.key(isKey);
            return column.build();
        }
    }
    

    2.2.5 创建表

    /**  使用junit进行测试
     *
     * 创建表
     * @throws KuduException
     */
    @Test
    public void createTable() throws KuduException {
        //设置表的schema
        List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
        columns.add(newColumn("CompanyId", Type.INT32, true));
        columns.add(newColumn("WorkId", Type.INT32, false));
        columns.add(newColumn("Name", Type.STRING, false));
        columns.add(newColumn("Gender", Type.STRING, false));
        columns.add(newColumn("Photo", Type.STRING, false));
        Schema schema = new Schema(columns);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置表的副本和分区规则
        LinkedList<String> list = new LinkedList<String>();
        list.add("CompanyId");
        //设置表副本数
        tableOptions.setNumReplicas(1);
        //设置range分区
        //tableOptions.setRangePartitionColumns(list);
        //设置hash分区和分区的数量
        tableOptions.addHashPartitions(list, 3);
        try {
            kuduClient.createTable("person", schema, tableOptions);
        } catch (Exception e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    2.2.6 插入数据

    /**
     * 向表中加载数据
     * @throws KuduException
     */
    @Test
    public void loadData() throws KuduException {
        //打开表
        KuduTable kuduTable = kuduClient.openTable(tableName);
        //创建KuduSession对象 kudu必须通过KuduSession写入数据
        KuduSession kuduSession = kuduClient.newSession();
        //采用flush方式 手动刷新
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        kuduSession.setMutationBufferSpace(3000);
        //准备数据
        for(int i=1; i<=10; i++){
            Insert insert = kuduTable.newInsert();
            //设置字段的内容
            insert.getRow().addInt("CompanyId",i);
            insert.getRow().addInt("WorkId",i);
            insert.getRow().addString("Name","lisi"+i);
            insert.getRow().addString("Gender","male");
            insert.getRow().addString("Photo","person"+i);
            kuduSession.flush();
            kuduSession.apply(insert);
        }
        kuduSession.close();
        kuduClient.close();
    }
    

    2.2.7 查询数据

    /**
     * 查询表数据
     * @throws KuduException
     */
    @Test
    public void queryData() throws KuduException {
        //打开表
        KuduTable kuduTable = kuduClient.openTable(tableName);
        //获取scanner扫描器
        KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
        KuduScanner scanner = scannerBuilder.build();
        //遍历
        while(scanner.hasMoreRows()){
            RowResultIterator rowResults = scanner.nextRows();
            while (rowResults.hasNext()){
                RowResult result = rowResults.next();
                int companyId = result.getInt("CompanyId");
                int workId = result.getInt("WorkId");
                String name = result.getString("Name");
                String gender = result.getString("Gender");
                String photo = result.getString("Photo");
                System.out.print("companyId:"+companyId+" ");
                System.out.print("workId:"+workId+" ");
                System.out.print("name:"+name+" ");
                System.out.print("gender:"+gender+" ");
                System.out.println("photo:"+photo);
            }
        }
        //关闭
        scanner.close();
        kuduClient.close();
    }
    

    2.2.8 修改数据

    /**
     * 修改数据
     * @throws KuduException
     */
    @Test
    public void upDATEData() throws KuduException {
        //打开表
        KuduTable kuduTable = kuduClient.openTable(tableName);
        //构建kuduSession对象
        KuduSession kuduSession = kuduClient.newSession();
        //设置刷新数据模式,自动提交
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
    
        //更新数据需要获取UpDATE对象
        UpDATE upDATE = kuduTable.newUpDATE();
        //获取row对象
        PartialRow row = upDATE.getRow();
        //设置要更新的数据信息
        row.addInt("CompanyId",1);
        row.addString("Name","kobe");
        //操作这个upDATE对象
        kuduSession.apply(upDATE);
        kuduSession.close();
    }
    

    2.2.9 删除数据

    /**
     * 删除表中的数据
     */
    @Test
    public void deleteData() throws KuduException {
        //打开表
        KuduTable kuduTable = kuduClient.openTable(tableName);
        KuduSession kuduSession = kuduClient.newSession();
        //获取Delete对象
        Delete delete = kuduTable.newDelete();
        //构建要删除的行对象
        PartialRow row = delete.getRow();
        //设置删除数据的条件
        row.addInt("CompanyId",2);
        kuduSession.flush();
        kuduSession.apply(delete);
        kuduSession.close();
        kuduClient.close();
    }
    

    2.2.10 删除表

    /**
     * 删除表
     */
    @Test
    public void dropTable() throws KuduException {
        //删除表
        DeleteTableResponse response = kuduClient.deleteTable(tableName);
        //关闭客户端连接
        kuduClient.close();
    }
    

    2.2.11 kudu的分区方式(结合Impala)

     为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。
    kudu提供了3种分区方式。

    2.2.11.1 Hash Partitioning (哈希分区)

     哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。

    /**
     * 测试分区:
     * hash分区
     */
    @Test
    public void testHashPartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));
        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行range分区
        tableOptions.addHashPartitions(parcols,6);
        try {
            kuduClient.createTable("dog",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    2.2.11.2 Range Partitioning (范围分区)

     范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.

    /**
     * 测试分区:
     * RangePartition
     */
    @Test
    public void testRangePartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));
        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行range分区
        tableOptions.setRangePartitionColumns(parcols);
        /**
         * range
         *  0 < value < 10
         * 10 <= value < 20
         * 20 <= value < 30
         * ........
         * 80 <= value < 90
         * */
        int count=0;
        for(int i =0;i<10;i++){
            //范围开始
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId",count);
            //范围结束
            PartialRow upper = schema.newPartialRow();
            count +=10;
            upper.addInt("CompanyId",count);
            //设置每一个分区的范围
            tableOptions.addRangePartition(lower,upper);
        }
        try {
            kuduClient.createTable("student",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    2.2.11.3 Multilevel Partitioning (多级分区)

    Kudu 允许一个表在单个表上组合多级分区。
     当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点 需求.

    /**
     * 测试分区:
     * 多级分区
     * Multilevel Partition
     * 混合使用hash分区和range分区
     *
     * 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
     * hash分区和range分区结合,可以极大的提升kudu的性能
     */
    @Test
    public void testMultilevelPartition() throws KuduException {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
        columnSchemas.add(newColumn("WorkId", Type.INT32,false));
        columnSchemas.add(newColumn("Name", Type.STRING,false));
        columnSchemas.add(newColumn("Gender", Type.STRING,false));
        columnSchemas.add(newColumn("Photo", Type.STRING,false));
        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //hash分区
        tableOptions.addHashPartitions(parcols,5);
        //range分区
        int count=0;
        for(int i=0;i<10;i++){
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId",count);
            count+=10;
            PartialRow upper = schema.newPartialRow();
            upper.addInt("CompanyId",count);
            tableOptions.addRangePartition(lower,upper);
        }
        try {
            kuduClient.createTable("cat",schema,tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    2.2.12 修改表

    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.*;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.List;
    
    /**
     * 修改表操作
     */
    public class AlterTable {
        //定义kudu的客户端对象
        private static KuduClient kuduClient;
        //定义一张表名称
        private static String tableName = "person";
    
        /**
         * 初始化操作
         */
        @Before
        public void init() {
            //指定kudu的master地址
            String masterAddress = "node2";
            //创建kudu的数据库连接
            kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
        }
    
        /**
         * 添加列
         */
        @Test
        public void alterTableAddColumn() {
            AlterTableOptions alterTableOptions = new AlterTableOptions();
            alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
            try {
                kuduClient.alterTable(tableName, alterTableOptions);
            } catch (KuduException e) {
                e.printStackTrace();
           }
        }
    
        /**
         * 删除列
         */
        @Test
        public void alterTableDeleteColumn(){
            AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
            try {
                kuduClient.alterTable(tableName, alterTableOptions);
            } catch (KuduException e) {
                e.printStackTrace();
           }
        }
    
        /**
         * 添加分区列
         */
        @Test
        public void alterTableAddRangePartition(){
            int lowerValue = 110;
            int upperValue = 120;
            try {
                KuduTable kuduTable = kuduClient.openTable(tableName);
                List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
                boolean flag = true;
                for (Partition rangePartition : rangePartitions) {
                    int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
                    if(startKey == lowerValue){
                        flag = false;
                    }
                }
                if(flag) {
                    PartialRow lower = kuduTable.getSchema().newPartialRow();
                    lower.addInt("Id", lowerValue);
                    PartialRow upper = kuduTable.getSchema().newPartialRow();
                    upper.addInt("Id", upperValue);
                    kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
                }else{
                    System.out.println("分区已经存在,不能重复创建!");
                }
            } catch (KuduException e) {
                e.printStackTrace();
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
    
        /**
         * 删除表
         * @throws KuduException
         */
        @Test
        public void dropTable() throws KuduException {
            kuduClient.deleteTable(tableName);
        }
    }
    

    2.3 Spark操作Kudu

    • Spark与KUDU集成支持:
      • DDL操作(创建/删除)
      • 本地Kudu RDD
      • Native Kudu数据源,用于DataFrame集成
      • 从kudu读取数据
      • 从Kudu执行插入/更新/ upsert /删除
      • 谓词下推
      • Kudu和Spark SQL之间的模式映射
      • 到目前为止,我们已经听说过几个上下文,例如SparkContext,SQLContext,HiveContext, SparkSession,现在,我们将使用Kudu引入一个KuduContext。这是可以在Spark应用程序中广播的主要可序列化对象。此类代表在Spark执行程序中与Kudu Java客户端进行交互。
      • KuduContext提供执行DDL操作所需的方法,与本机Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark等。

    2.3.1 创建表

    • 定义kudu的表需要分成5个步骤:
      • 提供表名
      • 提供schema
      • 提供主键
      • 定义重要选项;例如:定义分区的schema
      • 调用create Table api
    • 代码开发
    import java.util
    import com.erainm.SparkKuduDemo.TABLE_NAME
    import org.apache.kudu.client.CreateTableOptions
    import org.apache.kudu.spark.kudu.KuduContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    object SparkKuduTest {
      def main(args: Array[String]): Unit = {
        //构建sparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
    
        //构建SparkSession对象
        val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
        //获取sparkContext对象
        val sc: SparkContext = sparkSession.sparkContext
        sc.setLogLevel("warn")
    
        //构建KuduContext对象
        val kuduContext = new KuduContext("node2:7051", sc)
    
        //1.创建表操作
        createTable(kuduContext)
    
        /**
         * 创建表
         *
         * @param kuduContext
         * @return
         */
        def createTable(kuduContext: KuduContext) = {
          //如果表不存在就去创建
          if (!kuduContext.tableExists(TABLE_NAME)) {
    
            //构建创建表的表结构信息,就是定义表的字段和类型
            val schema: StructType = StructType(
              StructField("userId", StringType, false) ::
                StructField("name", StringType, false) ::
                StructField("age", IntegerType, false) ::
                StructField("sex", StringType, false) :: Nil)
    
            //指定表的主键字段
            val keys = List("userId")
    
            //指定创建表所需要的相关属性
            val options: CreateTableOptions = new CreateTableOptions
            //定义分区的字段
            val partitionList = new util.ArrayList[String]
            partitionList.add("userId")
            //添加分区方式为hash分区
            options.addHashPartitions(partitionList, 6)
    
            //创建表
            kuduContext.createTable(TABLE_NAME, schema, keys, options)
          }
        }
      }
    }
    

     定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方 法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对 象;(要使“asJava”方法可用,请记住导入JavaConverters库。) 创建表后,通过将浏览器指向http//master主机名:8051/tables

    • 来查看Kudu主UI可以找到创建的表,通过单击表ID,能够看到表模式和分区信息。
      在这里插入图片描述
      点击Table id 可以观察到表的schema等信息:
      在这里插入图片描述

    2.3.2 DML操作

     Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:

    • INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。 使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述 的INSERT_IGNORE。
    • INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。
    • DELETE - 从Kudu表中删除DataFrame中的行
    • UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
    • UPDATE - 更新dataframe中的行

    2.3.2.1 插入数据insert操作

    先创建一张表,然后把数据插入到表中

    import java.util
    
    import com.erainm.SparkKuduDemo.{TABLE_NAME, erainm}
    import org.apache.kudu.client.CreateTableOptions
    import org.apache.kudu.spark.kudu.KuduContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    
    object SparkKuduTest {
      //定义样例类
      case class erainm(id:Int, name:String, age:Int, sex:Int)
      
      def main(args: Array[String]): Unit = {
        //构建sparkConf对象
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
    
        //构建SparkSession对象
        val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    
        //获取sparkContext对象
        val sc: SparkContext = sparkSession.sparkContext
        sc.setLogLevel("warn")
    
        //构建KuduContext对象
        val kuduContext = new KuduContext("node2:7051", sc)
    
        //1.创建表操作
        createTable(kuduContext)
    
        /**
         * 创建表
         *
         * @param kuduContext
         * @return
         */
        def createTable(kuduContext: KuduContext) = {
          //如果表不存在就去创建
          if (!kuduContext.tableExists(TABLE_NAME)) {
    
            //构建创建表的表结构信息,就是定义表的字段和类型
            val schema: StructType = StructType(
              StructField("userId", StringType, false) ::
                StructField("name", StringType, false) ::
                StructField("age", IntegerType, false) ::
                StructField("sex", StringType, false) :: Nil)
    
            //指定表的主键字段
            val keys = List("userId")
    
            //指定创建表所需要的相关属性
            val options: CreateTableOptions = new CreateTableOptions
            //定义分区的字段
            val partitionList = new util.ArrayList[String]
            partitionList.add("userId")
            //添加分区方式为hash分区
            options.addHashPartitions(partitionList, 6)
    
            //创建表
            kuduContext.createTable(TABLE_NAME, schema, keys, options)
          }
        }
    
        /**
         * 2)加载数据
         * @param session
         * @param sc
         * @param kuduContext
         */
        def inserData(session: SparkSession, sc: SparkContext, kuduContext: KuduContext): Unit = {
          //定义数据
          val data = List(erainm(1, "tom", 30, 1), erainm(2, "mark", 26, 0))
          val erainmRDD = sc.makeRDD(data)
          import session.implicits._
          val dataFrame: DataFrame = erainmRDD.toDF
    
          kuduContext.insertRows(dataFrame, TABLE_NAME)
        }
      }
    }
    

    2.3.2.2 删除数据delete操作

    /**
     * 4)删除数据
     * @param session
     * @param kuduContext
     */
    def deleteData(session: SparkSession, kuduContext: KuduContext): Unit = {
      //定义数据
      val data = List(erainm(1, "tom", 50, 1), erainm(2, "mark", 30, 0))
    
      import session.implicits._
      val dataFrame: DataFrame = data.toDF().select("id")
    
      kuduContext.deleteRows(dataFrame, TABLE_NAME)
    }
    

    2.3.2.3 更新数据upsert操作

    /**
     * 3)修改数据
     * @param session
     * @param kuduContext
     */
    def upDATEData(session: SparkSession, kuduContext: KuduContext): Unit = {
      //定义数据
      val data = List(erainm(1, "tom", 50, 1), erainm(2, "mark", 30, 0))
    
      import session.implicits._
      val dataFrame: DataFrame = data.toDF()
    
      kuduContext.upDATERows(dataFrame, TABLE_NAME)
    }
    

    2.3.3 dataFrame操作kudu

    2.3.3.1 DataFrameApi读取kudu表中的数据

     虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。

    • 代码示例
    /**
     * 使用DataFrameApi读取kudu表中的数据
     * @param sparkSession
     * @param kuduMaster
     * @param tableName
     */
    def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
      //定义map集合,封装kudu的master地址和要读取的表名
      val options = Map(
        "kudu.master" -> kuduMaster,
        "kudu.table" -> tableName
      )
      sparkSession.read.options(options).kudu.show()
    }
    

    2.3.3.2 DataFrameApi写数据到kudu表中

    在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。

    • 代码示例
    /**
     * 6)DataFrameApi写数据到kudu表中
     */
    def dataFrame2Kudu(session: SparkSession, kuduContext: KuduContext): Unit ={
      val data = List(erainm(3, "canglaoshi", 14, 0), erainm(4, "xiaowang", 18, 1))
      import  session.implicits._
      val dataFrame = data.toDF
    
      //目前,在kudu中,数据的写入只支持append追加
      dataFrame.write.mode("append").options(kuduOptions).kudu
    
      //查看结果
      //导包
      import org.apache.kudu.spark.kudu._
      //加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果
      sparkSession.read.options(kuduOptions).kudu.show()
    }
    

    2.3.3.3 使用sparksql操作kudu表

     可以选择使用Spark SQL直接使用INSERT语句写入Kudu表;与’append’类似,INSERT语句实际上将默认使用 UPSERT语义处理;

    • 代码示例
    /**
     * 使用sparksql操作kudu表
     * @param sparkSession
     * @param sc
     * @param kuduMaster
     * @param tableName
     */
    def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
      //定义map集合,封装kudu的master地址和表名
      val options = Map(
        "kudu.master" -> kuduMaster,
        "kudu.table" -> tableName
      )
      val data = List(erainm(10, "小张", 30, 0), erainm(11, "小王", 40, 0))
      import sparkSession.implicits._
      val dataFrame: DataFrame = sc.parallelize(data).toDF
      //把dataFrame注册成一张表
      dataFrame.createTempView("temp1")
    
      //获取kudu表中的数据,然后注册成一张表
      sparkSession.read.options(options).kudu.createTempView("temp2")
      //使用sparkSQL的insert操作插入数据
      sparkSession.sql("insert into table temp2 select * from temp1")
      sparkSession.sql("select * from temp2 where age >30").show()
    }
    

    2.3.4 Kudu Native RDD

    Spark与Kudu的集成同时提供了kudu RDD.

    • 代码示例
    val columnsList = List("id", "name", "age", "sex")
    val rowRDD: RDD[Row] = kuduContext.kuduRDD(sc, TABLE_NAME, columnsList)
    rowRDD.foreach(println(_))
    sc.stop()
    //session.read.options(kuduOptions).kudu.show()
    

    2.3.5 修改表

    /**
     * 添加列
     * @param kuduContext
     */
    def addColumn(kuduContext: KuduContext): Unit ={
      val alterTableOptions: AlterTableOptions = new AlterTableOptions
      alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build)
      try {
        kuduContext.syncClient.alterTable(tableName, alterTableOptions)
      } catch {
        case ex:Exception => ex.printStackTrace()
      }
    }
    
    展开全文
  • kudu安装包

    2019-03-20 18:22:35
    kudu 安装包 分享给需要的人,kudu 分布式内存数据库。
  • Kudu分布式存储引擎 Kudu & Spark&Flink API操作_Kudu_文档
  • Kudu开发人员文档 编译安装Kudu 按照的步骤从源代码构建和安装Kudu 用树来构建Kudu 单个Kudu源代码树可用于多个构建,每个构建都有自己的构建目录。 除了源树的根目录,构建目录可以放置在文件系统中的任何位置。 ...
  • kudu table

    2021-06-05 16:45:10
    kudu kudu介绍 Kudu背景 #在 KUDU 之前,大数据主要以两种方式存储:##可以更快地访问批量数据集(高吞吐量),而不是该数据集中的特定记录(低延迟)静态数据:以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大...

    kudu

    kudu介绍

    Kudu背景

    在这里插入图片描述

    #在 KUDU 之前,大数据主要以两种方式存储:##可以更快地访问批量数据集(高吞吐量),而不是该数据集中的特定记录(低延迟)静态数据:以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。这类存储的局限性是数据无法进行随机的读写。动态数据:以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。#从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求,一个常见的方案是:
    
    #如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但他有如下缺点:1.架构复杂。从架构上看,数据在 HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。2.时效性低。数据从 HBase 导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。3.难以应对后续的更新。真实场景中,总会有数据是「延迟」到达的。如果这些数据之前已经从 HBase 导出到 HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。#为了解决上述架构的这些问题,KUDU 应运而生。KUDU 的定位是 「Fast Analytics on Fast Data」,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。#从上图可以看出,kudu是一个折中的产品,在hdfs和hbase这两个偏科生中平衡了随机读写和批量分析的性能。从kudu的诞生可以说明一个观点:底层技术发展很多时候都是上层业务推动的,脱离业务的技术很可能是空中楼阁。apache kudu是由cloudera开源的存储引擎,可以同时提供低延时的随机读写和高效的数据分析能力。他是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。kudu支持水平扩展,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密
    

    kudu是什么

    #Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。Kudu支持水平扩展,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密。
    

    kudu应用场景

    1.适用于那些既有随机访问,也有批量数据扫描的复合场景2.高计算量的场景3.使用了高性能的存储设备,包括使用更多的内存4.支持数据更新,避免数据反复迁移5.支持跨地域的实时数据备份和查询6.国内使用的kudu一些案例可以查看《构建近实时分析系统.pdf》文档。
    

    kudu架构

    #与HDFS和HBase相似,Kudu使用单个的Master节点,用来管理集群的元数据,并且使用任意数量的Tablet Server(类似HBase中的RegionServer角色)节点用来存储实际数据。可以部署多个Master节点来提高容错性。上图显示了一个具有三个 master 和多个tablet server的Kudu集群,每个服务器都支持多个tablet。它说明了如何使用 Raft 共识来允许master和tablet server的leader和follow。此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet follower。leader以金色显示,而 follower 则显示为蓝色。
    

    Table

    #表(Table)是数据库中用来存储数据的对象,是有结构的数据集合。kudu中的表具有schema(纲要)和全局有序的primary key(主键)。kudu中一个table会被水平分成多个被称之为tablet的片段。#table是数据存储在 Kudu 的位置。表具有schema和全局有序的primary key(主键)。table被分成很多段,也就是称为tablets。
    

    Tablet

    #一个tablet是一张table连续的片段,tablet是kudu表的水平分区,类似于HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。#tablet 会冗余存储。放置到多个tablet server上,并且在任何给定的时间点,其中一个副本被认为是leader tablet,其余的被认之为follower tablet。每个tablet都可以进行数据的读请求,但只有Leadertablet负责写数据请求。#一个tablet是一张table连续的segment,与其它数据存储引擎或关系型数据库的partition(分区)相似。给定的tablet冗余到多个tablet服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,但是写入时需要在为tablet服务的一组tablet server之间达成一致性。
    

    Tablet Server

    #tablet server集群中的小弟,负责数据存储,并提供数据读写服务#一个tablet server 存储了table表的tablet,向kuduclient 提供读取数据服务。对于给定的tablet,一个tablet server 充当leader,其他tablet server 充当该tablet 的follower 副本。#只有leader服务写请求,然而leader 或followers 为每个服务提供读请求。一个tablet server 可以服务多个tablets ,并且一个tablet 可以被多个tablet servers 服务着。#一个tablet server存储tablet和为tablet向client提供服务。对于给定的tablet,一个tablet server充当 leader,其他tablet server充当该 tablet 的follower副本。只有leader服务写请求,然而leader或followers为每个服务提供读请求。leader使用Raft Consensus Algorithm来进行选举 。一个tablet server可以服务多个tablet,并且一个 tablet 可以被多个tablet servers服务着。
    

    Master Server

    #集群中的老大,负责集群管理、元数据管理等功能。
    

    kudu Java API

    引入依赖

    <dependency>    <groupId>org.apache.kudu</groupId>    <artifactId>kudu-client</artifactId>    <version>1.8.0</version></dependency><dependency>    <groupId>junit</groupId>    <artifactId>junit</artifactId>    <version>4.12</version></dependency>
    

    原生操作

    package com.gaotu.kudu.client;
    
    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.*;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.ArrayList;
    
    /**
     * @author xubin03
     * @date 2021/3/4 7:08 下午
     */
    public class TestKudu {
        //声明全局变量 KuduClient 后期通过它来操作 kudu 表
        private KuduClient kuduClient;
        //指定 kuduMaster 地址
        private String kuduMaster;
        //指定表名
        private String tableName;
    
        @Before
        public void init() {
            //初始化操作
            kuduMaster = "al-bj-bigdata-inf-test01";
            //指定表名
            tableName = "student";
            KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster);
            kuduClientBuilder.defaultSocketReadTimeoutMs(10000);
            kuduClient = kuduClientBuilder.build();
        }
    
        //创建表
        @Test
        public void createTable() throws KuduException {
            //判断表是否存在,不存在就构建
            if (!kuduClient.tableExists(tableName)) {
                //构建创建表的 schema 信息 就是表的字段和类型
                ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
                columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id",
                                                                       Type.INT32).key(true).build());
                columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name",
                                                                       Type.STRING).build());
                columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age",
                                                                       Type.INT32).build());
                columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex",
                                                                       Type.INT32).build());
                Schema schema = new Schema(columnSchemas);
                //指定创建表的相关属性
                CreateTableOptions options = new CreateTableOptions();
                ArrayList<String> partitionList = new ArrayList<String>();
                //指定 kudu 表的分区字段是什么
                partitionList.add("id"); // 按照 id.hashcode % 分区 数 = 分区号
                options.addHashPartitions(partitionList, 6);
                kuduClient.createTable(tableName, schema, options);
            }
        }
    
        //向表加载数据
        @Test
        public void insertTable() throws KuduException {
            //向表加 载数据需要一个 kuduSession 对象
            KuduSession kuduSession = kuduClient.newSession();
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
            //需要使用 kuduTable 来构建 Operation 的子类实例对象
            KuduTable kuduTable = kuduClient.openTable(tableName);
            for (int i = 1; i <= 10; i++) {
                Insert insert = kuduTable.newInsert();
                PartialRow row = insert.getRow();
                row.addInt("id", i);
                row.addString("name", "zhangsan -" + i);
                row.addInt("age", 20 + i);
                row.addInt("sex", i % 2);
                kuduSession.apply(insert);// 最后实现执行数据的加载操作
            }
        }
    
        //查询表的数据结果
        @Test
        public void queryData() throws KuduException {
            //构建一个查询的扫描器
            KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
            ArrayList<String> columnsList = new ArrayList<String>();
            columnsList.add("id");
            columnsList.add("name");
            columnsList.add("age");
            columnsList.add("sex");
            kuduScannerBuilder.setProjectedColumnNames(columnsList);
            //返回结果集
            KuduScanner kuduScanner = kuduScannerBuilder.build();
            //遍历
            while (kuduScanner.hasMoreRows()) {
                RowResultIterator rowResults = kuduScanner.nextRows();
                while (rowResults.hasNext()) {
                    RowResult row = rowResults.next();
                    int id = row.getInt("id");
                    String name = row.getString("name");
                    int age = row.getInt("age");
                    int sex = row.getInt("sex");
                    System.out.println("id=" + id + " name=" + name + " age=" + age + " sex = " + sex);
                }
            }
        }
        //id=4 name=zhangsan -4 age=24 sex = 0
        // id=1 name=zhangsan -1 age=21 sex = 1
        // id=5 name=zhangsan -5 age=25 sex = 1
        // id=6 name=zhangsan -6 age=26 sex = 0
        // id=7 name=zhangsan -7 age=27 sex = 1
        // id=3 name=zhangsan -3 age=23 sex = 1
        // id=10 name=zhangsan -10 age=30 sex = 0
        // id=2 name=zhangsan -2 age=22 sex = 0
        // id=8 name=zhangsan -8 age=28 sex = 0
        // id=9 name=zhangsan -9 age=29 sex = 1
    
        //修改表的数据
        @Test
        public void updateData() throws KuduException {
            //修改表的数据需要一个 kuduSession 对象
            KuduSession kuduSession = kuduClient.newSession();
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
            //需要使用 kuduTable 来构建 Operation 的子类实例对象
            KuduTable kuduTable = kuduClient.openTable(tableName);
            //Update update = kuduTable.newUpdate();
            Upsert upsert = kuduTable.newUpsert(); // 如果 id 存在就表示修改,不存在就新增
            PartialRow row = upsert.getRow();
            row.addInt("id", 100);
            row.addString("name", "zhangsan 100");
            row.addInt("age", 100);
            row.addInt("sex", 0);
            kuduSession.apply(upsert);// 最后实现执行数据的修改操作
        }
        //id=4 name=zhangsan -4 age=24 sex = 0
        // id=100 name=zhangsan 100 age=100 sex = 0
        // id=1 name=zhangsan -1 age=21 sex = 1
        // id=5 name=zhangsan -5 age=25 sex = 1
        // id=6 name=zhangsan -6 age=26 sex = 0
        // id=7 name=zhangsan -7 age=27 sex = 1
        // id=3 name=zhangsan -3 age=23 sex = 1
        // id=10 name=zhangsan -10 age=30 sex = 0
        // id=2 name=zhangsan -2 age=22 sex = 0
        // id=8 name=zhangsan -8 age=28 sex = 0
        // id=9 name=zhangsan -9 age=29 sex = 1
    
        //删除数据
        @Test
        public void deleteData() throws KuduException {
            //删除表的数据需要一个 kuduSession 对象
            KuduSession kuduSession = kuduClient.newSession();
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
            //需要使用 kuduTable 来构建 O peration 的子类实例对象
            KuduTable kuduTable = kuduClient.openTable(tableName);
            Delete delete = kuduTable.newDelete();
            PartialRow row = delete.getRow();
            row.addInt("id", 100);
            kuduSession.apply(delete);// 最后实现执行数据的删除操作
        }
    
        //删除表
        @Test
        public void dropTable() throws KuduException {
            if (kuduClient.tableExists(tableName)) {
                kuduClient.deleteTable(tableName);
            }
        }
        
        //最后关闭client连接
        @After
    	public void close() throws KuduException {
    		if(kuduClient != null){
    			kuduClient.close();
    		}
    	}
    }
    

    kudu分区方式

    #为了提供可扩展性,Kudu 表被划分为称为 tablet 的单元,并分布在许多tablet servers 上。行总是属于单个 t ablet 。 将行分配给 tablet 的方法由在表创建期间设置的表的分区决定 。 kudu 提供了 3 种分区方式。
    

    Range Partitioning(范围分区)

    #范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象
    
    public static ColumnSchema newColumn(String name, Type type, boolean iskey) {
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(iskey);
        return column.build();
    }
    
    //测试分区:RangePartition
    @Test
    public void testRangePartition() throws KuduException {
        //设置表的 schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32, true));
        columnSchemas.add(newColumn("WorkId", Type.INT32, false));
        columnSchemas.add(newColumn("Name", Type.STRING, false));
        columnSchemas.add(newColumn("Gender", Type.STRING, false));
        columnSchemas.add(newColumn("Photo", Type.STRING, false));
        //创建 schema
        Schema schema = new Schema(columnSchemas);
        //创建表时 提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行 range 分区
        tableOptions.setRangePartitionColumns(parcols);
        /**
    		 * range
    		 * 0 < value < 10
    		 * 10 <= value < 20
    		 * 20 <= value < 30
    		 * ......
    		 * 80 <= value < 90
    		 * */
        int count = 0;
        for (int i = 0; i < 10; i++) {
            //范围开始
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId", count);
            //范围结束
            PartialRow upper = schema.newPartialRow();
            count += 10;
            upper.addInt("CompanyId", count);
            //设置每一个分区的范围
            tableOptions.addRangePartition(lower, upper);
        }
        try {
            kuduClient.createTable("student", schema, tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    Hash Partitioning (哈希分区)

    #哈希分区通过哈希值将行分配到许多buckets ( 存储桶 之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在tablet之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。
    
    public static ColumnSchema newColumn(String name, Type type, boolean iskey) {
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(iskey);
        return column.build();
    }
    
    //测试分区:hash 分区
    @Test
    public void testHashPartition() throws KuduException {
        //设置表的 schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32, true));
        columnSchemas.add(newColumn("WorkId", Type.INT32, false));
        columnSchemas.add(newColumn("Name", Type.STRING, false));
        columnSchemas.add(newColumn("Gender", Type.STRING, false));
        columnSchemas.add(newColumn("Photo", Type.STRING, false));
        //创建 schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //设置按照那个字段进行 range 分区
        tableOptions.addHashPartitions(parcols, 6);
        try {
            kuduClient.createTable("dog", schema, tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    Multilevel Partitioning (多级分区)

    #Kudu允许一个表在单个表上组合多级分区。当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点需求
    
    public static ColumnSchema newColumn(String name, Type type, boolean iskey) {
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(iskey);
        return column.build();
    }
    
    //测试分区:
    //多级分区
    //Multilevel Partition
    //混合 使用 hash 分区和 range 分区
    //哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet 无限增长问题,
    //hash 分区和 range 分区结合,可以极大的提升 kudu 的性能
    
    @Test
    public void testMultilevelPartition() throws KuduException {
        //设置表的 schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(newColumn("CompanyId", Type.INT32, true));
        columnSchemas.add(newColumn("WorkId", Type.INT32, false));
        columnSchemas.add(newColumn("Name", Type.STRING, false));
        columnSchemas.add(newColumn("Gender", Type.STRING, false));
        columnSchemas.add(newColumn("Photo", Type.STRING, false));
        //创建 schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("CompanyId");
        //hash 分区
        tableOptions.addHashPartitions(parcols, 5);
        //range 分区
        int count = 0;
        for (int i = 0; i < 10; i++) {
            PartialRow lower = schema.newPartialRow();
            lower.addInt("CompanyId", count);
            count += 10;
            PartialRow upper = schema.newPartialRow();
            upper.addInt("CompanyId", count);
            tableOptions.addRangePartition(lower, upper);
        }
        try {
            kuduClient.createTable("cat", schema, tableOptions);
        } catch (KuduException e) {
            e.printStackTrace();
        }
        kuduClient.close();
    }
    

    kudu spark

    #到目前为止,我们已经听说过几个上下文,例如SparkContext SQLContext HiveContext SparkSession ,现在,我们将使用 Kudu 引入一个KuduContext 。这是可在 Spark 应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu Java 客户端进行交互。 
    #KuduContext 提供执行DDL 操作所需的方法,与本机 Kudu RDD 的接口,对数据执行更新 插入删除,将数据类型从 Kudu 转换为 Spark 等。
    

    添加依赖

    	<!-- 如果要下1.6.0-cdh5.14.0版本的包,需要加上 因为这个是来自cdh商业版本,并不是apache版本,cdh的包并没有出现在maven的中央仓库中-->
    	<repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>		
    
    		<!-- spark -->
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client-tools</artifactId>
                <version>1.7.0</version>
            </dependency>
            <!--<dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client</artifactId>
                <version>1.8.0</version>
            </dependency>-->
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-spark2_2.11</artifactId>
                <version>1.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
    

    创建表

    定义kudu 的表需要分成 5 个步骤:
    1:提供表名
    2:提供 schema
    3:提供主键
    4:定义重要选项;例如:定义分区的 schema
    5:调用 create Table api
    
    package com.gaotu.kudu.client
    
    import org.apache.kudu.client.CreateTableOptions
    import org.apache.kudu.spark.kudu.KuduContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    import scala.collection.JavaConverters.seqAsJavaListConverter
    
    object SparkKuduTest {
        def main(args: Array[String]): Unit = {
            //构建 sparkConf 对象
            val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
            //构建 SparkSession 对象
            val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
            //获取 sparkContext 对象
            val sc: SparkContext = sparkSession.sparkContext
            sc.setLogLevel("warn")
            //构建 KuduContext 对象
            val kuduContext = new KuduContext("al-bj-bigdata-inf-test01", sc)
            // 创建表操作
            createTable(kuduContext)
        }
    
        //创建表
        def createTable(kuduContext: KuduContext) = {
            //1.1 定义表名
            val tableName = "spark_kudu"
            //1.2 定义表的 schema
            val schema = StructType(
                //false表示不能为空
                StructField("userId", StringType, false) ::
                StructField("name", StringType, false) ::
                StructField("age", IntegerType, false) ::
                StructField("sex", StringType, false) :: Nil)
            //1.3 定义表的主键
            val primaryKey = Seq("userId")
            //1.4 定义分区的 schema
            val options = new CreateTableOptions
            //设置分区
            //options.setRangePartitionColumns(List("userId").asJava)
            val list = new util.ArrayList[String]()
            list.add("userId")
            options.addHashPartitions(list,8)
            //设置副本
            options.setNumReplicas(1)
            //1.5 创建表
            if (!kuduContext.tableExists(tableName)) {
                kuduContext.createTable(tableName, schema, primaryKey, options)
            }
        }
    }
    
    #定义表时要注意的是Kudu 表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“ asJava ”方法。这是因为在这里,我们调用了 Kudu  Java 客户端本身,它需要 Java 对象(即 java.util.List )而不是 Scala 的List 对象;(要使“ asJava ”方法可用,请记住导入 JavaConverters 库。)
    创建表后,通过将浏览器指向http// master 主机名 :8051/tables 来查看Kudu 主 UI 可以找到创建的表,通过单击表 ID ,能够看到表模式和分区信息。
    
    点击Table id 可以观察到表的 schema 等信息:
    

    dataFrame操作kudu

    #Kudu支持许多 DML 类型的操作,其中一些操作包含在 Spark on Kudu 集成
    #包括:
    
    #1.INSERT 将 DataFrame 的行插入 Kudu 表。请注意,虽然 API 完全支持 INSERT但不鼓励在 Spark 中使用它。使用 INSERT 是有风险的,因为 Spark 任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为 如果行已经存在, INSERT 将不允许插入行(导致失败) 。相反,我们鼓励使 用下面描述的 INSERT_IGNORE 。
    
    #2.INSERT IGNORE 将 DataFrame 的行插入 Kudu 表。如果表存在,则忽略插入动作。
    
    #3.DELETE 从 Kudu 表中删除 DataFrame 中的行
    
    #4.UPSERT 如果存在,则在 Kudu 表中更新 DataFrame 中的行,否则执行插入操作。
    
    #5.UPDATE 更新 dataframe 中的行
    

    插入数据insert操作

    先创建一张表,然后把数据插入到表中。
    
    package com.gaotu.kudu.client
    
    import org.apache.kudu.client.CreateTableOptions
    import org.apache.kudu.spark.kudu.{KuduContext, KuduDataFrameReader, KuduDataFrameWriter}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    import scala.collection.JavaConverters.seqAsJavaListConverter
    
    //定义样例类 用于封装表的一行数据
    case class People(id: Int, name: String, age: Int)
    
    object DataFrameKudu {
        def main(args: Array[String]): Unit = {
            //构建 SparkConf 对象
            val sparkConf: SparkConf = new SparkConf().setAppName("DataFrameKudu").setMaster("local[2]")
            //构建 SparkSession 对象
            val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
            //获取 SparkContext 对象
            val sc: SparkContext = sparkSession.sparkContext
            sc.setLogLevel("warn")
            //指定 kudu 的 master 地址
            val kuduMaster = "al-bj-bigdata-inf-test01"
            //构建 KuduContext 对象
            val kuduContext = new KuduContext(kuduMaster, sc)
            //定义表名
            val tableName = "people"
            //1 、创建表
            createTable(kuduContext, tableName)
            //2 、插入数据到表中
            insertData2table(sparkSession, sc, kuduContext, tableName)
            //3、删除数据
            deleteData(sparkSession, sc, kuduMaster, kuduContext, tableName)
            //4、更新数据
            UpsertData(sparkSession, sc, kuduMaster, kuduContext, tableName)
            //5、获取数据
            getTableData(sparkSession, kuduMaster, tableName)
    
            //6、dataframe
            dataFrame2kudu(sparkSession, sc, kuduMaster, tableName)
            //7、sparksql
            SparkSql2Kudu(sparkSession, sc, kuduMaster, tableName)
        }
    
        //创建表
        private def createTable(kuduContext: KuduContext, tableName: String): Unit = {
            //定义表的 schema
            val schema = StructType(
                StructField("id", IntegerType, false) ::
                StructField("name", StringType, false) ::
                StructField("age", IntegerType, false) :: Nil)
            //定义表的主键
            val tablePrimaryKey = List("id")
            //定义表的选项配置
            val options = new CreateTableOptions
            options.setRangePartitionColumns(List("id").asJava)
            options.setNumReplicas(1)
            //创建表
            if (!kuduContext.tableExists(tableName)) {
                kuduContext.createTable(tableName, schema, tablePrimaryKey, options)
            }
        }
    
        //插入数据到表中
        private def insertData2table(sparkSession: SparkSession, sc: SparkContext, kuduContext: KuduContext, tableName: String): Unit = {
            //准备数据
            val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))
            val peopleRDD: RDD[People] = sc.parallelize(data)
            import sparkSession.implicits._
            val peopleDF: DataFrame = peopleRDD.toDF
            kuduContext.insertRows(peopleDF, tableName)
        }
    }
    

    查询数据select操作

    //查询数据
    private def queryData(sc: SparkContext,kuduContext:KuduContext,tableName:String):Unit = {
        val column = List("id", "name", "age")
        val value: RDD[Row] = kuduContext.kuduRDD(sc, tableName, column)
        value.foreach(println(_))
    }
    

    删除数据delete操作

    //删除数据
    private def deleteData1(sc: SparkContext,sparkSession: SparkSession,kuduContext: KuduContext,tableName: String): Unit ={
        val data = List(People(1,"zhangsan",50))
        val peopleRdd = sc.parallelize(data);
        import sparkSession.implicits._
        val dataFrame = peopleRdd.toDF().select("id")
        kuduContext.deleteRows(dataFrame,tableName)
    }
    

    删除数据delete操作

    //删除表的数据
    private def deleteData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
    
        //定义一个 map 集合,封装 kudu 的相关信息
        //val options = Map(
        //    "kudu.master" -> kuduMaster,
        //    "kudu.table" -> tableName)
    
        import sparkSession.implicits._
    
        val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))
        val dataFrame: DataFrame = sc.parallelize(data).toDF
        dataFrame.createTempView("temp")
        //获取年龄大于 30 的所有用户 id
        val result: DataFrame = sparkSession.sql("select id from temp where age >30")
        //删除对应的数据,这里必须要是主键字段
        kuduContext.deleteRows(result, tableName)
    }
    

    更新数据upsert操作

    //更新数据 添加数据
    private def UpsertData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
        //更新表中的数据
        //定义一个 map 集合,封装 kudu 的相关信息
        //val options = Map(
        //    "kudu.master" -> kuduMaster,
        //    "kudu.table" -> tableName
        //)
    
        import sparkSession.implicits._
    
        val data = List(People(1, "zhangsan", 50), People(5, "tom", 30))
        val dataFrame: DataFrame = sc.parallelize(data).toDF
        //如果存在就是更新,否则就是插入
        kuduContext.upsertRows(dataFrame, tableName)
    }
    

    更新数据update操作

    //更新数据
    private def updateData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {
        //定义一个 map 集合,封装 kudu 的相关信息
        //val options = Map(
        //    "kudu.master" -> kuduMaster,
        //    "kudu.table" -> tableName
        //)
        import sparkSession.implicits._
        val data = List(People(1, "zhangsan", 60), People(6, "tom", 30))
        val dataFrame: DataFrame = sc.parallelize(data).toDF
        //如果存在就是更新,否则就是报错
        kuduContext.updateRows(dataFrame, tableName)
    }
    

    DataFrameAPI读取kudu数据

    #虽然我们可以通过上面显示的 KuduContext 执行大量操作,但我们还可以直接从默认数据源本身调用读 写 API 。要设置读取,我们需要为 Kudu 表指定选项,命名我们要读取的表以及为表提供服务的 Kudu 集群的 Kudu 主服务器列表。
    
    //使用 DataFrameApi 读取 kudu 表中的数据private def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {    //定义 map 集合,封装 kudu 的 master 地址和要读取的表名    val options = Map(        "kudu.master" -> kuduMaster,        "kudu.table" -> tableName)    sparkSession.read.options(options).kudu.show()}
    

    DataFrameApi写到kudu表

    #在通过 DataFrame API 编写时,目前只支持一种模式 append 。尚未实现的 覆盖 模式。
    
    //DataFrame api 写数据到 kudu 表
    private def dataFrame2kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
        //定义 map 集合,封装 kudu 的 master 地址和要读取的表名
        val options = Map(
            "kudu.master" -> kuduMaster,
            "kudu.table" -> tableName
        )
        val data = List(People(7, "jim", 30), People(8, "xiaoming", 40))
        import sparkSession.implicits._
        val dataFrame: DataFrame = sc.parallelize(data).toDF
        //把 dataFrame 结果写入到 kudu 表中 目前只支持 append 追加
        dataFrame.write.options(options).mode("append").kudu
        //查看结果
        //导包
        import org.apache.kudu.spark.kudu._
        //加载表的数据,导包调用 kudu 方法,转换为 dataFrame ,最后在使用 show 方法显示结果
        sparkSession.read.options(options).kudu.show()
    }
    

    使用sparksql操作kudu

    #可以选择使用 Spark SQL 直接使用 INSERT 语句写入 Kudu 表;与'append'类似, INSERT 语句实际上将默认使用 UPSERT 语义处理
    
    //使用 sparksql 操作 kudu 表
    private def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
        //定义 map 集合,封装 kudu 的 master 地址和表名
        val options = Map(
            "kudu.master" -> kuduMaster,
            "kudu.table" -> tableName)
        val data = List(People(10, "小张", 30), People(11, "小王", 40))
        import sparkSession.implicits._
        val dataFrame: DataFrame = sc.parallelize(data).toDF
        //把 dataFrame 注册成一张表
        dataFrame.createTempView("temp1")
        //获取 kudu 表中的数据,然后注册成一张表
        sparkSession.read.options(options).kudu.createTempView("temp2")
        //使用 sparkSQL 的 insert 操作插入数据
        sparkSession.sql("insert into table temp2 select * from temp1")
        sparkSession.sql("select * from temp2 where age >30").show()
    }
    

    kudu native RDD

    #Spark 与 Kudu 的集成同时提供了 kudu RDD.
    
    //Spark 与 Kudu 的集成同时提供了 kudu RDD.
    //使用 kuduContext 对象调用 kuduRDD 方法,需要 sparkContex t 对象,表名,想要的字段名称
    val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sc, tableName, Seq("name", "age"))
    //操作该 rdd 打印输出
    val result: RDD[(String, Int)] = kuduRDD.map {
        case Row(name: String, age: Int) => (name, age)
    }
    result.foreach(println)
    

    kudu impala

    修改配置

    #在每一个服务器的 impala 的配置文件中添加如下配置 。
    vim /etc/default/impala
    #在 IMPALA_SERVER_ARGS 下添加:
    -kudu_master_hosts=node1:7051,node2:7051,node3:7051
    

    创建kudu表

    #需要先启动hdfs、hive、kudu、impala。使用impala的shell控制台。
    

    内部表

    #内部表由 Impala 管理,当您从 Impala 中删除时,数据和表确实被删除。当您使用 Impala 创建新表时,它通常是内部表。
    
    CREATE TABLE my_first_table
    (
    id BIGINT,
    name STRING,
    PRIMARY KEY(id)
    )
    PARTITION BY HASH PARTITIONS 16
    STORED AS KUDU
    TBLPROPERTIES (
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
    'kudu.table_name' = 'my_first_table');
    
    #在 CREATE TABLE 语句中,必须首先列出构成主键的列。
    

    外部表

    #外部表(创建者 CREATE EXTERNAL TABLE )不受 Impala 管理,并且删除此表不会将表从其源位置(此处为 Kudu )丢弃。相反,它只会去除 Impala 和 Kudu之间的映射。这是 Kudu 提供的用于将现有表映射到 Impala 的语法。
    
    //首先使用 java 创建 kudu 表
    package com.gaotu.kudu.client;
    
    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.*;
    import java.util.LinkedList;
    import java.util.List;
    
    /**
     * @author xubin03
     * @date 2021/3/4 8:32 下午
     */
    public class CreateTable {
        private static ColumnSchema newColumn(String name, Type type, boolean iskey) {
            ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
            column.key(iskey);
            return column.build();
        }
    
        public static void main(String[] args) throws KuduException {
            //master 地址
            final String masteraddr = "al-bj-bigdata-inf-test01";
            //创建 kudu 的数据库链接
            KuduClient client = new KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build();
            //设置表的 schema
            List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
            columns.add(newColumn("CompanyId", Type.INT32, true));
            columns.add(newColumn("WorkId", Type.INT32, false));
            columns.add(newColumn("Name", Type.STRING, false));
            columns.add(newColumn("Gender", Type.STRING, false));
            columns.add(newColumn("Photo", Type.STRING, false));
            Schema schema = new Schema(columns);
            //创建表时提供的所有选项
            CreateTableOptions options = new CreateTableOptions();
            // 设置表的 replica 备份和分区规则
            List<String> parcols = new LinkedList<String>();
            parcols.add("CompanyId");
            //设置表的备份数
            options.setNumReplicas(1);
            //设置 range 分区
            options.setRangePartitionColumns(parcols);
            //设置 hash 分区和数量
            options.addHashPartitions(parcols, 3);
            try {
                client.createTable("person", schema, options);
            } catch (KuduException e) {
                e.printStackTrace();
            }
            client.close();
        }
    }
    
    #注意:推荐使用外部表!!!!!!外部表首先得在kudu上有这个表,想映射到impala上,这个sql 可以在kudu的页面上查看到该建表语句
    # 使用 impala 创建外部表 将 kudu 的表映射到 impala 上 。
    CREATE EXTERNAL TABLE `person` STORED AS KUDU
    TBLPROPERTIES(
    'kudu.table_name' = 'person',
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')
    

    impala进行DML

    插入数据

    #impala 允许使用标准 SQL 语句将数据插入 Kudu 。
    #首先建表
    CREATE TABLE my_first_table1
    (
    id BIGINT,
    name STRING,
    PRIMARY KEY(id)
    )
    PARTITION BY HASH PARTITIONS 16
    STORED AS KUDU
    TBLPROPERTIES(
    'kudu.table_name' = 'person1',
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051');
    
    #此示例插入单个行
    INSERT INTO my_first_table VALUES (50, "zhangsan");
    
    #此示例插入 3 行:
    INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"),(3, "jim");
    
    #批量导入数据
    从 Impala 和 Kudu 的角度来看,通常表现最好的方法通常是使用 Impala中的 SELECT FROM 语句导入数据 。
    INSERT INTO my_first_table SELECT * FROM temp1;
    

    更新数据

    UPDATE my_first_table SET name="xiaowang" where id =1 ;
    

    删除数据

    delete from my_first_table where id =2;
    

    更改表属性

    重命名impala映射表

    ALTER TABLE PERSON RENAME TO person_temp;
    

    重命名内部表的基础kudu表

    #创建内部表
    CREATE TABLE kudu_student
    (
    CompanyId INT,
    WorkId INT,
    Name STRING
    Gender STRING,
    Photo STRING,
    PRIMARY KEY(CompanyId)
    )PARTITION BY HASH PARTITIONS 16
    STORED AS KUDU
    TBLPROPERTIES (
    'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
    'kudu.table_name' = 'student'
    );
    
    #如果表是内部表,则可以通过更改kudu.table_name 属性重命名底层的Kudu 表 。
    ALTER TABLE kudu_student SET TBLPROPERTIES('kudu.table_name'= 'new_student');
    

    将外部表重新映射kudu表

    #如果用户在使用过程中发现其他应用程序重新命名了kudu 表,那么此时的外部表需要重新映射到 kudu 上 。
    #首先创建一个外部表:
    
    CREATE EXTERNAL TABLE external_table
    STORED AS KUDU
    TBLPROPERTIES (
    'kudu. addresses' = 'node1:7051,node2:7051,node3:7051',
    'kudu.table_name' = 'person'
    );
    
    #重新映射外部表,指向不同的kudu 表:
    ALTER TABLE external_table SET TBLPROPERTIES('kudu.table_name' = 'hashTable')
    #上面的操作是:将external_table 映射的 PERSON 表重新指向 hashTable 表 。
    

    更改kudu master地址

    ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051');
    

    将内部表改为外部表

    ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');
    

    impala API操作kudu

    #对于 impala 而言,开发人员是可以通过 JDBC 连接 impala 的,有了 JDBC开发人员可以通过 impala 来间接操作 kudu 。
    

    引入依赖

    //mvn install:install-file -DgroupId=com.cloudera -DartifactId=ImpalaJDBC41 -Dversion=2.6.3 -Dpackaging=jar -Dfile=/Users/xubin03/Downloads/ImpalaJDBC41-2.6.3 .jar
        
    		<dependency>
                <groupId>com.cloudera</groupId>
                <artifactId>ImpalaJDBC41</artifactId>
                <version>2.6.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-service</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hive</groupId>
                        <artifactId>hive-service-rpc</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.hive</groupId>
                        <artifactId>hive-service</artifactId>
                    </exclusion>
                </exclusions>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libthrift</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libfb303</artifactId>
                <version>0.9.3</version>
            </dependency>
    

    jdbc连接impala操作kudu

    //使用 JDBC 连接 impala 操作 kudu ,与 JDBC 连接 mysql 做更重增删改查基本一样。
    
    //创建实体类
    package com.gaotu.kudu.impala;
    
    public class Person {
        private int companyId;
        private int workId;
        private String name;
        private String gender;
        private String photo;
    
        public Person(int companyId, int workId, String name, String gender, String photo) {
            this.companyId = companyId;
            this.workId = workId;
            this.name = name;
            this.gender = gender;
            this.photo = photo;
        }
    
        public int getCompanyId() {
            return companyId;
        }
    
        public void setCompanyId(int companyId) {
            this.companyId = companyId;
        }
    
        public int getWorkId() {
            return workId;
        }
    
        public void setWorkId(int workId) {
            this.workId = workId;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getGender() {
            return gender;
        }
    
        public void setGender(String gender) {
            this.gender = gender;
        }
    
        public String getPhoto() {
            return photo;
        }
    
        public void setPhoto(String photo) {
            this.photo = photo;
        }
    }
    
    package com.gaotu.kudu.impala;
    
    import java.sql.*;
    
    /**
     * @author xubin03
     * @date 2021/3/4 9:26 下午
     */
    //1.注册驱动  2.获得连接  3.根据连接获得会话  4.发送执行的sql  5.获得结果  6.解析结果  7.关闭连接
    public class Contants {
        private static String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
        //REQUEST_POOL={queue_name}  "al-bj-bigdata-inf-test01"
        private static String CONNECTION_URL = "jdbc:impala://172.16.18.230:21050/default;AuthMech=3;UID=hive;PWD=123456";
        //定义数据库连接
        static Connection conn = null;
        //定义 PreparedStatement 对象
        static PreparedStatement ps = null;
        //定义查询的结果集
        static ResultSet rs = null;
    
        //数据库连接
        public static Connection getConn() {
            try {
                Class.forName(JDBC_DRIVER);
                conn = DriverManager.getConnection(CONNECTION_URL);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return conn;
        }
    
        //创建一个表
        public static void createTable() {
            conn = getConn();
            // String sql = "CREATE TABLE impala_kudu_test" +
            // 		"(" +
            // 		"companyId BIGINT," +
            // 		"workId BIGINT," +
            // 		"name STRING," +
            // 		"gender STRING," +
            // 		"photo STRING," +
            // 		"PRIMARY KEY(companyId)" +
            // 		")" +
            // 		"PARTITION BY HASH PARTITIONS 16 " +
            // 		"STORED AS KUDU " +
            // 		"TBLPROPERTIES (" +
            // 		"'kudu.master_addresses' = 'al-bj-bigdata-inf-test01:7051'," +
            // 		"'kudu.table_name' = 'impala_kudu_test'" +
            // 		");";
            String sql = "CREATE TABLE impala_kudu_test" +
                "(" +
                "companyId BIGINT," +
                "workId BIGINT," +
                "name STRING," +
                "gender STRING," +
                "photo STRING," +
                "PRIMARY KEY(companyId)" +
                ")" +
                "PARTITION BY HASH PARTITIONS 16 " +
                "STORED AS KUDU " +
                "TBLPROPERTIES (" +
                "'kudu.master_addresses' = 'al-bj-bigdata-inf-test01:7051'" +
                ");";
            try {
                ps = conn.prepareStatement(sql);
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        //查询数据
        public static ResultSet queryRows() {
            try {
                //定义执行的 sql 语句
                String sql = "select * from impala_kudu_test";
                ps = getConn().prepareStatement(sql);
                rs = ps.executeQuery();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return rs;
        }
    
        //打印结果
        public static void printRows(ResultSet rs) {
            /**
    		 private int companyId;
    		 private int workId;
    		 private String name;
    		 private String gender;
    		 private String photo;
    		 **/
            try {
                while (rs.next()) {
                    //获取表的每一行字段信息
                    int companyId = rs.getInt("companyId");
                    int workId = rs.getInt("workId");
                    String name = rs.getString("name");
                    String gender = rs.getString("gender");
                    String photo = rs.getString("photo");
                    System.out.print("companyId:" + companyId + " ");
                    System.out.print("workId:" + workId + " ");
                    System.out.print("name:" + name + " ");
                    System.out.print("gender:" + gender + " ");
                    System.out.println("photo:" + photo);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //插入数据
        public static void insertRows(Person person) {
            conn = getConn();
            String sql = "insert into table impala_kudu_test(companyId,workId,name,gender,photo) values(?,?,?,?,?)";
            try {
                ps = conn.prepareStatement(sql);
                //给占位符?赋值
                ps.setInt(1, person.getCompanyId());
                ps.setInt(2, person.getWorkId());
                ps.setString(3, person.getName());
                ps.setString(4, person.getGender());
                ps.setString(5, person.getPhoto());
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        //关闭
                        ps.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //更新数据
        public static void updateRows(Person person) {
            //定义执行的 sql 语句
            String sql = "update impala_kudu_test set workId=" + person.getWorkId() +
                ",name='" + person.getName() + "' ," + "gender='" + person.getGender() + "'," +
                "photo='" + person.getPhoto() + "' where companyId=" + person.getCompanyId();
            try {
                ps = getConn().prepareStatement(sql);
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        //关闭
                        ps.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
    
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //删除数据
        public static void deleteRows(int companyId) {
            //定义 sql 语句
            String sql = "delete from impala_kudu_test where companyId=" + companyId;
            try {
                ps = getConn().prepareStatement(sql);
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        //删除表
        public static void dropTable() {
            String sql = "drop table if exists impala_kudu_test";
            try {
                ps = getConn().prepareStatement(sql);
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    
    //测试package com.gaotu.kudu.impala;import java.sql.Connection;import java.sql.ResultSet;/** * @author xubin03 * @date 2021/3/4 11:34 下午 */public class ImpalaJdbcClient {    public static void main(String[] args) {        Connection conn = Contants.getConn();        //创建一个表        Contants.createTable();        //插 入数据        Contants.insertRows(new Person(1, 100, "lisi", "male", "lisi photo"));        //查询表的数据        ResultSet rs = Contants.queryRows();        Contants.printRows(rs);        //更新数据        Contants.updateRows(new Person(1, 200, "zhangsan", "male", "zhangsan photo"));        //删除数据        Contants.deleteRows(1);        //删除表        Contants.dropTable();    }}
    

    kudu 原理

    table与schema

    Kudu 设计是面向结构化存储的,因此, Kudu 的表需要用户在建表时定义它的 Schema 信息 ,这些 Schema 信息包含: 列定义(含类型) Primary Key 定义(用户指定的若干个列的有序组合)。数据的唯一性,依赖于用户所提供的Primary Key 中的 Column 组合的值的唯一性。 Kudu 提供了 Alter 命令来增删列,但位于 Primary Key 中的列是不允许删除 的。从用户角度来看,Kudu 是一种存储结构化数据表的存储系统。在一个 Kudu集群中可以定义任意数量的 table ,每个 table 都需要预先定义好 schema 。每个table 的列数是确定的,每一列都需要有名字和类型 ,每个表中可以把其中一列或多列定义为主键。这么看来, Kudu 更像关系型数据库,而不是像 HBase 、Cassandra 和 MongoDB 这些 NoSQL 数据库。不过 Kudu 目前还不能像关系型数据一样支持二级索引。Kudu 使用确定的列类型 ,而不是类似于 NoSQL 的“ everything is byte ”。带来好处:确定的列类型使 Kudu 可以进行类型特有的编码 可以提供 元数据给其他上层查询工具 。
    

    kudu底层数据模型

    Kudu的底层数据文件的存储,未采用 HDFS 这样的较高抽象层次的分布式文件系统,而是自行开发了一套可 基于 Table/Tablet/Replica 视图级别的底层存储系统 。这套实现基于如下的几个设计目标:1.可提供快速的列式查询2.可支持快速的随机更新3.可提供更为稳定的查询性能保障
    
    一张table 会分成若干个 tablet ,每个 tablet 包括 MetaData 元信息及若干个 RowSet 。RowSet包含一个 MemRowSet 及若干个 DiskRowSet DiskRowSet 中包含一个BloomFile 、 Ad_hoc Index 、 BaseData 、 DeltaMem 及若干个 RedoFile 和 UndoFile 。MemRowSet :用于 新数据 insert 及已在 MemRowSet 中的数据的更新 ,一个MemRowSet 写满后会将数据刷到磁盘 形成若干个 DiskRowSet 。 默认是 1G 或者或者 120S 。DiskRowSet :用于 老数据的变更 ,后台定期对 DiskRowSet 做 compaction以删除没用的数据及合并历史数据,减少查询过程中的 IO 开销。BloomFile :根据一个 DiskRowSet 中的 key 生成一个 bloom filter ,用于快速模糊定位某个 key 是否在 DiskRowSet 中 。Ad_hocIndex :是主键的索引,用于 定位 key 在 DiskRowSet 中的具体哪个偏移位置 。UndoFile 是基于 BaseData 之前时间的历史数据,通过在 BaseData 上 apply UndoFile 中的记录,可以获得历史数据。RedoFile 是基于 BaseData 之后时间的变更记录,通过在 BaseData 上 apply RedoFile 中的记录,可获得较新的数据。BaseData 是 MemRowSet flush 下来的数据,按列存储,按主键有序。DeltaMem 用于 DiskRowSet 中数据的变更,先写到内存中,写满后 flush 到磁盘形成 RedoFile 。
    
    #REDO与 UNDO 与关系型数据库中的 REDO 与 UNDO 日志类似(在关系型数据库中, REDO 日志记录了更新后的数据 ,可以用来恢复尚未写入 Data File 的已成功事务更新的数据。而 UNDO 日志用来记录事务更新之前的数据 ,可以用来在事务失败时进行回滚)
    
    #MemRowSets可以对比理解成 HBase 中的 MemStore, 而 DiskRowSets 可理解成 HBase 中的 HFile 。#MemRowSets中的数据被 Flush 到磁盘之后,形成 DiskRowSets 。 DisRowSets中的数据,按照 32MB 大小为单位,按序划分为一个个的 DiskRowSet 。 DiskRowSet中的数据按照 Column 进行组织,与 Parquet 类似。这是Kudu 可支持一些分析性查询的基础。每一个 Column 的数据被存储在一个相邻的数据区域,而这个数据区域进一步被细分成一个个的小的 Page 单元,与 HBase File 中的 Block 类似,对每一个 Column Page 可采用一些 Encoding 算法,以及一些通用的 Compression 算法。 既然可对 Column Page 可采用 Encoding以及 Compression 算法,那么,对单条记录的更改就会比较困难了。#前面提到了Kudu 可支持单条记录级别的更新 删除, 是如何做到的?与HBase 类似,也是通过增加一条新的记录来描述这次更新 删除操作的。DiskRowSet 是不可修改了,那么 KUDU 要如何应对数据的更新呢?在 KUDU 中,把 DiskRowSet 分为了两部分: base data 、 delta stores 。 base data 负责存储基础数据, delta stores 负责存储 base data 中的变更数据
    
    如上图所示,数据从MemRowSet 刷到磁盘后就形成了一份 DiskRowSet (只包含 base data ),每份 DiskRowSet 在内存中都会有一个对应的DeltaMemStore ,负责记录此 DiskRowSet 后续的数据变更(更新、删除)。DeltaMemStore 内部维护一个 B 树索引,映射到每个 row_offset 对应的数据变更。 DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile ,随着 base data 对应数据的不断变更, DeltaFile 逐渐增长。
    

    tablet发现过程

    #当创建 Kudu 客户端时,其会 从主服务器上获取 tablet 位置信息 ,然后直接与服务 于该 tablet 的服务器进行交谈。#为了优化读取和写入路径, 客户端 将 保留 该信息的 本地缓存 ,以防止他们在每个请求时需要查询主机的 tablet 位置信息。随着时间的推移,客户端的缓存可能会变得过时,并且当写入被发送到不再是 tablet 领导者的 tablet 服务器时,则将被拒绝。然后客户端将通过 查询主服务器发现新领导者的位置来更新其缓存 。
    

    kudu写流程

    #当 Client 请求写数据时, 先根据主键从 Master Server 中获取要访问的目标 Tablets ,然后到依次对应的 Tablet 获取数据 。#因为KUDU 表存在主键约束,所以需要进行主键是否已经存在的判断,这里就涉及到之前说的索引结构对读写的优化了。一个 Tablet 中存在很多个 RowSets为了提升性能,我们要尽可能地减少要扫描的 RowSets 数量。#首先,我们先通过每个RowSet 中记录的主键的(最大最小)范围,过滤掉一批不存在目标主键的 RowSets ,然后在根据 RowSet 中的布隆过滤器,过滤掉确定不存在目标主键的 RowSets ,最后再通过 RowSets 中的 B 树索引,精确定位目标主键是否存在。#如果主键已经存在,则报错(主键重复),否则就进行写数据(写 MemRowSet )。
    

    kudu读流程

    #数据读取过程大致如下: 先根据要扫描数据的主键范围,定位到目标的Tablets ,然后读取 Tablets 中的 RowSets 。#在读取每个RowSet 时,先根据主键过滤要 scan 范围,然后加载范围内的base data ,再找到对应的 delta stores ,应用所有变更,最后 union 上 MemRowSet中的内容,返回数据给 Client 。
    

    kudu更新流程

    #数据更新的核心是定位到待更新数据的位置,这块与写入的时候类似,就不展开了,等定位到具体位置 后,然后将变更写到对应的 delta store 中 。
    
    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,719
精华内容 2,287
关键字:

kudu