精华内容
下载资源
问答
  • 2020-09-14 11:08:54

    背景

    最近新开一个项目,创建工程时为了与时俱进,springboot使用了2.3.3.RELEASE,orm框架使用mybatis,mybatis-spring-boot-starter相应的使用了2.1.3,mybatis generator使用了1.4.0。
    然后就是巴拉巴拉码代码(顺便吐槽下mabatis-generator使用MyBatis3DynamicSql方式生成的代码,使用起来跟jooq还真特么像啊哈哈。)
    好了,代码码好开始自测了,结果发现问题了

    1. 保存记录的时候,mysql表的timestamp字段保存的值不对,数据库保存的值比实际时间慢13小时。
    2. 查询记录的时候,查出的结果比数据库保存的值又快了5个小时。
      把mybatis的sql打印出来,发现保存记录的时候传给mysql的时间是对的。这就奇怪了,而且更奇怪的是,13个小时和5个小时这是什么鬼,跟UTC时间也扯不上边啊。

    解决过程

    1. 寻找差异
      1. 最初怀疑是我使用的开发库是不是时区设置的不对
        [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b8qQ2FX8-1600052839667)(http://pfp.ps.netease.com/kmspvt/file/5f5c728b68d8646962dbc50cZr953YvB01?sign=UqEjuXI1d5xySGlyBru6Iv8IwAs=&expire=1600054416)]
        跟测试库和其他工程开发库对比了一下,发现没差别。所以问题不是出自这里。但是还是有个疑惑啊,什么是CST时区呢。
        搜索引擎了一下
      CST (时区缩写) 编辑
      CST可视为美国、澳大利亚、古巴或中国的标准时间.
      美国中部时间:Central Standard Time (USA) UT-6:00
      澳大利亚中部时间:Central Standard Time (Australia) UT+9:30
      中国标准时间:China Standard Time UT+8:00
      古巴标准时间:Cuba Standard Time UT-4:00
      
      什么鬼,居然CST可以视为四个时区的时间,这么说假设CST使用了美国中部时间的话(为啥不假设用古巴标准时间呢?美帝开发的软件,可能会精古吗,直接排除。。。),那咱是UT+8,它是UT-6,那存到库里的时间应该是中国时间-14,这虽然接近13,但是毕竟不是啊,刚以为看到了曙光,结果又被打击了。无论如何这是个线索啊,google一下吧。
      The North American Central Time Zone (CT) is a time zone in parts of Canada, the United States, Mexico, Central America, some Caribbean Islands, and part of the Eastern Pacific Ocean.
      Central Standard Time (CST) is six hours behind Coordinated Universal Time (UTC). During summer most of the zone uses daylight saving time (DST), and changes to Central Daylight Time (CDT) which is five hours behind UTC.[1]
      
      daylight saving time是啥啊,原谅我的无知,继续搜索引擎吧
      夏令时,表示为了节约能源,人为规定时间的意思。也叫夏时制,夏时令(Daylight Saving Time:DST),又称“日光节约时制”和“夏令时间”,在这一制度实行期间所采用的统一时间称为“夏令时间”。
      
      必须承认,看到这个结果的时候我的心里真是五味杂陈,啊,夏令时,一个埋藏在记忆深处的名词,一个多么暴露年龄的名词啊。原来mysql给我保存时间的时候,不仅用了美帝的时区,还用了美帝的夏令时。。。
      可是可是,为什么之前的其他工程使用springboot1.5.9没出现过这种问题呢。联想到mysql-connector-java包升级到了8.0.15,连spring.datasource.driverClassName都跟以前不一样,变成com.mysql.cj.jdbc.Driver了,那是不是时区设置方面,也发生了某些不兼容的变化呢。。。
    2. 翻文档
      既然怀疑是mysql-connector-java包升级导致的问题,那就去翻翻mysql 8的文档吧,看看有没有什么跟时区有关的内容。结果找到这么一段
        Connector/J 8.0 always performs time offset adjustments on date-time values, and the adjustments require one of the following to be true:
      
        The MySQL server is configured with a canonical time zone that is recognizable by Java (for example, Europe/Paris, Etc/GMT-5, UTC, etc.)
      
        The server's time zone is overridden by setting the Connector/J connection property serverTimezone (for example, serverTimezone=Europe/Paris).
      
      看到这里大概的意思是想要时区正常,您要么修改mysql server的配置,要么您就要在连接url后边加上正确的serverTimezone。wtf,为啥呢,为啥呢。那反正我是没权限改服务器配置的,只能选择后者呢。然而还是想问,为啥呢。没其他的办法,看代码吧

    3.看代码
    下面是5.1.46的代码

         private void configureTimezone() throws SQLException {
            String configuredTimeZoneOnServer = this.serverVariables.get("timezone");
    
            if (configuredTimeZoneOnServer == null) {
                configuredTimeZoneOnServer = this.serverVariables.get("time_zone");
    
                if ("SYSTEM".equalsIgnoreCase(configuredTimeZoneOnServer)) {
                    configuredTimeZoneOnServer = this.serverVariables.get("system_time_zone");
                }
            }
    
            String canonicalTimezone = getServerTimezone();
    
            if ((getUseTimezone() || !getUseLegacyDatetimeCode()) && configuredTimeZoneOnServer != null) {
                // user can override this with driver properties, so don't detect if that's the case
                if (canonicalTimezone == null || StringUtils.isEmptyOrWhitespaceOnly(canonicalTimezone)) {
                    try {
                        canonicalTimezone = TimeUtil.getCanonicalTimezone(configuredTimeZoneOnServer, getExceptionInterceptor());
                    } catch (IllegalArgumentException iae) {
                        throw SQLError.createSQLException(iae.getMessage(), SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
                    }
                }
            }
             if (canonicalTimezone != null && canonicalTimezone.length() > 0) {
                this.serverTimezoneTZ = TimeZone.getTimeZone(canonicalTimezone);
    

    下面是8.0.15的

     public void configureTimezone() {
            String configuredTimeZoneOnServer = this.serverSession.getServerVariable("time_zone");
    
            if ("SYSTEM".equalsIgnoreCase(configuredTimeZoneOnServer)) {
                configuredTimeZoneOnServer = this.serverSession.getServerVariable("system_time_zone");
            }
    
            String canonicalTimezone = getPropertySet().getStringProperty(PropertyKey.serverTimezone).getValue();
    
            if (configuredTimeZoneOnServer != null) {
                // user can override this with driver properties, so don't detect if that's the case
                if (canonicalTimezone == null || StringUtils.isEmptyOrWhitespaceOnly(canonicalTimezone)) {
                    try {
                        canonicalTimezone = TimeUtil.getCanonicalTimezone(configuredTimeZoneOnServer, getExceptionInterceptor());
                    } catch (IllegalArgumentException iae) {
                        throw ExceptionFactory.createException(WrongArgumentException.class, iae.getMessage(), getExceptionInterceptor());
                    }
                }
            }
            if (canonicalTimezone != null && canonicalTimezone.length() > 0) {
                this.serverSession.setServerTimeZone(TimeZone.getTimeZone(canonicalTimezone));
    

    看起来注册时区的逻辑还是差不多的,先取服务器参数time_zone,如果是SYSTEM,就取服务器参数system_time_zone;然后如果url里没有指定serverTimeZone,就拿system_time_zone作为时区了,所以说两个版本里最终取到的时区都是CST。wtf,那就继续看看保存数据的代码吧。
    5.1.46的,代码取自PreparedStatement、ConnectionImpl、TimeUtil

     public void setTimestamp(int parameterIndex, Timestamp x) throws java.sql.SQLException {
            synchronized (checkClosed().getConnectionMutex()) {
                setTimestampInternal(parameterIndex, x, null, this.connection.getDefaultTimeZone(), false);
            }
        }
    

    跟着this.connection.getDefaultTimeZone()一直往上走,最终

        private static final TimeZone DEFAULT_TIMEZONE = TimeZone.getDefault();
    

    哦,原来5.1.46里面保存timestamp字段时,时区是直接取了系统的时区。
    那再看8.0.15的,代码来自PreparedStatementWrapper,ClientPreparedQueryBindings

     @Override
        public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
            try {
                if (this.wrappedStmt != null) {
                    ((PreparedStatement) this.wrappedStmt).setTimestamp(parameterIndex, x);
                } else {
                    throw SQLError.createSQLException(Messages.getString("Statement.AlreadyClosed"), MysqlErrorNumbers.SQL_STATE_GENERAL_ERROR,
                            this.exceptionInterceptor);
                }
            } catch (SQLException sqlEx) {
                checkAndFireConnectionError(sqlEx);
            }
        }
    
     @Override
        public void setTimestamp(int parameterIndex, Timestamp x, Calendar targetCalendar, int fractionalLength) {
                this.tsdf = TimeUtil.getSimpleDateFormat(this.tsdf, "''yyyy-MM-dd HH:mm:ss", targetCalendar,
                        targetCalendar != null ? null : this.session.getServerSession().getDefaultTimeZone());
                        
    
        /** c.f. getDefaultTimeZone(). this value may be overridden during connection initialization */
        private TimeZone defaultTimeZone = TimeZone.getDefault();
    

    咦,这里也是TimeZone.getDefault(); ,我*&¥#&*#,别激动别激动,看看注释,再回去看看注册时区的代码,原来是defaultTimeZone在那里被修改了;好了,现在知道了,8.0.15保存timestamp字段的时候是使用了mysql或者用户配置的时区。
    然而,难道5.1.46就不用mysql或者用户配置的时区吗,那不是大bug吗,好吧回去继续看代码,代码取自PreparedStatement、TimeUtil

    private void setTimestampInternal(int parameterIndex, Timestamp x, Calendar targetCalendar, TimeZone tz, boolean rollForward) throws SQLException {
                if (!this.useLegacyDatetimeCode) {
                    newSetTimestampInternal(parameterIndex, x, targetCalendar);
                } else {
                    Calendar sessionCalendar = this.connection.getUseJDBCCompliantTimezoneShift() ? this.connection.getUtcCalendar()
                            : getCalendarInstanceForSessionOrNew();
    
                    x = TimeUtil.changeTimezone(this.connection, sessionCalendar, targetCalendar, x, tz, this.connection.getServerTimezoneTZ(), rollForward);
    
    

    原来setTimestampInternal里还有玄机,当useLegacyDatetimeCode是true的时候(默认为true),会修改时区,看到this.connection.getServerTimezoneTZ()没,这就是前面注册时区的时候获取的mysql服务器或者url里配置的时区

    public static Timestamp changeTimezone(MySQLConnection conn, Calendar sessionCalendar, Calendar targetCalendar, Timestamp tstamp, TimeZone fromTz,
                TimeZone toTz, boolean rollForward) {
            if ((conn != null)) {
                if (conn.getUseTimezone()) {
    

    细节就不管了,总之useTimeZone为true时,就按照时区toTz来修正时间,嗯,结论就是如果你在url里配置上useTimeZone=true,5.1.46也也一样可以给你保存成美帝夏令时。。。。
    代码研究完了,那在url里加上serverTimezone=Asia/Shanghai吧。来,自测一下,保存的时间对了。好!很有精神!
    那再顺便测一下查询记录吧
    。。。。。。

    。。。。。。

    。。。。。。
    纳尼,什么,WHAT,获取到的时间比数据库保存时间慢了8小时,哈哈哈哈,我差点都忘了,还有问题2没有解决呢。

    欲知后事如何,请听下回分解。

    更多相关内容
  • hudi 快速入门,使用 hudi 实现基本的增删改查,同时实践hudi部分特性,例如时间旅行

    前言

    本文参考 Hudi 的官方文档中quick-start-guide部分完成 : https://hudi.apache.org/docs/quick-start-guide(单纯照着文档来我是没搞起来,总之踩了一些小坑)

    请严格遵守jar包版本的约束

    HudiSupported Spark 3 version
    0.10.03.1.x (default build), 3.0.x
    0.7.0 - 0.9.03.0.x
    0.6.0 and priornot supported

    spark shell 配置

    这个步骤是根据官网的描述操作的,只不过一开始没有进行下载jar,踩了一个微型坑

    启动pyspark的hudi环境报错

    ERROR INFO:

    (venv) gavin@GavindeMacBook-Pro test % which python3.8
    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python3.8
    (venv) gavin@GavindeMacBook-Pro test % export PYSPARK_PYTHON=$(which python3.8)
    (venv) gavin@GavindeMacBook-Pro test % echo ${PYSPARK_PYTHON}
    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python3.8
    (venv) gavin@GavindeMacBook-Pro test % pyspark --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
    Python 3.8.9 (default, Oct 26 2021, 07:25:54) 
    [Clang 13.0.0 (clang-1300.0.29.30)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    :: loading settings :: url = jar:file:/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /Users/gavin/.ivy2/cache
    The jars for the packages stored in: /Users/gavin/.ivy2/jars
    org.apache.hudi#hudi-spark3.1.2-bundle_2.12 added as a dependency
    org.apache.spark#spark-avro_2.12 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-311b5800-4168-498b-987f-f714233cf50c;1.0
            confs: [default]
    :: resolution report :: resolve 614199ms :: artifacts dl 0ms
            :: modules in use:
            ---------------------------------------------------------------------
            |                  |            modules            ||   artifacts   |
            |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
            ---------------------------------------------------------------------
            |      default     |   2   |   0   |   0   |   0   ||   0   |   0   |
            ---------------------------------------------------------------------
    
    :: problems summary ::
    :::: WARNINGS
                    module not found: org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1
    
            ==== local-m2-cache: tried
    
              file:/Users/gavin/.m2/repository/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.pom
    
              -- artifact org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1!hudi-spark3.1.2-bundle_2.12.jar:
    
              file:/Users/gavin/.m2/repository/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
    
            ==== local-ivy-cache: tried
    
              /Users/gavin/.ivy2/local/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/ivys/ivy.xml
    
              -- artifact org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1!hudi-spark3.1.2-bundle_2.12.jar:
    
              /Users/gavin/.ivy2/local/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/jars/hudi-spark3.1.2-bundle_2.12.jar
    
            ==== central: tried
    
              https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.pom
    
              -- artifact org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1!hudi-spark3.1.2-bundle_2.12.jar:
    
              https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
    
            ==== spark-packages: tried
    
              https://repos.spark-packages.org/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.pom
    
              -- artifact org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1!hudi-spark3.1.2-bundle_2.12.jar:
    
              https://repos.spark-packages.org/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
    
                    module not found: org.apache.spark#spark-avro_2.12;3.1.2
    
            ==== local-m2-cache: tried
    
              file:/Users/gavin/.m2/repository/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.pom
    
              -- artifact org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar:
    
              file:/Users/gavin/.m2/repository/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar
    
            ==== local-ivy-cache: tried
    
              /Users/gavin/.ivy2/local/org.apache.spark/spark-avro_2.12/3.1.2/ivys/ivy.xml
    
              -- artifact org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar:
    
              /Users/gavin/.ivy2/local/org.apache.spark/spark-avro_2.12/3.1.2/jars/spark-avro_2.12.jar
    
            ==== central: tried
    
              https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.pom
    
              -- artifact org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar:
    
              https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar
    
            ==== spark-packages: tried
    
              https://repos.spark-packages.org/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.pom
    
              -- artifact org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar:
    
              https://repos.spark-packages.org/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar
    
                    ::::::::::::::::::::::::::::::::::::::::::::::
    
                    ::          UNRESOLVED DEPENDENCIES         ::
    
                    ::::::::::::::::::::::::::::::::::::::::::::::
    
                    :: org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1: not found
    
                    :: org.apache.spark#spark-avro_2.12;3.1.2: not found
    
                    ::::::::::::::::::::::::::::::::::::::::::::::
    
    
    :::: ERRORS
            Server access error at url https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.pom (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repos.spark-packages.org/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.pom (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repos.spark-packages.org/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.pom (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repos.spark-packages.org/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.pom (java.net.ConnectException: Operation timed out (Connection timed out))
    
            Server access error at url https://repos.spark-packages.org/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar (java.net.ConnectException: Operation timed out (Connection timed out))
    
    
    :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
    Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1: not found, unresolved dependency: org.apache.spark#spark-avro_2.12;3.1.2: not found]
            at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1429)
            at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
            at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
            at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
            at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
            at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
            at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
            at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Traceback (most recent call last):
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/python/pyspark/shell.py", line 35, in <module>
        SparkContext._ensure_initialized()  # type: ignore
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/context.py", line 331, in _ensure_initialized
        SparkContext._gateway = gateway or launch_gateway(conf)
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/java_gateway.py", line 108, in launch_gateway
        raise Exception("Java gateway process exited before sending its port number")
    Exception: Java gateway process exited before sending its port number
    >>> exit()
    
    

    观察到回去本地maven仓库查找jar包,于是使用maven下载需要的两个jar包(对应的spark版本是3.1.2):

            <dependency>
                <groupId>org.apache.hudi</groupId>
                <artifactId>hudi-spark3.1.2-bundle_2.12</artifactId>
                <version>0.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-avro_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    

    也可以自己直接到网站下载:

    Hudi jar download:

    https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/

    https://repo1.maven.org/maven2/org/apache/hudi/hudi/
    在这里插入图片描述

    maven中jar下载完成之后,key在本地仓库中看到jar:/Users/gavin/.m2/repository/org/apache/hudi/…

    成功启动带hudi的pyspark

    (venv) gavin@GavindeMacBook-Pro test % pyspark --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
    Python 3.8.9 (default, Oct 26 2021, 07:25:54) 
    [Clang 13.0.0 (clang-1300.0.29.30)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    22/03/01 10:20:28 WARN Utils: Your hostname, GavindeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.24.227 instead (on interface en0)
    22/03/01 10:20:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    :: loading settings :: url = jar:file:/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /Users/gavin/.ivy2/cache
    The jars for the packages stored in: /Users/gavin/.ivy2/jars
    org.apache.hudi#hudi-spark3.1.2-bundle_2.12 added as a dependency
    org.apache.spark#spark-avro_2.12 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-9a87dae7-3c6a-4133-838b-c7050b1d8b89;1.0
            confs: [default]
            found org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1 in local-m2-cache
            found org.apache.spark#spark-avro_2.12;3.1.2 in local-m2-cache
            found org.spark-project.spark#unused;1.0.0 in local-m2-cache
    downloading file:/Users/gavin/.m2/repository/org/apache/hudi/hudi-spark3.1.2-bundle_2.12/0.10.1/hudi-spark3.1.2-bundle_2.12-0.10.1.jar ...
            [SUCCESSFUL ] org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1!hudi-spark3.1.2-bundle_2.12.jar (54ms)
    downloading file:/Users/gavin/.m2/repository/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar ...
            [SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar (2ms)
    downloading file:/Users/gavin/.m2/repository/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
            [SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (2ms)
    :: resolution report :: resolve 6622ms :: artifacts dl 62ms
            :: modules in use:
            org.apache.hudi#hudi-spark3.1.2-bundle_2.12;0.10.1 from local-m2-cache in [default]
            org.apache.spark#spark-avro_2.12;3.1.2 from local-m2-cache in [default]
            org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
            ---------------------------------------------------------------------
            |                  |            modules            ||   artifacts   |
            |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
            ---------------------------------------------------------------------
            |      default     |   3   |   3   |   3   |   0   ||   3   |   3   |
            ---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-9a87dae7-3c6a-4133-838b-c7050b1d8b89
            confs: [default]
            3 artifacts copied, 0 already retrieved (38092kB/67ms)
    22/03/01 10:20:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
          /_/
    
    Using Python version 3.8.9 (default, Oct 26 2021 07:25:54)
    Spark context Web UI available at http://192.168.24.227:4040
    Spark context available as 'sc' (master = local[*], app id = local-1646101237379).
    SparkSession available as 'spark'.
    >>> 
    
    

    IDEA 代码方式

    插入数据(表不存在则新建表,upsert)

    此例讲述的是「upsert」类型的插入,即「存在符合条件的则更新,不存在则新增」,具体是使用什么类型的数据插入方式,是由参数「hoodie.datasource.write.operation」控制的,具体参数的说明可见https://hudi.apache.org/docs/configurations

    根据官网的document(https://hudi.apache.org/docs/quick-start-guide),得到如下代码:

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars", "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                                  "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
    
        # pyspark
        tableName = "hudi_trips_cow"
        basePath = "file:///tmp/hudi_trips_cow"
        dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    
        # pyspark
        inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
        df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    
        hudi_options = {
            'hoodie.table.name': tableName,
            'hoodie.datasource.write.recordkey.field': 'uuid',
            'hoodie.datasource.write.partitionpath.field': 'partitionpath',
            'hoodie.datasource.write.table.name': tableName,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': 'ts',
            'hoodie.upsert.shuffle.parallelism': 2,
            'hoodie.insert.shuffle.parallelism': 2
        }
    
        df.write.format("hudi"). \
            options(**hudi_options). \
            mode("overwrite"). \
            save(basePath)
    
    

    ps:插入的数据使用的是「org.apache.hudi.QuickstartUtils.DataGenerator()」生成的样例数据(官网的代码就是这么干的,具体数据内容可参见查询 章节)

    运行代码后得到如下目录结构(妥妥的分区表目录结构):

    gavin@GavindeMacBook-Pro apache % tree /tmp/hudi_trips_cow 
    /tmp/hudi_trips_cow
    ├── americas
    │   ├── brazil
    │   │   └── sao_paulo
    │   │       └── 6f82f351-9994-459d-a20c-77baa91ad323-0_0-27-31_20220301105108074.parquet
    │   └── united_states
    │       └── san_francisco
    │           └── 52a5ee08-9376-4954-bb8f-f7f519b8b40e-0_1-33-32_20220301105108074.parquet
    └── asia
        └── india
            └── chennai
                └── 2f5b659d-3738-48ca-b590-bbce52e98642-0_2-33-33_20220301105108074.parquet
    
    8 directories, 3 files
    gavin@GavindeMacBook-Pro apache % 
    

    扩展

    除了基本的数据文件外,hudi还有一个metadata的隐藏文件「.hoodie」,文件具体内容再叙:

    gavin@GavindeMacBook-Pro hudi_trips_cow % ll -a
    total 0
    drwxr-xr-x   5 gavin  wheel  160 Mar  1 10:51 .
    drwxrwxrwt  10 root   wheel  320 Mar  1 11:26 ..
    drwxr-xr-x  13 gavin  wheel  416 Mar  1 10:51 .hoodie
    drwxr-xr-x   4 gavin  wheel  128 Mar  1 10:51 americas
    drwxr-xr-x   3 gavin  wheel   96 Mar  1 10:51 asia
    gavin@GavindeMacBook-Pro hudi_trips_cow % tree .hoodie 
    .hoodie
    ├── 20220301105108074.commit
    ├── 20220301105108074.commit.requested
    ├── 20220301105108074.inflight
    ├── archived
    └── hoodie.properties
    

    查询数据(查询当前版本的数据)

    查询代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
        basePath = "file:///tmp/hudi_trips_cow"
    
        # pyspark
        tripsSnapshotDF = spark. \
            read. \
            format("hudi"). \
            load(basePath)
        # load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
    
        count = tripsSnapshotDF.count()
        print(f'========hudi_trips_snapshot 表中共计[{count}]条数据')
        print('表结构如下:')
        tripsSnapshotDF.printSchema()
    
        tripsSnapshotDF.show()
    
        tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
        spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
        spark.sql(
            "select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    

    查询结果

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_query.py
    22/03/01 11:18:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 11:18:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    ========hudi_trips_snapshot 表中共计[10]条数据
    表结构如下:
    root
     |-- _hoodie_commit_time: string (nullable = true)
     |-- _hoodie_commit_seqno: string (nullable = true)
     |-- _hoodie_record_key: string (nullable = true)
     |-- _hoodie_partition_path: string (nullable = true)
     |-- _hoodie_file_name: string (nullable = true)
     |-- begin_lat: double (nullable = true)
     |-- begin_lon: double (nullable = true)
     |-- driver: string (nullable = true)
     |-- end_lat: double (nullable = true)
     |-- end_lon: double (nullable = true)
     |-- fare: double (nullable = true)
     |-- rider: string (nullable = true)
     |-- ts: long (nullable = true)
     |-- uuid: string (nullable = true)
     |-- partitionpath: string (nullable = true)
    
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|          begin_lat|          begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301105108074|20220301105108074...|c4340e2c-efd2-4a9...|  americas/united_s...|52a5ee08-9376-495...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1645736908516|c4340e2c-efd2-4a9...|americas/united_s...|
    |  20220301105108074|20220301105108074...|67ee90ec-d7b8-477...|  americas/united_s...|52a5ee08-9376-495...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645565690012|67ee90ec-d7b8-477...|americas/united_s...|
    |  20220301105108074|20220301105108074...|91703076-f580-49f...|  americas/united_s...|52a5ee08-9376-495...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1646031306513|91703076-f580-49f...|americas/united_s...|
    |  20220301105108074|20220301105108074...|96a7571e-1e54-4bc...|  americas/united_s...|52a5ee08-9376-495...| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302|  0.362464770874404|19.179139106643607|rider-213|1645796169470|96a7571e-1e54-4bc...|americas/united_s...|
    |  20220301105108074|20220301105108074...|3723b4ac-8841-4cd...|  americas/united_s...|52a5ee08-9376-495...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1646085368961|3723b4ac-8841-4cd...|americas/united_s...|
    |  20220301105108074|20220301105108074...|b3bf0b93-768d-4be...|  americas/brazil/s...|6f82f351-9994-459...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645868768394|b3bf0b93-768d-4be...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|7e195e8d-c6df-4fd...|  americas/brazil/s...|6f82f351-9994-459...| 0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645602479789|7e195e8d-c6df-4fd...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|3409ecd2-02c2-40c...|  americas/brazil/s...|6f82f351-9994-459...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1645621352954|3409ecd2-02c2-40c...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|60903a00-3fdc-45d...|    asia/india/chennai|2f5b659d-3738-48c...|  0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1645503078006|60903a00-3fdc-45d...|  asia/india/chennai|
    |  20220301105108074|20220301105108074...|22d1507b-7d02-402...|    asia/india/chennai|2f5b659d-3738-48c...|   0.40613510977307| 0.5644092139040959|driver-213|  0.798706304941517|0.02698359227182834|17.851135255091155|rider-213|1645948641664|22d1507b-7d02-402...|  asia/india/chennai|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    
    +------------------+-------------------+-------------------+-------------+
    |              fare|          begin_lon|          begin_lat|           ts|
    +------------------+-------------------+-------------------+-------------+
    | 93.56018115236618|0.14285051259466197|0.21624150367601136|1645736908516|
    | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1645565690012|
    | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1646031306513|
    | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1646085368961|
    |  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1645868768394|
    |34.158284716382845|0.46157858450465483| 0.4726905879569653|1645602479789|
    | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1645621352954|
    | 41.06290929046368| 0.8192868687714224|  0.651058505660742|1645503078006|
    +------------------+-------------------+-------------------+-------------+
    
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    |  20220301105108074|c4340e2c-efd2-4a9...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
    |  20220301105108074|67ee90ec-d7b8-477...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
    |  20220301105108074|91703076-f580-49f...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
    |  20220301105108074|96a7571e-1e54-4bc...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
    |  20220301105108074|3723b4ac-8841-4cd...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
    |  20220301105108074|b3bf0b93-768d-4be...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
    |  20220301105108074|7e195e8d-c6df-4fd...|  americas/brazil/s...|rider-213|driver-213|34.158284716382845|
    |  20220301105108074|3409ecd2-02c2-40c...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
    |  20220301105108074|60903a00-3fdc-45d...|    asia/india/chennai|rider-213|driver-213| 41.06290929046368|
    |  20220301105108074|22d1507b-7d02-402...|    asia/india/chennai|rider-213|driver-213|17.851135255091155|
    +-------------------+--------------------+----------------------+---------+----------+------------------+
    
    
    Process finished with exit code 0
    
    

    时间旅行查询(查询历史版本)

    查询代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
        basePath = "file:///tmp/hudi_trips_cow"
    
        # 以下列举三种查询方式
    
        # pyspark
        spark.read. \
            format("hudi"). \
            option("as.of.instant", "20210728141108"). \
            load(basePath).show()
    
        spark.read. \
            format("hudi"). \
            option("as.of.instant", "2022-02-28 14:11:08.000"). \
            load(basePath).show()
    
        # It is equal to "as.of.instant = 2021-07-28 00:00:00"
        spark.read. \
            format("hudi"). \
            option("as.of.instant", "2022-07-28"). \
            load(basePath).show()
    

    Ps:数据的产生时间是「20220301」

    查询结果

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_query_time_travel.py
    22/03/01 11:30:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 11:30:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|begin_lat|begin_lon|driver|end_lat|end_lon|fare|rider| ts|uuid|partitionpath|
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|begin_lat|begin_lon|driver|end_lat|end_lon|fare|rider| ts|uuid|partitionpath|
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+------+-------+-------+----+-----+---+----+-------------+
    
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|          begin_lat|          begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301105108074|20220301105108074...|c4340e2c-efd2-4a9...|  americas/united_s...|52a5ee08-9376-495...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1645736908516|c4340e2c-efd2-4a9...|americas/united_s...|
    |  20220301105108074|20220301105108074...|67ee90ec-d7b8-477...|  americas/united_s...|52a5ee08-9376-495...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645565690012|67ee90ec-d7b8-477...|americas/united_s...|
    |  20220301105108074|20220301105108074...|91703076-f580-49f...|  americas/united_s...|52a5ee08-9376-495...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1646031306513|91703076-f580-49f...|americas/united_s...|
    |  20220301105108074|20220301105108074...|96a7571e-1e54-4bc...|  americas/united_s...|52a5ee08-9376-495...| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302|  0.362464770874404|19.179139106643607|rider-213|1645796169470|96a7571e-1e54-4bc...|americas/united_s...|
    |  20220301105108074|20220301105108074...|3723b4ac-8841-4cd...|  americas/united_s...|52a5ee08-9376-495...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1646085368961|3723b4ac-8841-4cd...|americas/united_s...|
    |  20220301105108074|20220301105108074...|b3bf0b93-768d-4be...|  americas/brazil/s...|6f82f351-9994-459...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645868768394|b3bf0b93-768d-4be...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|7e195e8d-c6df-4fd...|  americas/brazil/s...|6f82f351-9994-459...| 0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645602479789|7e195e8d-c6df-4fd...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|3409ecd2-02c2-40c...|  americas/brazil/s...|6f82f351-9994-459...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1645621352954|3409ecd2-02c2-40c...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|60903a00-3fdc-45d...|    asia/india/chennai|2f5b659d-3738-48c...|  0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1645503078006|60903a00-3fdc-45d...|  asia/india/chennai|
    |  20220301105108074|20220301105108074...|22d1507b-7d02-402...|    asia/india/chennai|2f5b659d-3738-48c...|   0.40613510977307| 0.5644092139040959|driver-213|  0.798706304941517|0.02698359227182834|17.851135255091155|rider-213|1645948641664|22d1507b-7d02-402...|  asia/india/chennai|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    
    
    Process finished with exit code 0
    
    

    Update(Append)

    代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
        basePath = "file:///tmp/hudi_trips_cow"
    
        dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
        tableName = "hudi_trips_cow"
        hudi_options = {
            'hoodie.table.name': tableName,
            'hoodie.datasource.write.recordkey.field': 'uuid',
            'hoodie.datasource.write.partitionpath.field': 'partitionpath',
            'hoodie.datasource.write.table.name': tableName,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': 'ts',
            'hoodie.upsert.shuffle.parallelism': 2,
            'hoodie.insert.shuffle.parallelism': 2
        }
    
        #由于我将update操作与insert的步骤独立出来了,如果直接使用dataGen.generateUpdates(10)会报错,需要先执行一个生成insert数据的动作
        #由于我将update操作与insert的步骤独立出来,所以这个更新的数据和之前插入的数据和在同一个代码中得到的会不同。不过影响不大。
        dataGen.generateInserts(10)
    
        # pyspark
        updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
        df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
        df.show()
        df.write.format("hudi"). \
            options(**hudi_options). \
            mode("append"). \
            save(basePath)
    

    运行日志

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_update.py
    22/03/01 13:28:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 13:28:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    +--------------------+--------------------+----------+-------------------+-------------------+------------------+--------------------+---------+-------------+--------------------+
    |           begin_lat|           begin_lon|    driver|            end_lat|            end_lon|              fare|       partitionpath|    rider|           ts|                uuid|
    +--------------------+--------------------+----------+-------------------+-------------------+------------------+--------------------+---------+-------------+--------------------+
    |  0.7340133901254792|  0.5142184937933181|driver-284| 0.7814655558162802| 0.6592596683641996|49.527694252432056|americas/united_s...|rider-284|1645635237429|0456f152-9a1b-48f...|
    |  0.1593867607188556|0.010872312870502165|driver-284| 0.9808530350038475| 0.7963756520507014| 29.47661370147079|americas/brazil/s...|rider-284|1645899391613|1e34b971-3dfc-489...|
    |  0.7180196467760873| 0.13755354862499358|driver-284| 0.3037264771699937| 0.2539047155055727| 86.75932789048282|americas/brazil/s...|rider-284|1645890122334|1e34b971-3dfc-489...|
    |  0.6570857443423376|   0.888493603696927|driver-284| 0.9036309069576131|0.37603706507284995| 63.72504913279929|americas/brazil/s...|rider-284|1645547517087|8d784cd0-02d9-429...|
    | 0.08528650347654165|  0.4006983139989222|driver-284| 0.1975324518739051|  0.908216792146506| 90.25710109008239|  asia/india/chennai|rider-284|1646095456906|bc2e551e-4206-4f0...|
    | 0.18294079059016366| 0.19949323322922063|driver-284|0.24749642418050566| 0.1751761658135068|  90.9053809533154|americas/united_s...|rider-284|1645675773158|da50d4f5-94cb-41c...|
    |  0.4777395067707303|  0.3349917833248327|driver-284| 0.9735699951963335| 0.8144901865212508|  98.3428192817987|americas/united_s...|rider-284|1646066699577|a24084ea-4473-459...|
    |0.014159831486388885| 0.42849372303000655|driver-284| 0.9968531966280192| 0.9451993293955782| 2.375516772415698|americas/united_s...|rider-284|1645728852563|da50d4f5-94cb-41c...|
    | 0.16603428449020086|  0.6999655248704163|driver-284| 0.5086437188581894| 0.6242134749327686| 9.384124531808036|  asia/india/chennai|rider-284|1645620049479|9cf010a9-7303-4c5...|
    |  0.2110206104048945|  0.2783086084578943|driver-284|0.12154541219767523| 0.8700506703716298| 91.99515909032544|americas/brazil/s...|rider-284|1645773817699|1e34b971-3dfc-489...|
    +--------------------+--------------------+----------+-------------------+-------------------+------------------+--------------------+---------+-------------+--------------------+
    
    22/03/01 13:28:27 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
    22/03/01 13:28:27 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
    
    Process finished with exit code 0
    
    

    更新后的目录结构

    • 元数据目录:
    # 更新之前
    gavin@GavindeMacBook-Pro .hoodie % ll
    total 32
    -rw-r--r--  1 gavin  wheel  4374 Mar  1 10:51 20220301105108074.commit
    -rw-r--r--  1 gavin  wheel     0 Mar  1 10:51 20220301105108074.commit.requested
    -rw-r--r--  1 gavin  wheel  2594 Mar  1 10:51 20220301105108074.inflight
    drwxr-xr-x  2 gavin  wheel    64 Mar  1 10:51 archived
    -rw-r--r--  1 gavin  wheel   600 Mar  1 10:51 hoodie.properties
    # 更新之后
    gavin@GavindeMacBook-Pro .hoodie % ll
    total 56
    -rw-r--r--  1 gavin  wheel  4374 Mar  1 10:51 20220301105108074.commit
    -rw-r--r--  1 gavin  wheel     0 Mar  1 10:51 20220301105108074.commit.requested
    -rw-r--r--  1 gavin  wheel  2594 Mar  1 10:51 20220301105108074.inflight
    -rw-r--r--  1 gavin  wheel  4413 Mar  1 13:28 20220301132827300.commit
    -rw-r--r--  1 gavin  wheel     0 Mar  1 13:28 20220301132827300.commit.requested
    -rw-r--r--  1 gavin  wheel  2594 Mar  1 13:28 20220301132827300.inflight
    drwxr-xr-x  2 gavin  wheel    64 Mar  1 10:51 archived
    -rw-r--r--  1 gavin  wheel   600 Mar  1 10:51 hoodie.properties
    gavin@GavindeMacBook-Pro .hoodie % 
    
    • 数据文件目录
    #更新之前
    gavin@GavindeMacBook-Pro apache % tree /tmp/hudi_trips_cow 
    /tmp/hudi_trips_cow
    ├── americas
    │   ├── brazil
    │   │   └── sao_paulo
    │   │       └── 6f82f351-9994-459d-a20c-77baa91ad323-0_0-27-31_20220301105108074.parquet
    │   └── united_states
    │       └── san_francisco
    │           └── 52a5ee08-9376-4954-bb8f-f7f519b8b40e-0_1-33-32_20220301105108074.parquet
    └── asia
        └── india
            └── chennai
                └── 2f5b659d-3738-48ca-b590-bbce52e98642-0_2-33-33_20220301105108074.parquet
    
    8 directories, 3 files
    gavin@GavindeMacBook-Pro apache % 
    
    #更新之后
    gavin@GavindeMacBook-Pro hudi_trips_cow % tree ./*
    ./americas
    ├── brazil
    │   └── sao_paulo
    │       ├── 6f82f351-9994-459d-a20c-77baa91ad323-0_0-27-31_20220301105108074.parquet
    │       └── 6f82f351-9994-459d-a20c-77baa91ad323-0_0-29-39_20220301132827300.parquet
    └── united_states
        └── san_francisco
            ├── 52a5ee08-9376-4954-bb8f-f7f519b8b40e-0_1-33-32_20220301105108074.parquet
            └── 52a5ee08-9376-4954-bb8f-f7f519b8b40e-0_1-35-40_20220301132827300.parquet
    ./asia
    └── india
        └── chennai
            ├── 2f5b659d-3738-48ca-b590-bbce52e98642-0_2-33-33_20220301105108074.parquet
            └── 2f5b659d-3738-48ca-b590-bbce52e98642-0_2-35-41_20220301132827300.parquet
    
    6 directories, 6 files
    gavin@GavindeMacBook-Pro hudi_trips_cow % 
    

    额外做一个查询,看看当前数据是否新增了:

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_query.py
    22/03/01 13:34:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 13:34:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    ========hudi_trips_snapshot 表中共计[17]条数据
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|           begin_lat|           begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301105108074|20220301105108074...|c4340e2c-efd2-4a9...|  americas/united_s...|52a5ee08-9376-495...| 0.21624150367601136| 0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1645736908516|c4340e2c-efd2-4a9...|americas/united_s...|
    |  20220301105108074|20220301105108074...|67ee90ec-d7b8-477...|  americas/united_s...|52a5ee08-9376-495...|  0.5731835407930634|  0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645565690012|67ee90ec-d7b8-477...|americas/united_s...|
    |  20220301105108074|20220301105108074...|91703076-f580-49f...|  americas/united_s...|52a5ee08-9376-495...| 0.11488393157088261|  0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1646031306513|91703076-f580-49f...|americas/united_s...|
    |  20220301105108074|20220301105108074...|96a7571e-1e54-4bc...|  americas/united_s...|52a5ee08-9376-495...|  0.8742041526408587|  0.7528268153249502|driver-213| 0.9197827128888302|  0.362464770874404|19.179139106643607|rider-213|1645796169470|96a7571e-1e54-4bc...|americas/united_s...|
    |  20220301105108074|20220301105108074...|3723b4ac-8841-4cd...|  americas/united_s...|52a5ee08-9376-495...|  0.1856488085068272|  0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1646085368961|3723b4ac-8841-4cd...|americas/united_s...|
    |  20220301132827300|20220301132827300...|a24084ea-4473-459...|  americas/united_s...|52a5ee08-9376-495...|  0.4777395067707303|  0.3349917833248327|driver-284| 0.9735699951963335| 0.8144901865212508|  98.3428192817987|rider-284|1646066699577|a24084ea-4473-459...|americas/united_s...|
    |  20220301132827300|20220301132827300...|0456f152-9a1b-48f...|  americas/united_s...|52a5ee08-9376-495...|  0.7340133901254792|  0.5142184937933181|driver-284| 0.7814655558162802| 0.6592596683641996|49.527694252432056|rider-284|1645635237429|0456f152-9a1b-48f...|americas/united_s...|
    |  20220301132827300|20220301132827300...|da50d4f5-94cb-41c...|  americas/united_s...|52a5ee08-9376-495...|0.014159831486388885| 0.42849372303000655|driver-284| 0.9968531966280192| 0.9451993293955782| 2.375516772415698|rider-284|1645728852563|da50d4f5-94cb-41c...|americas/united_s...|
    |  20220301105108074|20220301105108074...|b3bf0b93-768d-4be...|  americas/brazil/s...|6f82f351-9994-459...|  0.6100070562136587|  0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645868768394|b3bf0b93-768d-4be...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|7e195e8d-c6df-4fd...|  americas/brazil/s...|6f82f351-9994-459...|  0.4726905879569653| 0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645602479789|7e195e8d-c6df-4fd...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|3409ecd2-02c2-40c...|  americas/brazil/s...|6f82f351-9994-459...|  0.0750588760043035| 0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1645621352954|3409ecd2-02c2-40c...|americas/brazil/s...|
    |  20220301132827300|20220301132827300...|8d784cd0-02d9-429...|  americas/brazil/s...|6f82f351-9994-459...|  0.6570857443423376|   0.888493603696927|driver-284| 0.9036309069576131|0.37603706507284995| 63.72504913279929|rider-284|1645547517087|8d784cd0-02d9-429...|americas/brazil/s...|
    |  20220301132827300|20220301132827300...|1e34b971-3dfc-489...|  americas/brazil/s...|6f82f351-9994-459...|  0.1593867607188556|0.010872312870502165|driver-284| 0.9808530350038475| 0.7963756520507014| 29.47661370147079|rider-284|1645899391613|1e34b971-3dfc-489...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|60903a00-3fdc-45d...|    asia/india/chennai|2f5b659d-3738-48c...|   0.651058505660742|  0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1645503078006|60903a00-3fdc-45d...|  asia/india/chennai|
    |  20220301105108074|20220301105108074...|22d1507b-7d02-402...|    asia/india/chennai|2f5b659d-3738-48c...|    0.40613510977307|  0.5644092139040959|driver-213|  0.798706304941517|0.02698359227182834|17.851135255091155|rider-213|1645948641664|22d1507b-7d02-402...|  asia/india/chennai|
    |  20220301132827300|20220301132827300...|bc2e551e-4206-4f0...|    asia/india/chennai|2f5b659d-3738-48c...| 0.08528650347654165|  0.4006983139989222|driver-284| 0.1975324518739051|  0.908216792146506| 90.25710109008239|rider-284|1646095456906|bc2e551e-4206-4f0...|  asia/india/chennai|
    |  20220301132827300|20220301132827300...|9cf010a9-7303-4c5...|    asia/india/chennai|2f5b659d-3738-48c...| 0.16603428449020086|  0.6999655248704163|driver-284| 0.5086437188581894| 0.6242134749327686| 9.384124531808036|rider-284|1645620049479|9cf010a9-7303-4c5...|  asia/india/chennai|
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    
    
    Process finished with exit code 0
    
    

    Ps:上述中数据新增了7条,但是生成的update数据有10条,采用的是append的方式进行更新;得到的结果是更新之后数据有17条;这个是因为数据append不是单纯的追加,而是使用「‘hoodie.datasource.write.operation’: ‘upsert’」的option的追加。也就是默认表已经存在,将新数据进行upsert。官网中描述的是『 In general, always use append mode unless you are trying to create the table for the first time』。所以对于appendoverwrite两种模式的选择,如果不是首次建表,基本都选择append

    扩展·查询历史版本

    根据第二次的提交记录可得到一个准确的时间点为「20220301132827300」:

    -rw-r--r-- 1 gavin wheel 4413 Mar 1 13:28 20220301132827300.commit

    然后查到更新之前的版本数据:

    spark.read.format("hudi").option("as.of.instant", "20220301132827300").load(basePath).show()
    
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
          /_/
    
    Using Python version 3.8.9 (default, Oct 26 2021 07:25:54)
    Spark context Web UI available at http://192.168.24.227:4040
    Spark context available as 'sc' (master = local[*], app id = local-1646101237379).
    SparkSession available as 'spark'.
    >>> basePath = "file:///tmp/hudi_trips_cow"
    >>> df = spark.read.format("hudi").option("as.of.instant", "20220301132827300").load(basePath)
    >>> df.count()
    17
    >>> df.show()                                                                   
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|           begin_lat|           begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301105108074|20220301105108074...|c4340e2c-efd2-4a9...|  americas/united_s...|52a5ee08-9376-495...| 0.21624150367601136| 0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1645736908516|c4340e2c-efd2-4a9...|americas/united_s...|
    |  20220301105108074|20220301105108074...|67ee90ec-d7b8-477...|  americas/united_s...|52a5ee08-9376-495...|  0.5731835407930634|  0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645565690012|67ee90ec-d7b8-477...|americas/united_s...|
    |  20220301105108074|20220301105108074...|91703076-f580-49f...|  americas/united_s...|52a5ee08-9376-495...| 0.11488393157088261|  0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1646031306513|91703076-f580-49f...|americas/united_s...|
    |  20220301105108074|20220301105108074...|96a7571e-1e54-4bc...|  americas/united_s...|52a5ee08-9376-495...|  0.8742041526408587|  0.7528268153249502|driver-213| 0.9197827128888302|  0.362464770874404|19.179139106643607|rider-213|1645796169470|96a7571e-1e54-4bc...|americas/united_s...|
    |  20220301105108074|20220301105108074...|3723b4ac-8841-4cd...|  americas/united_s...|52a5ee08-9376-495...|  0.1856488085068272|  0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1646085368961|3723b4ac-8841-4cd...|americas/united_s...|
    |  20220301132827300|20220301132827300...|a24084ea-4473-459...|  americas/united_s...|52a5ee08-9376-495...|  0.4777395067707303|  0.3349917833248327|driver-284| 0.9735699951963335| 0.8144901865212508|  98.3428192817987|rider-284|1646066699577|a24084ea-4473-459...|americas/united_s...|
    |  20220301132827300|20220301132827300...|0456f152-9a1b-48f...|  americas/united_s...|52a5ee08-9376-495...|  0.7340133901254792|  0.5142184937933181|driver-284| 0.7814655558162802| 0.6592596683641996|49.527694252432056|rider-284|1645635237429|0456f152-9a1b-48f...|americas/united_s...|
    |  20220301132827300|20220301132827300...|da50d4f5-94cb-41c...|  americas/united_s...|52a5ee08-9376-495...|0.014159831486388885| 0.42849372303000655|driver-284| 0.9968531966280192| 0.9451993293955782| 2.375516772415698|rider-284|1645728852563|da50d4f5-94cb-41c...|americas/united_s...|
    |  20220301105108074|20220301105108074...|b3bf0b93-768d-4be...|  americas/brazil/s...|6f82f351-9994-459...|  0.6100070562136587|  0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645868768394|b3bf0b93-768d-4be...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|7e195e8d-c6df-4fd...|  americas/brazil/s...|6f82f351-9994-459...|  0.4726905879569653| 0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645602479789|7e195e8d-c6df-4fd...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|3409ecd2-02c2-40c...|  americas/brazil/s...|6f82f351-9994-459...|  0.0750588760043035| 0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1645621352954|3409ecd2-02c2-40c...|americas/brazil/s...|
    |  20220301132827300|20220301132827300...|8d784cd0-02d9-429...|  americas/brazil/s...|6f82f351-9994-459...|  0.6570857443423376|   0.888493603696927|driver-284| 0.9036309069576131|0.37603706507284995| 63.72504913279929|rider-284|1645547517087|8d784cd0-02d9-429...|americas/brazil/s...|
    |  20220301132827300|20220301132827300...|1e34b971-3dfc-489...|  americas/brazil/s...|6f82f351-9994-459...|  0.1593867607188556|0.010872312870502165|driver-284| 0.9808530350038475| 0.7963756520507014| 29.47661370147079|rider-284|1645899391613|1e34b971-3dfc-489...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|60903a00-3fdc-45d...|    asia/india/chennai|2f5b659d-3738-48c...|   0.651058505660742|  0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1645503078006|60903a00-3fdc-45d...|  asia/india/chennai|
    |  20220301105108074|20220301105108074...|22d1507b-7d02-402...|    asia/india/chennai|2f5b659d-3738-48c...|    0.40613510977307|  0.5644092139040959|driver-213|  0.798706304941517|0.02698359227182834|17.851135255091155|rider-213|1645948641664|22d1507b-7d02-402...|  asia/india/chennai|
    |  20220301132827300|20220301132827300...|bc2e551e-4206-4f0...|    asia/india/chennai|2f5b659d-3738-48c...| 0.08528650347654165|  0.4006983139989222|driver-284| 0.1975324518739051|  0.908216792146506| 90.25710109008239|rider-284|1646095456906|bc2e551e-4206-4f0...|  asia/india/chennai|
    |  20220301132827300|20220301132827300...|9cf010a9-7303-4c5...|    asia/india/chennai|2f5b659d-3738-48c...| 0.16603428449020086|  0.6999655248704163|driver-284| 0.5086437188581894| 0.6242134749327686| 9.384124531808036|rider-284|1645620049479|9cf010a9-7303-4c5...|  asia/india/chennai|
    +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    
    ## 查询更新之前的数据20220301132827300 -> 20220301132527300
    >>> df_before_update = spark.read.format("hudi").option("as.of.instant", "20220301132527300").load(basePath)
    >>> df_before_update.count()
    10
    >>> df_before_update.show()
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|          begin_lat|          begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301105108074|20220301105108074...|c4340e2c-efd2-4a9...|  americas/united_s...|52a5ee08-9376-495...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1645736908516|c4340e2c-efd2-4a9...|americas/united_s...|
    |  20220301105108074|20220301105108074...|67ee90ec-d7b8-477...|  americas/united_s...|52a5ee08-9376-495...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645565690012|67ee90ec-d7b8-477...|americas/united_s...|
    |  20220301105108074|20220301105108074...|91703076-f580-49f...|  americas/united_s...|52a5ee08-9376-495...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1646031306513|91703076-f580-49f...|americas/united_s...|
    |  20220301105108074|20220301105108074...|96a7571e-1e54-4bc...|  americas/united_s...|52a5ee08-9376-495...| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302|  0.362464770874404|19.179139106643607|rider-213|1645796169470|96a7571e-1e54-4bc...|americas/united_s...|
    |  20220301105108074|20220301105108074...|3723b4ac-8841-4cd...|  americas/united_s...|52a5ee08-9376-495...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1646085368961|3723b4ac-8841-4cd...|americas/united_s...|
    |  20220301105108074|20220301105108074...|b3bf0b93-768d-4be...|  americas/brazil/s...|6f82f351-9994-459...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645868768394|b3bf0b93-768d-4be...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|7e195e8d-c6df-4fd...|  americas/brazil/s...|6f82f351-9994-459...| 0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645602479789|7e195e8d-c6df-4fd...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|3409ecd2-02c2-40c...|  americas/brazil/s...|6f82f351-9994-459...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1645621352954|3409ecd2-02c2-40c...|americas/brazil/s...|
    |  20220301105108074|20220301105108074...|60903a00-3fdc-45d...|    asia/india/chennai|2f5b659d-3738-48c...|  0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1645503078006|60903a00-3fdc-45d...|  asia/india/chennai|
    |  20220301105108074|20220301105108074...|22d1507b-7d02-402...|    asia/india/chennai|2f5b659d-3738-48c...|   0.40613510977307| 0.5644092139040959|driver-213|  0.798706304941517|0.02698359227182834|17.851135255091155|rider-213|1645948641664|22d1507b-7d02-402...|  asia/india/chennai|
    +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    

    Incremental query

    代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
    
        # pyspark
        tableName = "hudi_trips_cow"
        basePath = "file:///tmp/hudi_trips_cow"
    
        # pyspark
        # reload data
        spark. \
            read. \
            format("hudi"). \
            load(basePath). \
            createOrReplaceTempView("hudi_trips_snapshot")
    
        #先获取所有的已经存在的提交时间,再取倒数第二个作为增量查询的开始时间进行查询(不设置增量查询结束时间则表示查询开始时间之后的所有数据)
        commits = list(map(lambda row: row[0], spark.sql(
            "select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(
            50).collect()))
        print(f'get commit info [{commits}]')
        beginTime = commits[len(commits) - 2]  # commit time we are interested in
        print(f'set beginTime as [{beginTime}]')
        # incrementally query data
        incremental_read_options = {
            'hoodie.datasource.query.type': 'incremental',
            'hoodie.datasource.read.begin.instanttime': beginTime,
        }
    
        tripsIncrementalDF = spark.read.format("hudi"). \
            options(**incremental_read_options). \
            load(basePath)
        tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
    
        spark.sql(
            "select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
    

    执行结果

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic/basic_incremental_query.py
    22/03/01 15:08:26 WARN Utils: Your hostname, GavindeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.196.59.24 instead (on interface en0)
    22/03/01 15:08:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    22/03/01 15:08:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 15:08:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    get commit info [['20220301105108074', '20220301132827300']]
    set beginTime as [20220301105108074]
    +-------------------+------------------+--------------------+-------------------+-------------+
    |_hoodie_commit_time|              fare|           begin_lon|          begin_lat|           ts|
    +-------------------+------------------+--------------------+-------------------+-------------+
    |  20220301132827300|  98.3428192817987|  0.3349917833248327| 0.4777395067707303|1646066699577|
    |  20220301132827300|49.527694252432056|  0.5142184937933181| 0.7340133901254792|1645635237429|
    |  20220301132827300| 63.72504913279929|   0.888493603696927| 0.6570857443423376|1645547517087|
    |  20220301132827300| 29.47661370147079|0.010872312870502165| 0.1593867607188556|1645899391613|
    |  20220301132827300| 90.25710109008239|  0.4006983139989222|0.08528650347654165|1646095456906|
    +-------------------+------------------+--------------------+-------------------+-------------+
    
    
    Process finished with exit code 0
    
    

    查询指定时间点的数据(Point in time query)

    代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
        basePath = "file:///tmp/hudi_trips_cow"
    
        # reload data
        spark. \
            read. \
            format("hudi"). \
            load(basePath). \
            createOrReplaceTempView("hudi_trips_snapshot")
    
        commits = list(map(lambda row: row[0], spark.sql(
            "select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(
            50).collect()))
        endTime = commits[len(commits) - 2]
    
        beginTime = "000"  # Represents all commits > this time.
    
        # query point in time data
        point_in_time_read_options = {
            'hoodie.datasource.query.type': 'incremental',
            'hoodie.datasource.read.end.instanttime': endTime,
            'hoodie.datasource.read.begin.instanttime': beginTime
        }
    
        tripsPointInTimeDF = spark.read.format("hudi"). \
            options(**point_in_time_read_options). \
            load(basePath)
    
        tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
        spark.sql(
            "select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
    

    运行结果

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic/basic_point_in_time_query.py
    22/03/01 15:18:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 15:18:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    +-------------------+------------------+-------------------+-------------------+-------------+
    |_hoodie_commit_time|              fare|          begin_lon|          begin_lat|           ts|
    +-------------------+------------------+-------------------+-------------------+-------------+
    |  20220301105108074| 93.56018115236618|0.14285051259466197|0.21624150367601136|1645736908516|
    |  20220301105108074| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1645565690012|
    |  20220301105108074| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1646031306513|
    |  20220301105108074| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1646085368961|
    |  20220301105108074|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1645868768394|
    |  20220301105108074|34.158284716382845|0.46157858450465483| 0.4726905879569653|1645602479789|
    |  20220301105108074| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1645621352954|
    |  20220301105108074| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1645503078006|
    +-------------------+------------------+-------------------+-------------------+-------------+
    
    
    Process finished with exit code 0
    
    

    Delete

    此例中将从原表中随机取出两条数据,然后根据取出数据做一个条件查询删除,将原表中对应的这两条数据进行删除

    ps:delete动作只能在append模式下进行

    代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
    
        # pyspark
        tableName = "hudi_trips_cow"
        basePath = "file:///tmp/hudi_trips_cow"
    
        # pyspark
        # reload data
        spark. \
            read. \
            format("hudi"). \
            load(basePath). \
            createOrReplaceTempView("hudi_trips_snapshot")
    
        before_count = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
        print(f'before delete , there exists [{before_count}] records')
        # fetch two records to be deleted
        ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
        print(f'Records will be deleted :[{ds.collect()}]')
        # issue deletes
        hudi_delete_options = {
            'hoodie.table.name': tableName,
            'hoodie.datasource.write.recordkey.field': 'uuid',
            'hoodie.datasource.write.partitionpath.field': 'partitionpath',
            'hoodie.datasource.write.table.name': tableName,
            'hoodie.datasource.write.operation': 'delete',
            'hoodie.datasource.write.precombine.field': 'ts',
            'hoodie.upsert.shuffle.parallelism': 2,
            'hoodie.insert.shuffle.parallelism': 2
        }
    
        from pyspark.sql.functions import lit
    
        deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
        print(f'deletes data: [{deletes}]')
        #在生成的DFs后面再新增一列「ts」
        df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
        df.write.format("hudi"). \
            options(**hudi_delete_options). \
            mode("append"). \
            save(basePath)
    
        # run the same read query as above.
        roAfterDeleteViewDF = spark. \
            read. \
            format("hudi"). \
            load(basePath)
        roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
        # fetch should return (total - 2) records
        after_count = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
        print(f'after delete , there exists [{after_count}] records')
    

    运行结果

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic/basic_delete.py
    22/03/01 15:59:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 15:59:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    before delete , there exists [17] records
    Records will be deleted :[[Row(uuid='c4340e2c-efd2-4a92-9615-32822599d397', partitionpath='americas/united_states/san_francisco'), Row(uuid='67ee90ec-d7b8-4772-b02e-6a41a7556fa0', partitionpath='americas/united_states/san_francisco')]]
    deletes data: [[('c4340e2c-efd2-4a92-9615-32822599d397', 'americas/united_states/san_francisco'), ('67ee90ec-d7b8-4772-b02e-6a41a7556fa0', 'americas/united_states/san_francisco')]]
    22/03/01 15:59:34 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
    22/03/01 15:59:34 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
    after delete , there exists [15] records
    
    Process finished with exit code 0
    
    

    (补充)纯覆盖式Insert数据不更新

    之前的介绍了upsert类型的,这里补充一下纯insert的数据插入,不带更新的那种,直接全量覆盖。

    代码

    import pyspark
    
    if __name__ == '__main__':
        builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars",
                    "/Users/gavin/.ivy2/cache/org.apache.hudi/hudi-spark3.1.2-bundle_2.12/jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar,"
                    "/Users/gavin/.ivy2/cache/org.apache.spark/spark-avro_2.12/jars/spark-avro_2.12-3.1.2.jar") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        spark = builder.getOrCreate()
        sc = spark.sparkContext
    
        # pyspark
        tableName = "hudi_trips_cow_insert_overwirte"
        basePath = "file:///tmp/hudi_trips_cow_insert_overwirte"
        dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    
        hudi_options = {
            'hoodie.table.name': tableName,
            'hoodie.datasource.write.recordkey.field': 'uuid',
            'hoodie.datasource.write.partitionpath.field': 'partitionpath',
            'hoodie.datasource.write.table.name': tableName,
            'hoodie.datasource.write.operation': 'insert',
            'hoodie.datasource.write.precombine.field': 'ts',
            'hoodie.upsert.shuffle.parallelism': 2,
            'hoodie.insert.shuffle.parallelism': 2
        }
        # pyspark
        #造3条数据用于演示
        inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(3))
        df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
        print(f'start to insert data into a new table:[{df.collect()}]')
        df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
        print(spark.read.format("hudi").load(basePath).collect())
        spark.read.format("hudi").load(basePath).show()
    
        inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(1))
        df_new = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
        print(f'start to insert data into a new table:[{df_new.collect()}]')
        df_new.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
        print(spark.read.format("hudi").load(basePath).collect())
        spark.read.format("hudi").load(basePath).show()
    
    

    运行结果:后面插入的1条数据直接全量覆盖了之前insert的3条数据。最终结果表中的数据是1条,而不是4条

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/extend/insert_overwrite.py
    22/03/01 16:21:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 16:21:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    start to insert data into a new table:[[Row(begin_lat=0.4726905879569653, begin_lon=0.46157858450465483, driver='driver-213', end_lat=0.754803407008858, end_lon=0.9671159942018241, fare=34.158284716382845, partitionpath='americas/brazil/sao_paulo', rider='rider-213', ts=1645537223581, uuid='a63b8b04-6dcc-4edc-af9f-2b9f6dcfe145'), Row(begin_lat=0.6100070562136587, begin_lon=0.8779402295427752, driver='driver-213', end_lat=0.3407870505929602, end_lon=0.5030798142293655, fare=43.4923811219014, partitionpath='americas/brazil/sao_paulo', rider='rider-213', ts=1645608818472, uuid='172f2894-285a-4b48-97c2-d92bf992697c'), Row(begin_lat=0.5731835407930634, begin_lon=0.4923479652912024, driver='driver-213', end_lat=0.08988581780930216, end_lon=0.42520899698713666, fare=64.27696295884016, partitionpath='americas/united_states/san_francisco', rider='rider-213', ts=1645587087764, uuid='aeea15f6-e5b7-438a-b1c6-c00c19347ca1')]]
    22/03/01 16:21:09 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
    22/03/01 16:21:09 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
    [Row(_hoodie_commit_time='20220301162109926', _hoodie_commit_seqno='20220301162109926_1_3', _hoodie_record_key='aeea15f6-e5b7-438a-b1c6-c00c19347ca1', _hoodie_partition_path='americas/united_states/san_francisco', _hoodie_file_name='2cde0990-e337-4e8e-a6df-802f1b45fd53-0_1-10-17_20220301162109926.parquet', begin_lat=0.5731835407930634, begin_lon=0.4923479652912024, driver='driver-213', end_lat=0.08988581780930216, end_lon=0.42520899698713666, fare=64.27696295884016, rider='rider-213', ts=1645587087764, uuid='aeea15f6-e5b7-438a-b1c6-c00c19347ca1', partitionpath='americas/united_states/san_francisco'), Row(_hoodie_commit_time='20220301162109926', _hoodie_commit_seqno='20220301162109926_0_1', _hoodie_record_key='a63b8b04-6dcc-4edc-af9f-2b9f6dcfe145', _hoodie_partition_path='americas/brazil/sao_paulo', _hoodie_file_name='45b0d7ee-88ec-4a35-ad65-be649efe88be-0_0-8-16_20220301162109926.parquet', begin_lat=0.4726905879569653, begin_lon=0.46157858450465483, driver='driver-213', end_lat=0.754803407008858, end_lon=0.9671159942018241, fare=34.158284716382845, rider='rider-213', ts=1645537223581, uuid='a63b8b04-6dcc-4edc-af9f-2b9f6dcfe145', partitionpath='americas/brazil/sao_paulo'), Row(_hoodie_commit_time='20220301162109926', _hoodie_commit_seqno='20220301162109926_0_2', _hoodie_record_key='172f2894-285a-4b48-97c2-d92bf992697c', _hoodie_partition_path='americas/brazil/sao_paulo', _hoodie_file_name='45b0d7ee-88ec-4a35-ad65-be649efe88be-0_0-8-16_20220301162109926.parquet', begin_lat=0.6100070562136587, begin_lon=0.8779402295427752, driver='driver-213', end_lat=0.3407870505929602, end_lon=0.5030798142293655, fare=43.4923811219014, rider='rider-213', ts=1645608818472, uuid='172f2894-285a-4b48-97c2-d92bf992697c', partitionpath='americas/brazil/sao_paulo')]
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|         begin_lat|          begin_lon|    driver|            end_lat|            end_lon|              fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    |  20220301162109926|20220301162109926...|aeea15f6-e5b7-438...|  americas/united_s...|2cde0990-e337-4e8...|0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1645587087764|aeea15f6-e5b7-438...|americas/united_s...|
    |  20220301162109926|20220301162109926...|a63b8b04-6dcc-4ed...|  americas/brazil/s...|45b0d7ee-88ec-4a3...|0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1645537223581|a63b8b04-6dcc-4ed...|americas/brazil/s...|
    |  20220301162109926|20220301162109926...|172f2894-285a-4b4...|  americas/brazil/s...|45b0d7ee-88ec-4a3...|0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|rider-213|1645608818472|172f2894-285a-4b4...|americas/brazil/s...|
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+
    
    start to insert data into a new table:[[Row(begin_lat=0.6220454661413275, begin_lon=0.72024792576853, driver='driver-226', end_lat=0.9048755755365163, end_lon=0.727695054518325, fare=40.613510977307, partitionpath='americas/united_states/san_francisco', rider='rider-226', ts=1645914401180, uuid='db59af46-0fcf-4bb7-ab4a-b9387fb710d3')]]
    22/03/01 16:21:14 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow_insert_overwirte already exists. Deleting existing data & overwriting with new data.
    [Row(_hoodie_commit_time='20220301162114472', _hoodie_commit_seqno='20220301162114472_0_4', _hoodie_record_key='db59af46-0fcf-4bb7-ab4a-b9387fb710d3', _hoodie_partition_path='americas/united_states/san_francisco', _hoodie_file_name='2307cfa5-13d3-481d-8365-d8f8f4e1027a-0_0-35-58_20220301162114472.parquet', begin_lat=0.6220454661413275, begin_lon=0.72024792576853, driver='driver-226', end_lat=0.9048755755365163, end_lon=0.727695054518325, fare=40.613510977307, rider='rider-226', ts=1645914401180, uuid='db59af46-0fcf-4bb7-ab4a-b9387fb710d3', partitionpath='americas/united_states/san_francisco')]
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+----------------+----------+------------------+-----------------+---------------+---------+-------------+--------------------+--------------------+
    |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|         begin_lat|       begin_lon|    driver|           end_lat|          end_lon|           fare|    rider|           ts|                uuid|       partitionpath|
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+----------------+----------+------------------+-----------------+---------------+---------+-------------+--------------------+--------------------+
    |  20220301162114472|20220301162114472...|db59af46-0fcf-4bb...|  americas/united_s...|2307cfa5-13d3-481...|0.6220454661413275|0.72024792576853|driver-226|0.9048755755365163|0.727695054518325|40.613510977307|rider-226|1645914401180|db59af46-0fcf-4bb...|americas/united_s...|
    +-------------------+--------------------+--------------------+----------------------+--------------------+------------------+----------------+----------+------------------+-----------------+---------------+---------+-------------+--------------------+--------------------+
    
    
    Process finished with exit code 0
    
    

    FAQ

    ‘JavaPackage’ object is not callable

    ERROR INFO

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic.py
    22/03/01 10:53:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 10:53:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Traceback (most recent call last):
      File "/Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic.py", line 13, in <module>
        dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
    TypeError: 'JavaPackage' object is not callable
    
    Process finished with exit code 1
    

    Solution

    spark运行时候缺失jar包,将需要的jar包使用.config("spark.jars", "${YOUR_JAR_PATH})指明即可。

    java.lang.ClassNotFoundException: hudi.DefaultSource

    ERROR INFO

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_query.py
    22/03/01 11:11:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 11:11:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Traceback (most recent call last):
      File "/Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_query.py", line 12, in <module>
        tripsSnapshotDF = spark. \
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 204, in load
        return self._df(self._jreader.load(path))
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
        return_value = get_return_value(
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
        return f(*a, **kw)
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
    : java.lang.ClassNotFoundException: Failed to find data source: hudi. Please find packages at http://spark.apache.org/third-party-projects.html
    	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
    	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
    	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.ClassNotFoundException: hudi.DefaultSource
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
    	at scala.util.Try$.apply(Try.scala:213)
    	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
    	at scala.util.Failure.orElse(Try.scala:224)
    	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    	... 14 more
    
    
    Process finished with exit code 1
    
    

    Solution

    spark运行时候缺失jar包,将需要的jar包使用.config("spark.jars", "${YOUR_JAR_PATH})指明即可。

    Data must have been written before performing the update operation

    在使用「dataGen.generateUpdates(10)」的时候报的错,无法生成更新的数据

    ERROR INFO

    /Users/gavin/PycharmProjects/pythonProject/venv/bin/python /Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_update.py
    22/03/01 13:15:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    22/03/01 13:15:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Traceback (most recent call last):
      File "/Users/gavin/PycharmProjects/pythonProject/venv/spark/hudi/basic_update.py", line 29, in <module>
        updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
        return_value = get_return_value(
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
        return f(*a, **kw)
      File "/Users/gavin/PycharmProjects/pythonProject/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling o35.generateUpdates.
    : org.apache.hudi.exception.HoodieException: Data must have been written before performing the update operation
    	at org.apache.hudi.QuickstartUtils$DataGenerator.generateUpdates(QuickstartUtils.java:180)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:748)
    
    
    Process finished with exit code 1
    
    

    Solution

    根据报错信息可知,需要在update之前有一个insert动作,而对于dataGen来说,insert就是生成insert的数据,应该是为了基于insert的数据生成update的数据,所以,解决这个报错的方式就是在调用「dataGen.generateUpdates(10)」先调用一次「dataGen.generateInserts(10)」

    参考文档

    [1] Apache Hudi官方文档: https://hudi.apache.org/docs/quick-start-guide

    展开全文
  • 使用C3P0创建数据源DataSource。 使用DBCP创建据源DataSource。 能够使用JDBC简化工具包DBUtils完成单表的增删改查操作。 一,使用连接池重写工具类 1.为什么使用连接池重写工具类 1.因为每次创建和...

     

    目标

    • 使用C3P0创建数据源DataSource。

    • 使用DBCP创建据源DataSource。

    • 能够使用JDBC简化工具包DBUtils完成单表的增删改查操作。

    一,使用连接池重写工具类

    1.为什么使用连接池重写工具类

    • 1.因为每次创建和销毁连接都会消耗较多的系统资源

    • 2.每次创建和销毁连接都要消耗大概0.05~1s的时间。

    • 3.可以防止大量用户并发访问数据库服务器。

    • Connection对象在JDBC使用的时候就会去创建一个对象,使用结束以后就会将这个对象给销毁了.每次创建和销毁对象都是耗时操作.需要使用连接池对其进行优化.程序初始化的时候,初始化多个连接,将多个连接放入到池(集合)中.每次获取的时候,都可以直接从连接池中进行获取.使用结束以后,将连接归还到池中.

    2.连接池原理

     

    • 目的:解决建立数据库连接耗费资源和时间很多的问题,提高性能。

    3.编写连接池

    3.1步骤

    • 创建一个类,定义LinkedList集合作为连接池,在静态代码块中,向集合里面添加5个连接对象

    • 添加addBack()方法,用作归还连接

    • 代码:

    public class MyDataSource {
    	//连接池
    	static LinkedList<Connection> pool = new LinkedList<>();
    	//初始化连接
    	static{
    		try {
    			for(int i = 0; i < 5; i++){
    				pool.add(JdbcUtils.getConnection());
    			}
    		} catch (SQLException e) {
    			e.printStackTrace();
    		}
    	}
    	//定义一个方法,从连接池中获取connection,从头部获取
    	public Connection getConnectionFromPool(){
    		if (pool.size() > 0) {
    			//池子中有connection
    			return pool.removeFirst();
    		}else {
    			//如果池子中没有connection,则先加入等待队列,等待队列满了的话就新建connection(新建的				//connection是不需要放回池子的,用完后直接销毁)
    			return JDBCUtil.getConnection();
    		}
    	}
    	//定义一个方法,将connection放回池子中(如果是新创建的connection则直接销毁)
    	public void addBack(Connection connection){
    		//pool.addLast(connection);//免不了会将新建的connection添加进池子;
    		try {
    			//写一个自己的connection,然后重写close()方法,通过close()方法来添加进池子。
    			connection.close();//免不了会将池子中的connection销毁
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    	//返回pool里面连接的个数
    	public int getCount(){
    		return pool.size();
    	}	
    }

    3.2 编写连接池遇到的问题

    • 如果新建了connection,用完之后怎么判别是原池子中的connection(需要放回去),还是新建的connection(需要销毁)。

    3.3解决办法(自定义一个 Connection,重写close方法)

    • 继承 条件:可以控制父类的构造

    • 装饰者模式

      目的:改写已存在的类的某个方法或某些方法,装饰设计模式(包装模式)

      条件:1.包装类和被包装类实现同一个接口
      	2.包装类里面要拿到被包装类的引用

    步骤:

    1. 编写一个类实现一个接口,为被包装类

    2. 编写一个类,实现与被包装类相同的接口。(具备相同的行为)

    3. 定义一个被包装类类型的变量。

    4. 定义构造方法,把被包装类类的对象注入,给被包装类变量赋值。

    5. 对于不需要改写的方法,调用被包装类类原有的方法。

    6. 对于需要重写的方法,写自己的代码。

    • 动态代理(类似装饰者模式,此处不学)

    3.4datasource接口概述

    • Java为数据库连接池提供了公共的接口:javax.sql.DataSource,各个厂商(用户)需要让自己的连接池实现这个接口。这样应用程序可以方便的切换不同厂商的连接池!

    • 常见的连接池:DBCP、C3P0。

    4.常用连接池

    4.1 dbcp

    4.1.1 dbcp概念

    "DBCP" redirects here. For the database connection pool implementation, see Apache Commons.

    1,2-Dibromo-3-chloropropane, (dibromochloropropane) better known as DBCP, is the active ingredient in the nematicide Nemagon, also known as Fumazone.

    It is a soil fumigant formerly used in American agriculture. In mammals it causes male sterility at high levels of exposure. After discovery of its deleterious health effects on humans, the compound was banned from use in 1979 by the United States Environmental Protection Agency (EPA).[3] The continuing presence of the chemical as a contaminant in ground water remains a problem for many communities for years after end of use.

    • DBCP:Apache推出的 Database Connection Pool

    核心API:

    • basicDatasource

    • basicDatasourceFactory

    4.1.2使用步骤

    • 添加jar包 commons-dbcp-1.4.jar commons-pool-1.5.6.jar

    • 添加配置文件到src目录

    • 编写数据源工具类

    1. 通过配置文件来编写

    public class DBCPUtils {
    
    	static DataSource ds;
    
    	static {
    		//只需要初始化一次
    		try {
    			Properties p = new Properties();
    
    			InputStream is = 					  JdbcUtils.class.getClassLoader().getResourceAsStream("dbcpconfig.properties");
    			// InputStream is = new FileInputStream("src/dbcpconfig.properties");
    			p.load(is);
    			ds = BasicDataSourceFactory.createDataSource(p);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    	
    	
    	// 创建连接
    		public static Connection getConnection() throws SQLException {
    			
    			// DriverManager.getConnection(url, user, password);  Connection
    	        
    			Connection connection = ds.getConnection();
    			return connection;
    		}
    
    		// 释放资源
    		public static void release(ResultSet resultSet, Statement statement, Connection connection) {
    			
    			if (resultSet != null) {
    				try {
    					resultSet.close();
    				} catch (SQLException e) {
    					e.printStackTrace();
    				}
    			}
    
    			if (statement != null) {
    				try {
    					statement.close();
    				} catch (SQLException e) {
    					e.printStackTrace();
    				}
    			}
    
    			if (connection != null) {
    				try {
    					connection.close();//不要关注方法名,关注连接来源
    				} catch (SQLException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    }

     

    1. 通过硬编码来表写(不需要配置文件.了解一下)

      DataSource basicDataSource = new BasicDataSource();
      basicDataSource.setDriverClassName("com.mysql.jdbc.Driver");
      basicDataSource.setUrl("jdbc:mysql://localhost:3306/day01_1");
      basicDataSource.setUsername("root");
      basicDataSource.setPassword("123456");

    4.2 c3p0

    4.2.1 c3p0概念

     

    • C3P0开源免费的连接池!目前使用它的开源项目有:Spring、Hibernate等。使用第三方工具需要导入jar包,c3p0使用时还需要添加配置文件c3p0-config.xml

    4.2.2使用步骤

    • 添加jar包

    • 编写配置文件c3p0-config.xml,放在src中(注:文件名一定不要写错)

     

    1. 通过配置文件来编写

    public class C3P0Utils {
    	//创建一个连接池对象
    	static DataSource ds = new ComboPooledDataSource();
    	
    	//从池中获得一个连接
    	public static Connection getConnection() throws SQLException{
    		return ds.getConnection();
    	}
    	
    	//释放资源
    	public static void release(ResultSet rs,Statement stmt,Connection conn){
    		if(rs!=null){
    			try {
    				rs.close();
    			} catch (SQLException e) {
    				throw new RuntimeException(e);
    			}
    			rs = null;
    		}
    		if(stmt!=null){
    			try {
    				stmt.close();
    			} catch (SQLException e) {
    				throw new RuntimeException(e);
    			}
    			stmt = null;
    		}
    		
    		if(conn!=null){
    			try {
    				conn.close();//放心的关。是否关闭取决连接是怎么来的
    			} catch (SQLException e) {
    				throw new RuntimeException(e);
    			}
    			conn = null;
    		}
    	}
    }
    
    1. 通过硬编码来编写(不需要配置文件.了解一下)

    	DataSource cpds = new ComboPooledDataSource();
    	cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
    	cpds.setJdbcUrl("jdbc:mysql://localhost:3306/day10");
    	cpds.setUser("root");
    	cpds.setPassword("123");

    4.3 druid连接池

    Druid首先是一个数据库连接池。Druid是目前最好的数据库连接池,在功能、性能、扩展性方面,都超过其他数据库连接池,包括DBCP、C3P0、BoneCP、Proxool、JBoss DataSource。Druid已经在阿里巴巴部署了超过600个应用,经过一年多生产环境大规模部署的严苛考验。

    druid连接池的使用

    • 使用硬编码方式

    //创建一个DruidDataSource()对象
    		DruidDataSource dataSource = new DruidDataSource();
    		//设置用户名
    		dataSource.setUsername("root");
    		//设置密码
    		dataSource.setPassword("123");
    		//设置驱动类名
    		dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    		//设置初始化连接数
    		dataSource.setInitialSize(10);
    		//获得连接
    		Connection connection = dataSource.getConnection();
    • 使用配置文件

      //创建Properties对象 Properties properties = new Properties(); //加载配置文件 properties.load(new FileInputStream("src/druidconfig.properties")); //创建使用连接池工厂创建连接池 DataSource dataSource = DruidDataSourceFactory.createDataSource(properties); //获得连接 Connection connection = dataSource.getConnection();

    二,使用DBUtils 增删改查的操作(第一个数据库框架)

    1.案例分析

    • 简化JDBC代码开发,本案例我们将采用apache commons组件一个成员:DBUtils。

    • DBUtils就是JDBC的简化开发工具包。需要使用技术:连接池(获得连接)、SQL语句都没有少

    2.案例相关知识

    2.1javaBean组件

    JavaBean就是一个类,在开发中常用于封装数据。具有如下特性

    1.需要实现接口:java.io.Serializable ,通常偷懒省略了。2.提供私有字段:private 类型 字段名;3.提供getter/setter方法:4.提供无参构造

    	public class Category {
    		private String cid;
    		private String cname;
    	
    		public String getCid() {
    			return cid;
    		}
    		public void setCid(String cid) {
    			this.cid = cid;
    		}
    		public String getCname() {
    			return cname;
    		}
    		public void setCname(String cname) {
    			this.cname = cname;
    		}
    	}

    3.DBUtils完成CRUD

     

    3.1概述

    1. DBUtils是java编程中的数据库操作实用工具,小巧简单实用。 第一个操作数据库框架(jar),

    2. DBUtils封装了对JDBC的操作,简化了JDBC操作,可以少写代码。

    3. Dbutils三个核心功能介绍

    QueryRunner中提供对sql语句操作的API. update(), query()

    ResultSetHandler接口,用于定义select操作后,怎样封装结果集.

    DbUtils类,它就是一个工具类,定义了关闭资源与事务处理的方法

    3.2QueryRunner核心类

    • QueryRunner(DataSource ds) ,提供数据源(连接池),DBUtils底层自动维护连接connection

    • update(String sql, Object... params) ,执行更新数据 insert update delete 参数就是一个数组,参数个数取决于语句中?的个数

    • query(String sql, ResultSetHandler<T> rsh, Object... params) ,执行查询 select

    3.3ResultSetHandler结果集处理类

    Handler类型说明
    ArrayHandler将结果集中的第一条记录封装到一个Object[]数组中,数组中的每一个元素就是这条记录中的每一个字段的值
    ArrayListHandler将结果集中的每一条记录都封装到一个Object[]数组中,将这些数组在封装到List集合中。
    BeanHandler将结果集中第一条记录封装到一个指定的javaBean中。
    BeanListHandler将结果集中每一条记录封装到指定的javaBean中,将这些javaBean在封装到List集合中
    ColumnListHandler将结果集中指定的列的字段值,封装到一个List集合中
    KeyedHandler将结果集中每一条记录封装到Map<String,Object>,在将这个map集合做为另一个Map的value,另一个Map集合的key是指定的字段的值。
    MapHandler将结果集中第一条记录封装到了Map<String,Object>集合中,key就是字段名称,value就是字段值
    MapListHandler将结果集中每一条记录封装到了Map<String,Object>集合中,key就是字段名称,value就是字段值,在将这些Map封装到List集合中。
    ScalarHandler它是用于单个数据。例如select count(*) from 表操作。

    3.4练习

    3.4.1开发步骤:

    1. 创建项目,并导入jar包

    2. 创建连接池

    3. 编写测试类

    3.4.2增加

    //向user表添加一条数据
    	
    	@Test
    	public void  insert() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		
    		//String sql = "insert into user values(null,'aaa','111','露西')";
    		String sql = "insert into user values(?,?,?,?)";
    		
    		Object[] params ={null,"aaa","111","露西"};
    		queryRunner.update(sql, params);		
    	}

    3.4.3更新

     

    	//把用户名是aaa的user密码改成222
    	
    	@Test
    	public  void update() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		
    		String sql = "update user set password = ? where username = ?";
    		Object[] params={"222","aaa"};
    		queryRunner.update(sql, params);		
    	}

    3.4.4删除

     

    	//把用户名是aaa的user给删除
    	
    	@Test
    	public  void delete() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		
    		String sql = "delete from user where username = ?";
    		Object[] params={"aaa"};
    		queryRunner.update(sql, params);	
    	}

    3.4.5通过id查询

    	//查询id为1的用户信息
    	
    	@Test
    	public  void selectById() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		String sql = "select *from user where id = ?";
    		Object[] params = {1};
    		
    		User user = queryRunner.query(sql, new BeanHandler<>(User.class), params);
    		
    		System.out.println(user.toString());	
    	}

     

    3.4.6查询所有列

    	
    	//查询所有的用户信息
    	@Test
    	public  void selectAll() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		
    		String sql = "select *from user";
    		Object[] params = {};
    		
    		List<User> list = queryRunner.query(sql, new BeanListHandler<>(User.class), params);
    		
    		System.out.println(list.toString());		
    	}

     

    3.4.7 总记录数

     

    	//统计用户的个数
    	
    	@Test
    	public  void getCount() throws SQLException{
    		//创建queryRunner对象
    		QueryRunner queryRunner = new QueryRunner(C3P0Utils.getDataSource());
    		
    		String sql = "select count(*) from user";
    		
    		Long n = (Long) queryRunner.query(sql, new ScalarHandler());
    		
    		System.out.println(n.intValue());
    	}

     

     

     

     

     

     

     

    展开全文
  • 版次:4-1 内容简介回到顶部↑ASP.NET具有开发效率高、平台无关性、安全性强等方面的优势,广泛用于创建动态Web站点和基于Web的分布式应用程序,是广大Windows程序员开发Web应用程序的有效工具。ASP.NET 2.0在...
  • # 插入数据,会建立一个china分区 insert into test_hudi_hive select 0.1 as begin_lat, 0.2 as begin_lon, 'driver-1' as driver, 0.3 as end_lat, 0.4 as end_lon, 35 as fare, 'china' as partitionpath, '...

    Hudi源码编译

    第一步:下载Maven并安装且配置Maven镜像

    第二步:下载Hudi源码包(要求对应Hadoop版本、Spark版本、Flink版本、Hive版本)

    第三步:执行编译命令,完成之后运行hudi-cli脚本,如果可以运行,则说明编译成功

    https://github.com/apache/hudi官网查看不同版本的编译命令,要一一对应

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FBqXKEAY-1654954849724)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112136722.png)]

    image-20220611104825549

    Hudi大数据环境准备

    安装HDFS

    安装Spark

    安装Flink

    Hudi扫盲

    表数据结构

    Hudi表的数据文件,分为两类,一类是.hoodie文件,一类是实际的数据文件

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5g4FprIQ-1654955084436)(../AppData/Roaming/Typora/typora-user-images/image-20220611124122225.png)]

    .hoodie文件:由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多以后,会严重影响HDFS的性能,Hudi因此设计了一套文件合并机制,.hoodie文件夹中存放了对应的文件合并操作相关的日志文件。

    Hudi把随着时间流逝,对表的一系列CRUD操作叫做TimeLine,TimeLine中某一次的操作,叫做Instant。

    Instant Action:记录本次操作是一次数据提交commit,还是文件合并compaction,或者是文件清理cleans

    Instant Time:本次操作的时间

    Instant State:操作的状态,发起requested、进行中inflight、已完成commit

    在这里插入图片描述

    数据文件:Hudi真实的数据文件使用Parquet文件格式存储,其中包含了一个metadata元数据文件和数据文件parquet列式存储。

    Hudi为了实现数据的CURD,需要能够唯一标识一条记录,Hudi将把数据集中的唯一字段(record key)+数据所在分区(partitionPath)联合起来当作数据的唯一主键。

    在这里插入图片描述

    基于Spark-shell集成Hudi

    目的:使用spark-shell命令行,以本地模式方式运行,模拟产生trip乘车交易数据,将其保存至Hudi表中,并且从Hudi表加载数据查询分析,其中Hudi表数据最后存储在HDFS分布式文件系统中。

    启动spark-shell

    以下命令需要联网,--packages会基于ivy下载相关jar包到本地,然后加载到CLASSPATH中

    spark-shell \
      --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.0 \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    

    通过--jars指定本地依赖(需要把jar包放入Spark的lib目录下,或者指定jar包的全局路径

    bin/spark-shell \
      -- jars hudi-spark3.2-bundle_2.12-0.11.0.jar \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9LZst6KA-1654954849734)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138684.png)]

    导入相关依赖,构建DataGenerator模拟生成乘车数据

    # 导入Spark和Hudi的相关依赖包
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    
    # 定义表的名称和数据的存储路径
    val tableName = "hudi_trips_cow"
    val basePath = "hdfs://bigdata:8020/datas/hudi-warehouse/hudi_trips_cow"
    val dataGen = new DataGenerator
    
    # 模拟生成10条数据
    val inserts = convertToStringList(dataGen.generateInserts(10))
    
    # 将模拟数据转化为DataFrame数据集
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l2rEN5kr-1654954849735)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112137284.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wZ15mtpx-1654954849736)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112137842.png)]

    插入数据

    将模拟产生的数据保存到Hudi表中,直接通过format指定数据源source,设置相关属性保存即可

    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)
    

    getQuickstartWriteConfigs:设置写入/更新数据至Hudi时,Shuffle时分区数目,可以在源码中看到

    image-20220611122056858

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RRC4As8K-1654954849738)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138547.png)]

    PRECOMBINE_FIELD_OPT_KEY:数据合并时,依据主键字段·

    RECORDKEY_FIELD_OPT_KEY:每条记录的唯一id,支持多个字段

    PARTITIONPATH_FIELD_OPT_KEY:用于存放数据的分区字段

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OfWtV7kS-1654954849739)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138172.png)]

    数据保存之后,可以在HDFS上看到,路径为设置的save路径,并且可以看到,数据是以PARQUET列式方式进行存储的

    查询数据

    从Hudi表中读取数据,通过指定format数据源和相关参数Options即可

    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath)
    
    # 打印获取Hudi表数据的Schema信息
    tripsSnapshotDF.printSchema()
    

    比原先保存到Hudi表中的数据多了5个字段,这些字段属于Hudi管理数据时使用的相关字段

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u10aEFxN-1654954849739)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138001.png)]

    将获取Hudi表数据注册为临时视图,采用SQL方式依据业务查询分析数据

    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
    
    # 查询乘车费用大于20的信息数据
    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    
    # 选取字段查询数据
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WevyWBSu-1654954849740)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138553.png)]

    基于Spark-Hive集成Hudi手动创建HIVE表

    环境准备

    在Hive中创建表关联至Hudi表,将集成JAR包hudi-hadoop-mr-bundle-0.11.0.jar,放入HIVE的 lib目录下

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gVaX0i2p-1654954849740)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139732.png)]

    创建数据库和表

    连接HIVE

    # 启动metastore
    hive --service metastore
    # 启动hiveserver2
    hiveserver2
    # 启动beeline
    bin/beeline -u jdbc:hive2://bigdata:10000 -n root -p root
    

    通过Spark将数据写入Hudi表之后,要想通过Hive访问到这块数据,就需要创建一个Hive外部表,因为Hudi配置了分区,所以为了能读到所有的数据,此时外部表也得分区,分区字段名可随意配置。

    # 创建数据库
    create database db_hudi;
    
    # 使用数据库
    use db_hudi;
    
    # 创建hive外部表
    CREATE EXTERNAL TABLE `tbl_hudi_trips`(                                  
       `begin_lat` double,     
       `begin_lon` double, 
       `driver` string, 
       `end_lat` double, 
       `end_lon` double, 
       `fare` double, 
       `partitionpath` string,   
       `rider` string, 
       `uuid` string,  
       `ts` bigint
    )
    PARTITIONED BY (country string,province string,city string) 
    ROW FORMAT SERDE                                   
       'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
     STORED AS INPUTFORMAT                              
       'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
     OUTPUTFORMAT                                       
       'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
     LOCATION                                           
       '/datas/hudi-warehouse/hudi_trips_cow';
    

    由于Hudi表属于分区表,所以Hive创建的表也是分区表,需要手动添加分区数据

    # 分配分区
    alter table tbl_hudi_trips add if not exists partition(`country`='americas',`province`='brazil',`city`='sao_paulo') location '/datas/hudi-warehouse/hudi_trips_cow/americas/brazil/sao_paulo';
    
    alter table tbl_hudi_trips add if not exists partition(`country`='americas',`province`='united_states',`city`='san_francisco') location '/datas/hudi-warehouse/hudi_trips_cow/americas/united_states/san_francisco';
    
    alter table tbl_hudi_trips add if not exists partition(`country`='asia',`province`='india',`city`='chennai') location '/datas/hudi-warehouse/hudi_trips_cow/asia/india/chennai';
    
    # 查看分区
    SHOW PARTITIONS tbl_hudi_trips;
     
    # 查询表
    select * from tbl_hudi_trips;
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AnWW3voX-1654954849741)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139386.png)]

    基于SparkSQL集成Hudi自动创建HIVE表

    启动Spark-sql,给个版本启动方式根据官网说明

    spark-sql \
    --master local[2] \
    -- jars /opt/module/spark-3.2.1-bin-hadoop3.2/jars/hudi-spark3.2-bundle_2.12-0.11.0.jar \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    

    设置参数,Hudi的默认并发度是1500

    set hoodie.upsert.shuffle.parallelism=1;
    set hoodie.insert.shuffle.parallelism=1;
    set hoodie.delete.shuffle.parallelism=1;
    

    设置不同步Hudi表元数据(默认为true,会自动根据SparkSQL设置的进行分区,置为false,需要手动建立分区)

    set hoodie.datasource.meta.sync.enable=false;
    

    创建表,表的类型为COW,主键为uuid,分区字段为partitionpath,合并字段默认为ts

    # 创建表时,指定location存储路径,表就是外部表
    CREATE TABLE test_hudi_hive(
       `begin_lat` double,     
       `begin_lon` double, 
       `driver` string, 
       `end_lat` double, 
       `end_lon` double, 
       `fare` double, 
       `partitionpath` string,   
       `rider` string, 
       `uuid` string,  
       `ts` bigint
    ) using hudi
    PARTITIONED BY (partitionpath)
    options (
     primarykey='uuid',
     type='cow'
    )
    location 'hdfs://bigdata:8020/datas/hudi-warehouse/hudi_spark_sql/';
    
    # 查看创建后的表
    show create table test_hudi_hive;
    
    # 插入数据,会建立一个china分区
    insert into test_hudi_hive select 0.1 as begin_lat, 0.2 as begin_lon, 'driver-1' as driver, 0.3 as end_lat, 0.4 as end_lon, 35 as fare, 'china' as partitionpath, 'rider-1' as rider, 'uuid-1' as uuid, 1654447758530 as ts;
    
    # 插入数据,会建立一个america分区
    insert into test_hudi_hive select 0.1 as begin_lat, 0.2 as begin_lon, 'driver-2' as driver, 0.3 as end_lat, 0.4 as end_lon, 35 as fare, 'america' as partitionpath, 'rider-2' as rider, 'uuid-2' as uuid, 1654447758531 as ts;
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9xDRA3Na-1654954849742)(../AppData/Roaming/Typora/typora-user-images/image-20220611155639086.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GNr0IrLq-1654954849743)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139952.png)]

    基于FlinkSQL集成Hudi

    Flink集成Hudi时,需要把hudi-flink1.13-bundle_2.11-0.11.0.jar,放到FLINK_HOME的lib目录下

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lEqqaxRG-1654954849744)(../AppData/Roaming/Typora/typora-user-images/image-20220611163947941.png)]

    根据官网的说明,修改conf下的配置为文件flink-conf.yaml。给TaskManager分配Slots>=4

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vNQ3Fzl1-1654954849745)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139076.png)]

    配置之后启动FLINK集群bin/start-cluster.sh,如果在后续操作中报错类似找不到hadoop的类,则需要暴露一下hadoop的环境变量

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    

    启动FLINK SQL CLI

    bin/sql-client.sh --embedded
    

    创建表table

    将数据存储到Hudi表中,底层HDFS存储,表的类型为MOR

    # 更方便的展示数据
    set execution.result-mode=tableau;
    
    CREATE TABLE t1(
    	uuid VARCHAR(20), 
    	name VARCHAR(10),
    	age INT,
    	ts TIMESTAMP(3),
    	`partition` VARCHAR(20)
    )
    PARTITIONED BY (`partition`)
    WITH (
    	'connector' = 'hudi',
    	'path' = 'hdfs://bigdata:8020/hudi-warehouse/hudi-t1',
    	'write.tasks' = '1',
    	'compaction.tasks' = '1', 
    	'table.type' = 'MERGE_ON_READ'
    );
    

    插入数据

    INSERT INTO t1 VALUES
    ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
    ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
    ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
    ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
    ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
    ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
    ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
    

    在Flink集群上验证任务的执行状态,和Hadoop落盘的数据,已经验证SQL查询

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yDGIl6Sw-1654954849746)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139213.png)]

    image-20220611164711215

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f1Gpe36i-1654954849747)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139257.png)]

    基于FlinkSQL-HIVE集成Hudi手动创建HIVE表

    # 一定要设置checkpointing的时间,要么命令行,要么配置文件中
    set execution.checkpointing.interval=3sec;
    

    创建输出表,MERGE_ON_READ类型的会生成log文件,不会生成parquet文件。COPY ON WRITE类型的会生成parquet文件,不会生成log文件。

    CREATE TABLE flink_hudi_sink (
      uuid  STRING PRIMARY KEY NOT ENFORCED,
      name  STRING,
      age  INT,
      ts  STRING,
      `partition` STRING
    )
    PARTITIONED BY (`partition`) 
    WITH (
      'connector' = 'hudi',			
      'path' = 'hdfs://bigdata:8020/hudi-warehouse/flink_hudi_sink',
      'table.type' = 'COPY_ON_WRITE',
      'write.operation' = 'upsert',
      'hoodie.datasource.write.recordkey.field'= 'uuid',
      'write.precombine.field' = 'ts',
      'write.tasks'= '1',
      'compaction.tasks' = '1', 
      'compaction.async.enabled' = 'true', 
      'compaction.trigger.strategy' = 'num_commits', 
      'compaction.delta_commits' = '1'
    );
    
    
    INSERT INTO flink_hudi_sink VALUES
    ('id2','Stephen',33,'1970-01-01 00:00:02','par1'),
    ('id3','Julian',53,'1970-01-01 00:00:03','par2'),
    ('id4','Fabian',31,'1970-01-01 00:00:04','par2'),
    ('id5','Sophia',18,'1970-01-01 00:00:05','par3'),
    ('id6','Emma',20,'1970-01-01 00:00:06','par3'),
    ('id7','Bob',44,'1970-01-01 00:00:07','par4'),
    ('id8','Han',56,'1970-01-01 00:00:08','par4');
    

    由于Hive的读取,需要有生成的parquet文件,因此 'table.type' = 'COPY ON WRITE'是必须的。如果看到Hadoop中生成了parquet文件说明成功,则只需要手动创建Hive外部表和指定分区关联Hudi表即可。

    # 创建hive外部表
    CREATE EXTERNAL TABLE `flink_hudi_sink`(                                  
       `uuid` string,     
       `name` string, 
       `age` int, 
       `ts` string, 
       `partition` string
    )
    PARTITIONED BY (part string) 
    ROW FORMAT SERDE                                   
       'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
     STORED AS INPUTFORMAT                              
       'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
     OUTPUTFORMAT                                       
       'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
     LOCATION                                           
       '/hudi-warehouse/flink_hudi_sink';
       
    # 分配分区
    alter table tbl_hudi_trips add if not exists partition(`part`='pat1') location '/hudi-warehouse/flink_hudi_sink/par1';
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v2aaYh6j-1654954849748)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139206.png)]

    基于FlinkSQL集成Hudi-自动创建Hive表

    Hudi集成Flink和Hive编译依赖版本配置,查看源码包hudi-0.11.0/packaging/hudi-flink-bundle下的pom.xml文件,Hive2的打包和Hive3的打包不一样,根据自身的HIVE类型选择,打包命令如下,具体的版本根据官网模仿

    mvn clean install -DskipTests -Drat.skip=true -Dflink1.13 -Dscala-2.11 -Pflink-bundle-shade-hive3
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QQIFoPBp-1654954849748)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139046.png)]

    编译之后将hudi-flink1.13-bundle_2.11-0.11.0.jar拷贝到FLINK的lib目录下和hudi-hadoop-mr-bundle-0.11.0.jar拷贝到HIVE的lib目录下(具体根据版本而定)

    以COPY ON WRITE方式创建表自动同步HIVE分区表

    set execution.checkpointing.interval=3sec;
    
    set sql-client.execution.result-mode=tableau;
    
    CREATE TABLE czs_hudi_hive (
      uuid  STRING PRIMARY KEY NOT ENFORCED,
      name  STRING,
      age  INT,
      ts  STRING,
      `partition` STRING
    )
    PARTITIONED BY (`partition`) 
    WITH (
        'connector'='hudi',
        'path'= 'hdfs://bigdata:8020/czs_hudi_hive', 
        'table.type'= 'COPY_ON_WRITE',
        'hoodie.datasource.write.recordkey.field'= 'uuid', 
        'write.precombine.field'= 'ts',
        'write.tasks'= '1',
        'write.rate.limit'= '2000', 
        'compaction.tasks'= '1', 
        'compaction.async.enabled'= 'true',
        'compaction.trigger.strategy'= 'num_commits',
        'compaction.delta_commits'= '1',
        'changelog.enabled'= 'true',
        'read.streaming.enabled'= 'true',
        'read.streaming.check-interval'= '3',
        'hive_sync.enable'= 'true',
        'hive_sync.mode'= 'hms',
        'hive_sync.metastore.uris'= 'thrift://bigdata:9083',
        'hive_sync.jdbc_url'= 'jdbc:hive2://bigdata:10000',
        'hive_sync.table'= 'czs_hive',
        'hive_sync.db'= 'default',
        'hive_sync.username'= 'root',
        'hive_sync.password'= 'root',
        'hive_sync.support_timestamp'= 'true'
    );
    
    
    INSERT INTO czs_hudi_hive VALUES
    ('id2','Stephen',33,'1970-01-01 00:00:02','par1'),
    ('id3','Julian',53,'1970-01-01 00:00:03','par2'),
    ('id4','Fabian',31,'1970-01-01 00:00:04','par2'),
    ('id5','Sophia',18,'1970-01-01 00:00:05','par3'),
    ('id6','Emma',20,'1970-01-01 00:00:06','par3'),
    ('id7','Bob',44,'1970-01-01 00:00:07','par4'),
    ('id8','Han',56,'1970-01-01 00:00:08','par4');
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LiGot2KY-1654954849749)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140568.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bbd1PvVA-1654954849755)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140315.png)]

    基于FlinkCDC采集MySQL写入Hudi

    添加FlinkCDC依赖

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ysFnlECs-1654954849756)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140492.png)]

    添加Hadoop依赖

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    

    创建MERGE_ON_READ类型的Hudi表

    set execution.checkpointing.interval=3sec;
    
    set sql-client.execution.result-mode=tableau;
    
    CREATE TABLE users_source_mysql (
      id BIGINT PRIMARY KEY NOT ENFORCED,
      name STRING,
      birthday TIMESTAMP(3),
      ts TIMESTAMP(3)
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'bigdata',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'server-time-zone' = 'Asia/Shanghai',
    'debezium.snapshot.mode' = 'initial',
    'database-name' = 'hudi',
    'table-name' = 'tbl_users'
    );
    
    create view view_users_cdc 
    AS 
    SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM users_source_mysql;
    
    CREATE TABLE users_sink_hudi_hive(
    id bigint ,
    name string,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3),
    part VARCHAR(20),
    primary key(id) not enforced
    )
    PARTITIONED BY (part)
    with(
    'connector'='hudi',
    'path'= 'hdfs://bigdata:8020/czs_hudi_hive_test', 
    'table.type'= 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field'= 'id', 
    'write.precombine.field'= 'ts',
    'write.tasks'= '1',
    'write.rate.limit'= '2000', 
    'compaction.tasks'= '1', 
    'compaction.async.enabled'= 'true',
    'compaction.trigger.strategy'= 'num_commits',
    'compaction.delta_commits'= '1',
    'changelog.enabled'= 'true',
    'read.streaming.enabled'= 'true',
    'read.streaming.check-interval'= '3',
    'hive_sync.enable'= 'true',
    'hive_sync.mode'= 'hms',
    'hive_sync.metastore.uris'= 'thrift://bigdata:9083',
    'hive_sync.jdbc_url'= 'jdbc:hive2://bigdata:10000',
    'hive_sync.table'= 'czs_hive',
    'hive_sync.db'= 'default',
    'hive_sync.username'= 'root',
    'hive_sync.password'= 'root',
    'hive_sync.support_timestamp'= 'true'
    );
    
    INSERT INTO users_sink_hudi_hive SELECT id, name, birthday, ts, part FROM view_users_cdc ;
    

    自动生成Hudi MOR模式的两张表

    xxx _ro:ro表全称为read oprimized table对于MOR表同步的xxx_ro表,只暴露压缩后的parquet。

    xxx_rt:rt表示增量视图,主要针对增量查询的rt表,

    ro表只能查询parquet文件数据,rt表parquet文件数据和log文件数据都可查

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RTWHd0oj-1654954849757)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140612.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hxysH7rq-1654954849757)(../AppData/Roaming/Typora/typora-user-images/image-20220611213057980.png)]

    在这里插入图片描述

    基于FlinkCDC采集PostgreSQL写入Hudi

    设置PG的日志级别

    修改wal_level = logical,重启数据库,使配置生效

    vim /var/lib/pgsql/14/data/postgresql.conf
    

    在这里插入图片描述

    添加FlinkCDC依赖

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZCRhYlPA-1655088688110)(../AppData/Roaming/Typora/typora-user-images/image-20220613104822842.png)]

    添加Hadoop依赖

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    

    创建MERGE_ON_READ类型的Hudi表

    set execution.checkpointing.interval=3sec;
    
    set sql-client.execution.result-mode=tableau;
    
    CREATE TABLE pg_source(
      id INT,
      username STRING,
      PRIMARY KEY (id) NOT ENFORCED 
    ) WITH (
      'connector' = 'postgres-cdc', 
      'hostname' = 'bigdata',    
      'port' = '5432',              
      'username' = 'postgres',          
      'password' = 'postgres',   
      'database-name' = 'mydb', 
      'schema-name' = 'public',     
      'decoding.plugin.name' = 'pgoutput',
      'table-name' = 'test'        
    );
    
    
    CREATE TABLE pg_hudi_hive(
    id bigint ,
    name string,
    primary key(id) not enforced
    )
    PARTITIONED BY (name)
    with(
    'connector'='hudi',
    'path'= 'hdfs://bigdata:8020/pg_hive_test', 
    'table.type'= 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field'= 'id', 
    'write.precombine.field'= 'name',
    'write.tasks'= '1',
    'write.rate.limit'= '2000', 
    'compaction.tasks'= '1', 
    'compaction.async.enabled'= 'true',
    'compaction.trigger.strategy'= 'num_commits',
    'compaction.delta_commits'= '1',
    'changelog.enabled'= 'true',
    'read.streaming.enabled'= 'true',
    'read.streaming.check-interval'= '3',
    'hive_sync.enable'= 'true',
    'hive_sync.mode'= 'hms',
    'hive_sync.metastore.uris'= 'thrift://bigdata:9083',
    'hive_sync.jdbc_url'= 'jdbc:hive2://bigdata:10000',
    'hive_sync.table'= 'hudi_hive',
    'hive_sync.db'= 'default',
    'hive_sync.username'= 'root',
    'hive_sync.password'= 'root',
    'hive_sync.support_timestamp'= 'true'
    );
    
    INSERT INTO pg_hudi_hive SELECT id,username FROM pg_source ;
    

    在这里插入图片描述

    展开全文
  • Java (Apache HttpClient, Jersey1.x, Jersey2.x, OkHttp, Retrofit1.x, Retrofit2.x, Feign, RestTemplate, RESTEasy, Vertx, Google API Client Library for Java, Rest-assured, Spring 5 Web Client, ...
  • Spark结构化API—DataFrame,SQL和Dataset

    千次阅读 2020-03-12 21:08:09
    1. 结构化API是处理各种数据类型的工具,可处理非结构化的日志文件、半结构化的CSV文件以及高度结构化的Parquet文件。结构化API指以下三种核心分布式集合类型的API:Dataset类型、DataFrame类型、SQL表和视图。 ...
  • SmartGWT学习整理 2、理解核心中的核心DataSource DataSource之所以重要,是因为它负责所有的与服务器的数据交互,几乎所有的控件都离不开它。 可以这样说,理解了DataSource就掌握了SmartGWT的70% ...
  • if(page<1) page=1; else if(page>total) page=total; //返回数据给前端页面 ModelAndView mv=new ModelAndView("users"); mv.addObject("users", users); mv.addObject("page",page); mv....
  • root-org.apache.spark.deploy.master.Master-1-hadoop.out localhost: starting org.apache.spark.deploy.worker.Worker, logging to /hadoop/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-...
  • Where操作 适用场景:实现过滤,查询等功能。 说明:与SQL命令中的Where作用相似,都是起到范围限定也就是过滤作用的,而判断条件就是它...1.简单形式: 例如:使用where筛选在伦敦的客户 var q =  ...
  • Druid的发送数据和查询数据

    千次阅读 2017-03-17 16:56:18
    目录:1、需求2、参考3、数据和配置4、展现5、注意事项————————————————————————————–1、需求参考官网,使用Linux向Druid发送数据和查询数据2、参考数据来源–Formatting the Data ...
  • C#--Link to sql语法大全

    千次阅读 2018-05-10 16:26:29
    LINQ to SQL语句(1)之Where Where操作 适用场景:实现过滤,查询等功能。 说明:与SQL命令中的Where作用相似,都是起到范围限定也就是过滤作用的,而判断条件就是它后面所接的子句。 Where操作包括3种形式,分别...
  • margin: 0 1em 1em 0;” src="//cesium.com/docs/tutorials/creating-entities/Flag_of_Wyoming.svg"/>\ \ Wyoming is a state in the mountain region of the Western \ United States.\ \ \ Wyoming is ...
  • margin: 0 1em 1em 0;"\ src="//cesium.com/docs/tutorials/creating-entities/Flag_of_Wyoming.svg"/>\ <p>\ Wyoming is a state in the mountain region of the Western \ United States.\ </p>\ <p>\ ...
  • 用 AsyncDisplayKit 開發響應式 iOS App

    千次阅读 2017-02-12 08:44:14
    localized_country_name: "United Kingdom", state: "17", join_mode: "open", visibility: "public", lat: 51.5, lon: -0.14, members: 10637, organizer: { id: 13374959, name: "Simon Maple", bio: ...
  • JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-...
  • Apache Kylin从入门到精通

    千次阅读 2022-03-07 14:55:26
    DataSource Kafka/Hive/JDBC Hive/CSV Global Dictionary Two implementation New implementation Cube Optimization Tool Cube Planner Cube Planner phase1 and Optimize cube manually Self-monitoring System ...
  • Note 787750.1 for the Release Notes and Note 788920.1 for the List of Bugs Fixed document. You can also find this document on the Oracle Technology Network (OTN) Web site: ...
  • p.setCountry("United States"); personDao.save(p); List person = personDao.listPerson(); for(Person p1 : person){ System.out.println("Person List:" + p1); } } } 堆栈跟踪: ERROR: Unable obtain JDBC ...
  • For example, in the United States, Large box shows swimming (33 medals), Small box shows golf (1 medal) 而且,每个盒子都有不同的尺寸。 矩形的大小与数据成比例。 例如,在美国,大框显示游泳(33枚奖牌)...
  • //hudi_athena_test/hudi_trips'ALTER TABLE trips ADD PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco' ...
  • "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":...
  • 1 import UIKit 2 3 // 使当前的视图控制器类,遵循表格的数据源协议UITableViewDataSource 4 class ViewController: UIViewController, UITableViewDataSource { 5 6 // 创建一个字典对象...
  • Where操作 适用场景:实现过滤,查询等功能。 ...说明:与SQL命令中的Where作用相似,都是起到范围限定也就是过滤作用的,而判断条件就是它后面所接的子句。...1.简单形式: 例如:使用where筛选在伦敦的...
  • 杂记

    2020-07-28 13:59:44
    1:datasource hdfs kafka Collection 自定义 2:sink hdfs_sink kafka_sink(String,key_value) redis_sink 自定义 flink中没有 算子的划分都是transformation算子。 3:transformations addSource ...
  • 参考链接:onjava8 函数式编程与流式编程:https://blog.csdn.net/wangzibai/article/details/108900712 Lambda表达式和流式编程:...1 Lambda 1.1 无参无返回值 () -> {System.out.pr

空空如也

空空如也

1 2 3 4 5 ... 16
收藏数 317
精华内容 126
关键字:

datasource-1 inited