精华内容
参与话题
问答
  • 在《spark sql 写入hive较慢原因分析》中已经分析了spark sql 写入hive分区文件慢的原因,笔者提供几种优化思路供参考:(1)spark 直接生成hive库表底层分区文件,然后再使用add partion语句添加分区信息spark.sql(s...

    在《spark sql 写入hive较慢原因分析》中已经分析了spark sql 写入hive分区文件慢的原因,笔者提供几种优化思路供参考:

    (1)spark 直接生成hive库表底层分区文件,然后再使用add partion语句添加分区信息

    spark.sql(s"alter table legend.test_log_hive_text add partition (name_par='${dirName}')")

    (2)spark 生成文件存放到HDFS目录下,使用hive脚本命令,load数据到hive中

    hive -e "load data inpath '/test/test_log_hive/name_par=test$i' overwrite into table legend.test_log_hive_text partition(name_par='test$i') "

    (3)修改spark配置文件,指定hive metastore版本及jar所在位置,查看spark源码可看到spark支持的hive版本在0.12.0-2.3.3版本之间,修改参数spark.sql.hive.metastore.version及spark.sql.hive.metastore.jars参数

    private[spark] object HiveUtils extends Logging {

    def withHiveExternalCatalog(sc: SparkContext): SparkContext = {

    sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")

    sc

    }

    /** The version of hive used internally by Spark SQL. */

    val builtinHiveVersion: String = "1.2.1"

    val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")

    .doc("Version of the Hive metastore. Available options are " +

    s"0.12.0 through 2.3.3.")

    .stringConf

    .createWithDefault(builtinHiveVersion)

    // A fake config which is only here for backward compatibility reasons. This config has no effect

    // to Spark, just for reporting the builtin Hive version of Spark to existing applications that

    // already rely on this config.

    val FAKE_HIVE_VERSION = buildConf("spark.sql.hive.version")

    .doc(s"deprecated, please use ${HIVE_METASTORE_VERSION.key} to get the Hive version in Spark.")

    .stringConf

    .createWithDefault(builtinHiveVersion)

    val HIVE_METASTORE_JARS = buildConf("spark.sql.hive.metastore.jars")

    .doc(s"""

    | Location of the jars that should be used to instantiate the HiveMetastoreClient.

    | This property can be one of three options: "

    | 1. "builtin"

    | Use Hive ${builtinHiveVersion}, which is bundled with the Spark assembly when

    | -Phive is enabled. When this option is chosen,

    | spark.sql.hive.metastore.version must be either

    | ${builtinHiveVersion} or not defined.

    | 2. "maven"

    | Use Hive jars of specified version downloaded from Maven repositories.

    | 3. A classpath in the standard format for both Hive and Hadoop.

    """.stripMargin)

    .stringConf

    .createWithDefault("builtin")

    笔者根据自己需求实际情况采用的是第二种方法,笔者实际使用场景:Oracle GG实时读取上游DB日志数据,推送到kafka,流处理程序实时保存变化日志到hbase表中,hbase表每天合并操作日志生成T-1日切片表,再使用spark读取hbase表数据,同步到离线库中供离线分析使用(主要是借用hbase完成数据的更新,删除)以下是demo

    package cn.com.spark.hbase.hive

    import java.net.URI

    import java.util

    import org.apache.hadoop.conf.Configuration

    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.apache.hadoop.hbase.client.{Result, Scan}

    import org.apache.hadoop.hbase.io.ImmutableBytesWritable

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat

    import org.apache.hadoop.hbase.protobuf.ProtobufUtil

    import org.apache.hadoop.hbase.util.{Base64, Bytes}

    import org.apache.spark.SparkConf

    import org.apache.spark.sql.types.{DataTypes, StringType, StructField}

    import org.apache.spark.sql.{RowFactory, SparkSession}

    import org.slf4j.LoggerFactory

    import scala.collection.mutable

    import scala.collection.mutable.ArrayBuffer

    object HbaseToHive {

    val log = LoggerFactory.getLogger(HbaseToHive.getClass)

    // private val hdfsPath = "/user/hive/warehouse/legend.db/test_log_hive_text"

    private val hdfsPath = "/test/test_log_hive"

    def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()

    .setAppName(s"${this.getClass.getSimpleName}")

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    sparkConf.set("spark.broadcast.compress", "true")

    sparkConf.set("spark.rdd.compress", "true")

    sparkConf.set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

    // sparkConf.set("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

    sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))

    val spark = SparkSession

    .builder()

    .config(sparkConf)

    .appName(s"${this.getClass.getSimpleName}")

    .enableHiveSupport()

    .getOrCreate()

    val conf = HBaseConfiguration.create()

    // conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181")

    conf.set("hbase.zookeeper.quorum", "30.4.137.224:2181,30.4.137.228:2181,30.4.137.229:2181")

    conf.set(TableInputFormat.INPUT_TABLE, "test:test_log_hive")

    val scan = new Scan()

    val proto = ProtobufUtil.toScan(scan)

    conf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray))

    val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(

    conf,

    classOf[TableInputFormat],

    classOf[ImmutableBytesWritable],

    classOf[Result])

    val list = new util.ArrayList[StructField]()

    val rowKey = DataTypes.createStructField("rowKey", StringType, true)

    val name = DataTypes.createStructField("name", StringType, true)

    val age = DataTypes.createStructField("age", StringType, true)

    val mobile = DataTypes.createStructField("mobile", StringType, true)

    val addr = DataTypes.createStructField("addr", StringType, true)

    list.add(rowKey)

    list.add(name)

    list.add(age)

    list.add(mobile)

    list.add(addr)

    val schema = DataTypes.createStructType(list)

    val mapHbaseRDD = hBaseRDD.map(x => {

    val result = x._2

    val rowKey = Bytes.toString(result.getRow)

    val name = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))

    val age = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))

    val mobile = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("mobile"))

    val addr = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("addr"))

    RowFactory.create(rowKey, Bytes.toString(name), Bytes.toString(age), Bytes.toString(mobile), Bytes.toString(addr))

    })

    val df = spark.createDataFrame(mapHbaseRDD, schema)

    df.rdd.map(r => {

    (r.getString(1), r.mkString(","))

    }).repartition(3).saveAsHadoopFile(hdfsPath, classOf[String], classOf[String],

    classOf[RDDMultipleTextOutputFormat])

    val dirs = getDirs(hdfsPath)

    val loadSql = dirs.map(dir => {

    val dirNames = dir.split("/")

    val dirName = dirNames(dirNames.length - 1)

    s"load data inpath '${dir}' overwrite into table legend.test_log_hive_text partition (name_par='${dirName}')"

    })

    val loadSqlMap = spliceList(loadSql.toList, 30)

    val loadSqlGroups = new ArrayBuffer[String]

    loadSqlMap.foreach(x => {

    loadSqlGroups += x._2.mkString(";") + System.lineSeparator()

    })

    spark.sparkContext.makeRDD(loadSqlGroups).repartition(1).saveAsTextFile(hdfsPath + "/" + "load_sql")

    //

    // spark.sql("use legend")

    // spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

    // spark.sql("create table test_log_hive_text(rowKey STRING, name STRING,age STRING,mobile STRING,addr " +

    // "STRING) partitioned by(name_par STRING) row format delimited fields terminated by ','")

    //

    // for (dirPath

    // val dirNames = dirPath.split("/")

    // val dirName = dirNames(dirNames.length - 1).split("=")(1)

    // spark.sql(s"alter table legend.test_log_hive_text add partition (name_par='${dirName}')")

    // }

    // df.repartition(5)

    // df.createTempView("result")

    //

    // spark.sql("use legend")

    // spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

    // spark.sql("insert into legend.test_log_hive partition(name_par) select rowKey,name,age,mobile,addr,name as name_par from result")

    // df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("name").insertInto("test.test_log")

    // spark.sql("use legend")

    // spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

    // spark.sql("load data inpath '/test/test_log_hive' OVERWRITE INTO TABLE legend.test_log_hive_text PARTITION

    // " +

    // "(create_day='2019-04-28') ")

    // spark.sql("insert overwrite table legend.test_log_hive_orc PARTITION(name_par) select rowKey,name,age,

    // mobile," +

    // "addr,name as name_par from test_log_hive_text where create_day='2019-04-28' ")

    }

    //获取目录下的一级目录

    def getDirs(path: String): Array[String] = {

    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory)

    .map(_.toString)

    }

    //获取目录下的一级文件和目录

    def getFilesAndDirs(path: String): Array[Path] = {

    val fs = getHdfs(path).listStatus(new Path(path))

    FileUtil.stat2Paths(fs)

    }

    //生成FileSystem

    def getHdfs(path: String): FileSystem = {

    val conf = new Configuration()

    FileSystem.get(URI.create(path), conf)

    }

    /**

    * 拆分集合

    *

    * @param datas

    * @param splitSize

    * @return

    */

    def spliceList(datas: List[String], splitSize: Int): mutable.HashMap[String, List[String]] = {

    if (datas == null || splitSize < 1) return null

    val totalSize = datas.size

    val count = if (totalSize % splitSize == 0) totalSize / splitSize

    else totalSize / splitSize + 1

    val map = new mutable.HashMap[String, List[String]]();

    for (i

    val cols = datas.slice(i * splitSize, if (i == count - 1) totalSize

    else splitSize * (i + 1))

    map(i.toString) = cols

    }

    map

    }

    }

    package cn.com.spark.hbase.hive

    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

    class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {

    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {

    // ("name_par=" + key + "/" + name)

    (key + "/" + name)

    }

    override def generateActualKey(key: Any, value: Any): String = {

    null

    }

    }

    demo主要功能是读取hbase数据并按照分区字段值,分别保存到hdfs目录上,最后使用hive命令脚本load数据到hive表中

    展开全文
  • Hive优化

    2020-04-15 14:31:10
    hive调优是比较大的专题,需要结合实际的业务,数据的类型,分布,质量状况等来实际的考虑如何进行系统性的优化hive底层是mapreduce,所以hadoop调优也是hive调优的一个基础,hvie调优可以分为几个模块进行考虑,...

    hive调优是比较大的专题,需要结合实际的业务,数据的类型,分布,质量状况等来实际的考虑如何进行系统性的优化,hive底层是mapreduce,所以hadoop调优也是hive调优的一个基础,hvie调优可以分为几个模块进行考虑,数据的压缩与存储,sql的优化,hive参数的优化,解决数据的倾斜等。

    一,数据的压缩与存储格式

    对分析的数据选择合适的存储格式与压缩方式能提高hive的分析效率:

    1)压缩方式

         压缩可以节约磁盘的空间,基于文本的压缩率可达40%+; 压缩可以增加吞吐量和性能量(减小载入内存的数据量),但是在压缩和解压过程中会增加CPU的开销。所以针对IO密集型的jobs(非计算密集型)可以使用压缩的方式提高性能。 几种压缩算法:

    注意:选择压缩算法的时候需要考虑到是否可以分割,如果不支持分割(切片的时候需要确定一条数据的完整性),则一个map需要执行完一个文件,如果文件很大,则效率很低。一般情况下hdfs一个块(128M)就是一个map的输入切片,而block是按物理切割的,可能一条数据会被切到两个块中去,而mapde 切片如何确保一条数据在一个切片中呢?这就是看压缩算法支不支持分割了,具体的实现机制需要看源码研究。

    2)存储格式(行存与列存)

    1. TextFile

    Hive数据表的默认格式,存储方式:行存储。 可以使用Gzip压缩算法,但压缩后的文件不支持split 在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。

    2.Sequence Files

    Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。 Hadoop API提供的一种二进制文件,以key-value的形式序列化到文件中。存储方式:行存储。 sequencefile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,RECORD是默认选项,通常BLOCK会带来较RECORD更好的压缩性能。 优势是文件和hadoop api中的MapFile是相互兼容的

    3. RCFile

    存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:

    首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。 行组大小:行组变大有助于提高数据压缩的效率,但是可能会损害数据的读取性能,因为这样增加了 Lazy 解压性能的消耗。而且行组变大会占用更多的内存,这会影响并发执行的其他MR作业。

    4.ORCFile

    存储方式:数据按行分块,每块按照列存储。
    压缩快,快速列存取。效率比rcfile高,是rcfile的改良版本。

    5.Parquet

    Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间

    6、自定义格式

    可以自定义文件格式,用户可通过实现InputFormat和OutputFormat来自定义输入输出格式。

    mapreduce可以选择压缩的地方:map阶段的输出和reduce阶段的输出。

    设置方式:

    1. map阶段输出数据压缩 ,在这个阶段,优先选择一个低CPU开销的算法。

    set hive.exec.compress.intermediate=true
    set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec
    set mapred.map.output.compression.codec=com.hadoop.compression.lzo.LzoCodec;

    2.hive.exec.compress.output:用户可以对最终生成的Hive表的数据通常也需要压缩。该参数控制这一功能的激活与禁用,设置为true来声明将结果文件进行压缩。 (也可以在建表的时候进行设置)

    set hive.exec.compress.output=true 
    set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec

    结论,一般选择orcfile/parquet + snappy 的方式

    建表语句:

    create table tablename (
     xxx,string
     xxx, bigint
    )
    ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
    STORED AS orc tblproperties("orc.compress" = "SNAPPY")

    二、创建分区表,桶表,拆分表

    1)创建分区表:(分区表相当于hive的索引,加快查询速度)

    CREATE external TABLE table_name    
    (col1 string,  col2 double) 
    partitioned by (date string)  
     ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' Stored AS TEXTFILE
    location ‘xxxxx’;
     
    alter table table_name add partitions(key = value) location 'xxxx' (收到设置分区,静态分区)

     设置动态分区

    set hive.exec.dynamic.partition=true;(可通过这个语句查看:set hive.exec.dynamic.partition;) 
    set hive.exec.dynamic.partition.mode=nonstrict; (它的默认值是strick,即不允许分区列全部是动态的)
    SET hive.exec.max.dynamic.partitions=100000;(如果自动分区数大于这个参数,将会报错)
    SET hive.exec.max.dynamic.partitions.pernode=100000;

     

    2)创建桶表

    对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。

    把表(或者分区)组织成桶(Bucket)有两个理由:

    (1)获得更高的查询处理效率。桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。

    (2)使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。

    create table bucketed_user(id int,name string) clustered by (id)
     sorted by(name) into 4 buckets row format delimited fields terminated by '\t'
     stored as textfile;

    3)拆分表

    当你需要对一个很大的表做分析的时候,但不是每个字段都需要用到,可以考虑拆分表,生成子表,减少输入的数据量。并且过滤掉无效的数据,或者合并数据,进一步减少分析的数据量

    create table tablename 
    ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
    STORED AS orc tblproperties("orc.compress" = "SNAPPY")
    as select XXX from XXXX

     

    三、hive参数优化

    1)fetch task 为执行hive时,不用执行MapReduce,如select * from emp;

    Hive.fetch.task.conversion 默认为minimal
     
    修改配置文件hive-site.xml
    <property>
      <name>hive.fetch.task.conversion</name>
      <value>more</value>
      <description>
        Some select queries can be converted to single FETCH task 
        minimizing latency.Currently the query should be single 
        sourced not having any subquery and should not have
        any aggregations or distincts (which incurrs RS), 
        lateral views and joins.
        1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
        2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
      </description>
    </property> 
     
     
    或者当前session修改
    hive> set hive.fetch.task.conversion=more;
    执行SELECT id, money FROM m limit 10; 不走mr
     

    2)并行执行

    当一个sql中有多个job时候,且这多个job之间没有依赖,则可以让顺序执行变为并行执行(一般为用到union all )

    // 开启任务并行执行
     set hive.exec.parallel=true;
     // 同一个sql允许并行任务的最大线程数 
    set hive.exec.parallel.thread.number=8;

    3)jvm 重用

     JVM重用对hive的性能具有非常大的 影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。

     

    set mapred.job.reuse.jvm.num.tasks=10; 

    JVM的一个缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡“的job中有几个 reduce task 执行的时间要比其他reduce task消耗的时间多得多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

    4)设置reduce的数目

    reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定: hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,在Hive 0.14.0版本之前默认值是1G(1,000,000,000);而从Hive 0.14.0开始,默认值变成了256M(256,000,000) ) hive.exec.reducers.max(每个任务最大的reduce数,在Hive 0.14.0版本之前默认值是999;而从Hive 0.14.0开始,默认值变成了1009 ) 计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1) 即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;

    调整reduce个数方法一: 调整hive.exec.reducers.bytes.per.reducer参数的值:

    set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

    调整reduce个数方法二:

    set mapred.reduce.tasks = number

     reduce个数并不是越多越好; 同map一样,启动和初始化reduce也会消耗时间和资源; 另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题 -

    5) 推测执行

       什么是推测执行?
    所谓的推测执行,就是当所有task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node机器配置比较低或者CPU load很高(原因很多),导致任务执行比总体任务的平均执行要慢,此时Job Tracker会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill掉
     
    怎么配置推测执行参数?
    推测执行需要设置Job的两个参数:
        mapred.map.tasks.speculative.execution
        mapred.reduce.tasks.speculative.execution
    两个参数的默认值均为true.

    四、优化sql

    (1)where条件优化

    优化前(关系数据库不用考虑会自动优化):

    select m.cid,u.id from order m join customer u on( m.cid =u.id )where m.dt='20180808';

    优化后(where条件在map端执行而不是在reduce端执行):

    select m.cid,u.id from (select * from order where dt='20180818') m join customer u on( m.cid =u.id);

    (2)union优化

    尽量不要使用union (union 去掉重复的记录)而是使用 union all 然后在用group by 去重

    (3)count distinct优化

    不要使用count (distinct   cloumn) ,使用子查询

    select count(1) from (select id from tablename group by id) tmp;

    (4) 用in 来代替join

    如果需要根据一个表的字段来约束另为一个表,尽量用in来代替join

    select id,name from tb1  a join tb2 b on(a.id = b.id);
    select id,name from tb1 where id in(select id from tb2); in 要比join 快

     

    (5)消灭子查询内的 group by 、 COUNT(DISTINCT),MAX,MIN,可以减少job的数量。

      (6) join 优化:

       Common/shuffle/Reduce JOIN 连接发生的阶段,发生在reduce 阶段, 适用于大表 连接 大表(默认的方式)

        Map join : 连接发生在map阶段 , 适用于小表 连接 大表
                           大表的数据从文件中读取
                           小表的数据存放在内存中(hive中已经自动进行了优化,自动判断小表,然后进行缓存)

    set hive.auto.convert.join=true;  

    SMB join
       Sort -Merge -Bucket Join  对大表连接大表的优化,用桶表的概念来进行优化。在一个桶内发送生笛卡尔积连接(需要是两个桶表进行join)

    set hive.auto.convert.sortmerge.join=true;  
    set hive.optimize.bucketmapjoin = true;  
    set hive.optimize.bucketmapjoin.sortedmerge = true;  
    set hive.auto.convert.sortmerge.join.noconditionaltask=true;

    五、数据倾斜

    表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。

    原因:某个reduce的数据输入量远远大于其他reduce数据的输入量

    1)、key分布不均匀

    2)、业务数据本身的特性

    3)、建表时考虑不周

    4)、某些SQL语句本身就有数据倾斜
    解决方案:

    (1)参数调节

    set hive.map.aggr=true
    set hive.groupby.skewindata=true

    (2) 熟悉数据的分布,优化sql的逻辑,找出数据倾斜的原因。

    六、合并小文件

    小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:

    设置map输入的小文件合并

    set mapred.max.split.size=256000000;  
    //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
    set mapred.min.split.size.per.node=100000000;
    //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)  
    set mapred.min.split.size.per.rack=100000000;
    //执行Map前进行小文件合并
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

     设置map输出和reduce输出进行合并的相关参数:

    //设置map端输出进行合并,默认为true
    set hive.merge.mapfiles = true
    //设置reduce端输出进行合并,默认为false
    set hive.merge.mapredfiles = true
    //设置合并文件的大小
    set hive.merge.size.per.task = 256*1000*1000
    //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
    set hive.merge.smallfiles.avgsize=16000000

    七、查看sql的执行计划

    explain sql

    学会查看sql的执行计划,优化业务逻辑 ,减少job的数据量。

    以上为我在工作中的经验和网上查阅资料所整理出来的hive调优,后面会继续补充

     

    ————————————————
    版权声明:本文为CSDN博主「文艺攻城狮」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_36753550/article/details/82825207

     

     

    展开全文
  • 苏先生之Hive底层调优

    2019-02-21 16:31:15
    hive将其转换成MapReduce job,大多数情况我们不需要了解hive的内部工作,内部复杂的查询解析、优化和执行过程大部分时间我们是可以忽视的,不过想要彻底的掌握hive,我们就需要学习hive的理论知识以及底层的实现...

    一、为什么要了解底层

    HiveQL是一种声明式语言,用户会提交查询,而hive将其转换成MapReduce job,大多数情况我们不需要了解hive的内部工作,内部复杂的查询解析、优化和执行过程大部分时间我们是可以忽视的,不过想要彻底的掌握hive,我们就需要学习hive的理论知识以及底层的实现,这样会让用户更加高效的使用hive。

    二、hive的各种调优手段

    1、学会使用explain

    对于explain,可以帮助我们学习hive是如何将查询转换成MapReduce任务的。
    如:

    explain select sum(id) from number
    

    在一个普通的查询语句前添加explain关键字,查询语句是不会执行的,它会打印出抽象语法树:
    在这里插入图片描述
    在图中我们可以看到有表名number,列名id,还有sum函数等,一个hive任务包含有多个stage阶段,不同的stage会存在依赖,查询越复杂通常stage会越多,通常也会启动更多的任务来完成。
    stage plan的部分比较冗长也比较复杂,stage-1包含了这个job大部分的处理过程,而且会出发一个MapReduce,TableScan以这个表作为输入,然后产生一个id字段的输出,Group By Operator 会应用到sum(id),然后会产生一个输出的字段_col0(这是为临时结果字段按规则起的临时名字),这些都是map端的。而reduce这边,也就是Reduce Operator Tree下面,也有相同的Group By Operator,但是这次得到的是_col0字段进行的sum操作,最后我们看到了File Output Operator,说明输出是文本格式,是基于字符串输出格式:HiveIgnoreKeyTextOutputFormat
    在这里插入图片描述
    最后,因为该job没有limit,所以stage-0阶段没有任何操作

    2、学会限制调整

    大家在用SQL语句的时候limit语句是经常用到的,有时候对大量数据,我们并不需要把所有数据查询出来,所以我们会用到limit来限制行数,返回部分结果。通常情况下limit还是要先执行整条查询语句再返回部分结果,这种情况通常是浪费的,所以我们应该尽可能的避免,hive有一个配置属性的开启,当使用limit语句时,可以对其数据进行抽样

    	<property>
    		<name>hive.limit.optimize.enble</name>
    		<value>true</value>
    	</property>
    

    一旦 hive.limit.optimize.enble 属性设置为true时,那么将会有两个参数控制这个操作:

    	<property>
    		<name>hive.limit.row.max.size</name>
    		<value>100000</value>
    	</property>
    		<property>
    		<name>hive.limit.optimize.limit.file</name>
    		<value>10</value>
    	</property>
    

    hive.limit.row.max.size:设置最小采样容量
    hive.limit.optimize.limit.file:最大采样样本数
    该功能有一个缺点就是,有可能有部分数据永远不会被处理掉。

    3、学会join优化

    (1)将大表放在后边

    hive中如果是多个表查询,它会将前面的表缓存起来,然后扫描最后一个表,因此我们通常需要表大的那个表放到后边,或者我们可以用/*streamtable(table_name) */ 指出哪个是大表。

    (2)使用相同的连接键

    当有三个及以上的表连接时,如果每个on子句都使用相同的连接键时,只会产生一个MapReduce job。

    (3)尽早的过滤数据

    尽早的过滤不需要的数据,减少每个阶段的数据量,同时只选用能使用到的数据,对于分区表要加分区。

    (4)尽量原子化操作

    尽量避免一个SQL包含复杂逻辑,可以使用中间表来处理复杂的逻辑

    (5)map-side JOIN

    如果所有表中,有一张表是小表,那么可以在大表通过mapper时,将小表完全放到内存,Hive可以在map端执行连接过程,这是因为Hive可以和内存中的小表进行逐一匹配,从而可以省略掉常规连接操作所欲要的reduce过程。该操作不仅减少了reduce过程,而且有时候还可以同时减少map过程的执行步骤。

    4、使用本地模式

    有时候hive的输入数据量是很小的,这种情况下,为查询触发执行任务的时间消耗可能比实际job执行的时间多得多。对于大多数这种情况,hive可以通过本地模式在单台机器上处理所有任务,对于小数据集,执行时间会明显缩短。
    set hive.exec.mode.local.auto = true
    当一个job满足如下条件时,才能真正执行本地模式:
    1).job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
    2).job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
    3).job的reduce数必须为0或者1
    可用参数hive.mapred.local.mem(默认0)控制child jvm使用的最大内存数。

    5、并行执行

    hive会将一个查询转换成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段等。默认情况下,hive一次只会执行一个阶段,不过某些特定的job可能包含多个阶段,而这些阶段也并非完全依赖,也就是说有的阶段是可以并行执行的,这样可以使整job的执行时间缩短。
    开启并发执行:
    set hive.exec.parallel=true;
    同一个SQL允许的最大并行度,默认为8:
    set hive.exec.parallel.thread.number=16

    6、严格模式

    Hive提供了一个严格模式,可以防止用户执行那些可能产生意想不到的不好的影响的查询。
    严格模式设置属性为strict可以禁止3种类型的查询
    set hive.mapred.mode = strict
    1).对于分区表,不加分区过滤字段不能查询
    2).对于order by语句,必须使用limit
    3).限制笛卡尔积的查询(join的时候不使用on而使用where)

    7、合理设置mapper和reduce个数

    Hive通过将查询划分成一个或者多个MapReduce任务达到并行的目的。每个任务都可能具有多个mapper和reducer任务,其中一些是可以并行执行的。确定最佳的mapper和reducer个数取决于多个变量

    1).map端优化

    I.map执行时间 = map任务启动和初始化时间+逻辑处理时间
    通常情况下,作业会通过input产生一个或多个map,主要决定因素有文件大小、文件个数、还有集群设置的文件块大小(默认128M)
    如:一个文件如果有129M,那么集群会将文件拆分成128M和1M两个块,所以会产生两个map,如果有两个文件大小分别为1M,2M那么集群也会产生两个map。

    II.map并不是越多越好
    如果一个任务有很多远小于128M的文件,而且每个文件都生成一个map,那么map启动任务和初始化的时间远远高于逻辑处理的时间,会造成很大的资源浪费,而且可执行的map的数量是有限的,如果一个文件大小接近128M,但是处理的业务逻辑十分复杂,也不适合只用一个map去执行

    针对以上问题我们可以选择适当增减map的数量:
    减少map数量,即合并大量小文件:
    set mapred.max.split.size=100000000;//100M
    set mapred.min.split.size.per.node=100000000;
    set mapred.min.split.size.per.rack=100000000;
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    这个参数表示执行前进行小文件合并,
    前面三个参数确定了合并文件的大小,大于128M的按照128M分隔,大于100M小于128M的按照100M分隔,小于100M的和那些大文件分隔后留下的小文件一起合并

    如何增加map个数?
    当单个文件比较大,业务逻辑比较复杂的时候,map会执行的比较慢,所以这个时候可以考虑考虑增加map来工作,可以考虑将文件合理拆分成多个
    set mapred.reduce.tasks=10;

    2).reduce端优化

    I.如何确定reduce的个数
    reduce的个数极大的影响着,不指定reduce个数的情况下,hive会猜测确定一个reduce的个数,基于以下设定:
    hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
    hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
    即如果map的输出,reduce的输入不超过1g的话则会有一个reduce任务
    调整reduce个数的方法:
    set hive.exec.reducers.bytes.per.reducer=500000000; (500M)//设置每个reduce处理任务的数据量
    set mapred.reduce.tasks = 15; //直接设置reduce的个数

    II.只有一个reduce的情况
    a.SQL语句中没有group by的汇总
    b.用了order by
    c.使用笛卡尔积

    8、JVM重用

    jvm重用对hive性能具有非常大的影响,特别是对很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短,因为hive调起MapReduce任务,JVM启动过程会造成很大开销,JVM重用会使JVM实例再同一个job中使用多次
    设置jvm重用个数:
    set mapred.job.reuse.jvm.num.tasks=10; --10为重用个数
    缺点:开启JVM重用将会一直占用使用到的task插槽,以便重用,知道任务完成才能释放,如果某个job中的reduce任务比其他的reduce任务花的时间要长的多的话,那么保留的插槽就会一直空着无法被其他job使用,知道reduce任务执行结束。

    9、动态分区调整

    ----开启动态分区功能:
    hive.exec.dynamic.partition=true;
    ----必须保证有一个分区是静态的:
    hive.exec.dynamic.partition.mode=strict;//设置为nonstrict则所有分区都是动态的
    ----每个mapper或reducer可创建的最大分动态区数:
    hive.exec.max.dynamic.partitions.pernode=100;
    ----一个动态分区创建语句可创建的最大动态分区数:
    hive.exec.max.dynamic.partitions=1000;
    ----全局可创建的最大文件个数:
    hive.exec.max.created.files=100000;
    ----控制DataNode一次可打开的文件个数:(在hdfs-site.xml中设置)

    <property>
        <name>dfs.datanode.max.xcievers</name>
        <value>8192</value>
    </property>
    

    10、推测执行

    推测执行是Hadoop中的一个功能,可以触发执行一些重复的任务,尽管这样会重复对数据进行计算而消耗更多的计算资源,不过这个功能的目标是通过加快获取单个task的结果以及将执行慢的TaskTracker加入到黑名单的方式来提高整体的效率。

    (1)修改 $HADOOP_HOME/conf/mapred-site.xml文件

         <property>
                   <name>mapred.map.tasks.speculative.execution </name>
                   <value>true</value>
         </property>
         <property>
                   <name>mapred.reduce.tasks.speculative.execution </name>
                   <value>true</value>
         </property>
    

    (2)修改hive配置
    set hive.mapred.reduce.tasks.speculative.execution=true;

    11、单个MapReduce中多个group by

    一个特别的优化试图将查询中的多个Group By 操作组装到单个MapReduce任务中,如果想启动这个优化,需要一组常用的Group By 键:

    <property>
                   <name>hive.multigroupby.singlemr </name>
                   <value>false</value>
    </property>
    

    12、虚拟列

    Hive提供了两种虚拟列:一种用于将要进行划分的输入文件名,另一种用于文件中的块内偏移量。当hive产生了非预期的或null的返回结果时,可以通过这些虚拟列诊断查询。通过查询这些“字段”,用户可以查看到哪个文件甚至哪行数据导致的问题
    set hive.exec.rowoffset = true

    13、数据倾斜

    当一些任务执行进度长期维持在99%或100%,查看任务监控页面,发现只有几个少量的reduce子任务未完成,因为其处理的数据量和其他子任务差异过大,单一的reduce任务数据量和平均数据量差异过大,所以时长远大于平均时长。
    可能原因:
    1).key之分布不均匀
    2).业务数据本身特点
    3).建表时考虑不周
    4).某些SQL语句本身就有数据倾斜
    在这里插入图片描述
    参数调节:
    hive.map.aggr=true
    在Map端做combiner,假如map各条数据基本上不一样, 聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数hive.groupby.mapaggr.checkinterval = 100000 (默认)hive.map.aggr.hash.min.reduction=0.5(默认),预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合

    hive.groupby.skewindata=true
    生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce做次预汇总,减少某些key值条数过多某些key条数过小造成的数据倾斜问题

    展开全文
  • HIVE优化

    2018-09-17 13:58:26
    HIVE优化 ...HIVE底层运行的是MR程序,所以也要对MR进行优化。 并行执行: 在map运行过程中,有的时候需要存在依赖关系。 例如:做单词统计,一个map做完单词统计,下一个map的输入正好是...

    HIVE优化


    1、表和sql语句的优化

    思想:
    大表拆分成小表,分区表,(动态分区)有可能产生大量的分区,外部表(保证数据安全),临时表是将表的数据清洗,获得想要的字段。

    2、MR优化

    思想:
    HIVE中底层运行的是MR程序,所以也要对MR进行优化。

    并行执行:
    在map运行过程中,有的时候需要存在依赖关系。
    例如:做单词统计,一个map做完单词统计,下一个map的输入正好是上一个map的输出,这时候就是一个map依赖于另一个map的时候。
    在很多时候map task之间是没有关系的,那么就可以一起并行运行。Map的并行功能默认是关闭的,首先先开启并行功能:

    set hive.exec.parallel=true;
    

    设置并行执行的线程个数:

    set hive.exec.parallel.thread.number=8;
    

    PS: 一般在工作中,都会选择开启这个功能。

    3、JVM重用

    思想:
    对于job程序来说,频繁使用开关JVM,在这个过程中会造成大量的资源浪费,可以设置JVM重用在开启的时候可以多跑一些程序。
    经过测试选择合理的数目:

    set Mapreduce.job.jvm.numbertasks=4;
    

    推测执行:
    当某个任务出现迟迟不结束的情况下,可以开启推测执行,开启一个同样的任务,任务谁先完成谁就会去关闭另一个同样的任务。
    缺点:
    1、过多的占用资源
    2、有可能发生重复写入的情况,发生异常。

    4、HIVE的本地模式

    概述:
    如果在HIVE中运行的job数据量不大,提交到集群中,拉去资源的时候很浪费时间,这个时候就可以考虑使用本地模式
    例如:
    HIVE在3个集群中的任意一个集群上,提交的job就在HIVE本地的机器上。
    前提:
    1、处理小数据集。
    2、输入1的数据块大小不能超过128MB。
    3、Reduce的个数不超过1个。
    设置本地模式的参数:

    set hive.exec.mode.local.auto=true;
    
    展开全文
  • (1)背景目前使用Python读取Hive表,解析转换之后并发插入Redis,使用fetchone方法读取速度较慢,Python转换处理的速度也较慢。所以需要优化插入Redis的流程。考虑使用SparkSQL读取数据插入Redis。(2)优化思路步骤1)...
  • Hive优化

    2018-12-06 19:20:52
    Hive优化 Hive优化思想: Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具,...Hive性能优化时,把HiveQL当做M/R程序来读,即从M/R的运行角度来考虑优化性能,从更底层思考如何优化运算...
  • HIVE SQL优化

    2019-06-19 20:05:00
    作为数据仓库的利器,大部分的数仓工作者大部分时间都在写Hive Sql,根据作者经验整理出单纯的Hive Sql调优。 一. 普通场景下的sql优化 1. 列裁剪 由于数仓底层存储大都采用列式存储,如ORC/PARQUET,所以可以采用列...
  • hive优化

    2018-11-23 11:35:25
    hive底层是MapReduce运行的。所以可以以MapReduce的方式进行对Hive优化。 一 map端的压缩: set hive.exec.compress.intermediate=true; set mapreduce.map.output.compress=true; set mapreduce.map.output....
  • Hive优化(十三)

    2019-09-23 22:04:07
    Hive底层是MapReduce,当数据量太大时,往往可以通过并行来提高效率,比如通过Partition实现运行多个Reduce,可是如果处理不当则容易引发数据倾斜,从而导致效率降低,这就涉及Hive优化Hive优化主要分为Map....
  • Hive SQL优化之 Count Distinct

    千次阅读 2017-03-21 12:24:42
    Hive SQL优化之 Count Distinct ...Hive是Hadoop的子项目,它提供了对数据的结构化管理和类SQL语言的查询功能。SQL的交互方式极大程度地降低了Hadoop生态环境中数据处理的门槛,用户...目前,Hive底层使用MapRed
  • Hive join 优化实战

    千次阅读 2016-08-02 14:12:28
    由于 hive 与传统关系型数据库面对的业务场景及底层技术架构都有着很大差异,因此,传统数据库领域的一些技能放到 Hive 中可能已不再适用。关于 hive优化与原理、应用的文章,前面也陆陆续续的介绍了一些,但大多...
  • 1 与会者目标=>...hive 建立在Hadoop体系架构上的一层SQL抽象,HiveSQL实际上先被SQL解析器解析后被Hive框架解析成一个MapReduce可执行计划,并按照该计划生成MapReduce任务后交给Hadoop集群处理 Ma.
  • hive性能优化时,把HiveQL当做M/R程序来读,即从M/R的运行角度来考虑优化性能,从更底层思考如何优化运算性能,而不仅仅局限于逻辑代码的替换层面。 RAC(Real Application Cluster)真正应用集群就像一辆机动...
  • HIVE优化

    2020-01-13 15:27:35
    由于Hive的执行依赖于底层的MapReduce作业,因此对Hadoop作业的优化或者对MapReduce作业的调整是提高Hive性能的基础。所以我们可以通过一系列的调优方法,来提高大幅度地Hive查询的性能。 1、启用压缩 压缩可以使...
  • Hive性能优化(全面)

    2018-09-11 12:26:00
    1.介绍 首先,我们来看看Hadoop的计算框架特性,在此特性下会衍生哪些...hive性能优化时,把HiveQL当做M/R程序来读,即从M/R的运行角度来考虑优化性能,从更底层思考如何优化运算性能,而不仅仅局限于逻辑代码的...
  • 在阐述Hive Join具体的优化方法之前,首先看一下Hive Join的几个重要特点,在实际使用时也可以利用下列特点做相应优化: 1.只支持等值连接 2.底层会将写的HQL语句转换为MapReduce,并且reduce会将join语句中除最后一...
  • 3.6 Hive优化

    2019-12-04 10:03:41
    好多优化要基于底层 Hadoop—JVM 把HiveSQL 当做Mapreduce程序去优化 Fetch hive-default.xml.template hive.fetch.task.conversion → 默认more 如果 none那么每次执行select * from 都是执行MapReduce 本地...
  • Hive优化策略

    2018-04-12 18:38:41
    所以,在学习了解MR原理是必要的,清楚了Hive底层优化过程,会大大增加Hive的执行效率。Hive对于OLAP类型的应用有很大的局限性,它不适合需要立即返回查询结果的场景。然而,通过实施下面一系列的调优方法,Hive...
  • 由于Hive的执行依赖于底层的MapReduce作业,因此对Hadoop作业的优化或者对MapReduce作业的调整是提高Hive性能的基础。 所以我们可以通过一系列的调优方法来使其提高效率。 为什么Hive运行性能低下? 简单来说hive用...

空空如也

1 2 3 4 5 ... 10
收藏数 183
精华内容 73
关键字:

hive底层优化