精华内容
下载资源
问答
  • 创建追加查询的数据来源
    2021-05-07 12:16:23

    视图:不是表的表;

    如果视图只来源于一张表,可以直接对视图进行update操作;

    创建视图:

    CREATE OR REPLACE VIEW v_payment_info_status AS

    SELECT * from t_payment_info_status ;//支付状态表

    视图创建成功后可以通过:SELECT * from v_payment_info_status FOR UPDATE;追加数据,但追加成功后t_payment_info_status 也对应追加了。

    t_payment_info_status 现有数据:

    STATUS  STATUS_DESC

    1 書類返送待ち

    2 承認待ち

    3 支払待ち

    4 支払済み

    5 入力者戻し

    6 取消

    现要求在检索画面的支付状态下拉列表中追加显示以下3中状态:

    未払金計上

    据置金計上

    支払対象外

    但因为项目上其他画面也用到t_payment_info_status作为code表,为了不影响其他画面的显示,先采用hello word的方式直接union all要追加的数据:

    CREATE OR REPLACE VIEW v_payment_info_status AS SELECT * from t_payment_info_status  UNION ALL SELECT 7,'未払金計上' FROM dual UNION ALL SELECT 8,'据置金計上' FROM dual UNION ALL SELECT 9,'支払対象外' FROM dual;

    更多相关内容
  • react向数组中如何追加数据

    千次阅读 2021-06-10 12:55:28
    react向数组中如何追加数据发布时间:2020-12-04 11:23:27来源:亿速云阅读:179作者:小新这篇文章给大家分享的是有关react向数组中如何追加数据的内容。小编觉得挺实用的,因此分享给大家做个参考。一起跟随小编...

    react向数组中如何追加数据

    发布时间:2020-12-04 11:23:27

    来源:亿速云

    阅读:179

    作者:小新

    这篇文章给大家分享的是有关react向数组中如何追加数据的内容。小编觉得挺实用的,因此分享给大家做个参考。一起跟随小编过来看看吧。

    首先渲染一个随机数,使其每间隔一秒变换一次。

    代码如下:

    数组追加元素

    class Xpf extends React.Component {

    constructor(props){

    super(props);

    this.state = {

    random:Math.random()

    }

    }

    componentWillMount(){

    setInterval(() => {

    this.setState({

    random:Math.random()

    })

    }, 1000);

    }

    render() {

    let {random} = this.state;

    return (

    {random}

    );

    }

    }

    ReactDOM.render(

    ,

    document.getElementById('xpf')

    );

    注意:组件更新有两种方式:props或state的改变,而改变state一般是通过setState()方法来的,只有当state或props改变,render方法才能再次调用,即组件更新

    代码如下:

    数组追加元素

    class Xpf extends React.Component {

    constructor(props){

    super(props);

    this.state = {

    random:Math.random(),

    arr:[1,2,3]

    }

    }

    componentWillMount(){

    setInterval(() => {

    this.setState({

    random:Math.random(),

    arr:[...this.state.arr,Math.random()]

    })

    }, 1000);

    }

    render() {

    let {random,arr} = this.state;

    return (

    {random}

    {

    arr.map((item,index)=>{

    return (

    {item})

    })

    }

    );

    }

    }

    ReactDOM.render(

    ,

    document.getElementById('xpf')

    );

    使用...this.state.arr将arr解构出来,再将随机数加进去

    注意:不能使用 arr : this.state.arr.push(Math.random()),不能使用在原数组的基础上修改的方法,例如push之类,可以使用concat方法或者ES6数组拓展语法

    感谢各位的阅读!关于react向数组中如何追加数据就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到吧!

    展开全文
  • Blink SQL之创建数据结果表

    千次阅读 2022-03-31 20:28:17
    Blink SQL至创建Sink结果表。

    Blink创建数据结果表

    概述

    Blink使用CREATE TABLE作为输出结果数据的格式定义,同时定义数据如何写入到目的数据结果表。

    结果表有两种类型:

    • Append类型:输出存储是日志系统、消息系统、操作日志类的RDS数据库等,数据流输出结果追加到存储中,不会修改原有数据。
    • Update类型:输出存储生命了主键的数据库(如:RDS、HBase等),数据流的输出存在Upsert操作。

    结果表语法

    CREATE TABLE tableName
        (columnName dataType [, columnName dataType ]*)
        [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
    

    结果表示例

    CREATE TABLE rds_output(
    id INT,
    len INT,
    content VARCHAR,
    PRIMARY KEY(id)
    ) WITH (
    type='rds',
    url='yourDatabaseURL',
    tableName='yourTableName',
    userName='yourDatabaseUserName',
    password='yourDatabasePassword'
    );
    

    创建Oracle数据库结果表

    注意事项

    • 实时计算将每行结果数据拼成一行SQL写入目标数据库。
    • 数据库中存在需要的表时,Oracle结果表会向该表写入或更新数据;不存在需要的表时,Oracle结果表会新建一个用于写入结果的表。
    • 逻辑表和物理表的主键不必一致,但是逻辑表的主键必须包含物理表主键。
    • 如果未定义主键,以Append方式插入数据;如果已经定义主键,以Upsert方式插入数据。

    语法示例

    CREATE TABLE oracle_sink(
      employee_id BIGINT,
      employee_name VARCHAR,
      employee_age INT,
      PRIMARY KEY(employee_id)
    ) WITH (
        type = 'oracle',
        url = '<yourUrl>',
        userName = '<yourUserName>',
        password = '<yourPassword>',
        tableName = '<yourTableName>'
    );
    

    with参数

    参数描述是否必选示例值
    type结果表类型固定值为oracle。
    url数据库连接串jdbc:oracle:thin:@192.168.171.62:1521:sit0
    userName登录数据库的用户名
    password登录数据库的密码
    tableName数据库的表名
    maxRetryTimes向结果表插入数据的最大尝试次数默认值为10。
    batchSize单次写入数据的批次大小。默认值为50。
    bufferSize去重的缓存大小。默认值为500。
    flushIntervalMs写超时时间,单位为毫秒。默认值为500。
    excludeUpdateColumns更新表的某行数据时,是否不更新该行指定的列数据。默认不填写。
    ignoreDelete是否忽略删除操作。默认值为false。

    注意

    • batchSize和bufferSize在指定主键后参数才生效。参数在达到任一阈值时都会触发数据写入。
    • flushIntervalMs 如果在写超时时间内没有向数据库写入数据,系统会将缓存的数据再写一次。
    • excludeUpdateColumns在更新表的某行数据时,默认不更新主键数据。

    类型映射

    Oracle 字段类型实时计算Flink版字段类型
    CHAR、VARCHAR、VARCHAR2VARCHAR
    FLOATDOUBLE
    NUMBERBIGINT
    DECIMALDECIMAL

    代码示例

    CREATE TABLE oracle_source (
      employee_id BIGINT,
      employee_name VARCHAR,
      employ_age INT
    ) WITH (
      type = 'random'
    );
    
    CREATE TABLE oracle_sink(
      employee_id BIGINT,
      employee_name VARCHAR,
      employ_age INT,
      primary key(employee_id)
    )with(
      type = 'oracle',
      url = 'jdbc:oracle:thin:@192.168.171.62:1521:sit0',
      userName = 'blink_test',
      password = 'blink_test',
      tableName = 'oracle_sink'
    );
    
    INSERT INTO oracle_sink
    SELECT * FROM oracle_source;
    

    创建交互式分析Hologres结果表

    注意事项

    由于Hologres是异步写入数据的,因此需要添加blink.checkpoint.fail_on_checkpoint_error=true作业参数,作业异常时才会触发Failover。

    语法示例

    create table Hologres_sink(
      name varchar,
      age BIGINT,
      birthday BIGINT
    ) with (
      type='hologres',
      dbname='<yourDbname>',
      tablename='<yourTablename>',
      username='<yourUsername>',
      password='<yourPassword>',
      endpoint='<yourEndpoint>',
      field_delimiter='|' --该参数可选。
    );
    

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为hologres。
    dbname数据库名称。
    tablename表名称
    username用户名
    password密码
    endpointHologres VPC 端点信息参考访问域名列表。
    field_delimiter导出数据时,不同行之间使用的分隔符。默认值为"\u0002"。
    mutateType流式写入语义。默认值为insertorignore。
    partitionrouter分区表写入默认值为false。
    ignoredelete是否忽略撤回消息。默认值为false。
    createPartTable当写入分区表时,是否根据分区值自动创建不存在的分区表。false(默认值):不会自动创建。true:自动创建。

    注意

    • 如果Schema不为Public时,则tableName需要填写为schema.tableName
    • field_delimiter参数不能在数据中插入分隔符,且需要与bulkload语义一同使用。
    • createPartTable参数如果分区值中存在短划线(-),暂不支持自动创建分区表。

    流式语义

    流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。

    根据Hologres Sink的配置和Hologres表的属性,流式语义分为以下两种:

    • Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。
    • At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不必进行后续重试。

    在Hologres结果表中使用流式语义,注意事项:

    • 如果Hologres物理表未设置主键,则Hologres Sink使用At-least-once语义。

    • 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,需要设置mutateType参数确定更新结果表的方式。

    mutateType取值如下:

    • insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。仅第一次数据流触发更新,后续忽略。无法实现局部更新。
    • insertorreplace:使用后续出现的数据整行替换已有数据。根据主键覆盖更新。无法实现局部更新。
    • insertorupdate:使用后续出现的数据选择性替换已有数据。根据主键覆盖更新。可以局部更新。

    默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。可以设置参数partitionRouter为true,开启自动将数据路由到对应分区表的功能。

    注意事项:

    • tablename参数需要填写为父表的表名。
    • Blink Connector不会自动创建分区表,需要提前手动创建需要导入数据的分区表,否则会导入失败。

    宽表Merge和局部更新功能

    在把多个流的数据写到一张Hologres宽表的场景中,会涉及到宽表Merge和数据的局部更新。示例如下:

    假设有两个Flink数据流,一个数据流中包含A、B和C字段,另一个数据流中包含A、D和E字段,Hologres宽表WIDE_TABLE包含A、B、C、D和E字段,其中A字段为主键。具体操作如下:

    1. 使用Flink SQL创建两张Hologres结果表,其中一张表只声明A、B和C字段,另一张表只声明A、D和E字段。这两张表都映射至宽表WIDE_TABLE。
    2. 两张结果表的属性设置:
      • mutatetype设置为insertorupdate,根据主键更新数据。
      • ignoredelete设置为true,防止回撤消息产生Delete请求。
    3. 将两个Flink数据流的数据分别INSERT至对应的结果表中。

    注意

    • 宽表必须有主键。
    • 每个数据流的数据都必须包含完整的主键字段。
    • 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary encoding功能。

    类型映射

    HologresBLINK
    INTINT
    INT[]ARRAY<INT>
    BIGINTBIGINT
    BIGINT[]ARRAY<BIGINT>
    REALFLOAT
    REAL[]ARRAY<FLOAT>
    DOUBLE PRECISIONDOUBLE
    DOUBLE PRECISION[]ARRAY<DOUBLE>
    BOOLEANBOOLEAN
    BOOLEAN[]ARRAY<BOOLEAN>
    TEXTVARCHAR
    TEXT[]ARRA<VARCHAR>
    NUMERICDECIMAL
    DATEDATE
    TIMESTAMP WITH TIMEZONETIMESTAMP

    创建云原生数据仓库AnalyticDB MySQL版2.0结果表

    什么是云原生数据仓库AnalyticDB MySQL版

    云原生数据仓库AnalyticDB MySQL版是阿里巴巴自主研发的海量数据实时高并发在线分析(Realtime OLAP)云计算服务,支持在毫秒级单位时间内,对千亿级数据进行实时地多维分析透视和业务探索。

    DDL定义

    CREATE TABLE stream_test_hotline_agent (
    id INTEGER,
    len BIGINT,
    content VARCHARPRIMARY KEY(id)
    ) WITH (
    type='ads',
    url='yourDatabaseURL',
    tableName='<yourDatabaseTableName>',
    userName='<yourDatabaseUserName>',
    password='<yourDatabasePassword>',
    batchSize='20'
    );
    

    WITH参数

    参数说明备注
    type结果表类型固定值为ads。
    urlJDBC连接地址云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url ='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
    tableName表名
    username账号
    password密码
    maxRetryTimes写入重试次数可选,默认值为10。
    bufferSize流入多少条数据后开始输出。可选,默认值为5000,表示输入的数据达到5000条就开始输出。
    batchSize一次批量写入的条数可选,默认值为1000。
    batchWriteTimeoutMs写入超时时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    connectionMaxActive单连接池最大连接数可选,默认值为30。
    ignoreDelete是否忽略delete操作默认值为false。

    注意

    • 如果错误码是20015,则表示batchSize设置过大。云原生数据仓库AnalyticDB MySQL版batchSize不能超过1 MB。例如,batchSize设置为1000,则平均每条记录大小不能超过1 KB。

    类型映射

    云原生数据仓库AnalyticDB MySQL版字段类型实时计算Flink版字段类型
    BOOLEANBOOLEAN
    TINYINT、SMALLINT、INTINT
    BIGINTBIGINT
    DOUBELDOUBLE
    VARCHARVARCHAR
    DATEDATE
    TIMETIME
    TIMESTAMPTIMESTAMP

    创建数据总线DataHub结果表

    DDL定义

    create table datahub_output(
      name VARCHAR,
      age BIGINT,
      birthday BIGINT
    )with(
      type='datahub',
      endPoint='<yourEndpoint>,
      project='<yourProjectName>',
      topic='<yourTopicName>',
      accessId='<yourAccessId>',
      accessKey='<yourAccessKey>',
      batchSize='<yourBatchSize>',
      batchWriteTimeoutMs='1000'
    );
    

    WITH参数

    参数参数说明是否必填备注
    type源表类型固定值为datahub
    endPointendPoint地址DataHub的endPoint地址,请参考DataHub域名列表
    projectDataHub项目名称
    topicDataHub中Topic名称
    accessIdAccessKey ID
    accessKeyAccessKey Secret
    maxRetryTimes读取最大重试次数Blink 2.2.7以下版本:默认3;Blink 2.2.7及以上版本:默认20
    batchSize一次批量写入的条数Blink 3.3以下版本:默认300;Blink 3.3及以上版本:默认100
    batchWriteTimeoutMs缓存数据的超时时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    maxBlockMessages每次写入的最大Block数默认值为100。
    reserveMilliSecondTIMESTAMP类型是否保留毫秒默认值为false。
    partitionBy写入结果表前会根据该值进行Hash分类,数据会流向对应的结果表。默认值为空,随机进行数据分配。 partitionBy决定数据流到Blink的哪个Subtask。
    hashFields指定了列名之后,相同列的值会写入到同一个Shard。默认值为Null,即随机写入。可以指明多个列值,用逗号(,)分隔。例如,hashFields='a,b'hashFields决定数据流写到DataHub的哪个Shard。

    类型映射

    DataHub字段类型实时计算字段类型
    BIGINTBIGINT
    TIMESTAMPBIGINT
    STRINGVARCHAR
    DOUBLEDOUBLE
    BOOLEANBOOLEAN
    DECIMALDECIMAL

    注意

    DataHub的TIMESTAMP精确到微秒,在Unix时间戳中为16位,但实时计算定义的TIMESTAMP精确到毫秒,在Unix时间戳中为13位,所以建议您使用BIGINT进行映射。如果您需要使用TIMESTAMP,建议使用计算列进行转换。

    代码示例

    create table datahub_input(
      name VARCHAR
    ) with (
      type='datahub',
      endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
      project='test1',
      topic='topic1',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      startTime='2018-06-01 00:00:00'
    );
    
    create table datahub_output(
      name varchar
    )with(
      type='datahub',
      endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
      project='test2',
      topic='topic2',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      batchSize='1000',
      batchWriteTimeoutMs='500'
    );
    
    INSERT INTO datahub_output
    SELECT 
      LOWER(name)
    from datahub_input;
    

    创建日志服务SLS结果表

    注意事项

    日志服务SLS结果表仅支持VARCHAR类型的字段。

    什么是日志服务

    日志服务SLS是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。日志服务本身是流数据存储,实时计算Flink版能将其作为流式数据的输入。

    DDL定义

    create table sls_stream(
     `name` VARCHAR,
     age BIGINT,
     birthday BIGINT
    )with(
     type='sls',
     endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
     accessId='<yourAccessId>',
     accessKey='<yourAccessKey>',
     project='<yourProjectName>',
     logstore='<yourLogstoreName>'
    );
    

    WITH参数

    参数注释说明是否必填备注
    endPointEndPoint地址服务入口
    project项目名
    logstore表名
    accessIdAccessKey ID
    accessKeyAccessKey Secret
    topic属性字段默认值为空,可以将选定的字段作为属性字段topic填充。
    timestampColumn属性字段默认值为空,可以将选定的字段作为属性字段timestamp填充(类型必须为INT)。如果未指定,则默认填充当前时间。
    source属性字段。日志的来源地,例如产生该日志机器的IP地址。默认值为空,可以将选定的字段作为属性字段source填充。
    partitionColumn分区列如果modepartition,则该参数必填。
    flushIntervalMs触发数据写入的周期默认值为2000,单位为毫秒。
    reserveMilliSecondTIMESTAMP类型是否保留毫秒值。默认值为false,不保留。

    类型映射

    日志服务字段类型实时计算Flink版字段类型
    STRINGVARCHAR

    代码示例

    CREATE TABLE random_input (
      a VARCHAR, 
      b VARCHAR) with (
        type = 'random'
    );
    
    create table sls_output(
     a varchar,
     b varchar
    )with(
     type='sls',
     endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
     accessId='<yourAccessId>',
     accessKey='<yourAccessKey>',
     project='ali-cloud-streamtest',
     logStore='stream-test2'
    );
    
    INSERT INTO sls_output
    SELECT a, b
    FROM random_input;
    

    创建消息队列MQ结果表

    CSV格式

    CREATE TABLE stream_test_hotline_agent (
    id INTEGER,
    len BIGINT,
    content VARCHAR
    ) WITH (
    type='mq',
    endpoint='<yourEndpoint>',
    accessID='<yourAccessId>',
    accessKey='<yourAccessSecret>',
    topic='<yourTopicName>',
    producerGroup='<yourGroupName>',
    tag='<yourTagName>',
    encoding='utf-8',
    fieldDelimiter=',',
    retryTimes='5',
    sleepTimeMs='500'
    );
    

    二进制格式

    CREATE TABLE source_table (
      commodity VARCHAR
    )WITH(
      type='random'
    );
    
    CREATE TABLE result_table (
      mess VARBINARY
    ) WITH (
      type = 'mq',
      endpoint='<yourEndpoint>',
      accessID='<yourAccessId>',
      accessKey='<yourAccessSecret>',
      topic='<yourTopicName>',
      producerGroup='<yourGroupName>'
    );
    
    INSERT INTO result_table
    SELECT 
    CAST(SUBSTRING(commodity,0,5) AS VARBINARY) AS mess   
    FROM source_table;
    

    WITH参数

    参数说明备注
    type结果表类型固定值为mq。
    topicMessage Queue队列名称
    endpoint地址阿里云消息队列提供内网服务MQ(非公网region)和公网服务MQ(公网region)两种类型。
    accessIDAccessKey ID
    accessKeyAccessKey Secret
    producerGroup写入的群组
    tag写入的标签可选,默认值为空。
    fieldDelimiter字段分割符可选,默认值为\u0001 。分隔符的使用情况如下所示:1.只读模式:以 \u0001作为分隔符,\u0001在只读模式不可见。2.编辑模式:以^A作为分隔符。
    encoding编码类型可选,默认值为utf-8
    retryTimes写入的重试次数可选,默认值为10。
    sleepTimeMs重试间隔时间可选,默认值为1000(毫秒)。
    instanceIDMQ实例ID如果MQ实例无独立命名空间,则不可以使用instanceID参数。如果MQ实例有独立命名空间,则instanceID参数必选。

    创建表格存储Tablestore结果表

    什么是表格存储Tablestore

    表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。其本质上就类似于市面上开源的HBase。

    DDL定义

    CREATE TABLE stream_test_hotline_agent (
     name VARCHAR,
     age BIGINT,
     birthday BIGINT,
     PRIMARY KEY (name,age)
    ) WITH (
     type='ots',
     instanceName='<yourInstanceName>',
     tableName='<yourTableName>',
     accessId='<yourAccessId>',
     accessKey='<yourAccessSecret>',
     endPoint='<yourEndpoint>',
     valueColumns='birthday'
    ); 
    

    注意

    • valueColumns值不能是声明的主键,可以是主键之外的任意字段。
    • Tablestore结果表声明中,除主键列外,至少包含一个属性列。

    WITH参数

    参数说明备注
    type结果表类型固定值为ots。
    instanceName实例名
    tableName表名
    endPoint实例访问地址参见服务地址
    accessIdAccessKey ID
    accessKeyAccessKey Secret
    valueColumns指定插入的字段列名插入多个字段以英文逗号(,)分割。例如'ID,NAME'
    bufferSize流入多少条数据后开始输出可选,默认值为5000,表示输入的数据达到5000条就开始输出。
    batchWriteTimeoutMs写入超时的时间可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    batchSize一次批量写入的条数可选,默认值为100。
    retryIntervalMs重试间隔时间可选,单位毫秒,默认值为1000。
    maxRetryTimes最大重试次数可选,默认值为100。
    ignoreDelete是否忽略DELETE操作默认值为False。

    注意

    • bufferSize根据Tablestore主键对结果数据进行去重后,再在bufferSize的基础上进行batchSize。
    • Tablestore结果表必须定义有Primary Key,以Update方式写入结果数据到Tablestore表。

    类型映射

    Tablestore字段类型实时计算Flink版字段类型
    INTEGERBIGINT
    STRINGVARCHAR
    BOOLEANBOOLEAN
    DOUBLEDOUBLE

    创建云数据库RDS MySQL版结果表

    语法示例

    CREATE TABLE rds_output(
       id INT,
       len INT,
       content VARCHAR,
       PRIMARY KEY (id,len)
    ) WITH (
       type='rds',
       url='<yourDatabaseURL>',
       tableName='<yourDatabaseTable>',
       userName='<yourDatabaseUserName>',
       password='<yourDatabasePassword>'
    );        
    

    注意

    • 实时计算写入RDS MySQL数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库,然后执行。如果使用批量写,需要在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
    • 如果实时计算写入数据支持自增主键,则在DDL中不声明该自增字段即可。例如,ID是自增字段,实时计算Flink版DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
    • 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里**PRIMARY KEY()**中声明,否则拆分的表无法写入。
    • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为rds
    urlJDBC(Java DataBase Connectivity)连接地址URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中databaseName为对应的数据库名称。
    tableName表名
    userName用户名
    password密码
    maxRetryTimes最大重试次数默认值为10。
    batchSize一次批量写入的条数默认值为4096。
    bufferSize流入多少条数据后开始去重默认值为10000。
    flushIntervalMs清空缓存的时间间隔默认值为2000,单位为毫秒。表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    excludeUpdateColumns忽略指定字段的更新默认值为空(默认忽略PRIMARY KEY字段)。表示更新主键值相同的数据时,忽略指定字段的更新。
    ignoreDelete是否忽略delete操作默认值为false,表示支持delete功能。
    partitionBy分区默认为空。表示写入Sink节点前,会根据该值进行Hash分区,数据会流向相应的Hash节点。

    类型映射

    RDS字段类型实时计算Flink版字段类型
    BOOLEANBOOLEAN
    TINYINTTINYINT
    SMALLINTSMALLINT
    INTINT
    BIGINTBIGINT
    FLOATFLOAT
    DECIMALDECIMAL
    DOUBLEDOUBLE
    DATEDATE
    TIMETIME
    TIMESTAMPTIMESTAMP
    VARCHARVARCHAR
    VARBINARYVARBINARY

    JDBC连接参数

    参数名称说明默认值最低版本要求
    useUnicode是否使用Unicode字符集,如果参数characterEncoding设置为GB2312或GBK,本参数值必须设置为true。false1.1g
    characterEncoding当useUnicode设置为true时,指定字符编码。例如可设置为GB2312或GBK。false1.1g
    autoReconnect当数据库连接异常中断时,是否自动重新连接。false1.1
    autoReconnectForPools是否使用针对数据库连接池的重连策略。false3.1.3
    failOverReadOnly自动重连成功后,连接是否设置为只读。true3.0.12
    maxReconnectsautoReconnect设置为true时,重试连接的次数。31.1
    initialTimeoutautoReconnect设置为true时,两次重连之间的时间间隔,单位为秒。21.1
    connectTimeout和数据库服务器建立socket连接时的连接超时时长,单位为毫秒。0表示永不超时,适用于JDK 1.4及以上版本。03.0.1
    socketTimeoutsocket操作(读写)超时,单位:毫秒。0表示永不超时。03.0.1

    代码示例

    CREATE TABLE source (
       id INT,
       len INT,
       content VARCHAR
    ) with (
       type = 'random'
    );
    
    CREATE TABLE rds_output(
       id INT,
       len INT,
       content VARCHAR,
       PRIMARY KEY (id,len)
    ) WITH (
       type='rds',
       url='<yourDatabaseURL>',
       tableName='<yourDatabaseTable>',
       userName='<yourDatabaseUserName>',
       password='<yourDatabasePassword>'
    );
    
    INSERT INTO rds_output
    SELECT id, len, content FROM source;
    

    创建MaxCompute结果表

    MaxCompute中的Clustered Table表不支持作为MaxCompute结果表。

    实现原理

    MaxCompute Sink可以分为以下两个阶段:

    1. **写入数据。**调用MaxCompute SDK中的接口将数据写入缓冲区,在缓冲区大小超过64 MB或者每隔指定的时间间隔时,上传数据到MaxCompute的临时文件中。
    2. 提交会话。在任务进行Checkpoint时, MaxCompute Sink会调用Tunnel的Commit方法,提交会话,移动临时文件到MaxCompute表的数据目录,并修改元数据。

    Commit方法不能提供原子性。因此,MaxCompute Sink提供的是At least Once方式,而不是Exactly Once方式。

    语法示例

    DDL中定义的字段需要与MaxCompute物理表中的字段名称、顺序以及类型保持一致,否则可能导致MaxCompute物理表中查询的数据为/n

    create table odps_output(
      id INT,
      user_name VARCHAR,
      content VARCHAR
    ) with (
      type = 'odps',
      endPoint = '<YourEndPoint>',
      project = '<YourProjectName>',
      tableName = '<YourtableName>',
      accessId = '<yourAccessKeyId>',
      accessKey = '<yourAccessKeySecret>',
      `partition` = 'ds=2018****'
    );
    

    WITH参数

    参数说明是否必填备注
    type结果表类型。固定值为odps
    endPointMaxCompute服务地址。请参见Endpoint
    tunnelEndpointMaxCompute Tunnel服务的连接地址。请参见Endpoint
    projectMaxCompute项目名称。无。
    tableName表名。无。
    accessIdAccessKey ID。无。
    accessKeyAccessKey Secret。无。
    partition分区名。参见注意事项。
    flushIntervalMsOdps tunnel writer缓冲区Flush间隔,单位毫秒。MaxCompute Sink写入记录时,先放入数据到MaxCompute的缓冲区中,等缓冲区溢出或者每隔一段时间(flushIntervalMs)时,再把缓冲区里的数据写到目标 MaxCompute表。默认值是30000毫秒,即30秒。
    partitionBy写入Sink节点前,系统会根据该值做Hash Shuffle,数据就会流向对应的Sink节点。系统按照多个列进行Hash Shuffle,各个列名之间使用逗号(,)分割。默认为空。
    isOverwrite写入Sink节点前,是否将结果表清空。默认参数值为false。在声明MaxCompute的流式作业结果表时,isOverwrite参数必须为false,否则在编译时会抛出异常。
    dynamicPartitionLimit分区数目最大值。默认值是100,内存中会把出现过的分区和Tunnel/writer的映射关系维护到一个Map里,如果这个Map的大小超过了dynamicPartitionLimit设定值,则会出现Too many dynamic partitions: 100, which exceeds the size limit: 100报错。

    注意

    如果存在分区表,则必填partition参数:

    • 固定分区:例如``partition= 'ds=20180905'表示将数据写入分区ds= 20180905

    • 动态分区:如果不明文显示分区的值,则会根据写入数据中的分区列具体的值,写入到不同的分区中。例如``partition='ds'表示根据ds字段的值写入分区。

      如果要创建多级动态分区,With参数中Partition的字段顺序和结果表的DDL中的分区字段顺序,必须与物理表一致,各个分区字段之间使用逗号(,)分割。

      动态分区列需要显式写在建表语句中。

    Stream模式的MaxCompute结果表具备At Least Once数据保障机制,在作业运行失败后,可能会出现数据重复。

    MaxCompute字段类型实时计算Flink版字段类型
    TINYINTTINYINT
    SMALLINTSMALLINT
    INTINT
    BIGINTBIGINT
    FLOATFLOAT
    DOUBLEDOUBLE
    BOOLEANBOOLEAN
    DATETIMETIMESTAMP
    TIMESTAMPTIMESTAMP
    VARCHARVARCHAR
    STRINGVARCHAR
    DECIMALDECIMAL

    代码示例

    • 写入固定分区

      CREATE TABLE source (
         id INT,
         len INT,
         content VARCHAR
      ) with (
         type = 'random'
      );
      
      create table odps_sink (
         id INT,
         len INT,
         content VARCHAR
      ) with (
         type = 'odps',
         endPoint = '<yourEndpoint>', 
         project = '<yourProjectName>',
         tableName = '<yourTableName>',
         accessId = '<yourAccessId>',
         accessKey = '<yourAccessPassword>',
         `partition` = 'ds=20180418'
      );
      
      INSERT INTO odps_sink 
      SELECT 
         id, len, content 
      FROM source;
      
    • 写入动态分区

      CREATE TABLE source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP 
      ) with (
        type = 'random'
      );
      
      create table odps_sink (
        id INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --动态分区列需要显式写在建表语句中。
      ) with (
        type = 'odps',
        endPoint = '<yourEndpoint>', 
        project = '<yourProjectName>',
        tableName = '<yourTableName>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessPassword>',
        `partition`='ds' --不写分区的值,表示根据ds字段的值写入不同分区。
      );
      
      INSERT INTO odps_sink 
      SELECT 
         id, 
         len, 
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM source;
      

    创建云数据库HBase版结果表

    实时计算HBase结果表不支持自建的开源HBase。

    DDL定义

    HBase标准版示例代码如下。

    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        zkQuorum = '2',  
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500'
    );
    

    HBase增强版示例代码如下。

    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        endPoint = '<host:port>', ----HBase增强版的Java API访问地址。
        userName  = 'root', --用户名。
        password = 'root', --密码。
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500'
    );
    

    Blink 3.5.0及以上HBase增强版示例代码如下。

    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        zkQuorum = '<host:port>', ----HBase增强版的Java API访问地址。
        userName  = 'root', --用户名。
        password = 'root', --密码。
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500'
    );
    

    Blink 3.5.0及以上HBase写入主备切换示例代码如下。

    create table liuxd_user_behavior_test_front (
        row_key varchar,
        from_topic varchar,
        origin_data varchar,
        record_create_time varchar,
        primary key (row_key)
    ) with (
        type = 'cloudhbase',
        zkQuorum = '<host:port>', ----HBase高可用访问地址。
        haClusterID = 'ha-xxx', ----HBase高可用实例ID。
        userName  = 'root', --用户名。
        password = 'root', --密码。
        columnFamily = '<yourColumnFamily>',
        tableName = '<yourTableName>',
        batchSize = '500' 
    );
    

    注意

    • PRIMARY KEY支持定义多字段。多字段以rowkeyDelimiter(默认为:)作为分隔符进行连接。
    • HBase执行撤回删除操作时,如果COLUMN定义了多版本,将清空所有版本的COLUMN值。
    • HBase标准版和HBase增强版DDL的区别为连接参数不同:
      • HBase标准版使用连接参数zkQuorum
      • HBase增强版使用连接参数endPoint

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为cloudhbase。
    zkQuorumHBase集群配置的zk地址可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。
    zkNodeParent集群配置在zk上的路径可以在hbase-site.xml文件中查看hbase.zookeeper.quorum相关配置。仅在HBase标准版中生效。
    endPointHBase地域名称可在购买的HBase实例控制台中获取。仅在HBase增强版中生效。
    userName用户名仅在HBase增强版中生效。
    password密码仅在HBase增强版中生效。
    tableNameHBase表名
    columnFamily列族名仅支持插入同一列族。
    maxRetryTimes最大尝试次数默认值为10。
    bufferSize流入多少条数据后进行去重默认值为5000。
    batchSize一次批量写入的条数默认值为100。建议batchSize参数值为200~300。过大的batchSize值可能导致任务OOM(内存不足)报错。
    flushIntervalMs周期性清理buffer的间隔,可以减少写入HBase的延迟。默认值为2000,单位为毫秒。
    writePkValue是否写入主键值默认值为false。
    stringWriteMod是否都按照STRING插入默认值为false。
    rowkeyDelimiterrowKey的分隔符默认值为冒号(:)。
    isDynamicTable是否为动态表默认值为false。
    haClustserIDHBase高可用实例ID只有访问同城主备实例时才需要配置。

    动态表

    实时计算部分结果数据需要按某列的值,作为动态列输入HBase。HBase中,以每小时的成交额作为动态列的数据,示例如下。

    rowkeycf:0cf:1cf:2
    20170707100cf:1300

    isDynamicTable参数值为true时,表明该表为支持动态列的HBase表。

    动态表仅支持3列输出,例如,ROW_KEY、COLUMN和VALUE。此时第2列(本示例中的COLUMN)为动态列,动态表中的其它参数与HBase的WITH参数一致。

    使用动态表时,所有数据类型需要先转换为STRING类型,再进行输入。

    CREATE TABLE stream_test_hotline_agent (
      name varchar,
      age varchar,
      birthday varchar,
      primary key (name)
    ) WITH (
      type = 'cloudhbase',
      ...
      columnFamily = 'cf',
      isDynamicTable ='true'
    );
    

    以上声明将把birthday插入到以name为ROW_KEY的cf:age列中。例如,(wang,18,2016-12-12)会插入ROW_KEY为wang的行,cf:18列。

    DDL中必选按照从上到下的顺序,声明ROW_KEY(本示例中的name)、COLUMN(本示例中的age)和VALUE(本示例中的birthday),且声明ROW_KEY为PRIMARY KEY。

    代码示例

    create table source (
      id   TINYINT,
      name BIGINT
    ) with (
      type = 'random'
    );
    
    create table sink (
      id    TINYINT,
      name  BIGINT,
      primary key (id)
    ) with (
      type = 'cloudhbase',
      zkQuorum = '<yourZkQuorum>',  
      columnFamily = '<yourColumnFamily>',
      tableName = '<yourTableName>'
    );
    
    INSERT INTO sink
    SELECT id, name FROM source;
    

    创建Elasticsearch结果表

    DDL定义

     CREATE TABLE es_stream_sink(
      field1 LONG,
      field2 VARBINARY,
      field3 VARCHAR,
      PRIMARY KEY(field1)
    )WITH(
      type ='elasticsearch',
      endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
      accessId = '<yourUsername>',
      accessKey = '<yourPassword>',
      index = '<yourIndex>',
      typeName = '<yourTypeName>'
    );
    
    • ES支持根据PRIMARY KEY进行UPDATE,且PRIMARY KEY只能为1个字段。
    • 在指定PRIMARY KEY后,Document的ID为PRIMARY KEY字段的值。
    • 在未指定PRIMARY KEY时,Document的ID为随机。
    • 在full更新模式下,新增的doc会完全覆盖已存在的doc。
    • 在inc更新模式下,系统会依据输入的字段值更新对应的字段。
    • 所有的更新默认为UPSERT语义,即INSERT或UPDATE。

    WITH参数(通用配置)

    参数说明是否必选默认值
    typeconnector类型。elasticsearch
    endPointServer地址,例入:http://127.0.0.1:9211。
    accessId创建ES时的登录名。如果通过Kibana插件操作ES,请填写Kibana登录ID。
    accessKey创建ES时的登录密码。如果通过Kibana插件操作ES,请填写Kibana登录密码。
    index索引名称。
    typeName文档类型。_doc
    bufferSize流入多少条数据后开始去重。1000
    maxRetryTimes异常重试次数。30
    timeout读超时,单位为毫秒。600000
    discovery是否开启节点发现。如果开启,客户端每5分钟刷新一次Server List。false
    compression是否使用GZIP压缩Request Bodies。true
    multiThread是否开启JestClient多线程。true
    ignoreWriteError是否忽略写入异常。false
    settings创建Index的Settings配置。
    updateMode指定主键(PRIMARY KEY)后的更新模式:full:全量覆盖。inc:增量更新。full

    WITH参数(动态索引相关)

    参数说明是否必选备注
    dynamicIndex是否开启动态索引。参数取值如下:true:开启。false(默认值):不开启。
    indexField抽取索引的字段名。dynamicIndex为true时必填。只支持TIMESTAMP(以秒为单位)、DATE和LONG3种数据类型。
    indexInterval切换索引的周期。dynamicIndex为true时必填。参数值如下:d(默认值):天。m:月。w:周
    mapping启用动态索引时,设置文档各字段的类型与格式。例如,设置名为sendTime字段的格式:{ "properties": { "sendTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } }默认值为空。
    • 当开启动态索引后,基本配置中的index名称会作为后续创建索引的统一Alias,Alias和索引为一对多关系。
    • 不同的indexInterval对应的真实索引名称:
      • d -> Alias "yyyyMMdd"
      • m -> Alias "yyyyMM"
      • w -> Alias "yyyyMMW"
    • 对于单个的真实索引可使用Index API进行修改,但对于Alias只支持get功能。

    代码示例

    CREATE TABLE es_stream_sink(
      field1 LONG,
      field2 VARBINARY,
      field3 TIMESTAMP,
      PRIMARY KEY(field1)
    )WITH(
      type ='elasticsearch',
      endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
      accessId = '<yourAccessId>',
      accessKey = '<yourAccessSecret>',
      index = '<yourIndex>',
      typeName = '<yourTypeName>',
      dynamicIndex = 'true',
      indexField = 'field3',
      indexInterval = 'd'
    );
    

    创建时序数据库结果表

    实时计算引用时序数据库(TSDB)结果表需要配置数据存储白名单。

    什么是时间序列数据库(TSDB)

    阿里云时序数据库(Time Series Database,简称TSDB)是一种集时序数据高效读写、压缩存储、实时计算能力为一体的数据库服务,可以广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。

    DDL定义

    CREATE TABLE stream_test_hitsdb (
        metric      VARCHAR,   
        `timestamp` INTEGER,
        `value`     DOUBLE,
        tagk1       VARCHAR,     
        tagk2       VARCHAR,
        tagk3       VARCHAR
    ) WITH (
        type='hitsdb',
        host='<yourHostName>',
        virtualDomainSwitch = 'false',
        httpConnectionPool = '20',
        batchPutSize = '1000'
    );
    

    建表默认格式:

    • 第0列:metric(VARCHAR)。
    • 第1列:timestamp(INTEGER),单位为秒。
    • 第2列:value(DOUBLE)。
    • 第3~N列:TagKey,即时间序列数据库中的fieldName。
    • tag可以为多列。
    • 必须声明metrictimestampvalue,且字段名称、字段顺序和字段数据类型必须和TSDB保持完全一致。

    WITH参数

    参数说明备注
    type结果表类型固定值为hitsdb
    hostIP或VIP域名填写注册实例的Host。
    port端口默认值为8242。
    virtualDomainSwitch是否使用VIPServer默认值为false,如果需要使用VIPServer,则virtualDomainSwitch设置为true。
    httpConnectionPoolHTTP连接池默认值为10。
    httpCompress使用GZIP压缩默认值为false,即不压缩。
    httpConnectTimeoutHTTP连接超时时间默认值为0。
    ioThreadCountIO线程数默认值为1。
    batchPutBufferSize缓冲区大小默认值为10000。
    batchPutRetryCount写入重试次数默认值为3。
    batchPutSize每次提交数据量默认每次提交500个数据点。
    batchPutTimeLimit缓冲区等待时间默认值为200,单位为毫秒。
    batchPutConsumerThreadCount序列化线程数默认值为1。

    Blink写入TSDB模型

    Blink 3.2 及以上版本支持6种Blink写入TSDB的模型,分别是:

    • 支持单值无tag数据点的写入,其schema格式如下所示,包括3个字段,这3个字段名称不可使用其他名称代替。

      metric,timestamp,value
      
    • 支持单值带tag数据点的写入,其schema格式如下所示,除metric、timestamp和value关键词名字必须保持一致外,tag的名称可以任意指定。

      metric,timestamp,value,tagKey1,....,tagKeyN
      
    • 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示,包括4个字段,这4个字段名称不可使用其他名称代替。

      metric,timestamp,value,tags
      

      其中,tags的内容为形如如下格式的JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。

      {"tagKey1":"tagValue1","tagKey2":"tagValue2",……,"tagKeyN":"tagValueN"}
      
    • 支持多值无tag数据点的写入,其schema格式如下所示。

      metric,timestamp,field_name1,field_name2,……,field_nameN
      

      其中metric和timestamp字段名称不可使用其他名称代替。对于多值的fields,由于需要区分tag和field,同时兼容之前的单值写入,这里约定需要给每个field加上固定前缀field,自动识别field字段。例如,对于一个field名称field_name1,在写入TSDB时,会自动将前缀field_去掉,只保留name1,即对于上面的schema,实际写入TSDB的格式如下,name1和name2是多值field的名称。

      metric,timestamp,name1,name2,……,nameN
      
    • 支持多值带tag数据点的写入,其schema格式如下所示,除metric和timestamp关键词名字必须保持一致外,tag的名称可以任意指定。

      metric,timestamp,tagKey1,....,tagKeyN,field_name1,field_name2,……,field_nameN
      
    • 支持单值带不确定tag个数的数据点的写入,其schema格式如下所示。

      metric,timestamp,tags,field_name1,field_name2,……,field_nameN
      

      其中,tags的内容类似如下JSON字符串,便于绕过Blink table schema需要固定tag个数的限制。

      {"tagKey1":"tagValue1","tagKey2":"tagValue2",……,"tagKeyN":"tagValueN"}
      

    创建消息队列Kafka结果表

    Kafka结果表支持写入自建Kafka集群。

    DDL定义

    create table sink_kafka (
      messageKey VARBINARY,
      `message` VARBINARY,
      PRIMARY KEY (messageKey)
    ) with (
      type = 'kafka010',
      topic = '<yourTopicName>',
      bootstrap.servers = '<yourServerAddress>'
    );
    
    • 创建Kafka结果表时,必须明文指定PRIMARY KEY (messageKey)

    WITH参数

    • 通用配置

      参数说明是否必选备注
      typeKafka对应版本必须是Kafka08、Kafka09、Kafka010、Kafka011中的一种。
      topicTopic名称
    • 必选配置

      • Kafka08必选配置

        参数说明
        zookeeper.connectzk连接ID
      • Kafka09/Kafka010/Kafka011必选配置

        参数说明
        bootstrap.serversKafka集群地址
    • 可选配置参数

      • consumer.id
      • socket.timeout.ms
      • fetch.message.max.bytes
      • num.consumer.fetchers
      • auto.commit.enable
      • auto.commit.interval.ms
      • queued.max.message.chunks
      • rebalance.max.retries
      • fetch.min.bytes
      • fetch.wait.max.ms
      • rebalance.backoff.ms
      • refresh.leader.backoff.ms
      • auto.offset.reset
      • consumer.timeout.ms
      • exclude.internal.topics
      • partition.assignment.strategy
      • client.id
      • zookeeper.session.timeout.ms
      • zookeeper.connection.timeout.ms
      • zookeeper.sync.time.ms
      • offsets.storage
      • offsets.channel.backoff.ms
      • offsets.channel.socket.timeout.ms
      • offsets.commit.max.retries
      • dual.commit.enabled
      • partition.assignment.strategy
      • socket.receive.buffer.bytes
      • fetch.min.bytes

    Kafka官方文档:

    Kafka版本对应关系

    typeKafka版本
    Kafka080.8.22
    Kafka090.9.0.1
    Kafka0100.10.2.1
    Kafka0110.11.0.2及以上

    示例

    create table datahub_input (
      id VARCHAR,
      nm VARCHAR
    ) with (
      type = 'datahub'
    );
    
    create table sink_kafka (
      messageKey VARBINARY,
      `message` VARBINARY,
      PRIMARY KEY (messageKey)
    ) with (
      type = 'kafka010',
      topic = '<yourTopicName>',
      bootstrap.servers = '<yourServerAddress>'
    );
    
    INSERT INTO
      sink_kafka
    SELECT
      cast(id as VARBINARY) as messageKey,
      cast(nm as VARBINARY) as `message`
    FROM
      datahub_input;
    

    创建云数据库HybridDB for MySQL结果表

    什么是云数据库HybridDB for MySQL

    云数据库HybridDB for MySQL(原名PetaData)是同时支持海量数据在线事务(OLTP)和在线分析(OLAP)的HTAP(Hybrid Transaction/Analytical Processing)关系型数据库。HybridDB for MySQL采用一份数据存储来进行OLTP和OLAP处理,避免把一份数据复制多次进行数据分析,极大地降低了数据存储的成本。

    DDL定义

    create table petadata_output(
     id INT,
     len INT,
     content VARCHAR,
     primary key(id,len)
    ) with (
     type='petaData',
     url='yourDatabaseURL',
     tableName='yourTableName',
     userName='yourDatabaseUserName',
     password='yourDatabasePassword'
    );
    
    • 实时计算写入PetaData数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。
    • bufferSize默认值是1000,如果到达bufferSize阈值,则会触发写出。因此配置batchSize的同时还需要配置bufferSize。bufferSize和batchSize大小相同即可。
    • batchSize数值不建议设置过大,建议设置batchSize='4096'

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为petaData。
    url地址切换网络类型
    tableName表名
    userName用户名
    password密码
    maxRetryTimes最大重试次数默认值为3。
    batchSize一次批量写入的条数默认值为1000,表示每次写多少条。
    bufferSize流入多少条数据后开始去重
    flushIntervalMs清空缓存的时间间隔单位为毫秒。默认值为3000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    ignoreDelete是否忽略Delete操作默认值为false。

    创建云数据库RDS SQL Server版结果表

    语法示例

    create table ss_output(
      id INT,
      len INT,
      content VARCHAR,
      primary key(id,len)
    ) with (
      type='jdbc',
      url='jdbc:sqlserver://ip:port;database=****',
      tableName='<yourDatabaseTableName>',
      userName='<yourDatabaseUserName>',
      password='<yourDatabasePassword>'
    );
    
    • 实时计算Flink版写入RDS和DRDS数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库。如果使用批量写,需要在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
    • RDS SQL Server数据库支持自增主键。如果需要让实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。
    • 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里**primary key()**中声明,否则拆分的表无法写入。
    • DDL声明的字段必选至少存在一个非主键的字段,否则产生报错。

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为jdbc。
    urlJDBC(Java DataBase Connectivity)连接地址
    tableName表名
    username账号
    password密码
    maxRetryTimes写入重试次数默认值为10。
    bufferSize流入多少条数据后开始去重默认值为10000,表示输入的数据达到10000条开始去重。
    flushIntervalMs清空缓存的时间间隔单位为毫秒,默认值为2000,表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    excludeUpdateColumns忽略指定字段的更新可选,默认值为空(默认忽略Primary Key字段),表示更新主键值相同的数据时,忽略指定字段的更新。
    ignoreDelete是否忽略Delete操作默认值为False。

    类型映射

    RDS字段类型实时计算Flink版字段类型
    BOOLEANBOOLEAN
    TINYINTTINYINT
    SMALLINTSMALLINT
    INTINT
    BIGINTBIGINT
    FLOATFLOAT
    DECIMALDECIMAL
    DOUBLEDOUBLE
    DATEDATE
    TIMETIME
    TIMESTAMPTIMESTAMP
    VARCHARVARCHAR
    VARBINARYVARBINARY

    JDBC连接参数

    参数说明默认值最低版本要求
    useUnicode是否使用Unicode字符集。如果参数CharacterEncoding设置为GB2312或GBK,useUnicode值必须设置为True。False1.1g
    characterEncoding当useUnicode设置为True时,指定字符编码。例如可设置为GB2312或GBK。False1.1g
    autoReconnect当数据库连接异常中断时,是否自动重新连接。False1.1
    autoReconnectForPools是否使用针对数据库连接池的重连策略。False3.1.3
    failOverReadOnly自动重连成功后,连接是否设置为只读。True3.0.12
    maxReconnectsautoReconnect设置为True时,重试连接的次数。31.1
    initialTimeoutautoReconnect设置为True时,两次重连之间的时间间隔,单位为秒。21.1
    connectTimeout和数据库服务器建立socket连接时的超时,单位为毫秒。0,表示永不超时,适用于JDK 1.4及以上版本。3.0.1
    socketTimeoutsocket操作(读写)超时,单位为毫秒。0,表示永不超时。3.0.1

    代码示例

    CREATE TABLE source (
      id INT,
      len INT,
      content VARCHAR
    ) with (
      type = 'random'
    );
    
    CREATE TABLE rds_output(
      id INT,
      len INT,
      content VARCHAR,
      PRIMARY KEY (id,len)
    ) WITH (
      type='jdbc',
      url='<yourDatabaseURL>',
      tableName='<yourDatabaseTable>',
      userName='<yourDatabaseUserName>',
      password='<yourDatabasePassword>'
    );
    
    INSERT INTO rds_output
    SELECT id, len, content FROM source;
    

    创建云数据库Redis版结果表

    什么是云数据库Redis版

    阿里云数据库Redis版是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。

    语法示例

    云数据库Redis版结果表支持5种Redis数据结构,其DDL定义如下:

    STRING类型

    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为set key value

    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'string',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认值为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除已插入的数据,默认值为false。
    );
    

    LIST类型

    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为lpush key value

    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'list',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认值为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除已插入的数据,默认值为false。
    );
    

    SET类型

    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为sadd key value

    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'set',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认值为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除已插入的数据,默认值为false。
    );
    

    HASHMAP类型

    DDL为三列:第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_value

    create table resik_output (
      a varchar,
      b varchar, 
      c varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'hashmap',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认值为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除已插入的数据,默认值为false。
    );
    

    SORTEDSET类型

    DDL为三列:第1列为key,第2列为score,第3列为value。Redis插入数据的命令为add key score value

    create table resik_output (
      a varchar,
      b double,  --必须为DOUBLE类型。
      c varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'sortedset',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认值为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除已插入的数据,默认值为false。
    );
    

    WITH参数

    参数参数说明是否必填取值
    type结果表类型固定值为redis
    mode对应Redis的数据结构取值:string、list、set、hashmap、sortedset
    hostRedis Server对应地址取值示例:127.0.0.1
    portRedis Server对应端口默认值为6379。
    dbNumRedis Server对应数据库序号默认值为0。
    ignoreDelete是否忽略Retraction消息默认值为false,可取值为true或false。如果设置为false,收到Retraction时,同时删除数据对应的key及已插入的数据。
    passwordRedis Server对应的密码默认值为空,不进行权限验证。
    clusterModeRedis是否为集群模式取值如下:true:Redis为集群模式。false(默认值):Redis为单机模式。

    类型映射

    Redis字段类型实时计算Flink版字段类型
    STRINGVARCHAR
    SCOREDOUBLE

    因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

    代码示例

    CREATE TABLE random_stream (
      v VARCHAR, 
      p VARCHAR
    ) with (
        type = 'random'
    );
    
    create table resik_output (
      a VARCHAR,
      b VARCHAR,
      primary key(a) 
    ) with (
      type = 'redis',
      mode = 'string',
      host = '<yourRedisHost>',
      password = '<yourRedisPassword>'
    );
    
    INSERT INTO resik_output 
    SELECT v, p
    FROM random_stream;
    

    创建云数据库MongoDB版结果表

    MongoDB结果表不支持主键更新,数据输入形式为重复插入。

    DDL定义

    CREATE TABLE mongodb_sink (
      `a`  VARCHAR
    ) WITH (
       type = 'mongodb',
       database = '<yourDatabaseName>',
       collection= '<yourCollectionName>',
       uri='mongodb://{<databaseAccount>}:{<atabasePassword>}@{host}:****?replicaSet=mgset-1224****',
       keepAlive='true',
       maxConnectionIdleTime='20000',
       batchSize='2000'
    );
    

    WITH参数

    参数说明是否必填备注
    typeConnector类型固定值为mongodb。
    database数据库名称
    collection数据集合
    uriMongoDB连接串
    keepAlive是否保持长连接默认值为true。
    maxConnectionIdleTime连接超时时长整型值,不能为负数,单位为毫秒。默认值为60000。0表示无连接超时限制。
    batchSize每次批量写入的条数整型值,默认值为1024。系统会设定一个大小为batchSize的缓冲条数,当数据的条数达到batchSize时,触发数据的输出。

    当Checkpoint时间达到时,即使数据未到达batchSize值,也将触发数据的输出。

    创建云原生数据仓库AnalyticDB MySQL版3.0结果表

    云原生数据仓库AnalyticDB MySQL版3.0结果表暂不支持注册存储功能。

    云原生数据仓库AnalyticDB MySQL版3.0数据库支持自增主键。如果实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段。

    DDL定义

    CREATE TABLE rds_output (
    id INT,
    len INT,
    content VARCHARPRIMARY KEY(id,len)
    ) WITH (
    type='ADB30',
    url='jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
    tableName='<yourDatabaseTableName>',
    userName='<yourDatabaseUserName>',
    password='<yourDatabasePassword>'
    );
    

    实现原理

    实时计算Flink版写入云原生数据仓库AnalyticDB MySQL版3.0结果表分为以下两个阶段:

    1. 将实时计算Flink版每行的结果数据拼接为一行SQL。
    2. 在目标数据库执行拼接后的SQL。

    WITH参数

    参数注释说明是否必选备注
    typeconnector类型固定值为ADB30
    urljdbc连接地址云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
    tableName表名
    username账号
    password密码
    maxRetryTimes写入重试次数默认值为3。
    bufferSize流入多少条数据后开始去重默认值为1000,表示输入的数据达到1000条则开始输出。
    batchSize一次批量写入的条数默认值为1000。
    flushIntervalMs清空缓存的时间间隔单位为毫秒,默认值为3000。表示如果缓存中的数据在等待3秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    ignoreDelete是否忽略DELETE操作默认值为false,表示支持DELETE功能。
    replaceMode是否采用replace into语法插入数据取值:true(默认):采用。 false:采用insert into on duplicate key语法插入数据。
    excludeUpdateColumns在更新主键值相同的数据时,忽略指定字段的更新。如果要忽略的字段为多个,则需要使用英文逗号(,)分割。例如excludeUpdateColumns=column1,column2
    reserveMilliSecondTimeStamp类型是否保留毫秒默认值为false,不保留毫秒数值。

    注意bufferSizebatchSize参数需要指定主键后才能生效。

    创建自定义结果表

    接口说明

    自定义结果表Class需要继承自定义Sink插件的基类CustomSinkBase,并使用如下方法实现。

    protected Map<String,String> userParamsMap;// userParamsMap是自定义SQL的WITH语句中定义的键值对,所有的键均为小写。
    protected Set<String> primaryKeys;// primaryKeys是自定义的主键字段名。
    protected List<String> headerFields;// headerFields是标记为header的字段列表。
    protected RowTypeInfo rowTypeInfo;//字段类型和名称。
    /**
     * 初始化方法。每次初始建立和Failover的时候会调用一次。
     * 
     * @param taskNumber taskNumber为当前节点的编号。
     * @param numTasks   numTasks为Sink节点的总数。
     * @throws IOException
     */
    public abstract void open(int taskNumber,int numTasks) throws IOException;
    
    /**
     * close方法,释放资源。
     *
     * @throws IOException
     */
    public abstract void close() throws IOException;
    
    /**
     * 处理插入单行数据。
     *
     * @param row
     * @throws IOException
     */
    public abstract void writeAddRecord(Row row) throws IOException;
    
    /**
     * 处理删除单行数据。
     *
     * @param row
     * @throws IOException
     */
    public abstract void writeDeleteRecord(Row row) throws IOException;
    
    /**
     * 如果进行批量插入,该方法需要把线程中缓存的数据全部刷入下游存储;如果不进行批量插入,可以不使用该方法。
     *
     * @throws IOException
     */
    public abstract void sync() throws IOException;
    
    /** 
    * 返回类名。 
    */ 
    public String getName();
    

    自定义Redis结果表示例

    进入blink_customersink_3x目录,执行mvn clean package命令,再在实时计算Flink版开发控制台上传刚编译成功后的JAR包blink_customersink_3x/target/blink-customersink-3.x-1.0-SNAPSHOT-jar-with-dependencies.jar,引用资源之后,对于自定义的Sink插件,需要指明type = 'custom',并且指明实现接口的Class。

    create table in_table(
        kv varchar 
    )with(
        type = 'random'
    );
    
    create table out_table(
        `key` varchar,
        `value` varchar
    )with(
        type = 'custom',
        class = 'com.alibaba.blink.customersink.RedisSink',
        -- 1. 可以定义更多自定义参数, 在open函数中通过userParamsMap获取。
        -- 2. with参数里key大小写不敏感。在实时计算Flink版中,参数key的值直接处理为全小写。建议您在引用数据存储的DDL中使用小写声明key。
        host = 'r-uf****.redis.rds.aliyuncs.com',
        port = '6379',
        db = '0',
        batchsize = '10',
        password = '<yourHostPassword>'
    );
    
    insert into out_table
    select
    substring(kv,0,4) as `key`,
    substring(kv,0,6) as `value`
    from in_table;
    

    Redis Sink插件的参数说明如下。

    参数说明是否必填备注
    hostRedis实例内网连接地址(host)
    portRedis实例端口号
    passwordRedis连接密码
    dbRedis Database编号默认值为0,表示db0。
    batchsize每次批量写入的条数默认值为1,表示不批量写入。

    创建InfluxDB结果表

    DDL定义

    create table stream_test_influxdb(
        `metric` varchar,
        `timestamp` BIGINT,
        `tag_value1` varchar,
        `field_fieldValue1` Double
    )with(
        type = 'influxdb',
        endpoint = 'http://service.cn.influxdb.aliyuncs.com:****',
        database = '<yourDatabaseName>',
        batchPutsize = '1',
        username = '<yourDatabaseUserName>',
        password = '<yourDatabasePassword>'
    );
    

    建表默认格式:

    • 第0列:metric(VARCHAR),必填。

    • 第1列:timestamp(BIGINT),必填,单位为毫秒。

    • 第2列:tag_value1(VARCHAR),必填,最少填写一个。

    • 第3列:field_fieldValue1(DOUBLE),必填,最少填写一个。

      写入多个field_fieldValue时,需要按照如下格式填写。

      field_fieldValue1 类型,
      field_fieldValue2 类型,
      ...      
      field_fieldValueN 类型
      

      示例如下。

      field_fieldValue1 Double,
      field_fieldValue2 INTEGER,
      ...      
      field_fieldValueN INTEGER
      

      结果表中只支持metrictimestamptag_*field_*,不能出现其他的字段。

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为InfluxDB。
    endpointInfluxDB的Endpoint在InfluxDB中,Endpoint是VPC网络地址,例如:https://localhost:3242或http://localhost:8086。Endpoint支持HTTP和HTTPS。
    databaseInfluxDB的数据库名例如:db-blink或者blink。
    batchPutSize批量提交的记录条数默认每次提交500个数据点。
    usernameInfluxDB的用户名需要对写入的数据库有写权限。
    passwordInfluxDB的密码默认值为0。
    retentionPolicy保留策略为空时,默认填写为每个database的默认保留策略。

    类型映射

    InfluxDB字段类型实时计算Flink版字段类型
    BOOLEANBOOLEAN
    INTINT
    BIGINTBIGINT
    FLOATFLOAT
    DECIMALDECIMAL
    DOUBLEDOUBLE
    DATEDATE
    TIMETIME
    TIMESTAMPTIMESTAMP
    VARCHARVARCHAR

    创建Phoenix5结果表

    Phoenix5是云数据库HBase实例中的一种HBase SQL服务,须在云数据库HBase实例中开启HBase SQL服务后才可以使用Phoenix5。

    DDL定义

    create table US_POPULATION_SINK (
      `STATE` varchar,
      CITY varchar,
      POPULATION BIGINT,
      PRIMARY KEY (`STATE`, CITY)  --主键必填。
    ) WITH (
      type = 'PHOENIX5',
      serverUrl = '<yourserverUrl>',
      tableName = '<yourTableName>'
    );
    

    WITH参数

    参数说明是否必填备注
    type结果表类型固定值为PHOENIX5
    serverUrlPhoenix5的Query Server地址:如果集群中创建的,则是负载均衡服务的URL地址。如果是在单机中创建的,则是单机的URL地址。serverUrl格式为http://host:port,其中:host:Phoenix5服务的域名。port:Phoenix5服务的端口号,固定值为8765。
    tableName读取Phoenix5表名。Phoenix5表名格式为SchemaName.TableName,其中:SchemaName:模式名,可以为空,即不写模式名,仅写表名,表示使用数据库的默认模式。TableName:表名。

    代码示例

    create table `source` (
      `id` varchar,
      `name` varchar,
      `age` varchar,
      `birthday` varchar 
    ) WITH (
       type = 'random'
    );
    
    create table sink (
      `id` varchar,
      `name` varchar,
      `age` varchar,
      `birthday` varchar,
      primary key (id)
    ) WITH (
      type = 'PHOENIX5',
      serverUrl = '<yourserverUrl>',
      tableName = '<yourTableName>'
    );
    
    INSERT INTO sink
      SELECT  `id` ,`name` , `age` ,`birthday` 
    FROM `source`;
    

    创建分析型数据库PostgreSQL版结果表

    实现原理

    实时计算Flink版写入分析型数据库PostgreSQL结果表分为以下两个阶段:

    1. 将实时计算Flink版每行的结果数据拼接为一行SQL。
    2. 在目标数据库执行拼接后的SQL。

    DDL定义

    create table rds_output(
      id INT,
      len INT,
      content VARCHAR,
      PRIMARY KEY(id)
    ) with (
      type='adbpg',
      url='jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
      tableName='<yourDatabaseTableName>',
      userName='<yourDatabaseUserName>',
      password='<yourDatabasePassword>'
    );
    

    WITH参数

    参数说明是否必填备注
    type源表类型。固定值为adbpg
    urlJDBC连接地址。分析型数据库PostgreSQL版数据库的JDBC连接地址。格式为'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',其中:yourNetworkAddress:内网地址。PortId:连接端口。yourDatabaseName:连接的数据库名称。示例:url='jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres'
    tableName表名。无。
    username账号。无。
    password密码。无。
    maxRetryTimes写入重试次数。默认值为3。
    useCopy是否采用Copy API写入数据。参数取值如下:0(默认值):采用INSERT方式写入数据。1:采用copy API方式写入数据。
    batchSize一次批量写入的条数。默认值为5000。
    exceptionMode数据写入过程中出现异常时的处理策略。支持以下两种处理策略:ignore(默认值):忽略出现异常时写入的数据。strict:数据写入异常时,Failover报错。
    conflictMode当主键冲突或唯一索引出现冲突时的处理策略。支持以下三种处理策略:ignore(默认值):忽略主键冲突,保留之前的数据。strict:主键冲突时,Failover报错。update:主键冲突时,更新新到的数据。
    targetSchemaSchema名称。默认值为public。
    connectionMaxActive单个task允许的最大连接数。请根据实际的并发task个数,以及目标端数据库允许的最大连接数进行设置。

    类型映射

    分析型数据库PostgreSQL版字段类型实时计算Flink版字段类型
    BOOLEANBOOLEAN
    SMALLINTTINYINT
    SMALLINTSMALLINT
    INTINT
    BIGINTBIGINT
    DOUBLE PRECISIONDOUBLE
    TEXTVARCHAR
    TIMESTAMPDATETIME
    DATEDATE
    REALFLOAT
    DOUBLE PRECISIONDECIMAL
    TIMETIME
    TIMESTAMPTIMESTAMP
    展开全文
  • 易语言 文本的创建追加

    千次阅读 2018-08-13 10:40:30
    在易语言中创建文本的命令,就是:写到文件(),要追加文本,思路上就是先把文件中的数据读入变量,再把要追加的文本加在变量中,一起写到文件就是了。 具体的代码如下: 注:原贴地址来源于精易论坛,如有不妥...

    文本的创建和追加 就是往一个文本写内容

    文件格式常用为:*.txt,*.csv

    在易语言中创建文本的命令,就是:写到文件(),要追加文本,思路上就是先把文件中的数据读入变量,再把要追加的文本加在变量中,一起写到文件就是了。

    具体的代码如下:

    注:原贴地址来源于精易论坛,如有不妥,请删贴处理。

    原贴地址:https://bbs.125.la/forum.php?mod=viewthread&tid=14153349

     

    展开全文
  • access数据综合查询

    2020-12-24 01:41:53
    实验名称:数据综合查询实验目的:熟练掌握数据的综合查询方法实验内容:(1)利用“HappyYouAccess实验07学籍01....(要求:数据来源于cjb、kcb)(B)做一参数查询名称为“综合期末成绩查询”,输入要查询的期未成绩的...
  • 3.SQL--创建教师表和向表内插入数据

    千次阅读 2019-05-16 17:20:30
    --创建教师表,并向表内插入数据create table Teacher(Tid varchar(10),Tname nvarchar(10))--向表内插入数据insert into Teacher values('01' , '韩寒')insert into Teacher values('02' , '李玉')insert into ...
  • 在项目的开发过程中,我们会遇到一些特殊的业务需求,如接下来讲的业务需求:根据多个字段,如果多个字段相同就把两条数据合并为一条数据,但是两条数据创建人的追加在一条数据中,返回给前端。 话不多说上图...
  • 6.创建Spoop任务同步商品表数据 7.历史数据和增量数据合并 8.Java的nanoTime() 9.创建视图完成按分钟级别更新数仓中的业务表 10.创建定时调度作业 11.总结 0. 相关文章链接 开发随笔文章汇总 1. 为什么要...
  • 项目三 创建与维护MySQL数据

    千次阅读 2022-03-13 14:36:36
    1.创建数据表 create table 表名( 字段名1 字段数据类型1, 字段名2 字段数据类型2, ..... 字段名n 字段数据类型n ); 运行结果: 2. 查看数据库中的所有数据表 show tables; 运行结果: 3.查看...
  • 数据内容动态添加到HTML中

    千次阅读 2021-06-09 14:07:42
    // 申明一个数组用来装遍历的元素var li = [];//遍历元素并加载到标签中for(var i = 0; ivar nav_li = ''+navGroup.self_...JS从后台获取数据,前台动态添加tr标签中的td标签功能描述: 要求从后台查询该省份的所有城...
  • 数据分区是一种物理数据库的设计技术,它的目的是为了在特定的SQL操作中减少数据读写的总量以缩减响应时间。 分区并不是生成新的数据表,而是将表的数据均衡分摊到不同的硬盘,系统或是不同服务器存储介子中,实际上...
  • 一、介绍 在构建数据湖时,也许没有比数据格式存储更具有...原子事务–保证对数据湖的更新或追加操作不会中途失败,产生脏数据。 一致的更新–防止在写入过程中读取失败或返回不完整的结果。同时处理潜在的并发写入冲
  • hive内部表外部表的创建及load数据

    千次阅读 2019-04-10 18:03:55
    创建hive内部表 create table test_01(id bigint, name string ) row format delimited fields terminated by ','; 默认记录和字段分隔符: \n 每行一条记录 ^A 分隔列(八进制 \001) ^B 分隔ARRAY或者STRUCT中...
  • golang 切片追加问题

    千次阅读 2020-08-20 15:05:11
    Golang 内置方法append用于向切片中追加一个或多个元素,实际项目中比较常见。 其原型如下: func append(slice []Type, elems ...Type) []Type 本节不会对append的使用方式详细展开,而是重点介绍几个使用中...
  • 数据仓库介绍

    千次阅读 2022-05-10 14:42:20
    它出于分析性报告和决策支持目的而创建。 为需要业务智能的企业,提供指导业务流程改进、监视时间、成本、质量以及控制。 数据仓库的特点 1.数据仓库的数据是面向主题的 与传统数据库面向应用进行数据组织的特点...
  • 前期回顾: ⼤数据是如何产⽣的?...元数据具体的工作内容元数据分为技术元数据和业务元数据7 数据治理脏数据的种类数据治理原则知识拓展(数据集市)结束语 数据仓库(数据是如何存储的) 1 什么是数据
  • 大数据--数据仓库

    千次阅读 2022-04-24 15:39:49
    数据仓库概念数据仓库特点数据仓库分层数据仓库建模模型选择数仓建模流程数仓建模过程模型设计的思路模型落地实现事实表设计事实表设计原则事实表设计方法三种事实表多维体系结构维度设计六范式与反范式元数据数据...
  • 今年暑假系里给每个同学布置了一篇学年论文,恰好我选的题目是人民汇率的变动对我国物价水平波动的影响,就想着怎么也得弄点数据撑撑排面吧,一开始我是不想爬的,然后就在网上各种找接口,找了半天,不是不能用就是...
  • 数据仓库原理

    千次阅读 2022-03-29 16:30:43
    数据仓库原理 ODS>DWD>DWS>ADS
  • 商业数据的获取与清洗

    千次阅读 2022-04-27 16:52:49
    分析需求的数据数据描述 内部数据 外部数据 逻辑描述 表态数据 动态数据 数据词典 数据源前期准备 PowerBI获取外部数据 方法一:直接抓取表格数据 方法二:使用示例抓取数据 方法三:抓取JSON格式 ...
  • 通过“字段选择”选择要最终输出的字段到下一个步骤“追加流”(实验中必须保证每个数据源经过字段选择后,输出的字段都是一致的),再通过“追加流”设置2个数据源的合并顺序,然后通过“追加流”进行多个据源的...
  • 数据仓库 & OLAP

    千次阅读 2022-03-28 21:02:57
    一、数据库 vs. 数据仓库 1. 构建目的不同:数据库主要用于实现企业的日常业务管理,提高业务运营的效率 ...除了要通过创建事务和处理的正确性外,还需复杂的并发控制机制保证事务运行的隔离性。(更新操
  • 数据中台(一)数据中台详解

    千次阅读 2021-06-25 23:18:08
    1.数据中台的由来 数据库阶段 ---> 传统数仓 ---> 大数据平台 ----> 大数据中台 1.1.数据存储起源:数据库 1979年:Oracle1.0商用数据库发布 1996年:MySQL1.0发布,到2000年以后开始火起来。 特点:...
  • 5.1 数据抽取方式 5.1.1 基于源数据的CDC 5.1.2 基于触发器的CDC 5.1.3 基于快照的CDC 5.1.4 基于日志的CDC 5.2 MySQL数据复制 5.2.1 复制的用途 5.2.2 二进制日志 5.2.3 复制步骤 5.3 使用Kafka 5.3.1 ...
  • python 向 mysql 中 添加 数据

    千次阅读 2018-10-21 17:52:24
    链接:python 连接 mysql 查询 数据 及 表结构 python 向 mysql 中添加数据 import pymysql import json #第一步:连接到mysql数据库(ishop1数据库) conn = pymysql.connect(host='localhost',user='root',...
  • 通过以下方法来实现 优先支持在文件级原子更新数据,而无需重写整个表/分区 能够只读取更新的部分,而不是进行低效的扫描或搜索 严格控制文件大小来保持出色的查询性能(小的文件会严重损害查询性能)。 读时合并...
  • 一、Kettle数据抽取概览 1. 文件抽取 (1)处理文本文件 (2)处理XML文件 2. 数据库抽取 二、变化数据捕获 1. 基于源数据的CDC 2. 基于触发器的CDC 3. 基于快照的CDC 4. 基于日志的CDC 三、使用Sqoop抽取...
  • 1Access应用基础—查询设计一.查询的基础知识所谓查询是指根据用户指定的一个或多...用户通过查询告诉Access检索条件,Access根据用户提供的条件将查询到的数据反馈给用户。1.查询的类型选择查询:是最常用的查询方...
  • 文档大纲:一、数仓基本概念1. 数据仓库架构我们在谈数仓之前,为了让大家有直观的认识,先来谈数仓架构,“架构”是什么?这个问题从来就没有一个准确的答案。这里我们引用一段话:在软件行业,一种...
  • Hive开发要知道数据仓库的四个层次设计

    万次阅读 多人点赞 2018-02-12 18:11:09
    数据仓库:数据仓库全面接收源系统数据,ETL进程对数据进行规范化、验证、清洗,并最终装载进入数据集市,通过数据集市支持系统进行数据查询、分析,整个数据仓库包含四大层次。 1.数据仓库的四个操作  ETL...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 28,381
精华内容 11,352
热门标签
关键字:

创建追加查询的数据来源