精华内容
下载资源
问答
  • Oracle CDC数据增量测试

    2017-12-13 17:49:47
    创建表空间 创建临时表空间===================================== create temporary ...tempfile 'e:\oracle\oradata\cdctest\test_temp01.dbf' size 32m autoextend on next 32m maxsize 2048m exte
    创建表空间
    


    创建临时表空间=====================================

    create temporary tablespace test_temp
    tempfile 'e:\oracle\oradata\cdctest\test_temp01.dbf'
    size 32m
    autoextend on
    next 32m maxsize 2048m
    extent management local;
    

    创建数据表空间======================================

    create tablespace test_data
    logging
    datafile 'e:\oracle\oradata\cdctest\test_data01.dbf'
    size 32m
    autoextend on
    next 32m maxsize 2048m
    extent management local;

    创建用户


    1)业务操作用户
    创建业务操作用户并指定表空间=========================

    create user appuser identified by appuser 
    default tablespace test_data
    temporary tablespace test_temp;

    给用户授予权限======================================

    grant connect,resource, create view to appuser


    2)发布用户
    创建发布用户并指定表空间=============================

    create user cdc_pub identified by cdc 
    default tablespace test_data
    temporary tablespace test_temp;

    给用户授予权限=======================================

    grant connect,resource to cdc_pub -- connect 连接权限,resource用于给开发人员用的角色
    grant SELECT_CATALOG_ROLE TO cdc_pub --可以查看一些数据字典的视图·
    GRANT EXECUTE_CATALOG_ROLE TO cdc_pub--执行目录角色,能够执行所有系统包
    GRANT EXECUTE ON DBMS_CDC_PUBLISH TO cdc_pub--用于定义发布操作
    grant create job to cdc_pub --创建任务

    3)订阅用户
    创建订阅用户并指定表空间=============================

    create user cdc_sub identified by cdc 
    default tablespace test_data
    temporary tablespace test_temp;

    给用户授予权限=======================================

    grant connect,resource to cdc_sub -- connect 连接权限,resource用于给开发人员用的角色
    grant execute on DBMS_CDC_SUBSCRIBE TO CDC_SUB --用于定义订阅操作
    2.创建业务表:以业务用户账户(APPUSER)登录

    create table SalesOrder(
    orderId int not null,
    customerId int not null,
    duedate date not null,
    deliverTo int not null,
    createddttm date default sysdate,
    constraint pk_salesOrder primary key (orderId)
    )
    
    select * from salesorder
    
    
    
    create table salesorderdetail(
    solineId int not null,
    orderId int not null,
    itemNumber varchar2(20) not null,
    quantity decimal(13,4),
    linePrice decimal(13,4),
    constraint pk_sodetail primary key (solineId)
    )
    
    select * from salesorderdetail

    3.创建发布:以发布者登录(CDC_PUB)
    1)创建发布集

    BEGIN
     DBMS_CDC_PUBLISH.CREATE_CHANGE_SET(
     change_set_name => 'CDC_SET_SO',  --改变集 
     description => 'Change set for SalesOrder, SalesOrderDetail',
     change_source_name => 'SYNC_SOURCE');
    END;

    2)创建发布表:一个发布集对应多个发布的表
    发布表即是用于存放变更了的数据的表。以下语句将在发布者(CDC_PUB)名下新建两个发布表:CT_SALESORDER和CT_SALESORDERDETAIL。

    BEGIN
       DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE( 
       DDL_MARKERS=>'n',
       owner => 'cdc_pub',  --发布表的Owner! 
       change_table_name => 'CT_SalesOrder',  --发布表名
       change_set_name => 'CDC_SET_SO',  --改变集
       source_schema => 'appuser',  --业务表的Owner
       source_table => 'SalesOrder',  --业务表
       column_type_list => 'OrderID int, CustomerID int, DueDate Date,  DeliverTo int, 
                   CreateDTTM Date',   --发布表中的列定义 
       capture_values => 'new',  -- 获取更改的值
       rs_id => 'n',
       row_id => 'n',
       user_id => 'n',
       timestamp => 'n',
       object_id => 'n',
       source_colmap => 'n',
       target_colmap => 'n',
       options_string => null );
    END;
    
    BEGIN
       DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE( 
       DDL_MARKERS=>'n',
       owner => 'cdc_pub',
       change_table_name => 'CT_SalesOrderDetail',
       change_set_name => 'CDC_SET_SO',
       source_schema => 'appuser',
       source_table => 'SalesOrderDetail',
       column_type_list => 'SOLineID int, OrderID int, ItemNumber    
               varchar2(20), Quantity decimal(13,4), LinePrice decimal(18,4)',
       capture_values => 'new',
       rs_id => 'n',
       row_id => 'n',
       user_id => 'n',
       timestamp => 'n',
       object_id => 'n',
       source_colmap => 'n',
       target_colmap => 'n',
       options_string => null );
    END;

    3)给订阅用户授权,使其对发布表有读权限

    grant select on CT_SALESORDER to cdc_sub
    grant select on CT_SalesOrderDetail to cdc_sub

    4.创建订阅:以订阅者(CDC_SUB)登录
    1)创建订阅:一个订阅中可订阅多个发布表

    BEGIN
      DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(
      change_set_name => 'CDC_SET_SO',   --改变集
      description => 'Change data for salesOrder, salesOrderDetail',
      subscription_name => 'CDC_SUB_SO');  --订阅的名称 
    END;



    2)订阅表:系统将针对每个发布表建立订阅视图,将来订阅时从这些视图读取数据

    begin
      DBMS_CDC_SUBSCRIBE.SUBSCRIBE(
        subscription_name => 'CDC_SUB_SO',  --订阅的名称
        source_schema => 'APPUSER',  --业务数据表的Owner
        source_table => 'SALESORDER', --业务数据表名
    	--订阅的列 
        column_list => 'OrderID, CustomerID, DueDate,  DeliverTo, 
                   CreateDTTM',
        --订阅试图的名称 
    subscriber_view => 'V_CDC_SalesOrder');
    END;
    
    begin
      DBMS_CDC_SUBSCRIBE.SUBSCRIBE(
        subscription_name => 'CDC_SUB_SO',
        source_schema => 'APPUSER',
        source_table => 'SALESORDERDetail',
        column_list => 'SOLINEID,ORDERID,ITEMNUMBER,QUANTITY,LINEPRICE',
        subscriber_view => 'V_CDC_SalesOrderDetail');
    END;

    5.激活订阅

    BEGIN
        DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(
            subscription_name => 'CDC_SUB_SO');
    END;

    至此,发布、订阅的管理工作完成。以下是进行测试。
    可以看出,Oracle 11g的同步CDC不是基于触发器的!从ALL_TRIGGERS找不到业务表上有触发器。


    6.操作业务表:用业务用户(APPUSER)登录

    insert into SalesOrder ( orderId, customerId, dueDate, deliverTo )
    values (1, 1, trunc(sysdate)+10, 1)
    
    insert into SalesOrderDetail( SoLineId, OrderId, ItemNumber, Quantity, linePrice)
    values ( 1, 1, 'Desk001', 2, 500)
    
    insert into SalesOrderDetail( SoLineId, OrderId, ItemNumber, Quantity, linePrice)
    values ( 2, 1, 'Chair001', 2, 350)
    
    --注意,可以试一下,在没有提交之前,在发布表中是没有数据的。这一点似乎也和Oracle 10g不同。
    commit

    7.测试订阅:以订阅用户(CDC_SUB)登录


    begin
      dbms_cdc_subscribe.extend_window( 
        subscription_name=>'CDC_SUB_SO');   --订阅名 
    end;
    
    -- 查询订阅视图
    select * from V_CDC_SalesOrder order  by commit_timestamp$
    
    select * from V_CDC_SalesOrderDetail order  by commit_timestamp$
    
    --完成本次订阅
    begin
    DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
        subscription_name => 'CDC_SUB_SO');  --订阅名 
    END;


    oracle 文档 ——
    https://docs.oracle.com/cd/E11882_01/appdev.112/e40758/d_cdcpub.htm#ARPLS023

    展开全文
  • oracle统计数据每日增量

    千次阅读 2021-03-03 17:22:05
    有个需求是统计数据库的每日数据增量oracle有个查询所有表的sql如下, selectt.table_name,t.num_rowsfromuser_tablest; 查询出来的结果如下图所示: 稍微改造一下就可以查出所有表的所有数据,如下: ...

    有个需求是统计数据库的每日数据增量,oracle有个查询所有表的sql如下,

    select t.table_name,t.num_rows from user_tables t;

    查询出来的结果如下图所示:

    稍微改造一下就可以查出所有表的所有数据,如下:

    SELECT sum(num_rows) sum from user_tables;

    然后想着写个存储过程,创建个表E_DATA_DAILY存储时间和数量,这里我存的表如下:

    建表语句:

    CREATE TABLE "E_DATA_DAILY" (
    "time" DATE NULL ,
    "count" NUMBER NULL 
    )

    存储过程编写的内容如下:作用是先查出当时的数据总量后插入到E_DATA_DAILY表中方便以后进行查看。

    CREATE OR REPLACE
    PROCEDURE "GETCOUNT" AS
    BEGIN
    EXECUTE IMMEDIATE 'insert into E_DATA_DAILY VALUES (sysdate,(SELECT sum(num_rows) sum from user_tables))';
    COMMIT;
    END;

    创建定时任务:

    DECLARE
      X NUMBER;
    BEGIN
      SYS.DBMS_JOB.SUBMIT
        ( job       => X 
         ,what      => 'GETCOUNT;'
         ,next_date => sysdate
         ,interval  => 'TRUNC(sysdate) + 1 +1/ (24)'
        );
    commit;
    END;

    查询定时任务:

    select * from user_jobs;

    这样就可以每天一点查出数据存入到表中!这样是不是很简单呢?喜欢请关注“蛋皮皮”微信公众号!有事没事点个赞,点个再看~~~

    展开全文
  • oracle增量获取数据

    千次阅读 2018-11-28 17:35:59
    转载: ... 一、增量数据采集概述 数据采集通常是指ETL过程中Extract-数据抽取部分。除了ETL外在不同应用系统之间通常也需要传递数据,在某些环境条件限制下不能将数据从一个系统直接移到...

    转载: https://blog.csdn.net/chensrao/article/details/6200338

    一、增量数据采集概述

    数据采集通常是指ETL过程中Extract-数据抽取部分。除了ETL外在不同应用系统之间通常也需要传递数据,在某些环境条件限制下不能将数据从一个系统直接移到另一个系统,只能借助文本来作为中间媒介传递数据,且文本的生成有时间窗口的限制,所以对数据采集即数据抽取的性能有一定的要求。对增加数据的采集的方法常用的有以下几种:

    1. 时间戳(Timestamps on rows)

    2. 版本号(Version Numbers on rows)

    3. 状态指示(Status indicators on rows)

    4. 时间戳、版本号、状态指示混合使用(Time/Version/Status on rows)

    5. 触发器 (Triggers on tables)

    6. 表差异(Table differencing)

    7. 数据库的日志扫描(Log scanners on databases)

    对于增量数据采集情况在Oracle中推出了两种主要方案,一种是我们熟悉的物化视图(materialized view),另一种就是本文将要介绍的CDC组件(Change Data Capture 改变数据捕获)。

    CDC 特性是在Oracle9i数据库中引入的。CDC能够帮助你识别从上次提取之后发生变化的数据。利用CDC,在对源表进行INSERT、UPDATE或 DELETE等操作的同时就可以提取数据,并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据,然后利用数据库视图以一种可控的方式提供给目标系统。

    二、CDC的发布订阅模型

    CDC体系结构基于发布者/订阅者模型。发布者捕捉变化数据并提供给订阅者。订阅者使用从发布者那里获得的变化数据。通常,CDC系统拥有一个发布者和多个订阅者。发布者首先需要识别捕获变化数据所需的源表。然后,它捕捉变化的数据并将其保存在特别创建的变化表中。它还使订阅者能够控制对变化数据的访问。订阅者需要清楚自己感兴趣的是哪些变化数据。一个订阅者可能不会对发布者发布的所有数据都感兴趣。 订阅者需要创建一个订阅者视图来访问经发布者授权可以访问的变化数据。

    CDC有几个重要的基本概念需要先明确一下:

    l 源表(Source Table),业务数据库的需要捕获数据的源表

    l 变化表(Change Table) ,保存从源表捕获的变化数据(包括各种DML产生的数据)

    l 变化集(Change Set),是保证事务一致性的数据集合。一个变化集对应多个变化表

    l 订阅视图(Subscription View),提供给读取变化表数据的视图

    l 订阅窗口(Subscription Window) ,定义了查看变化数据的时间范围.就象一个观察变化数据的滑动窗口。变化数据处理完成后,可以对清除订阅窗口。

    三、CDC的同步与异步模式

    l 同步模式,实时的捕获变化数据并存储到变化表中,发布者与订阅都位于同一数据库中,同步模式实际使用trigger的形式来捕捉变化数据。下图说明了同步模式的基本架构。

    clip_image002

    l 异步模式,以Oracle流复制技术为基础,从redo log中读取日志记录来捕捉变化数据。异步模式有三种:

    1. Asynchronous HotLog

    clip_image004 2. Asynchronous Distributed HotLog

    clip_image006

    3. Asynchronous AutoLog Mode

    3.1 Asynchronous Autolog Online Change Data Capture

    clip_image008

    3.2 Asynchronous AutoLog Archive Change Data Capture

    clip_image010

    四、CDC相关的数据库对象 (Package)

    l 包(Package)

    n DBMS_CDC_PUBLISH, 用于定义发布操作

    n DBMS_CDC_SUBSCRIBE,用于定义订阅操作

    l 角色

    n EXECUTE_CATALOG_ROLE

    n SELECT_CATALOG_ROLE

    n CREATE TABLE and CREATE SESSION privileges

    n EXECUTE on the DBMS_CDC_PUBLISH package

    l 视图

    n ALL_SOURCE_TABLES 源表

    n ALL_PUBLISHED_COLUMNS 发布的表列

    n All_Subscribed_Columns 订阅的表列

    n All_Subscriptions 所有订阅

    n All_Subscribed_Tables 已经订阅的表

    五、CDC的实施步骤

    同步模式以trigger形式来捕捉,这种实行没有自身来使用trigger来捕捉来的灵活,且其对性能的影响比从日志捕捉变化数据的性能影响大,所以本文以异步捕捉中的异步Hotlog来讲解CDC的具体实施步骤.

    1. 准备数据和用户

    SQL>show user

    scott

    SQL>create table t1 ( a_num number , b_varchar varchar2(10),c_char char(10));#测试表

    SQL>conn / as sysdba

    SQL>create user cdcpub identified by cdcpub; #发布用户(publisher)

    SQL>create user subscriber1 identified by subscriber1;#订阅者(subscriber)

    2. 调整数据库参数(需根据实际情况修改)

    compatible = 10.2.0

    java_pool_size = 50000000

    job_queue_processes = 2

    parallel_max_servers = + 5

    processes = + 7

    sessions = + 2

    streams_pool_size = + 21 MB

    undo_retention = 3600

    3. 修改数据库日志模式

    #数据库默认是在archivelog模式

    SQL>show user

    USER is “sys”

    SQL>ALTER DATABASE FORCE LOGGING; #此步骤是可选但建议执行;

    SQL>ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;#此步骤需执行;

    SQL>ALTER TABLE scott.t1

    ADD SUPPLEMENTAL LOG GROUP log_group_t1 (a_num,b_varchar,c_char) ALWAYS;

    #若要捕捉t1表中所有列,可用下列语句代替上述alter table语句:

    #ALTER TABLE scott.t1 ADD SUPPLEMENTAL LOG DATA (ALL) #COLUMNS;

    4. 授权

    ALTER USER cdcpub QUOTA UNLIMITED ON SYSTEM

    QUOTA UNLIMITED ON SYSAUX;

    GRANT CREATE SESSION TO cdcpub;

    GRANT CREATE TABLE TO cdcpub;

    GRANT CREATE TABLESPACE TO cdcpub;

    GRANT UNLIMITED TABLESPACE TO cdcpub;

    GRANT SELECT_CATALOG_ROLE TO cdcpub;

    GRANT EXECUTE_CATALOG_ROLE TO cdcpub;

    GRANT CREATE SEQUENCE TO cdcpub;

    GRANT DBA TO cdcpub;

    GRANT EXECUTE on DBMS_CDC_PUBLISH TO cdcpub;

    EXECUTE DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(GRANTEE => 'cdcpub');

    5. 准备源表(Source Table)

    SQL>show user

    USER is “SYS”

    SQL>

    BEGIN

    DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION(TABLE_NAME => 'scott.t1');

    END;

    /

    6. 创建变更集(Change Set)

    SQL>show user

    USER is “cdcpub”

    SQL>

    BEGIN

    DBMS_CDC_PUBLISH.CREATE_CHANGE_SET(

    change_set_name => 'cdc_test_cs',

    description => 'Change set for scott.t info',

    change_source_name => 'HOTLOG_SOURCE',

    stop_on_ddl => 'y'

    );

    END;

    /

    7. 创建变更表(Change Table)

    SQL>show user

    USER is “cdcpub”

    SQL>

    BEGIN

    DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE(

    owner => 'cdcpub',

    change_table_name => 't1_ct',

    change_set_name => 'cdc_test_cs',

    source_schema => 'scott',

    source_table => 't1',

    column_type_list =>'a_num number,b_varchar varchar2(10),c_char char(10)',

    capture_values => 'both',

    rs_id => 'y',

    row_id => 'n',

    user_id => 'n',

    timestamp => 'n',

    object_id => 'n',

    source_colmap => 'n',

    target_colmap => 'y',

    options_string => '');

    END;

    /

    8. 激活变更集(Change Set)

    SQL>show user

    USER is “cdcpub”

    SQL>

    BEGIN

    DBMS_CDC_PUBLISH.ALTER_CHANGE_SET(

    change_set_name => 'cdc_test_cs',

    enable_capture => 'y');

    END;

    /

    9. 授权给订阅者

    SQL>show user

    USER is “cdcpub”

    SQL> GRANT SELECT ON cdcpub.t1_ct TO subscriber1;

    SQL>show user

    USER is “SYS”

    SQL>GRANT CREATE TABLE TO subscriber1;

    SQL>GRANT CREATE SESSION TO subscriber1;

    SQL>GRANT CREATE VIEW TO subscriber1;

    SQL>GRANT UNLIMITED TABLESPACE TO subscriber1;

    10. 订阅变更数据

    i. 创建订阅集(subscription)

    SQL>show user

    USER is “subscriber1”

    SQL>

    BEGIN

    DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(

    change_set_name => 'cdc_test_cs',

    description => 'Change data for t1',

    subscription_name => 't1_sub');

    END;

    /

    ii. 订阅源表及源表中的相关字段

    SQL>show user

    USER is “subscriber1”

    SQL>

    BEGIN

    DBMS_CDC_SUBSCRIBE.SUBSCRIBE(

    subscription_name => 't1_sub',

    source_schema => 'scott',

    source_table => 't1',

    column_list => 'a_num,b_varchar,c_char',

    subscriber_view => 't1_view');

    END;

    /

    iii. 激活订阅

    SQL>show user

    USER is “subscriber1”

    SQL>

    BEGIN

    DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(

    subscription_name => 't1_sub');

    END;

    /

    iv. 扩展订阅窗口

    SQL>show user

    USER is “subscriber1”

    SQL>

    BEGIN

    DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(

    subscription_name => 't1_sub');

    END;

    /

    #extend_window功能说明:此过程是设置订阅窗口的高位边界,只要设置

    #了新数据才能看到新数据;

    v. 查看订阅视图内容

    SQL>show user

    USER is “subscriber1”

    SQL>select * from t1_view; #因scott.t1表中无数据变更所以当前视图无数据

    no rows selected

    SQL>conn scott/tiger

    SQL> insert into t1 values(2,'cdc_test1','cdc');

    SQL>commit;

    SQL>conn subscriber1/subscriber1

    SQL>

    BEGIN

    DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(

    subscription_name => 't1_sub');

    END;

    /

    SQL>select OPERATION$,a_num,b_varchar,c_char from t1_view

    OP A_NUM B_VARCHAR C_CHAR

    -- ---------- ---------- ----------

    I 2 cdc_test1 cdc

    vi. 清除变更集数据

    如果当前变更数据不在需要使用,可以清除为后续生成的数据腾出空间;

    BEGIN

    DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(

    subscription_name => 'SALES_SUB');

    END;

    /

    vii. 删除订阅:

    如果当前订阅不需要再使用可以删除;

    BEGIN

    DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIPTION(

    subscription_name => 't1_sub');

    END;

    /

    参考:

    Oracle® Database Data Warehousing Guide 10g Release 2 (10.2) B14223-02 Chapter 16

    http://en.wikipedia.org/wiki/Change_data_capture

    展开全文
  • Oracle相同表结构增量数据同步 Oracle强制结束正在运行SQL,死循环时使用
  • 通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。但是随着业务需求的升级,批量同步无论从实时性,还是对在线...

    背景

    在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。但是随着业务需求的升级,批量同步无论从实时性,还是对在线OLTP系统的抽取压力,都无法满足要求。需要实时从OLTP系统中获取数据变更,实时同步到下游业务系统。

    本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。

    Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现业务系统。

    环境介绍

    组件版本

    本案例中使用到的组件和版本

    组件

    版本

    描述

    源端Oracle

    Oracle 12.2.0.1.0 Linux x64

    源端Oracle

    源端OGG

    Oracle GoldenGate 12.3.0.1.4 for Oracle Linux x64

    源端OGG,用于抽取源端Oracle的数据变更,并将变更日志发送到目标端

    目标端OGG

    OGG_BigData_Linux_x64_12.3.2.1.1

    目标端OGG,接受源端发送的Oracle事物变更日志,并将变更推送到kafka消息队列。

    目标端kafka

    kafka_2.12-2.2.0

    消息队列,接收目标端OGG推送过来的数据。

     

    整体架构图

    名词解释

    1.OGG Manager

    OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。

    2.数据抽取(Extract)

    抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下几种类型:

    • 本地抽取

    从本地数据库捕获增量变更数据,写入到本地Trail文件

    • 数据推送(Data Pump)

    从本地Trail文件读取数据,推送到目标端。

    • 初始数据抽取

    从数据库表中导出全量数据,用于初次数据加载

    3.数据推送(Data Pump)

    Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG

    4.Trail文件

    数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。

    5.数据接收(Collector)

    数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。

    6.数据复制(Replicat)

    数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。

    7.检查点(Checkpoint)

    检查点用于记录数据库事物变更。

     

    操作步骤

    源端Oracle配置

    1.检查归档

    使用OGG,需要在源端开启归档日志

    SQL> archive log list;
    
        Database log mode              Archive Mode
    
        Automatic archival             Enabled
    
        Archive destination            /u01/app/oracle/product/12.2.0/db_1/dbs/arch
    
        Oldest online log sequence     2576
    
        Next log sequence to archive   2577
    
        Current log sequence           2577

    2.检查数据库配置

    SQL> select force_logging, supplemental_log_data_min from v$database;
    
    
    
    FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
    
    ---------- ------------------------
    
    YES        YES
    
    
    
    如果没有开启辅助日志,需要开启:
    
    
    
    SQL> alter database force logging;
    
    SQL> alter database add supplemental log data;

    3.开启goldengate复制参数

    SQL> alter system set enable_goldengate_replication = true;

    4.创建源端Oracle账号

     SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
    
        SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
    
        SQL> grant dba to ggsadmin;

    5.创建测试表   

    SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
    
        SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
    
    SQL> select count(*) from baiyang.ora_to_kfk;
    
    
    
        COUNT(*)
    
        ----------
    
            436

     源端OGG配置

    1.检查源端OGG环境

        cd /oradata/oggorcl/ogg
    
        ./ggsci
    
       
    
        GGSCI (dtproxy) 1> info all
    
    
    
        Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    
    
        MANAGER     STOPPED

        2.创建相关文件夹    

    GGSCI (dtproxy) 2> create subdirs
    
    
    
        Creating subdirectories under current directory /oradata/oggorcl/ogg
    
    
    
        Parameter file                 /oradata/oggorcl/ogg/dirprm: created.
    
        Report file                    /oradata/oggorcl/ogg/dirrpt: created.
    
        Checkpoint file                /oradata/oggorcl/ogg/dirchk: created.
    
        Process status files           /oradata/oggorcl/ogg/dirpcs: created.
    
        SQL script files               /oradata/oggorcl/ogg/dirsql: created.
    
        Database definitions files     /oradata/oggorcl/ogg/dirdef: created.
    
        Extract data files             /oradata/oggorcl/ogg/dirdat: created.
    
        Temporary files                /oradata/oggorcl/ogg/dirtmp: created.
    
        Credential store files         /oradata/oggorcl/ogg/dircrd: created.
    
        Masterkey wallet files         /oradata/oggorcl/ogg/dirwlt: created.
    
        Dump files                     /oradata/oggorcl/ogg/dirdmp: created.

    3.配置源端Manager

      GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
    
        Successfully logged into database.
    
    
    
        GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
    
            -- 添加
    
            oggschema ggsadmin
    
    
    
        GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
    
            -- 添加
    
            PORT 7810 --默认监听端口
    
            DYNAMICPORTLIST  7811-7820 --动态端口列表
    
            AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次
    
            PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7  --*/
    
            LAGREPORTHOURS 1 --每隔一小时检查一次传输延迟情况
    
            LAGINFOMINUTES 30 --传输延时超过30分钟将写入错误日志
    
            LAGCRITICALMINUTES 45 --传输延时超过45分钟将写入警告日志
    
            PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
    
            ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --设定172网段可连接
    
          
    
        -- 添加同步的表
    
        GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
    
        -- Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
    
        GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
    
        -- Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239

    目标端OGG配置

    1.目标端检查环境

     GGSCI (172-16-101-242) 1> info all
    
        Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
        MANAGER     STOPPED 

     2.创建目录    

    GGSCI (172-16-101-242) 2> create subdirs
    
    
    
        Creating subdirectories under current directory /app/ogg
    
    
    
        Parameter file                 /app/ogg/dirprm: created.
    
        Report file                    /app/ogg/dirrpt: created.
    
        Checkpoint file                /app/ogg/dirchk: created.
    
        Process status files           /app/ogg/dirpcs: created.
    
        SQL script files               /app/ogg/dirsql: created.
    
        Database definitions files     /app/ogg/dirdef: created.
    
        Extract data files             /app/ogg/dirdat: created.
    
        Temporary files                /app/ogg/dirtmp: created.
    
        Credential store files         /app/ogg/dircrd: created.
    
        Masterkey wallet files         /app/ogg/dirwlt: created.
    
    Dump files                     /app/ogg/dirdmp: created.

    3.目标端Manager配置

    GGSCI (172-16-101-242) 3> edit params mgr
    
        -- 添加
    
        PORT 7810
    
        DYNAMICPORTLIST 7811-7820
    
        AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
    
        PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
    
        
    
        GGSCI (172-16-101-242) 4> edit  param  ./GLOBALS
    
    CHECKPOINTTABLE ggsadmin.checkpoint

    全量数据同步

    1.配置源端数据初始化

     -- 配置源端初始化进程
    
        GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
    
        
    
        -- 配置源端初始化参数
    
        GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
    
            -- 添加
    
        EXTRACT initkfk
    
        SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
    
        USERID ggsadmin,PASSWORD oracle
    
        RMTHOST 172.16.101.242, MGRPORT 7810
    
        RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
    
    table baiyang.ora_to_kfk;

    2.源端生成表结构define文件

     GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
    
            -- 添加
    
        defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
    
        userid ggsadmin,password oracle
    
        table baiyang.ora_to_kfk;
    
      
    
        -- 执行
    
        $./defgen paramfile dirprm/define_kfk.prm
    
        -- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
    
        
    
        -- 将此文件传输到目标段dirdef文件夹
    
        scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt

    3.配置目标端数据初始化进程

      -- 配置目标端初始化进程
    
        GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
    
        
    
        GGSCI (172-16-101-242) 6> edit params initkfk
    
            -- 添加
    
        SPECIALRUN
    
        end runtime
    
        setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
    
        targetdb libfile libggjava.so set property=./dirprm/kafka.props
    
        SOURCEDEFS ./dirdef/define_kfk.txt
    
        EXTFILE ./dirdat/ekfk000000
    
        reportcount every 1 minutes, rate
    
        grouptransops 10000
    
    map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;

    4.配置kafka相关参数

     -- 配置kafka 相关参数
    
        vi ./dirprm/kafka.props
    
        -- 添加
    
        gg.handlerlist=kafkahandler
    
        gg.handler.kafkahandler.type=kafka
    
        gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    
        gg.handler.kafkahandler.topicMappingTemplate=test_ogg
    
        gg.handler.kafkahandler.format=json
    
        gg.handler.kafkahandler.mode=op
    
        gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/
    
        
    
        vi custom_kafka_producer.properties
    
        -- 添加
    
        bootstrap.servers=172.16.101.242:9092
    
        acks=1
    
        compression.type=gzip
    
        reconnect.backoff.ms=1000
    
        value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    
        key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    
        batch.size=102400
    
        linger.ms=10000

    5.源端开启全量数据抽取

    -- 源端
    
    GGSCI (dtproxy) 20>  start mgr
    
    GGSCI (dtproxy) 21>  start initkfk

    6.目标端全量数据应用

    GGSCI (172-16-101-242) 13> start mgr
    
    ./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

    7.kafka数据验证

    使用kafka客户端工具查看topic的数据
    
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
    
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
    
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
    
    
    
    全量数据已经同步到目标kafka topic

    增量数据同步

    1.源端抽取进程配置

    GGSCI (dtproxy) 9> edit param extkfk
    
    -- 添加
    
    dynamicresolution
    
    SETENV (ORACLE_SID = "dtstack")
    
    SETENV (NLS_LANG = "american_america.AL32UTF8")
    
    userid ggsadmin,password oracle
    
    exttrail ./dirdat/to
    
    table baiyang.ora_to_kfk;
    
    -- 添加extract进程
    
    GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
    
    -- 添加trail文件的定义与extract进程绑定
    
    GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

    2.源端数据推送进程配置

    -- 配置源端推送进程
    
    GGSCI (dtproxy) 12> edit param pupkfk
    
    -- 添加
    
    extract pupkfk
    
    passthru
    
    dynamicresolution
    
    userid ggsadmin,password oracle
    
    rmthost 172.16.101.242 mgrport 7810
    
    rmttrail ./dirdat/to
    
    table baiyang.ora_to_kfk;
    
    
    
    -- 添加extract进程
    
    GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to
    
    -- 添加trail文件的定义与extract进程绑定
    
    GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

    3.配置目标端恢复进程

    -- 配置目标端恢复进程
    
    edit param repkfk
    
    -- 添加
    
    REPLICAT repkfk
    
    SOURCEDEFS ./dirdef/define_kfk.txt
    
    targetdb libfile libggjava.so set property=./dirprm/kafka.props
    
    REPORTCOUNT EVERY 1 MINUTES, RATE
    
    GROUPTRANSOPS 10000
    
    MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
    
    
    
    --添加trail文件到replicate进程
    
    add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

    4.源端开启实时数据抓取

    ./ggsci
    
    GGSCI (dtproxy) 5> start extkfk
    
    
    
    Sending START request to MANAGER ...
    
    EXTRACT EXTKFK starting
    
    
    
    
    
    GGSCI (dtproxy) 6> start pupkfk
    
    
    
    Sending START request to MANAGER ...
    
    EXTRACT PUPKFK starting
    
    
    
    
    
    GGSCI (dtproxy) 7> status all
    
    
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    
    
    MANAGER     RUNNING
    
    EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10
    
    EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00

    5.目标端开启实时数据同步

    ./ggsci
    
    GGSCI (172-16-101-242) 7> start replicat repkfk
    
    
    
    Sending START request to MANAGER ...
    
    REPLICAT REPKFK starting
    
    
    
    
    
    GGSCI (172-16-101-242) 8> info all
    
    
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    
    
    MANAGER     RUNNING
    
    REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00

    6.测试增量数据同步

    • Oracle插入增量数据
    SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000;
    
    SQL> commit;
    
    SQL> select count(*) from baiyang.ora_to_kfk;
    
    
    
    COUNT(*)
    
    ----------
    
        905
    • 查看Kafka消息队列消费数据
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
    
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
    •  源端Oracle删除数据
    SQL> delete from baiyang.ora_to_kfk ;
    
    906 rows deleted.
    
    SQL> commit;
    • 查看kafka消息队列消费数据
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}
    • 源端插入数据 
    SQL> insert into  baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1');
    
    1 row created.
    
    SQL> commit;
    • 查看kafka消息队列消费数据
    {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

    总结

    使用OGG可以方便地将Oracle的数据变更情况实时同步到Kafka消息队列。下游业务系统通过订阅kafka的消息队列,能方便地实现各类实时数据的应用。

    展开全文
  • Oracle物化视图获取增量数据

    千次阅读 2019-09-23 23:55:02
    物化视图的快速刷新需要先构造物化视图日志,而物化视图日志中会记录表的dml操作,因此可以通过物化视图日志订阅Oracle增量。 1.物化视图日志名 物化视图日志名为MLOG$_表名。当表名长度超过20时,只取前20位;当...
  • 较为明了地介绍了oracle CDC 实现的方式,包含实际操作。
  • Logstash从Oracle全量、增量、定时抽取数据至ES下载LogstashOracle->ES全量Oracle->ES增量多管道运行任务 下载Logstash [https://www.elastic.co/cn/downloads/logstash] Oracle->ES全量 解压缩下载的...
  • 基于oracle增量数据采集

    千次阅读 2017-02-20 11:13:31
    之前调研了下基于mysql的数据增量采集方案,目前有阿里的canal和Open Replicator,均是基于MySQL binlog分析的开源项目。Open Replicator仅提供了binlog解析;canal基于数据库增量日志解析,提供增量数据订阅&...
  • 在两个ORACLE数据库之间实现数据增量同步
  • Oracle物化视图增量刷新的应用研究.pdf
  • Oracle BI基础之ETL数据增量抽取方案

    万次阅读 2015-12-11 10:05:52
    ETL数据增量抽取方案 一、 ETL 简介 数据集成是把不同来源、格式和特点的数据在逻辑上或物理上有机地集中,从而提供全面的数据共享,是企业商务智能、数据仓库系统的重要组成部分。ETL 是企业数据集成的主要...
  • 两个表做数据同步时增量添加数据的方法: Oracle10g中有一个隐含的默认字段ORA_ROWSCN,此字段是按时间递增的,可以通过ORA_ROWSCN &gt; '上次同步保存的值' 条件来增量增加数据。...
  • [b]现在有这样一种情况,源表数据量1000W,或者更大,属于总表,增量数据量百万级别,要求把增量数据更新到源表中。 我采用的是MERGE语句,根据条件判断是insert还是update,增量表300W的时候且全走UPDATE分支...
  • 以时间戳取增量,对源表删除的数据无能为力。 通过源表更新目标表的时候,通常是先判断 源表中的数据在目标表中是否存在(通过主键判断) 如果存在,那么就用源表的数据,更新目标表的数据。 如果不存在,那么就...
  • 在“基于oracle增量数据采集”一文中提出了基于[color=blue]触发器》物化视图》存储过程》java source》外部程序[/color]数据采集方案。本文初步对其进行了实现,利用maven-assembly-plugin进行打包,输出结构如下...
  • 数据oracle数据库只有读权限,没有时间字段,第一次全量抽取后,如何进行定期增量抽取?
  • 利用ogg实现oracle到kafka的增量数据实时同步

    万次阅读 多人点赞 2018-05-25 09:50:49
    ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json。 下面是我的源端和目标端的一些配置信息: - 版本 OGG版本 i...
  • OGG增量抽取Oracle业务数据到kafka-部署手册
  •  差异增量:是备份上级及同级备份以来所有变化的数据块,差异增量是默认增量备份方式  累积增量:是备份上级备份以来所有变化的块  因为累积增量是备份上级备份以来所有变化的数据块,所以累积增量需要更多的...
  • 主要介绍了shell脚本将Oracle服务器中数据定时增量刷新到ftp服务器中,非常不错,具有一定的参考借鉴价值 ,需要的朋友可以参考下
  • Oracle数据库逻辑增量备份之exp/imp

    热门讨论 2012-11-08 11:57:20
    Oracle数据库逻辑增量备份之exp/imp 1 一、实现需求 1 二、逻辑备份恢复工具exp/imp 2 1、逻辑备份原理 2 2、exp语法和参数 2 3、imp语法和参数 3 三、exp逻辑备份 4 1、exp表模式备份 4 2、exp用户模式备份 5 3、...
  • 另一个应用需要实时或接近实时地(如延时几秒)抽取数据库中的增量数据;但不能对数据库有性能影响(或则影响非常小,如下降千分之一以内),不能在数据库上做开发(如触发器存储过程之类),更不能修改数据库配置...
  • 有没有什么方案可以实现实时采集oracle和sqlserver的增量数据传入kafka,不能设置时间戳,触发器,最好是监控操作日志
  • 简介:  GreenPlum是一个基于... 目前有一个业务是需要将Oracle数据库中的基础数据增量同步到GreenPlum数据仓库,便于进行数据分析和处理。 规模:  每天产生60G左右数据,最大的表每天新增上亿条数据...
  • 各位大佬,最近做数据迁移,需要将Oracle数据迁移到mysql,然后还面临一个问题就是迁移过程中Oracle新增数据不会自动传输,所以要进行数据同步,网上搜资料看着kettle可以做到这一步,但是对于他们的帖子有点看不...
  • Windows下Oracle11g增量备份与恢复

    千次阅读 2016-11-25 13:40:57
    Windows下Oracle11g增量备份与恢复
  • 第一次遇到这个问题,现在有个需求,每隔5秒要统计从今天零点到现在的数据,要查询的表很大,除第一次查询外不能再从零点开始,只能以上一次查询的时间为起点,请问这个该怎么做?请大家提供下思路或方法,谢谢

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 38,592
精华内容 15,436
关键字:

oracle年数据增量