精华内容
下载资源
问答
  • hive增量更新

    千次阅读 2018-06-20 20:03:01
    但是hive频繁更新与hive的设计原则相反,并且hive增量更新很缓慢。为实现增量更新,我们可以采用union all进行关联或在一个分区表中求最新的日期的数据。select b.id,b.content,b.update_date from (select a.*,row_...

    很多数据需要进行更新,如用户信息修改。hive0.11之后开始支持update和delete。但是hive频繁更新与hive的设计原则相反,并且hive增量更新很缓慢。为实现增量更新,我们可以采用union all进行关联或在一个分区表中求最新的日期的数据。

    select b.id,b.content,b.update_date from (

    select a.*,row_number() over (distribute by id sort by update_date) as rn
    from (select id,content,update_date from t1 union all select id,content,update_date from t2) a
    ) b

    where b.rn=1;

    或者

    select b.id,b.content,b.update_date from 

    (select a.* ,row_number() over(distribute by id sort by update_date) as rn from a) b

    where b.rn=1;


    展开全文
  • Hive增量更新方案

    千次阅读 2019-01-25 10:40:35
    Hive增量更新方案 方案一(总结出来业界可行方案): 1、Hive原始表提前规划好以时间分区,初始化装载源库记录为base_table(最新数据) 2、每个相关表都会有一个timestamp列,对每一行操作做了修改,都会重置这列...

    Hive增量更新方案
    方案一(总结出来业界可行方案):
    1、Hive原始表提前规划好以时间分区,初始化装载源库记录为base_table(最新数据)
    2、每个相关表都会有一个timestamp列,对每一行操作做了修改,都会重置这列timestamp为当前时间戳;
    3、新增数据通过sqoop(支持当天抽取)或者导出当天文件两种形式,抽取或导入到hive表,记录为新增表incremental_table
    4、(1)如果incremental_table表中数据不涉及到更新,直接导入到以时间分区的base_table表中   (2)如果某几个分区涉及到更新操作,将这段时间分区内的base_table和incremental_table数据进行合并,有相同主键的取timestamp最新的那条,合并后新数据重新写回base_table分区中;
    (3)如果涉及到删除数据操作,分软删除(打标签)和硬删除;如果是软删除,记录数据也会存在incremental_table,只是在合并base_table和incremental_table表中过滤掉此记录写回base_table即可。如果是硬删除,需将删除的数据行放入额外审计表中audit_table,与base_table和incremental_table一起进行合并过滤处理后,再写回base_table。

    采用Oozie、NiFi或者Shell脚本等方式,将上述流程统一做成一个工作流,方便调度。 

    方案二(如有业务诉求,需要对Hive表具体某条记录进行delete或update操作):
    如果一个HIVE表要实现update和delete功能,该表就必须支持ACID,需开启ACID,同时必须满足以下条件:
    1、表的存储格式必须是ORC(STORED AS ORC);
    2、表必须进行分桶(CLUSTERED BY (col_name, col_name, ...)  INTO num_buckets BUCKETS);
    3、Table property中参数transactional必须设定为True(tblproperties('transactional'='true'));
    4、以下配置项必须被设定:
         Client端:
    hive.support.concurrency – true
    hive.enforce.bucketing – true
    hive.exec.dynamic.partition.mode – nonstrict  
    hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager  
         服务端:
    hive.compactor.initiator.on – true
    hive.compactor.worker.threads – 1
    hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager(经过测试,服务端也需要设定该配置项)

    展开全文
  • 4 步搞定 Hive 增量更新

    万次阅读 2018-05-22 23:34:13
    Hive更新很有趣。 Hive 的表有两种,一种是 managed table, 一种是 external table. managed table 是 Hive 自动帮我们维护的表,自动分割底层存储文件,自动分区,这些自动化的操作,都是 Hive 封装了与 ...

    Hive 的更新很有趣。

    Hive 的表有两种,一种是 managed table, 一种是 external table.

    managed table 是 Hive 自动帮我们维护的表,自动分割底层存储文件,自动分区,这些自动化的操作,都是 Hive 封装了与 Hadoop 交互的接口。

    external table 只是一种在 Hive 维护的与外部文件的映射。

    managed table 与 external table 最大的区别在于删除的时候,external table 默认情况下只是删除表定义,而数据依旧在hadoop 上存储着;managed table 则是表定义连着表数据一起被删除了。

    早期的时候, Hive 支持的表操作只有两种:OverWrite 和 Appand

    Overwrite 并不是对某一行的数据做更新,而是对整张表做覆盖,所以感觉上 Hive 更像是在做 ETL 里面的 Staging, 而不像是最终存储计算结果的地方。Hive 超强的计算能力可以做为大数据量转换的工具,最终结果将被送到关系型数据库或者其他 Hive 实例上存储。

    hortonworks 有一篇提出相关解决方案的文章,介绍了 4步走解决增量更新 Hive 表:

    url如下:

    https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive

    1. Ingest

    2. Reconcile

    3. Compact

    4. Purge

    过程中,用到了四个 Hive 表,分别是:

    base_table: 初始化装载源库来的表数据,表示最新数据

    incremental_table:用来装载上一次增量更新以来,发生过更改的数据,包括新增,更新,和删除

    reconcile_view:以 base_table, incremental_table 计算出来的最新数据,涉及到的操作,有删除,更新,和新增。每一次都要重复计算是不是有些多余,浪费很多对没有变更的数据的重复计算。如果有对数据有分区,只要对有数据更新的分区做增量更新,会有很大效率的提高。

    reporting_table:将reconcile_view的数据,装载到 reporting_table中,用它来替换掉 base_table中的数据。

    一) 取决于源数据库的服务是否支持直连抽取数据,可以有两种方法完成第一步 ingest, 即 Extract.

    1. File Processing: 由源数据库自发的输出,以文件方式在合理的时间窗口导出

    2. RDBMS Processing (Database Client based ETL): 由 Sqoop 来完成抽取; ETL 工具, kettle, Informatica等;

    File Processing :

    1. 由数据库软件自带的导入导出,将文件导出一定分隔符分割的文本文件

    2. 将这些文本文件放到 Hive 映射的文件夹下面

    RDBMS Processing (Database Client based ETL):

    1. SQOOP: 既可以实现初始化导入,也可以完成增量导入,增量导入的实现,依赖于Sqoop 本身的 check-sum 机制。check-sum 是对 Hive 表中的一行用来做校验数据做了 hash 计算,根据匹配是否来做增量更新。

    以下是文章的原文,展示了 Sqoop 的具体用法:

    SQOOP is the JDBC-based utility for integrating with traditional
    databases.

    A SQOOP Import allows for the movement of data into either HDFS (a
    delimited format can be defined as part of the Import definition) or
    directly into a Hive table.

    The entire source table can be moved into HDFS or Hive using the
    “–table” parameter.

    sqoop import
    
    --connect jdbc:teradata://{host name or ip address}/Database=retail
    
    --connection-manager org.apache.sqoop.teradata.TeradataConnManager
    
    --username dbc
    
    --password dbc
    
    --table SOURCE_TBL
    
    --target-dir /user/hive/incremental_table -m 1
    

    注**

    –table source_TBL: 是指关系型数据库里的原表

    –target-dir :Hive 中表对应的存储目录

    After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.

    sqoop import
    
    --connect jdbc:teradata://{host name or ip address}/Database=retail
    
    --connection-manager org.apache.sqoop.teradata.TeradataConnManager
    
    --username dbc
    
    --password dbc
    
    --table SOURCE_TBL
    
    --target-dir /user/hive/incremental_table -m 1
    
    --check-column modified_date
    
    --incremental lastmodified
    
    --last-value {last_import_date|last_import_value}
    

    注**

    –check-column : 是指定原表中用来做增量判断条件的那一字段

    –incremental lastmodified: 指定增量的模式,append 或者 lastmodified.

    在数据仓库中,无论是维度表还是事实表,我们总会设计一栏自增列,作为代理键或者主键。这个时候这些键值总是自增长的,因此适合采用 append 形式,指定check-sum 列为自增列,如果有比 {last_import_value}大的值,就会被 sqoop 导入进来;

    在设计数据库的时候,为了审计,我们通常也会设计一列为 timestamp 列,每对一行做了修改,就会重置这列 timestamp 为当前时间戳。如果是针对这类行数据,我们要指定的便是 lastmodified, 配合 check-sum 设置为 timestamp 列,sqoop 就会导入比{last_import_date} 大的数据行。

    这里写图片描述

    –last-value { last_import_date } 这是需要从程序外面传进来的

    考虑到这是增量更新,那么理应把 sqoop 做成一个 Job 来自动化运行,并且记录每一次的时间,作为下次运行时要传入的 {last_import_date} 或者{last_import_value}

    Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.

    sqoop import
    
    --connect jdbc:teradata://{host name or ip address}/Database=retail
    
    --connection-manager org.apache.sqoop.teradata.TeradataConnManager
    
    --username dbc
    
    --password dbc
    
    --target-dir /user/hive/incremental_table -m 1
    
    --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’
    

    Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.

    注**

    这是前面两种全量和增量的替代写法,用指定的查询,从原关系型数据库导出数据,不同的是,全量的时候,要指定导入的 Hive 目标表是 base_table, 而增量的时候,导入的是 incremental_table.

    二) Reconciliation 将新旧数据综合起来

    初始化时,装载最终的目标表没有多少难度。

    在这段中,主要解决的问题是增量与初始化的融合。

    初始化的数据,存储在 base_table 中, 而增量数据我们已经装载到了 incremental_table 中。

    将两者的数据合二为一,就可以生成与源数据库一致的最新数据。

    前提是源数据库的任何数据行不接受硬删除即delete 操作,而是在行上打了一个软删除的标签,表示该行已删除。

    如果是做了硬删除,那么同时也要做好删除的审计,将删除的数据行放入审计表中,一同发送给 incremental_table .

    base_table

    CREATE TABLE base_table (
    
    id string,
    
    field1 string,
    
    field2 string,
    
    field3 string,
    
    field4 string,
    
    field5 string,
    
    modified_date string
    
    )
    
    ROW FORMAT DELIMITED
    
    FIELDS TERMINATED BY ','
    
    LOCATION '/user/hive/base_table';
    

    incremental_table

    CREATE EXTERNAL TABLE incremental_table (
    
    id string,
    
    field1 string,
    
    field2 string,
    
    field3 string,
    
    field4 string,
    
    field5 string,
    
    modified_date string
    
    )
    
    ROW FORMAT DELIMITED
    
    FIELDS TERMINATED BY ','
    
    LOCATION '/user/hive/incremental_table';
    

    reconcile_view

    CREATE VIEW reconcile_view AS
    
    SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date FROM
    
    (SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn
    
    FROM (SELECT * FROM base_table
    
    UNION ALL
    
    SELECT * FROM incremental_table)
    
    t1) t2
    
    WHERE rn = 1;
    

    从最后一个view定义来解说,incremental_table 必须拥有增量记录的全部,因此硬删除操作就不会反应在 incremental_table 里头。

    但是 reconcile_view 所涉及的量毕竟有限,浪费明明不会更改的那部分数据的计算。

    因此如果能做好分区,仅仅对某几个分区做全量更新会更高效。

    三) Compact: 物化视图,即将reconciliation_view 装载到 reporting_table 里面去

    reporting_table

    DROP TABLE reporting_table;
    
    CREATE TABLE reporting_table AS
    
    SELECT * FROM reconcile_view;
    

    首先是要将之前的 reporting_table 删除,再重建 reporting _table, 用 reconciliation_view 填充这张表。

    在这张表的基础上,可以做很多聚合,过滤等操作,进行数据二次加工。

    四) Purge: 将多余的表数据清空

    base_table :应该当换成 reporting_table 里面的数据

    incremental_table: 清空

    DROP TABLE base_table;
    
    CREATE TABLE base_table AS
    
    SELECT * FROM reporting_table;
    
    hadoop fs –rm –r /user/hive/incremental_table/*
    

    总结:

    1. Oozie 可以将这4步统一做成一个工作流,方便调度

    2. 可以用脚本自定义工作流,就像数据仓库的 ETL 一样

    展开全文
  • hive增量更新的新方案

    千次阅读 2016-11-14 20:36:48
    之前是采用的join的方法来增量更新。详情见: http://blog.csdn.net/qq_20641565/article/details/52763663现在有一种新方案如下:Select b.id,b.name,b.addr,b.updated_date From ( select a.*,row_number() over...

    之前是采用的join的方法来增量更新。详情见:
    http://blog.csdn.net/qq_20641565/article/details/52763663

    现在有一种新方案如下:

    Select b.id,b.name,b.addr,b.updated_date 
    From
    (
    select a.*,row_number() over(distribute by a.id sort by updated_date) as rn
    from
    (
            Select id,name,addr, updated_date from test
    Union all
    Select id,name,addr, updated_date from test_delta 
    where y=’2016and m=’10and d=’09’
    ) a
    ) b
    Where b.rn = 1
    

    把中间表和最终表进行union all合并,然后取出相同主键的那条数据updated_date最新的那条数据。

    这种方法和前面left join的方法都很消耗性能,其中left join是合并的时候消耗性能,而union all 是排序的时候消耗性能。总体测试上面来看,union all 略优于 left join

    展开全文
  • hive 实现增量更新

    2020-06-08 16:06:34
    创建Hive的表: create table customer ( id int, age tinyint, name string ) partitioned by(dt string) row format delimited fields terminated by '|' stored as textfile; 导入初始化数据: load data local ...
  • 大数据hive实现原理.zip
  • 1. 背景 2. sql #!/bin/sh version_now=$(date -d"-2 ...hive -e "DROP TABLE IF EXISTS app.tmp_xz_jimi3_sku_description_delete" hive -e " CREATE TABLE app.tmp_xz_jimi3_sku_description_delete AS SELECT
  • Hive中实现增量更新

    千次阅读 2016-12-05 19:16:26
    hive增量
  • Hive 更新增量

    千次阅读 2019-05-27 09:41:13
    insertoverwritetableerp.tsor_BKPF--要更新此表 SELECTtd.*FROM( select ta.* fromerp.tsor_BKPF ta--先要把原来中未更新的数据捞出来 leftjoin(selecttc.MANDT,tc.BUKRS,t...
  • hive增量对比后将增量数据插入原表

    千次阅读 2016-04-29 09:17:12
    作为核准原表和增量表(增加和修改的记录)中审核的标准,以下图为例 下图用 stu1原表 stu2增量表模拟上面业务:                   以业务主键为关联条件,案例如下:    ...
  • hive全量与增量~的思考

    千次阅读 2019-05-19 11:30:35
    数据如果保留多份,就会存在一致性问题,就需要同步,同步分为两大类:全量和增量 2. 概述 数据如果要保留副本,要么同时写(就是多写),或者进行复制:异步写(即从主数据拷贝到副本); 同时写(多写),引出...
  • 要求将只存在于u1而不存在于u2的的ID记录全部插入u2中,并用u1中的记录更新u2中相同ID的记录。 不要被题目误导了,这个应该先更新数据,然后再插入,不要被题目的顺序误导 数据源 drop table u1; create table if not ...
  • 方案一、如果业务库没有删除操作,并且更新时间完整,使用更新时间做增量同步,sqoop只同步更新时间变化的数据,合并到ODS层表 方案二、如果业务库有删除操作,可以先解析数据库操作日志,存到hdfs,T+1同步数据后,...
  • hive增量数据处理

    2020-08-03 09:39:34
    select * from (select *, row_number() over (partition by ${primary_key} order by updated desc) as updated_n from adata ) tmp where updated_n = 1 and (deleted is null or deleted != 1)
  • 一,全量和增量 我们写数据,会不会就需要保存数据呢?为了保证一份数据丢失...增量的基础是全量,就是用某种方式先将全量数据拷贝过来,然后采用增量方式同步更新增量是指抓取某个时刻(更新时间)或者检查点(chec
  • hive增量抽取方案

    千次阅读 2017-06-27 00:50:00
    一、使用sqoop从mysql中抽取数据到hive,查看sqoop官方文档,有如下两种方案: 7.2.9. Incremental Imports Sqoop provides an incremental import mode which can be used to retrieve only rows newer than some ...
  • sqoop从mysql到hive实现定时增量导入

    千次阅读 2018-10-11 11:22:09
    1.第一次全量抽取,并创建hive表 sqoop import --connect jdbc:mysql://localhost:3306/test --username xxx --password xxx --direct --fields-terminated-by '\t' --target-dir /data/sqoop/shop -...
  • hive数据增量同步方案

    2021-09-15 13:09:56
    目录1-每天全量同步2-每天增量同步3-不变的数据增量同步 1-每天全量同步 如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也...
  • sqoop增量更新使用问题

    千次阅读 2018-11-14 15:58:39
    使用sqoop增量更新数据到hive中,使用参数–increment,append为根据递增列更新数据,lastmodified为根据更新时间进行增量更新,–check-column 指定增量更新的基准列,–last-value为更新比较值。 sqoop import -...
  •  存在这样的情况,某个流水号srl_id在...增量 一些其他帖子 full join :https://blog.csdn.net/magicharvey/article/details/20692829  https://my.oschina.net/sniperLi/blog/755273 业务不会变化的字段:...
  • 只是之前每天汇总了trace表数据后,再统计一次今天所有的数据,但是现在甲方有些需求需要我们每个小时更新一次,比如在某小时内是否有同学出现了异常消费。所以之前的博客都是做一些铺垫。 2、hive与MySQL中创建的...
  • 通过解析RDS的binlog将RDS的增量数据同步到HDFS下,并映射加载到Hive外部分区表 由于RDS表中的第二个字段都为datetime字段,所以刚才以该字段作为Hive的分区字段 配置文件介绍 doc/creat table.sql:Hive表的建表...
  • 1.需求 已知用户的月度点击次数信息,如下图,第一列用户名称,第二列月份第三列... 2.hive sql的统计分析 创建月度点击统计表 CREATE TABLE use_click_month( use_name string, date_month string, count int )r...
  • 增量增量表,就是记录每天新增数据的表,比如说,从24号到25号新增了那些数据,改变了哪些数据,这些都会存储在增量表的25号分区里面。上面说的快照表的25号分区和24号分区(都是t+1,实际时间分别对应26号和25...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 10,548
精华内容 4,219
关键字:

hive增量更新