精华内容
下载资源
问答
  • 基于Flink1.11.2 多表join与维表join

    千次阅读 2020-09-29 14:57:26
    离线业务是多个表join,因为接入的myslqbinlog的数据,所以数据存在修改跟删除,这个是跟日志分析最大的区别,因为日志分析啥的数据只有新增,相对来说考虑的东西就少了很多。 2,首先我们要实现的是多表join,这...

    1,业务场景:之前离线的业务场景,想通过实时来实现。离线业务是多个表join,因为接入的myslq binlog的数据,所以数据存在修改跟删除,这个是跟日志分析最大的区别,因为日志分析啥的数据只有新增,相对来说考虑的东西就少了很多。

     

    2,首先我们要实现的是多表join,这个相对还是比较简单的,比如下面的SQL:

     

    3,但是3个表join可是全量join的,也就是说表的state是一直存在的,总会存在OOM爆炸的时候,那怎么办呢。

    在我们使用window窗口的时候,一个窗口结束之后会帮我们清除state,这里Flink支持通过API清除state,不过是全局的,细粒度的还没研究。

    因为业务是只管当天的数据,数据到kafka的时候会有延迟,但是不可能延迟一天,所以我这里设置TTL为1天到3天。

    当然,肯定要测试一下TTL,读取2个topic注册为表,然后join,30s之后发现join不上了。state过期了。这个大家手动去测试吧。

     

    4,几个表join之后,再加上设置了 TTL,我们还要关联维表数据,在做之前我就想,维表会不会受TTL的影响,脑壳发昏了 。

    经过鸡哥的提醒我才想起来了,维表读取的代码就是缓存,是guava。磕头谢罪。

     

    5,维表的join,维表的join语法在官网一目了然:

     

    我们需要给主流添加上processTime:

    完整代码如下:

    import java.util.concurrent.TimeUnit
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.common.time.Time
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
    import org.apache.flink.runtime.state.StateBackend
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.environment.CheckpointConfig
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.EnvironmentSettings
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    /*
      todo 在这里做join 操作 ttl 验证、
     */
    object practice_join_mysqlJoin {
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
        val stenv = StreamTableEnvironment.create(env, bsSettings)
        val statement = stenv.createStatementSet()
    
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(30L, TimeUnit.SECONDS)))
        env.enableCheckpointing(1 * 60 * 1000L)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000L)
        env.getCheckpointConfig.setCheckpointTimeout(2 * 60000L)
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        env.setStateBackend(new RocksDBStateBackend("hdfs://dev-ct6-dc-master01:8020/flink/checkpoints/rocksDBStateBackend2", true))
        env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        //todo 设置状态过期时间
    //    stenv.getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(3))
        stenv.getConfig.setIdleStateRetentionTime(Time.seconds(30), Time.minutes(6))
    
        //todo 1,从mysql同步数据到kafka,此略
    
        //todo 2,我们直接从dwd层获取数据,创建为table
    
        val assure_orders =
          s"""
             |CREATE TABLE assure_orders (
             | order_id INT,
             | order_name STRING,
             | proctime as PROCTIME()
             |) WITH (
             | 'connector' = 'kafka',
             | 'topic' = 'doris_test.assure_orders',
             | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             | 'properties.group.id' = 'test1',
             | 'format' = 'canal-json',
    //         | 'format' = 'json',
             | 'scan.startup.mode' = 'earliest-offset'
             |)
           """.stripMargin
        stenv.executeSql(assure_orders)
    
        val order_company =
          s"""
             |CREATE TABLE order_company (
             | company_id INT,
             | company_name STRING
             |) WITH (
             | 'connector' = 'kafka',
             | 'topic' = 'doris_test.order_company',
             | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             | 'properties.group.id' = 'test1',
             | 'format' = 'canal-json',
    //         | 'format' = 'json',
             | 'scan.startup.mode' = 'earliest-offset'
             |)
           """.stripMargin
        stenv.executeSql(order_company)
    
    
        val order_address =
          s"""
             |CREATE TABLE order_address (
             | address_id INT,
             | address_name STRING
             |) WITH (
             | 'connector' = 'kafka',
             | 'topic' = 'doris_test.order_address',
             | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             | 'properties.group.id' = 'test1',
             | 'format' = 'canal-json',
    //         | 'format' = 'json',
             | 'scan.startup.mode' = 'earliest-offset'
             |)
           """.stripMargin
        stenv.executeSql(order_address)
    
        //todo ====================维表join====================
        val dim_base_province_create =
          """
            | CREATE TABLE `dim_base_province` (
            |      `id` INT,
            |      `name` STRING,
            |      `region_id` INT ,
            |      `area_code`STRING,
            |      PRIMARY KEY (id) NOT ENFORCED
            |    ) WITH (
            |     'connector' = 'jdbc',
            |    'url' = 'jdbc:mysql://192.168.5.24:3306/doris_test?useUnicode=true&characterEncoding=UTF-8',
            |    'table-name' = 'base_province_copy_copy', -- MySQL中的待插入数据的表
            |      'driver' = 'com.mysql.jdbc.Driver',
            |    'username' = 'root',
            |    'password' = '123456',
            |   'lookup.cache.max-rows'='1000',
            |   'lookup.cache.ttl' = '5s'
            |    )
          """.stripMargin
    
        stenv.executeSql(dim_base_province_create)
    
        //todo  ====================打印测试=====================
        val sink_print =
          s"""
             |create table sink_print (
             |a INT,
             |b STRING,
             |c STRING,
             |d STRING,
             |e STRING
             |) with ('connector' = 'print')
            """.stripMargin
    //    stenv.executeSql(sink_print)
    
        //todo 输出======================
        val inset_kafka =
          s"""
             |CREATE TABLE sink_print (
             |a INT,
             |b STRING,
             |c STRING,
             |d STRING,
             |e STRING
             |) WITH (
             | 'connector' = 'kafka',
             | 'topic' = 'cccc',
             | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
             | 'properties.group.id' = 'test1',
             | 'format' = 'changelog-json',
    //         | 'format' = 'json',
             | 'scan.startup.mode' = 'earliest-offset'
             |)
           """.stripMargin
    
        stenv.executeSql(inset_kafka)
    
        val insert_sql  =
          """
            |INSERT INTO sink_print
            | select d.order_id,d.company_name,d.address_name,d.order_name,dim_base_province.name from
            | (
            | SELECT a.*,b.*,c.*
            | FROM assure_orders a
            | JOIN
            | order_company b
            |  ON a.order_id = b.company_id
            | JOIN
            | order_address c
            | ON a.order_id = c.address_id
            |  ) d
            | LEFT JOIN
            | dim_base_province FOR SYSTEM_TIME AS OF d.proctime
            | ON d.order_id = dim_base_province.id
          """.stripMargin
      /* val join_sql =
         """
           |INSERT INTO sink_print
           |SELECT assure_orders.order_id,assure_orders.order_name,dim_base_province.name FROM assure_orders
           |LEFT JOIN dim_base_province FOR SYSTEM_TIME AS OF assure_orders.proctime
           |ON assure_orders.order_id = dim_base_province.id
         """.stripMargin*/
        stenv.executeSql(insert_sql)
    
      }
    }
    

     

     

    总结:

    代码虽然看着简单,也算是简单的写了一些sql,后面也提交到测试集群运行了 :

    后面的计划,我还是回归Streaming开发,SQL话开发虽然简单,但是对于调优跟底层的理解还是欠缺,回归streaming开发可能对我们的新人可能不太友好,但是对于深入理解,调优更有优势。

     

    展开全文
  • left显示的记录数是跟主有关系,也就是说主条,如果不加条件限制,结果跟主表条数一致 举例子: A: B: id | name id | email 1 | tom 1 | anything@126.com 2 | mary 1 | anything@163.com...

    https://segmentfault.com/q/1010000006812626

    http://www.xumenger.com/sql-join-20160630/

    on后面关联的条件如果是1对1的数量就不变,如果是1对多的数量就会增加
    通常的做法是,先把多的那个表聚合,或者取sum或取分组中的最大最小值
    再或者分组后将字符拼接,如id a,b,c

    左右表的主键外键对应的问题 ,出现了一对多。

    left显示的记录数是跟主表有关系,也就是说主表有几条,如果不加条件限制,结果跟主表条数一致

    举个例子:

        A表:                          B表:
    id   | name                id | email
     1   | tom                  1 | anything@126.com
     2   | mary                 1 | anything@163.com
                                1 | anything@gmail.com
    SELECT a.* FROM talbe_a AS a
    LEFT JOIN table_b AS b ON a.id = b.id

     

    如上,最终结果会有多少条数据呢?

     

     

    转载于:https://my.oschina.net/u/3672057/blog/1551081

    展开全文
  • postgresql大表join优化

    千次阅读 2020-06-05 09:47:40
    postgresql大表join优化 一、背景 1、数据量: 表名 数据量 f_invoice 87346130 f_invoice_item 97535867 2、索引: :f_invoice_item CREATE INDEX f_invoice_item_order_item_id_idx ON ...

    postgresql大表join优化

    一、背景

    1、数据量:

    表名数据量
    f_invoice87346130
    f_invoice_item97535867

    2、索引:

    表:f_invoice_item

    CREATE INDEX f_invoice_item_order_item_id_idx ON ins_dw_prd12.f_invoice_item USING btree (order_item_id)
    CREATE INDEX f_invoice_item_invoice_id_idx ON ins_dw_prd12.f_invoice_item USING btree (invoice_id) WITH (fillfactor='100')
    ​

    表:f_invoice

    CREATE INDEX idx_f_invoice_gin ON ins_dw_prd12.f_invoice USING gin (source_type, invoice_type, invoice_status, invoice_title, invoice_date, seller_taxer_code, shop_id, create_time)
    CREATE INDEX idx_f_invoice_invoice_date ON ins_dw_prd12.f_invoice USING btree (invoice_date) WITH (fillfactor='100')
    CREATE INDEX idx_f_invoice_seller_taxer_code ON ins_dw_prd12.f_invoice USING btree (seller_taxer_code) WITH (fillfactor='100')
    CREATE INDEX idx_invoice_createtime_btree ON ins_dw_prd12.f_invoice USING btree (create_time) WITH (fillfactor='100')
    ​

     

    二 、优化前

    sql:

    explain(analyse, timing)
    SELECT count(*)
    from (SELECT fi.invoice_id
          FROM ins_dw_prd12.f_invoice fi
          WHERE (fi.seller_taxer_code in ('91320200704046760T', '91340100149067617J', '91320214MA1YGE8F94') and
                 fi.create_time >= '2020-01-01 00:00:00' and fi.create_time <= '2020-01-31 00:00:00')) AS mm
             INNER JOIN ins_dw_prd12.f_invoice_item fit ON fit.invoice_id = mm.invoice_id
             inner join ins_dw_prd12.f_invoice m on m.invoice_id = mm.invoice_id

    执行计划:

    Finalize Aggregate  (cost=3083416.86..3083416.87 rows=1 width=8) (actual time=85251.980..85251.980 rows=1 loops=1)
      ->  Gather  (cost=3083416.44..3083416.85 rows=4 width=8) (actual time=85251.097..85269.008 rows=5 loops=1)
            Workers Planned: 4
            Workers Launched: 4
            ->  Partial Aggregate  (cost=3082416.44..3082416.45 rows=1 width=8) (actual time=85244.739..85244.739 rows=1 loops=5)
                  ->  Nested Loop  (cost=184106.68..3082211.80 rows=81856 width=0) (actual time=2308.041..85237.967 rows=57076 loops=5)
                        ->  Parallel Hash Join  (cost=184106.11..2879308.96 rows=81856 width=16) (actual time=2307.992..85029.464 rows=57076 loops=5)
                              Hash Cond: (fit.invoice_id = fi.invoice_id)
                              ->  Parallel Seq Scan on f_invoice_item fit  (cost=0.00..2631148.52 rows=24401652 width=8) (actual time=0.466..79746.085 rows=19507465 loops=5)
                              ->  Parallel Hash  (cost=183190.09..183190.09 rows=73282 width=8) (actual time=334.243..334.243 rows=54056 loops=5)
                                    Buckets: 524288  Batches: 1  Memory Usage: 14752kB
                                    ->  Parallel Index Scan using idx_invoice_createtime_btree on f_invoice fi  (cost=0.57..183190.09 rows=73282 width=8) (actual time=0.177..314.460 rows=54056 loops=5)
                                          Index Cond: ((create_time >= '2020-01-01 00:00:00'::timestamp without time zone) AND (create_time <= '2020-01-31 00:00:00'::timestamp without time zone))
                                          Filter: ((seller_taxer_code)::text = ANY ('{91320200704046760T,91340100149067617J,91320214MA1YGE8F94}'::text[]))
                                          Rows Removed by Filter: 455651
                        ->  Index Only Scan using f_invoice_pkey on f_invoice m  (cost=0.57..2.48 rows=1 width=8) (actual time=0.003..0.003 rows=1 loops=285380)
                              Index Cond: (invoice_id = fit.invoice_id)
                              Heap Fetches: 285380
    Planning Time: 8.120 ms
    Execution Time: 85269.153 ms
    ​

    分析:

    其中耗时最严重的点在:

    并行顺序扫描了表f_invoice_item,并且loops=5,每次扫描行数:rows=19507465;而表f_invoice_item数据量才9700万左右。

    ->  Parallel Seq Scan on f_invoice_item fit  (cost=0.00..2631148.52 rows=24401652 width=8) (actual time=0.466..79746.085 rows=19507465 loops=5)

     

    问题:表f_invoice_item上有索引f_invoice_item_invoice_id_idx,为什么会不走呢??

     

    三、优化后

    sql:

    explain(analyse, timing)
    SELECT count(*)
    FROM (select *
          from ins_dw_prd12.f_invoice fi
          where fi.seller_taxer_code in ('91320200704046760T', '91340100149067617J', '91320214MA1YGE8F94')
            and fi.create_time >= '2020-03-01 00:00:00'
            and fi.create_time <= '2020-03-31 00:00:00') m
             INNER JOIN (select *
                         from ins_dw_prd12.f_invoice_item
                         where invoice_id in (SELECT fi.invoice_id
                                              FROM ins_dw_prd12.f_invoice fi
                                              WHERE fi.seller_taxer_code in
                                                    ('91320200704046760T', '91340100149067617J', '91320214MA1YGE8F94')
                                                and fi.create_time >= '2020-03-01 00:00:00'
                                                and fi.create_time <= '2020-03-31 00:00:00')) fit
                        ON fit.invoice_id = m.invoice_id

    执行计划:

    Finalize Aggregate  (cost=428280.97..428280.98 rows=1 width=8) (actual time=2400.367..2400.367 rows=1 loops=1)
      ->  Gather  (cost=428280.55..428280.96 rows=4 width=8) (actual time=2399.218..2432.599 rows=5 loops=1)
            Workers Planned: 4
            Workers Launched: 4
            ->  Partial Aggregate  (cost=427280.55..427280.56 rows=1 width=8) (actual time=2394.585..2394.585 rows=1 loops=5)
                  ->  Nested Loop  (cost=203100.20..427279.71 rows=334 width=0) (actual time=1465.895..2388.019 rows=52988 loops=5)
                        ->  Parallel Hash Join  (cost=203099.63..405399.83 rows=299 width=16) (actual time=1459.954..1850.252 rows=47458 loops=5)
                              Hash Cond: (fi.invoice_id = fi_1.invoice_id)
                              ->  Parallel Index Scan using idx_invoice_createtime_btree on f_invoice fi  (cost=0.57..202088.56 rows=80840 width=8) (actual time=0.313..363.616 rows=47458 loops=5)
                                    Index Cond: ((create_time >= '2020-03-01 00:00:00'::timestamp without time zone) AND (create_time <= '2020-03-31 00:00:00'::timestamp without time zone))
                                    Filter: ((seller_taxer_code)::text = ANY ('{91320200704046760T,91340100149067617J,91320214MA1YGE8F94}'::text[]))
                                    Rows Removed by Filter: 601517
                              ->  Parallel Hash  (cost=202088.56..202088.56 rows=80840 width=8) (actual time=1459.076..1459.076 rows=47458 loops=5)
                                    Buckets: 524288  Batches: 1  Memory Usage: 13472kB
                                    ->  Parallel Index Scan using idx_invoice_createtime_btree on f_invoice fi_1  (cost=0.57..202088.56 rows=80840 width=8) (actual time=1.947..1438.735 rows=47458 loops=5)
                                          Index Cond: ((create_time >= '2020-03-01 00:00:00'::timestamp without time zone) AND (create_time <= '2020-03-31 00:00:00'::timestamp without time zone))
                                          Filter: ((seller_taxer_code)::text = ANY ('{91320200704046760T,91340100149067617J,91320214MA1YGE8F94}'::text[]))
                                          Rows Removed by Filter: 601517
                        ->  Index Only Scan using f_invoice_item_invoice_id_idx on f_invoice_item  (cost=0.57..70.85 rows=233 width=8) (actual time=0.011..0.011 rows=1 loops=237290)
                              Index Cond: (invoice_id = fi_1.invoice_id)
                              Heap Fetches: 264945
    Planning Time: 0.591 ms
    Execution Time: 2432.666 ms
    ​

     

    效果

    从优化前85秒到优化后2.4秒,性能提升接近40倍。

     

     

    展开全文
  • spark亿数据join优化

    万次阅读 2017-12-14 11:25:41
    转:...任务很简单,就是join两张A ship有几千万行,包含每日寄出去的包裹的信息,B item有几十亿行,包括所有商品的属性,我们需要把商品的属性信息加到每包裹里面的商品上。

    转:https://daizuozhuo.github.io/spark-join/


    最近在项目中用Spark join了几十亿的数据,在debug和不断优化性能中感觉收获良多,特此记录一下。

    任务很简单,就是join两张表,表A ship有几千万行,包含每日寄出去的包裹的信息,表B item有几十亿行,包括所有商品的属性,我们需要把商品的属性信息加到每个包裹里面的商品上。

    一开始我就是把它当成一个很简单的任务,不就是一个简单的left join吗?于是写下了如下代码:

    Dataset<Row> shipItems = getSpark().sql("select " +
            "ship.*, item.* " +
            "from ship left join item " +
            "on ship.asin = item.item_asin")
            .drop("item_asin");
    shipItems.createOrReplaceTempView("ship_items");
    

    但是一经实践就遇到一下错误:

    17/07/10 02:26:14 ERROR YarnClientSchedulerBackend: Yarn application has already exited with state KILLED!
    17/07/10 02:26:14 INFO SparkUI: Stopped Spark web UI at http://172.31.5.203:4040
    17/07/10 02:26:14 ERROR TransportClient: Failed to send RPC 8654033690236908099 to /172.31.5.177:42830: java.nio.channels.ClosedChannelException
    java.nio.channels.ClosedChannelException
            at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
    17/07/10 02:26:14 ERROR YarnSchedulerBackend$YarnSchedulerEndpoint: Sending RequestExecutors(0,0,Map()) to AM was unsuccessful
    java.io.IOException: Failed to send RPC 8654033690236908099 to /172.31.5.177:42830: java.nio.channels.ClosedChannelException
    

    这个错误表明spark的任务在Yarn的executor中被kill了,然后在看一下Yarn executor中的log :”WARN: Received singal SIG_TERM”, 果然因为内存使用太多被Yarn kill了。

    这是为什么呢?来估算一下完成这个任务需要多少计算资源吧。假设每个表大概有50字段,每个字段占用10个字符,那么当Spark将表A load进内存时需要501010^7/10^9 = 5G,而将表B load进内存时需要:501010^9/1000^9 = 500G。但是当我把整个集群的内存加到600G的时候这个错误仍然没有解决。猜测内存600G的集群仍然无法join表A和表B可能是因为在join过程中可能生成了多份数据而超过了表A和表B本来的大小,这个时候如果再继续提高集群大小就有点不划算了,所以我们开始考虑如何对程序进行优化。

    此时该程序所需内存如下所示:original join其中深蓝色代表已经存在于集群中内存中的数据,浅蓝色代表正在生成中的数据,那么此时整个集群所需要的内存为两倍于表A和表B大小:(500 + 5) * 2 = 2010 G.

    因为我们表A left join 表B 之后的结果和表A的大小是一样的,所以实际上大部分表B的数据是没有用的,那么我们可不可以先将一部分表B的数据去掉呢?我们可以确定的是只有存在于表A中物品才会出现在结果中,所以我们可以将表A中的所有商品ID取出来做成一个集合,这个集合的大小为 20 * 10^7/1000/1000 = 200 M,然后将这个集合broadcast到每一个slave节点进行filter,这样可以得到一个大大缩减版的表B。更重要的是,我们得到缩减版的表B之后,原来那个巨大的表B就可以从内存中删除了,这样可以大大减少内存的使用,最终使得程序成功运行。优化之后的代码如下:

    Dataset<Row> distinctAsin = getSpark()
            .sql("select distinct asin from ship")
            .persist(StorageLevel.DISK_ONLY())
    //only keep items appeared in shipments
    Dataset<Row> filteredItems = getSpark()
            .sql("select * from item")
            .withColumnRenamed("asin", "item_asin")
            .join(functions.broadcast(distinctAsin),
                    functions.col("item_asin").equalTo(functions.c("asin")), "leftsemi")
            .persist(StorageLevel.DISK_ONLY());
    filteredItems.createOrReplaceTempView("filter_item");
    Dataset<Row> shipItems = getSpark().sql("select " +
            "ship.*, filter_item.* " +
            "from ship left join filter_item " +
            "on ship.asin = filter_item.item_asin")
            .drop("item_asin");
    shipItems.createOrReplaceTempView("ship_items");
    

    此时该程序所需的内存如下:optimized join其中深蓝色代表已经存在于集群中内存中的数据,浅蓝色代表正在生成中的数据,蓝色阴影代表已经处理过从内存中删除的数据,那么此时整个集群所需要的内存为两倍于表A和表C大小:(5 + 5) * 2 = 20 G.

    memory usage通过gangalia监控到的集群内存使用量.

    如果觉得有用,请点star


    展开全文
  • 目录一、连接二、多删除一、join连接 连接条件 三种连接类型示意图 1、内连接:inner join mysql> select a.id,a.name from ceshi AS a inner join ceshi_two AS b on a.name = b.name;2、左外连接 mysql> select ...
  • 记得5年前遇到一个SQL,就是一个简单的两表关联,SQL跑了差不多一天一夜,这两个表都非常巨大,每个表都有几十个G,数据量每个表有20多亿,表的字段也特别多。 相信大家也知道SQL慢在哪里了,单个进程的PGA 是绝对...
  • big table:streamed small table:buffered reduce the memory need and job count and w/r 
  • phoenix 大 顺序 join查询

    千次阅读 2018-11-10 17:36:04
    粗糙的全扫描例子: ...T_EXTENSION_ALL_DATAS_SHOW 小几十条数据。   select T1.LOGIN_DATE,T1.COUNTRY,T1.IP,T1.BROWSER,T1.USER_NAME,T1.GENDER,T1.EMAIL,T2.TIME_SPEND,T2.CAM_SI...
  • flink 多表join的例子

    千次阅读 2019-05-22 16:57:32
    今天写了一稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下MapToString 参考bug 那篇博客 public static void main(String[] arg) throws Exception { final ExecutionEnvironment env = ...
  • Hive是基于Hadoop的一数据仓库工具,可以将结构化的数据文件映射为一张数据库,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 sql中的连接查询有inner join(内连接)、left join(左...
  • select * from a,b where a.id=b.id; a100GB b50MB 如何优化?...A100GB 是不是几十亿的数据??如果是标量子查询 A返回一条 B就会扫描1次 相当于B被扫描几十亿次 即使B走索引 走索引至少读3块 然后回
  • Mycat catlet跨库JOIN与全局JOIN

    万次阅读 2017-06-14 10:38:00
    当表设置为全局表后可以与任意一个表进行JOIN操作。Mysql分库这里我们将基础和业务分别放在不同的数据库分片上,创建m和n数据库实例名。m基础配置数据库:包含t_usern业务数据数据库:包含t_servicem&n数据库建表...
  • 从这两单词的角度分析,Fork是分叉的意思,可以引申为切分,Join是加入的意思,可以引申为合并。Fork的作用是把大任务切分为小任务,Join则是把这些小任务的执行结果进行合并的过程。以计算1+2+3+4为例,假设阈值...
  • 种 hive join 类型简介

    千次阅读 2017-09-01 15:35:24
    传统DBMS 数据库已经将各种算法优化到了极致,而对于hadoop 使用的mapreduce 所进行的join 操作,去年开始也是有各种不同的算法论文出现,讨论各种算法的适用场景和取舍条件,本文讨论hive 中出现的join 优化,...
  • left join 速度很慢,全查询?

    千次阅读 2020-09-11 15:21:41
    left join 速度很慢,全查询? 有小伙伴问我为什么十几万条数据进行left join关联查询耗时很多。 观察两张发现,关联字段未加索引。 增加索引后速度提升明细。
  • 五.Sql server中join的使用方法

    千次阅读 2012-11-15 17:11:38
    这一节讲解下sql中join的使用,join指令来用来多查询,它可以实现将多表格连接起来,有如下种使用方法: 1. 左连接 left join或left outer join 2. 右连接 right join或right outer join 3. 全连接 ...
  • Join连接是大数据处理的重要手段,它基于表之间的共同字段将来自两个或多个表的行结合起来。如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在...
  • 但是这些多线程不是完全独立运行的,因为你需要在进入下一阶段前将这一阶段的所有数据处理完,而不能让一线程先进入下一阶段。所以如何设计Join算法来减少overhead就是非常关键的事情了。 这篇文章我们将...
  • pthread_join()函数

    千次阅读 2014-12-19 20:08:14
    pthread_join
  • Hash Join只能用于相等连接,且只能在CBO... Hash Join的执行计划第1是hash(build table),第2探查(probe table),一般不叫内外表,nested loop才有内外表  Hash也就是所谓的内,探查所谓的外表  
  • 如果没有优化过SQL,哪里能练出火眼金睛,注意看跑得慢的SQL是HASHJOIN,跑得快的SQL是HASHJOINR...
  • SQL优化:化解关联的多对多join

    千次阅读 2016-12-22 14:34:53
    昨天在写了一存储过程,写好后执行存储过程时,发现非常慢,但是当前系统里的数据量就几十万条,数据量是比较小的。 代码如下: declare @biz_date varchar(7) set @biz_date = '2016-10' select '达成率' kpi, ...
  • hash join概念

    千次阅读 2010-07-12 19:53:00
    在Oracle中,从7.3开始引入Hash Join,以代替sort-merge和nested-loop join方式,提高效率。...  Hash join的主要资源消耗在于CPU,而merge join的资源消耗主要在于此盘IO(扫描或索引)。 在
  • 利用join方法,让多线程有序执行

    千次阅读 2018-01-29 13:51:39
    和wait()有点类似,join()方法可以让多线程之间排队等待,按照一定的顺序执行。join方法是阻塞的,会一直等到取消或超时为止。假如现在有三线程,main,t0,t1,要在main线程启动之后相继执行t0,t1,那么可以在...
  • left join 效率问题

    万次阅读 2017-09-04 10:27:06
    问题:两张表关联 数据库使用oracle,left join,第一张表30w条数据,第二张表300条数据,开始使用第一张表关联第二张表,查询20个字段信息,导致oracle崩溃。...将要关联的两个表信息进行update整合成一张表,然后查
  • hive中join导致的数据倾斜问题排查

    万次阅读 多人点赞 2017-08-10 22:55:22
    hive中大key导致的join数据倾斜问题1、场景如果某个key下记录数...本例子SQL如下:查询每appid打开的次数,需要排除掉作弊的imei。selectappid,count(*)from ( select md5imei,appid from ( select t1.md5im...
  • mysql减少join种通用方法

    千次阅读 2019-01-25 17:20:00
    过多的join使用(有时系统可能会查询超过十几个join)反而会带来极低的查询效率,所以阿里开发规范有规定: join的数量不允许超过3. 要求是有了,但关于如何减少,有些人不知道该怎么做,本文给出3较为通用的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 68,999
精华内容 27,599
关键字:

几十个表join