精华内容
下载资源
问答
  • 添加数据(查询构造器二)添加数据(查询构造器二)添加一条数据可以使用save方法统一写入数据,自动判断是新增还是更新数据(以写入数据中是否存在主键数据为依据)。$data=['foo'=>'bar','bar'=>'foo'];Db::name('...

    添加数据(查询构造器二)

    添加数据(查询构造器二)

    添加一条数据

    可以使用save方法统一写入数据,自动判断是新增还是更新数据(以写入数据中是否存在主键数据为依据)。$data = ['foo' => 'bar', 'bar' => 'foo'];

    Db::name('user')->save($data);

    或者使用 insert 方法向数据库提交数据$data = ['foo' => 'bar', 'bar' => 'foo'];

    Db::name('user')->insert($data);

    insert 方法添加数据成功返回添加成功的条数,通常情况返回 1

    如果你的数据表里面没有foo或者bar字段,那么就会抛出异常。

    如果不希望抛出异常,可以使用下面的方法:$data = ['foo' => 'bar', 'bar' => 'foo'];

    Db::name('user')->strict(false)->insert($data);

    不存在字段的值将会直接抛弃。

    如果是mysql数据库,支持replace写入,例如:$data = ['foo' => 'bar', 'bar' => 'foo'];

    Db::name('user')->replace()->insert($data);

    添加数据后如果需要返回新增数据的自增主键,可以使用insertGetId方法新增数据并返回主键值:$userId = Db::name('user')->insertGetId($data);

    insertGetId 方法添加数据成功返回添加数据的自增主键

    添加多条数据

    添加多条数据直接向 Db 类的 insertAll 方法传入需要添加的数据(通常是二维数组)即可。$data = [

    ['foo' => 'bar', 'bar' => 'foo'],

    ['foo' => 'bar1', 'bar' => 'foo1'],

    ['foo' => 'bar2', 'bar' => 'foo2']

    ];

    Db::name('user')->insertAll($data);

    insertAll方法添加数据成功返回添加成功的条数

    如果是mysql数据库,支持replace写入,例如:$data = [

    ['foo' => 'bar', 'bar' => 'foo'],

    ['foo' => 'bar1', 'bar' => 'foo1'],

    ['foo' => 'bar2', 'bar' => 'foo2']

    ];

    Db::name('user')->replace()->insertAll($data);

    确保要批量添加的数据字段是一致的

    如果批量插入的数据比较多,可以指定分批插入,使用limit方法指定每次插入的数量限制。$data = [

    ['foo' => 'bar', 'bar' => 'foo'],

    ['foo' => 'bar1', 'bar' => 'foo1'],

    ['foo' => 'bar2', 'bar' => 'foo2']

    ...

    ];

    // 分批写入 每次最多100条数据

    Db::name('user')

    ->limit(100)

    ->insertAll($data);

    展开全文
  • 模型更新数据通常系统会自动判断需要新增还是更新数据。查找并更新在取出数据后,更改字段内容后使用save方法更新数据。这种方式是最佳的更新方式。$user=User::find(1);$user->name='thinkphp';$user->email=...

    模型更新数据

    通常系统会自动判断需要新增还是更新数据。

    查找并更新

    在取出数据后,更改字段内容后使用save方法更新数据。这种方式是最佳的更新方式。$user = User::find(1);

    $user->name     = 'thinkphp';

    $user->email    = 'thinkphp@qq.com';

    $user->save();

    save方法成功返回true,并只有当before_update事件返回false的时候返回false,有错误则会抛出异常。

    对于复杂的查询条件,也可以使用查询构造器来查询数据并更新$user = User::where('status',1)

    ->where('name','liuchen')

    ->find();

    $user->name     = 'thinkphp';

    $user->email    = 'thinkphp@qq.com';

    $user->save();

    save方法更新数据,只会更新变化的数据,对于没有变化的数据是不会进行重新更新的。如果你需要强制更新数据,可以使用下面的方法:$user = User::find(1);

    $user->name     = 'thinkphp';

    $user->email    = 'thinkphp@qq.com';

    $user->force()->save();

    这样无论你的修改后的数据是否和之前一样都会强制更新该字段的值。

    如果要执行SQL函数更新,可以使用下面的方法$user = User::find(1);

    $user->name     = 'thinkphp';

    $user->email    = 'thinkphp@qq.com';

    $user->score=  Db::raw('score+1');

    $user->save();

    字段过滤

    默认情况下会过滤非数据表字段的数据,如果你通过外部提交赋值给模型,并且希望指定某些字段写入,可以使用:$user = User::find(1);

    // post数组中只有name和email字段会写入

    $user->allowField(['name', 'email'])->save($_POST);

    最佳用法是在传入模型数据之前就进行过滤,例如:$user = User::find(1);

    // post数组中只有name和email字段会写入

    $data = Request::only(['name','email']);

    $user->save($data);

    批量更新数据

    可以使用saveAll方法批量更新数据,只需要在批量更新的数据中包含主键即可,例如:$user = new User;

    $list = [

    ['id'=>1, 'name'=>'thinkphp', 'email'=>'thinkphp@qq.com'],

    ['id'=>2, 'name'=>'onethink', 'email'=>'onethink@qq.com']

    ];

    $user->saveAll($list);

    批量更新方法返回的是一个数据集对象。

    批量更新仅能根据主键值进行更新,其它情况请自行处理。

    直接更新(静态方法)

    使用模型的静态update方法更新:User::update(['name' => 'thinkphp'], ['id' => 1]);

    模型的update方法返回模型的对象实例

    如果你的第一个参数中包含主键数据,可以无需传入第二个参数(更新条件)User::update(['name' => 'thinkphp', 'id' => 1]);

    如果你需要只允许更新指定字段,可以使用User::update(['name' => 'thinkphp', 'email' => 'thinkphp@qq.com'], ['id' => 1], ['name']);

    上面的代码只会更新name字段的数据。

    自动识别

    我们已经看到,模型的新增和更新方法都是save方法,系统有一套默认的规则来识别当前的数据需要更新还是新增。

    1.实例化模型后调用save方法表示新增

    2.查询数据后调用save方法表示更新

    不要在一个模型实例里面做多次更新,会导致部分重复数据不再更新,正确的方式应该是先查询后更新或者使用模型类的update方法更新。

    不要调用save方法进行多次数据写入。

    展开全文
  • 随着计算机的速度越来越,对于能够处理大量输入数据的程序的需求变得日益迫切。具有讽刺意味的是,由于在输入量很大时程序的效率明显降低,因此这又要求更加关注效率问题。通过在实际编程之前对算法进行分析,学生...
  • 功能简介:1、admin文件夹下更新了admin_index.asp文件,在系统后台首页的“用户等级及设置”中增加了“用户批量删除”项,通过选择查询条件,可进行自动批量删除。2、admin文件夹下新增admin_user_delete1.asp文件...
  • Apache+Hudi入门指南(含代码示例)

    千次阅读 热门讨论 2020-02-24 21:09:26
    使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增 更新范围小,是文件级别,不是表级别 文件大小与hdfs的Blocksize保持一致 数据文件使用parquet格式,充分利用列存的优势(drema...

    1. 什么是Apache Hudi

    一个spark 库
    大数据更新解决方案,大数据中没有传统意义的更新,只有append和重写(Hudi就是采用重写方式)

    使用Hudi的优点

    • 使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增
    • 更新范围小,是文件级别,不是表级别
    • 文件大小与hdfs的Blocksize保持一致
    • 数据文件使用parquet格式,充分利用列存的优势(dremal论文实现
    • 提供了可扩展的大数据更新框架
    • 并发度由spark控制

    hudi详细介绍见hudi官网 http://hudi.apache.org/cn/docs/0.5.0-quick-start-guide.html

    2. Hudi编译

    git clone https://github.com/apache/incubator-hudi.git && cd incubator-hudi
    mvn clean package -DskipTests -DskipITs
    

    注意: 本文编译hudi使用的linux环境,window环境一定要加上-DskipITs,不然会编译docker文件启动服务运行linux命令导致报错,如果是linux环境且需要用docker进行测试可以考虑去掉其参数。

    3. 前置环境安装准备

    所有版本选择均是查看当前master分支pom 中所依赖的 spark,hive ,hadoop,presto版本。(hudi-0.5.2-SNAPSHOT)

    版本 链接地址
    hadoop 2.7.3 https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3-src.tar.gz
    spark 2.4.4 https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
    hive 2.3.1 http://archive.apache.org/dist/hive/hive-2.3.1/apache-hive-2.3.1-bin.tar.gz
    presto 0.217 https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.217/presto-server-0.217.tar.gz
    presto-cli-0.217-executable.jar https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.217/presto-cli-0.217-executable.jar

    **注意:**小版本不一样不影响使用,如果运行spark任务报错不兼容排下依赖包就好。

    4. Hive和Presto集成

    4.1 hive

    hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。
    hive 外表数据结构如下:

    CREATE EXTERNAL TABLE `test_partition`(
      `_hoodie_commit_time` string, 
      `_hoodie_commit_seqno` string, 
      `_hoodie_record_key` string, 
      `_hoodie_file_name` string, 
      `id` string, 
      `oid` string, 
      `name` string, 
      `dt` string, 
      `isdeleted` string, 
      `lastupdatedttm` string, 
      `rowkey` string)
    PARTITIONED BY ( 
      `_hoodie_partition_path` string)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      'hdfs://hj:9000/tmp/hudi'
    TBLPROPERTIES (
      'transient_lastDdlTime'='1582111004')
    
    

    hive集成hudi方法:将hudi jar复制到hive lib下

    cp ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $HIVE_HOME/lib
    

    4.2 Presto

    presto 集成hudi 是基于hive catalog 同样是访问hive 外表进行查询,如果要集成需要把hudi 包copy 到presto hive-hadoop2插件下面。

    presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下

    cp  ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $PRESTO_HOME/plugin/hive-hadoop2/ 
    

    5. Hudi代码实战

    5.1 Copy_on_Write 模式操作(默认模式)

    5.1.1 insert操作(初始化插入数据)

    // 不带分区写入
      @Test
      def insert(): Unit = {
        val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val insertData = spark.read.parquet("/tmp/1563959377698.parquet")
        insertData.write.format("org.apache.hudi")
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          // table name 设置
          .option(HoodieWriteConfig.TABLE_NAME, "test")
          .mode(SaveMode.Overwrite)
          // 写入路径设置
          .save("/tmp/hudi")
      }
    
    // 带分区写入
      @Test
      def insertPartition(): Unit = {
        val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        // 读取文本文件转换为df
        val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
        insertData.write.format("org.apache.hudi")
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 设置分区列
          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
          // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
          .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
          // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
          .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
          .mode(SaveMode.Overwrite)
          .save("/tmp/hudi")
      }
    

    5.1.2 upsert操作(数据存在时修改,不存在时新增)

    // 不带分区upsert
      @Test
      def upsert(): Unit = {
    
        val spark = SparkSession.builder.appName("hudi upsert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val insertData = spark.read.parquet("/tmp/1563959377699.parquet")
    
        insertData.write.format("org.apache.hudi")
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 表名称设置
          .option(HoodieWriteConfig.TABLE_NAME, "test")
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .mode(SaveMode.Append)
          // 写入路径设置
          .save("/tmp/hudi");
      }
    
    // 带分区upsert
      @Test
      def upsertPartition(): Unit = {
    
        val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_update_data.txt")
    
        upsertData.write.format("org.apache.hudi").option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
           // 分区列设置
          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
          .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .mode(SaveMode.Append)
          .save("/tmp/hudi");
      }
    

    5.1.3 delete操作(删除数据)

      @Test
      def delete(): Unit = {
        val spark = SparkSession.builder.appName("delta insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val deleteData = spark.read.parquet("/tmp/1563959377698.parquet")
        deleteData.write.format("com.uber.hoodie")
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 表名称设置
          .option(HoodieWriteConfig.TABLE_NAME, "test")
          // 硬删除配置
          .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
      }
    

    删除操作分为软删除和硬删除配置在这里查看:http://hudi.apache.org/cn/docs/0.5.0-writing_data.html#%E5%88%A0%E9%99%A4%E6%95%B0%E6%8D%AE

    5.1.4 query操作(查询数据)

      @Test
      def query(): Unit = {
        val basePath = "/tmp/hudi"
        val spark = SparkSession.builder.appName("query insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val tripsSnapshotDF = spark.
          read.
          format("org.apache.hudi").
          load(basePath + "/*/*")
    
        tripsSnapshotDF.show()
      }
    

    5.1.5 同步至Hive

      @Test
      def hiveSync(): Unit = {
        val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")
    
        upsertData.write.format("org.apache.hudi")
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 分区列设置
          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
          // 设置要同步的hive库名
          .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
          // 设置要同步的hive表名
          .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
          // 设置数据集注册并同步到hive
          .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
          // 设置当分区变更时,当前数据的分区目录是否变更
          .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
          // 设置要同步的分区列名
          .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
          // 设置jdbc 连接同步
          .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
          // hudi表名称设置
          .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
          // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
          .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
          // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
          .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .mode(SaveMode.Append)
          .save("/tmp/hudi");
      }
    
      @Test
      def hiveSyncMergeOnReadByUtil(): Unit = {
        val args: Array[String] = Array("--jdbc-url", "jdbc:hive2://hj:10000", "--partition-value-extractor", "org.apache.hudi.hive.MultiPartKeysValueExtractor", "--user", "hive", "--pass", "hive", "--partitioned-by", "dt", "--base-path", "/tmp/hudi_merge_on_read", "--database", "hj_repl", "--table", "test_partition_merge_on_read")
        HiveSyncTool.main(args)
      }
    

    这里可以选择使用spark 或者hudi-hive包中的hiveSynTool进行同步,hiveSynTool类其实就是run_sync_tool.sh运行时调用的。hudi 和hive同步时保证hive目标表不存在,同步其实就是建立外表的过程。

    5.1.6 Hive查询读优化视图和增量视图

      @Test
      def hiveViewRead(): Unit = {
        // 目标表
        val sourceTable = "test_partition"
        // 增量视图开始时间点
        val fromCommitTime = "20200220094506"
        // 获取当前增量视图后几个提交批次
        val maxCommits = "2"
    
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        val prop = new Properties()
        prop.put("user", "hive")
        prop.put("password", "hive")
        val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
        val stmt = conn.createStatement
        // 这里设置增量视图参数
        stmt.execute("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat")
        // Allow queries without partition predicate
        stmt.execute("set hive.strict.checks.large.query=false")
        // Dont gather stats for the table created
        stmt.execute("set hive.stats.autogather=false")
        // Set the hoodie modie
        stmt.execute("set hoodie." + sourceTable + ".consume.mode=INCREMENTAL")
        // Set the from commit time
        stmt.execute("set hoodie." + sourceTable + ".consume.start.timestamp=" + fromCommitTime)
        // Set number of commits to pull
        stmt.execute("set hoodie." + sourceTable + ".consume.max.commits=" + maxCommits)
    
        val rs = stmt.executeQuery("select * from " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
    
      }
    

    读优化视图即去掉增量视图参数即可。

    5.1.7 Presto查询读优化视图(暂不支持增量视图)

      @Test
      def prestoViewRead(): Unit = {
        // 目标表
        val sourceTable = "test_partition"
        Class.forName("com.facebook.presto.jdbc.PrestoDriver")
        val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
        val stmt = conn.createStatement
        val rs = stmt.executeQuery("select * from  " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
      }
    

    5.2 Merge_On_Read 模式操作

    5.2.1 insert操作(插入数据)

      @Test
      def insertPartitionMergeOnRead(): Unit = {
        val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        // 读取文本文件转换为df
        val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
        insertData.write.format("org.apache.hudi")
          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 设置分区列
          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
          // 设置当分区变更时,当前数据的分区目录是否变更
          .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
          // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
          .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
          .mode(SaveMode.Overwrite)
          .save("/tmp/hudi_merge_on_read")
      }
    

    merge on read 主要是要是加入option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)参数,其他修改删除操作和copy on write 类似,这里不一一列举。

    5.2.2 同步至Hive

      @Test
      def hiveSyncMergeOnRead(): Unit = {
        val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
        val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")
    
        upsertData.write.format("org.apache.hudi")
          // 配置读时合并
          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
          // 设置主键列名
          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
          // 设置数据更新时间的列名
          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
          // 分区列设置
          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
          // 设置要同步的hive库名
          .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
          // 设置要同步的hive表名
          .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition_merge_on_read")
          // 设置数据集注册并同步到hive
          .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
          // 设置当分区变更时,当前数据的分区目录是否变更
          .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
          // 设置要同步的分区列名
          .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
          // 设置jdbc 连接同步
          .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
          // hudi表名称设置
          .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
          // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
          .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
          // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
          .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
          // 并行度参数设置
          .option("hoodie.insert.shuffle.parallelism", "2")
          .option("hoodie.upsert.shuffle.parallelism", "2")
          .mode(SaveMode.Append)
          .save("/tmp/hudi_merge_on_read");
      }
    

    与copy on write 操作一样,不同的是merge on read 会生成两个表后缀为_ro和_rt的外表。_ro为读优化视图,_rt为实时视图。

    5.2.3 Hive查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

    /**
       * merge on read 实时视图查询
       */
      @Test
      def mergeOnReadRealtimeViewByHive(): Unit = {
        // 目标表
        val sourceTable = "test_partition_merge_on_read_rt"
    
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        val prop = new Properties()
        prop.put("user", "hive")
        prop.put("password", "hive")
        val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
        val stmt = conn.createStatement
    
        val rs = stmt.executeQuery("select * from " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
      }
    
    
      /**
       * merge on read 读优化视图查询
       */
      @Test
      def mergeOnReadReadoptimizedViewByHive(): Unit = {
        // 目标表
        val sourceTable = "test_partition_merge_on_read_ro"
    
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        val prop = new Properties()
        prop.put("user", "hive")
        prop.put("password", "hive")
        val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
        val stmt = conn.createStatement
    
        val rs = stmt.executeQuery("select * from " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
      }
    

    5.2.4 Presto查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

    /**
       * presto merge on read 实时视图查询
       */
      @Test
      def mergeOnReadRealtimeViewByPresto(): Unit = {
        // 目标表
        val sourceTable = "test_partition_merge_on_read_rt"
        Class.forName("com.facebook.presto.jdbc.PrestoDriver")
        val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
        val stmt = conn.createStatement
        val rs = stmt.executeQuery("select * from  " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
      }
    
    
      /**
       * presto merge on read 读优化视图查询
       */
      @Test
      def mergeOnReadReadoptimizedViewByPresto(): Unit = {
        // 目标表
        val sourceTable = "test_partition_merge_on_read_ro"
        Class.forName("com.facebook.presto.jdbc.PrestoDriver")
        val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
        val stmt = conn.createStatement
        val rs = stmt.executeQuery("select * from  " + sourceTable)
        val metaData = rs.getMetaData
        val count = metaData.getColumnCount
    
        while (rs.next()) {
          for (i <- 1 to count) {
            println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
          }
          println("-----------------------------------------------------------")
        }
    
        rs.close()
        stmt.close()
        conn.close()
      }
    

    6. 问题整理

    1. merg on read 问题

    merge on read 要配置option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)才会生效,配置为option(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name())将不会生效。

    2. spark pom 依赖问题

    不要引入spark-hive 的依赖里面包含了hive 1.2.1的相关jar包,而hudi 要求的版本是2.x版本。如果一定要使用请排除相关依赖。

    3. hive视图同步问题

    代码与hive视图同步时resources要加入hive-site.xml 配置文件,不然同步hive metastore 会报错。

    git 测试代码地址:https://github.com/hj2016/hudi-test

    展开全文
  • 为了效率用到了on duplicate key update进行自动判断是更新还是新增,一段时间后发现该表的主键id(已设置为连续自增),不是连续的自增,总是跳跃的增加,这样就造成id自增过,已经超过最大值了,通过查找资料发现...

    项目中需要记录用户对资源的访问次数,实时写入数据库中。性能测试时数据量比较大,每次有10W次的访问次数,有update,也有insert。

    为了效率用到了on duplicate key update进行自动判断是更新还是新增(MySQL判断记录是否存在的依据是主键或者唯一索引,insert在主键或者唯一索引已经存在的情况下会插入失败,而InsertOrUpdate在主键或者唯一索引已经存在的情况下就变成了根据主键或唯一索引update的操作)。一段时间后发现该表的主键id(已设置为连续自增),不是连续的自增,总是跳跃的增加,这样就造成id自增过快,已经快超过最大值了,通过查找资料发现on duplicate key update有一个特性就是每次是更新的情况下id也是会自增加1的,比如说现在id最大值的5,然后进行了一次更新操作再进行一次插入操作时,id的值就变成了7而不是6。

    	 <insert id="insertorUpdateCount" parameterType="map">
    	 	insert into call_count (chapterId, contentId, platform, count, updateTime)
    		values(#{chapterid,jdbcType=VARCHAR}, #{contentid,jdbcType=VARCHAR}, #{platform,jdbcType=VARCHAR}, 
    		     #{count,jdbcType=INTEGER}, #{updatetime,jdbcType=TIMESTAMP}) 
    		
    		ON DUPLICATE KEY UPDATE 
    		chapterId=VALUES(chapterId),contentId=VALUES(contentId),platform=VALUES(platform),count=count+1
    	 </insert>
    

    解决这个问题,有两种方式:

    第一种:拆分成两个动作,先更新,更新无效再插入(使用)
    1、根据唯一索引来更新表

    	<update id="updateCount" parameterType="map">
    		update call_count
    		   set  count=count+1
    		where chapterId = #{chapterid,jdbcType=VARCHAR} 
    		and contentId = #{contentid,jdbcType=VARCHAR} 
    		and platform = #{platform,jdbcType=VARCHAR}    
    	</update>
    

    2、根据上一步的返回值,如果返回值大于0,说明更新成功不再需要插入数据,如果返回值不大于0则需要进行插入该条数据

    	<insert id="insertCount" parameterType="map">
    	    <selectKey keyProperty="id" order="AFTER" resultType="java.lang.Integer">
          		SELECT LAST_INSERT_ID()
        	</selectKey>
    		insert into call_count (chapterId, contentId, platform, count, updateTime)
    		values(#{chapterid,jdbcType=VARCHAR}, #{contentid,jdbcType=VARCHAR}, #{platform,jdbcType=VARCHAR}, 
    		     #{count,jdbcType=INTEGER}, #{updatetime,jdbcType=TIMESTAMP}) 
    	</insert>
    

    第二种:修改innodb_autoinc_lock_mode参数(未使用)
      innodb_autoinc_lock_mode中有3种模式,0,1,2,数据库默认是1的情况下,就会发生上面的那种现象,每次使用insert into … on duplicate key update 的时候都会把简单自增id增加,不管是发生了insert还是update
    innodb_autoinc_lock_mode参数详解
    tradition(innodb_autoinc_lock_mode=0) 模式:
    1、它提供了一个向后兼容的能力
    2、在这一模式下,所有的insert语句(“insert like”) 都要在语句开始的时候得到一个表级的auto_inc锁,在语句结束的时候才释放这把锁,注意呀,这里说的是语句级而不是事务级的,一个事务可能包涵有一个或多个语句。
    3、它能保证值分配的可预见性,与连续性,可重复性,这个也就保证了insert语句在复制到slave的时候还能生成和master那边一样的值(它保证了基于语句复制的安全)。
    4、由于在这种模式下auto_inc锁一直要保持到语句的结束,所以这个就影响到了并发的插入

    consecutive(innodb_autoinc_lock_mode=1) 模式:
    1、这一模式下去simple insert 做了优化,由于simple insert一次性插入值的个数可以立马得到确定,所以mysql可以一次生成几个连续的值,用于这个insert语句;总的来说这个对复制也是安全的(它保证了基于语句复制的安全)
    2、这一模式也是mysql的默认模式,这个模式的好处是auto_inc锁不要一直保持到语句的结束,只要语句得到了相应的值后就可以提前释放锁

    interleaved(innodb_autoinc_lock_mode=2) 模式:
    1、由于这个模式下已经没有了auto_inc锁,所以这个模式下的性能是最好的;但是它也有一个问题,就是对于同一个语句来说它所得到的auto_incremant值可能不是连续的。

    由于用户访问量较大,0模式虽然只有实际的发生insert的时候才增加,但是每次都会在语句执行期间锁表,并发性不太好,所以最终选择使用第一种解决方案,把更新插入语句分开写。

    推荐一篇mysql语句分析很到位的文章:避坑必看:很详尽的MyBatis返回自增主键实验(包括插入或更新SQL语句insert on duplicate key update的自增主键返回情况)

    展开全文
  • A 新增 WXML 面板支持自定义组件数据查看与实时修改 A 新增 WXML 面板支持使用键盘 (上下左右) navigate the DOM tree A 新增 WXML 面板支持右键操作 Hide element/Delete element/Scroll Into View/Collapse ...
  • 随着医疗设备的逐步更新,医疗数据存储成为了医院IT系统新的挑战。 据了解,医院的PACS数据量每年以15%的速率增长。三甲综合性医院或胸科、肺科、骨科等三甲专科医院,每年新增拍片量达到了60TB。与此同时,根据...
  • 数据更新方便 关系型数据库增删改查都能写,hive只能写查询和新增,因为hive数据存放到hdfs上面,hdfs是不支持随机写的 索引方面 关系型数据库是有索引的,hive也有索引,但是实现方式是不一样的. hive索引作用时是在进行...
  • 晓林事务提醒秘书是一款有快速生日提醒功能,无论农历还是阳历...*.[新增]新增大量节日数据。 *.[优化]优化数据载入失败后的处理,避免数据被意外覆盖。 *.[修复]修复极个别情况下,执行内置命令会弹出错误提示的BUG。
  • 2、新增快速新增数据源功能,快速为当前选中的数据库添加数据源。 3、新增打开目录功能,左键打开菜单选择要打开的目录,点击右键可以快速打开当前应用所在目录。 4、新增对使用驼峰命名的表名和字段名的支持,...
  • 更新日志:多备份2.2版本完全突破1.0版本印象,无论是在界面还是功能上都有大幅提升。1、客户端采用web界面,操作流程简单化。2、新增本地备份功能,可在本地直接备份文件和mysql、mssql、oracle等数据库3、新增混合...
  • 密码管理工具(软件)

    2021-02-05 18:06:52
    密码管理工具项目背景效果预览刚更新1.1.0版本,前后端数据交互对敏感信息进行加密处理 项目背景 由于目前信息化速度比较,工作、生活中很多网站都需要注册才能使用,导致账号密码越来越多,管理很麻烦,经常忘记...
  • 为获得快速、准确而精简的模糊神经网络,...在参数的学习阶段,所有隐含层神经元(无论是新增还是已有)的参数使用扩展的卡尔曼算法更新.通过仿真实验,该算法在达到与其它算法性能相当甚至更好的情况下,能获得更精简的结构.
  • MP3、MP4、FLV、WMA、RM等文件的播放、暂停、停止、进、退、上一首、下一首 声音的控制(大小、声道) 视频大小的控制(4种模式) 播放模式(随机、循环、单曲) 大背景、场景图片及动画、皮肤 设置 歌词 ...
  • 所以你修改后再看还是原来的设置 在播放器上右击 更新配置 选项 可立刻更新配置(设置)的缓存 更新列表(当前) 选项 可立刻更新当前歌曲列表的缓存 出于安全考虑也可以删除admin文件夹 直接手工编辑XML文件 演示URL...
  • VUE3对比VUE2的优势及新特性原理

    千次阅读 2020-10-27 10:00:09
    vue3新增了静态标记(patchflag)与上次虚拟节点对比时,只对比带有patch flag的节点(动态数据所在的节点);可通过flag信息得知当前节点要对比的具体内容 原因2:静态提升 vue2无论元素是否参与更新,每次都会重新...

空空如也

空空如也

1 2 3 4 5 ... 15
收藏数 297
精华内容 118
关键字:

更新数据快还是新增快