精华内容
下载资源
问答
  • 最近遇到这样的一个场景: 存在两个Hadoop集群,需要将一个集群中的...Java读取源hive—>我司kafka—>sparkStreaming读取kafka—>目标端hive 代码示例: Java获取其他公司hive数据: package com.zhbr....

    最近遇到这样的一个场景:

    存在两个Hadoop集群,需要将一个集群中的hive数据传输到另一个集群的hive中。且源端hive为其他公司数据源,涉及到的一定的安全和保密性。

    现大致思路为:

    Java读取源端hive—>我司kafka—>sparkStreaming读取kafka—>目标端hive

    代码示例:

    Java获取其他公司hive表数据:
    package com.zhbr.dataImport.test;
    
    import com.alibaba.fastjson.JSON;
    import com.zhbr.dataImport.rdbms.ImportRDBMSData;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * @ClassName GW_to_Kafka_test
     * @Description TODO
     * @Autor yanni
     * @Date 2020/3/25 9:07
     * @Version 1.0
     **/
    public class GW_to_Kafka_test2 {
    
        private static String brokerList = "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092";
    
        // public static final String topic="topic-demo";
        private static String topic = "hive2kafka2";
    
        public static void main(String[] args) throws SQLException {
        	//自定义的JDBC方式读取
            Connection conn  = ImportRDBMSData.getConn();
            Statement stmt  = ImportRDBMSData.getStatement(conn);
            String querySQL = "select * from lsb_copy";
    
            //查询
            ResultSet res = stmt.executeQuery(querySQL);
    
            //创建ListBuffer集合
            ArrayList<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
    
            //获得结果集结构信息(元数据)
            ResultSetMetaData metaData = res.getMetaData();
    
            //ResultSet列数
            int columnCount = metaData.getColumnCount();
    
            //配置生产者客户端参数
            //将配置序列化
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //内存缓冲
            properties.put("buffer.memory", 67108864);
            //批处理大小
            properties.put("batch.size", 131072);
            //发送间隔
            properties.put("linger.ms", 100);
            //消息的最大大小
            properties.put("max.request.size", 10485760);
            //失败重试
            properties.put("retries", 3);
            properties.put("retry.backoff.ms", 20000);
            //ack级别(1代表保证leader收到)
            properties.put("acks", "1");
            properties.put("bootstrap.servers", brokerList);
            //压缩
            properties.put("compression.type", "gzip");
            //创建KafkaProducer 实例
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            try {
            // ResultSet转List<Map>数据结构
            // next用于移动到ResultSet的下一行,使下一行成为当前行
            while (res.next()) {
    
                //创建map集合
                HashMap<String, Object> map = new HashMap<String, Object>();
    
                // 遍历获取对当前行的每一列的键值对,put到map中
                for (int i = 1;i<=columnCount;i++) {
                    // 获取当前行某一列字段的字段名
                    String allColumnName = metaData.getColumnName(i).toLowerCase();
    
                    // rs.getObject(i) 获得当前行某一列字段的值
                    Object columnValue = res.getObject(i);
                    map.put(allColumnName,columnValue);
                }
    
                //将数据添加到list集合
                list.add(map);
    
                //当list集合容量为5000时,发送一次
                if(list.size()==5000){
                    String str = JSON.toJSONString(list);
    
                    //构建待发送的消息
                    ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,str);
    
                    //尝试发送消息
                    kafkaProducer.send(record);
                    //打印发送成功
                    System.out.println("batchSize 5000 send success from producer");
    
                    //清空list集合
                    list.clear();
                }
    
    
            }
    
            //将剩下的不满5000条的数据发送
            if(list.size()>0){
                String str = JSON.toJSONString(list);
    
                //构建待发送的消息
                ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,str);
    
                //尝试发送消息
                kafkaProducer.send(record);
                //打印发送成功
                System.out.println("batchSize "+list.size()+" send success from producer");
    
                //清空list集合
                list.clear();
            }
    
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //关闭生产者客户端实例
                kafkaProducer.close();
                ImportRDBMSData.closeAllConn(stmt,conn);
            }
        }
    }
    

    分批次写入,避免因为性能问题导致数据丢失及服务器宕机,如此可基本保证hive表大数据量的写入工作。

    \

    sparkStreaming实时消费kafka,将数据保存到hive
    package com.zhbr.dataImport.test
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object Kafka_to_Hive {
    
        def main(args: Array[String]): Unit = {
    
          //获取sparkSession
          val spark = SparkSession.builder().appName(this.getClass.getSimpleName.filter(!_.equals('$')))
            .master("local[4]").config("spark.streaming.receiver.writeAheadLog.enable","true").getOrCreate()
    
          //获取sparkContext
          val sc = spark.sparkContext
    
          //设置日志级别
          sc.setLogLevel("WARN")
    
          val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    
          //设置检查点,通常生产环境当中,为了保证数据不丢失,将数据放到hdfs之上,hdfs的高容错,多副本特征
          ssc.checkpoint("./kafka-chk2")
    
          //设置kafkaParams
          val kafkaParams=Map("metadata.broker.list"->"node01:9092,node02:9092,node03:9092","group.id"->"group1")
    
          //设置topics
          val topics=Set("hive2kafka2")
    
          //获取数据
          val data: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    
          //获取真正的数据,数据在元组的第二位
          val realData: DStream[String] = data.map(x=>x._2)
    
          realData.map(record => record.toString).foreachRDD(rdd => {
            import spark.implicits._
            val df = spark.read.json(spark.createDataset(rdd))
    
            //存入MySQL
            df.write.mode(SaveMode.Append).format("jdbc")
                      .option(JDBCOptions.JDBC_URL,"jdbc:mysql://localhost:3306/test11")
                      .option("user","root")
                      .option("password","123")
                      .option(JDBCOptions.JDBC_TABLE_NAME,"lsb_copy")
                      .save()
    
            //存入hive
            //df.createTempView("df_tmp")
            //spark.sql("insert into table df_copy select * from df_tmp")
          })
    
          //开启流式计算
          ssc.start()
          ssc.awaitTermination()
      }
    }
    



    \

    最后:

    欢迎各位大神提出更简单、更快捷的解决思路。

    展开全文
  • 关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源前面文章介绍了SparSQL通过Hive操作HBase表。SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。...

    关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

    前面文章介绍了SparSQL通过Hive操作HBase表。

    SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

    在HBase中表如下:

    create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1}

    put 'lxw1234','lxw1234.com','f1:c1','name1'

    put 'lxw1234','lxw1234.com','f1:c2','name2'

    put 'lxw1234','lxw1234.com','f2:c1','age1'

    put 'lxw1234','lxw1234.com','f2:c2','age2'

    put 'lxw1234','lxw1234.com','f3:c1','job1'

    put 'lxw1234','lxw1234.com','f3:c2','job2'

    put 'lxw1234','lxw1234.com','f3:c3','job3'

    hbase(main):025:0* scan 'lxw1234'

    ROW COLUMN+CELL

    lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1

    lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2

    lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1

    lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2

    lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1

    lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2

    lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3

    进入spark-shell

    sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

    运行以下代码:

    import sqlContext._

    var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map(

    "sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)",

    "hbase_table_name" -> "lxw1234",

    "hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )"

    )).load()

    //sparksql_table_schema参数为sparksql中表的定义

    //hbase_table_name参数为HBase中表名

    //hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。

    scala> hbasetable.printSchema()

    root

    |-- row_key: string (nullable = false)

    |-- c1: string (nullable = false)

    |-- c2: string (nullable = false)

    |-- c3: string (nullable = false)

    hbasetable.registerTempTable("lxw1234")

    sqlContext.sql("SELECT * from lxw1234").collect

    res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3])

    sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collect

    res3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])

    源码

    HBaseRelation.scala

    package com.lxw1234.sparksql.hbase

    import java.io.Serializable

    import org.apache.hadoop.fs.Path

    import org.apache.spark.sql._

    import org.apache.spark.sql.sources.TableScan

    import scala.collection.immutable.{HashMap, Map}

    import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}

    import org.apache.spark.sql._

    import org.apache.spark.rdd.NewHadoopRDD

    import org.apache.hadoop.hbase.HBaseConfiguration

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat

    import scala.collection.JavaConversions._

    import scala.collection.JavaConverters._

    import scala.collection.mutable.ArrayBuffer

    import org.apache.spark.sql.types.StructType

    import org.apache.spark.sql.types.DataType

    import org.apache.spark.sql.types.StructField

    import org.apache.spark.sql.types.LongType

    import org.apache.spark.sql.types.IntegerType

    import org.apache.spark.sql.types.StringType

    import org.apache.spark.sql.types.MapType

    import org.apache.spark.sql.sources.BaseRelation

    object Resolver extends Serializable {

    def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {

    val cfColArray = hbaseField.fieldName.split(":",-1)

    val cfName = cfColArray(0)

    val colName = cfColArray(1)

    var fieldRs: Any = null

    //resolve row key otherwise resolve column

    if(cfName=="" && colName=="key") {

    fieldRs = resolveRowKey(result, hbaseField.fieldType)

    } else {

    fieldRs = resolveColumn(result, cfName, colName,hbaseField.fieldType)

    }

    fieldRs

    }

    def resolveRowKey (result: Result, resultType: String): Any = {

    val rowkey = resultType match {

    case "string" =>

    result.getRow.map(_.toChar).mkString

    case "int" =>

    result .getRow.map(_.toChar).mkString.toInt

    case "long" =>

    result.getRow.map(_.toChar).mkString.toLong

    }

    rowkey

    }

    def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {

    val column = resultType match {

    case "string" =>

    result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString

    case "int" =>

    result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toInt

    case "long" =>

    result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString.toLong

    }

    column

    }

    }

    /**

    val hbaseDDL = s"""

    |CREATE TEMPORARY TABLE hbase_people

    |USING com.shengli.spark.hbase

    |OPTIONS (

    | sparksql_table_schema '(row_key string, name string, age int, job string)',

    | hbase_table_name 'people',

    | hbase_table_schema '(:key , profile:name , profile:age , career:job )'

    |)""".stripMargin

    */

    case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{

    val hbaseTableName = hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))

    val hbaseTableSchema = hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))

    val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))

    val rowRange = hbaseProps.getOrElse("row_range", "->")

    //get star row and end row

    val range = rowRange.split("->",-1)

    val startRowKey = range(0).trim

    val endRowKey = range(1).trim

    val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field

    val registerTableFields = extractRegisterSchema(registerTableSchema)

    val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)

    val hbaseTableFields = feedTypes(tempFieldRelation)

    val fieldsRelations = tableSchemaFieldMapping(hbaseTableFields,registerTableFields)

    val queryColumns = getQueryTargetCloumns(hbaseTableFields)

    def feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) : Array[HBaseSchemaField] = {

    val hbaseFields = mapping.map{

    case (k,v) =>

    val field = k.copy(fieldType=v.fieldType)

    field

    }

    hbaseFields.toArray

    }

    def isRowKey(field: HBaseSchemaField) : Boolean = {

    val cfColArray = field.fieldName.split(":",-1)

    val cfName = cfColArray(0)

    val colName = cfColArray(1)

    if(cfName=="" && colName=="key") true else false

    }

    //eg: f1:col1 f1:col2 f1:col3 f2:col1

    def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {

    var str = ArrayBuffer[String]()

    hbaseTableFields.foreach{ field=>

    if(!isRowKey(field)) {

    str += field.fieldName

    }

    }

    str.mkString(" ")

    }

    lazy val schema = {

    val fields = hbaseTableFields.map{ field=>

    val name = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName

    val relatedType = field.fieldType match {

    case "string" =>

    SchemaType(StringType,nullable = false)

    case "int" =>

    SchemaType(IntegerType,nullable = false)

    case "long" =>

    SchemaType(LongType,nullable = false)

    }

    StructField(name,relatedType.dataType,relatedType.nullable)

    }

    StructType(fields)

    }

    def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField], registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {

    if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")

    val rs = externalHBaseTable.zip(registerTable)

    rs.toMap

    }

    /**

    * spark sql schema will be register

    * registerTableSchema '(rowkey string, value string, column_a string)'

    */

    def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {

    val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)

    val fieldsArray = fieldsStr.split(",").map(_.trim)

    fieldsArray.map{ fildString =>

    val splitedField = fildString.split("\\s+", -1)

    RegisteredSchemaField(splitedField(0), splitedField(1))

    }

    }

    //externalTableSchema '(:key , f1:col1 )'

    def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {

    val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)

    val fieldsArray = fieldsStr.split(",").map(_.trim)

    fieldsArray.map(fildString => HBaseSchemaField(fildString,""))

    }

    // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.

    lazy val buildScan = {

    val hbaseConf = HBaseConfiguration.create()

    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)

    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);

    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);

    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);

    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(

    hbaseConf,

    classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],

    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

    classOf[org.apache.hadoop.hbase.client.Result]

    )

    val rs = hbaseRdd.map(tuple => tuple._2).map(result => {

    var values = new ArrayBuffer[Any]()

    hbaseTableFields.foreach{field=>

    values += Resolver.resolve(field,result)

    }

    Row.fromSeq(values.toSeq)

    })

    rs

    }

    private case class SchemaType(dataType: DataType, nullable: Boolean)

    //

    // private def toSqlType(hbaseSchema: Schema): SchemaType = {

    // SchemaType(StringType,true)

    // }

    }

    DefaultSource.scala

    package com.lxw1234.sparksql.hbase

    import org.apache.spark.sql.SQLContext

    import org.apache.spark.sql.sources.RelationProvider

    class DefaultSource extends RelationProvider {

    def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {

    HBaseRelation(parameters)(sqlContext)

    }

    }

    package.scala

    package com.lxw1234.sparksql

    import org.apache.spark.sql.SQLContext

    import scala.collection.immutable.HashMap

    package object hbase {

    abstract class SchemaField extends Serializable

    case class RegisteredSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable

    case class HBaseSchemaField(fieldName: String, fieldType: String) extends SchemaField with Serializable

    case class Parameter(name: String)

    protected val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")

    protected val HBASE_TABLE_NAME = Parameter("hbase_table_name")

    protected val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")

    protected val ROW_RANGE = Parameter("row_range")

    /**

    * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.

    */

    implicit class HBaseContext(sqlContext: SQLContext) {

    def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {

    var params = new HashMap[String, String]

    params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)

    params += ( HBASE_TABLE_NAME.name -> hbaseTableName)

    params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)

    //get star row and end row

    params += ( ROW_RANGE.name -> rowRange)

    sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));

    //sqlContext.baseRelationToSchemaRDD(HBaseRelation(params)(sqlContext))

    }

    }

    // implicit class HBaseSchemaRDD(schemaRDD: SchemaRDD) {

    // def saveIntoTable(tableName: String): Unit = ???

    // }

    }

    相关配置和说明

    本来在SparkSQL中通过外部数据源建表的语法是:

    CREATE TEMPORARY TABLE hbasetable

    USING com.lxw1234.sparksql.hbase

    OPTIONS (

    sparksql_table_schema   ‘(row_key string, c1 string, c2 string, c3 string)’,

    hbase_table_name   ‘lxw1234′,

    hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

    )

    在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

    是否是因为Spark1.4包的编译了Hive的原因?

    上面源码的编译依赖HBase的相关jar包:

    hbase-client-0.96.1.1-cdh5.0.0.jar

    hbase-common-0.96.1.1-cdh5.0.0.jar

    hbase-protocol-0.96.1.1-cdh5.0.0.jar

    hbase-server-0.96.1.1-cdh5.0.0.jar

    还有HBase的集群信息:

    hbase.zookeeper.quorum

    hbase.client.scanner.caching

    我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

    此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

    https://github.com/OopsOutOfMemory/spark-sql-hbase

    此程序只做学习和测试使用,并未测试性能

    如果觉得本博客对您有帮助,请 赞助作者 。

    展开全文
  • 1.读取txt文件 scala版本 package com.kevin.scala.dataframe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * 读取txt文件转成DataFrame形式操作 */ ...

    1.读取txt文件

    scala版本

    package com.kevin.scala.dataframe
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
    /**
      * 读取txt文件转成DataFrame形式操作
      */
    object DataFrameTxt {
    
      def main(args: Array[String]): Unit = {
        // 1.创建sparkconf
        val conf = new SparkConf().setAppName("DataFrameTxt").setMaster("local")
        // 2.创建sc
        val sc = new SparkContext(conf)
        // 3.创建sqlcontext
        val sqlContext = new SQLContext(sc)
        val line = sc.textFile("DTSparkSql\\src\\main\\resources\\person.txt")
        import sqlContext.implicits._
        // 4.读取文件用map切分,再用map将数据装成Person类型,toDF转成DataFrame
        line.map(_.split(",")).map(p => new Person(p(0),p(1),p(2).trim.toInt)).toDF.show()
        // 5.关闭sc
        sc.stop()
    
      }
    
    }
    

    Java版本

    package com.kevin.java.dataframe;
    
    import com.kevin.java.entity.Person;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    /**
     * @author kevin
     * @version 1.0
     * @description     读取txt文件转成DataFrame形式操作
     * @createDate 2019/1/6
     */
    public class DataFrameTxt {
    
        public static void main(String[] args) {
            // 1.创建SparkConf并设置作业名称和模式
            SparkConf conf = new SparkConf().setAppName("DataFrameTxt").setMaster("local");
    
            // 2.基于sparkConf创建SparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            // 3.创建SQLContext对象对sql进行分析处理
            SQLContext sqlContext = new SQLContext(sc);
    
            // 4.读取文件数据
            String file = "DTSparkSql\\src\\main\\resources\\person.txt";
            JavaRDD<String> lineRDD = sc.textFile(file);
    
            // 5.获取每一行txt数据转为person类型
            JavaRDD<Person> map = lineRDD.map(new Function<String, Person>() {
                @Override
                public Person call(String line) throws Exception {
                    String[] s = line.split(",");
                    Person p = new Person();
                    p.setId(s[0]);
                    p.setName(s[1]);
                    p.setAge(Integer.valueOf(s[2]));
                    return p;
                }
            });
    
            // 6.将person类型的数据转为DataFrame表
            DataFrame df = sqlContext.createDataFrame(map, Person.class);
    
            // 7.查看所有数据,默认前20行
            df.show();
    
            // 8.关闭sc
            sc.close();
    
        }
    }
    

    2.读取Json文件

    scala版本

    package com.kevin.scala.dataframe
    
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 读取json格式文件转为DataFrame表形式分析处理
      *  json文件中不能嵌套json格式的内容
      *  1.读取json格式两种方式
      *  2.df.show()默认显示前20行,使用df.show(行数)显示多行
      *  3.df.javaRDD/(scala df.rdd)将DataFrame转换成RDD
      *  4.df.printSchema()显示DataFrame中Schema信息
      *  5.dataFrame自带的API操作DataFrame,一般不使用
      *  6.使用sql查询,先将DataFrame注册成临时表:df.registerTempTable("jtable"),
      *  再使用sql,怎么使用sql?sqlContext.sql("sql语句")
      *  7.不能加载嵌套的json文件
      *  8.df加载过来之后将列安装ascii排序
      */
    object DataFrameJson {
    
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf
        val conf = new SparkConf().setAppName("DataFrameJson").setMaster("local")
        // 2.创建SparkContext
        val sc = new SparkContext(conf)
        // 3.创建SQLContext对象对sql进行分析处理
        val sqlContext = new SQLContext(sc)
        val file = "DTSparkSql\\src\\main\\resources\\json"
        // 4.读取json文件并转成DataFrame表
        val df = sqlContext.read.json(file)
        // 5.获取表中所有的数据
        df.show()
        // 6.使用DataFrame自带的API操作DataFrame
        // select name from table
        df.select("name").show()
    
        // select name, age+10 as addage from table
        df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show()
    
        // select name ,age from table where age>19
        df.select(df.col("name"),df.col("age")).where(df.col("age").gt("19")).show()
    
        // select age,count(*) from table group by age
        df.groupBy(df.col("age")).count().show()
    
        // 7.将DataFrame注册成临时表使用sql语句操作
        df.registerTempTable("tempTable")
        sqlContext.sql("select age,count(*) as gg from tempTable group by age").show()
        sqlContext.sql("select name,age from tempTable").show()
    
        // 8.关闭
        sc.stop()
      }
    
    }
    

    java版本

    package com.kevin.java.dataframe;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    /**
     * @author kevin
     * @version 1.0
     * @description     读取json格式文件转为DataFrame表形式分析处理
     *
     *  json文件中不能嵌套json格式的内容
     *  1.读取json格式两种方式
     *  2.df.show()默认显示前20行,使用df.show(行数)显示多行
     *  3.df.javaRDD/(scala df.rdd)将DataFrame转换成RDD
     *  4.df.printSchema()显示DataFrame中Schema信息
     *  5.dataFrame自带的API操作DataFrame,一般不使用
     *  6.使用sql查询,先将DataFrame注册成临时表:df.registerTempTable("jtable"),
     *  再使用sql,怎么使用sql?sqlContext.sql("sql语句")
     *  7.不能加载嵌套的json文件
     *  8.df加载过来之后将列安装ascii排序
     * @createDate 2019/1/6
     */
    public class DataFrameJson {
    
        public static void main(String[] args) {
    
            // 1.创建SparkConf作业设置作业名称和模式
            SparkConf conf = new SparkConf().setAppName("DataFrameJson").setMaster("local");
    
            // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            // 3.根据sc创建SQLContext对象对sql进行分析处理
            SQLContext sqlContext = new SQLContext(sc);
    
            // 4.读取文件数据
            String file = "DTSparkSql\\src\\main\\resources\\json";
    
            // 5.读取json文件并转成DataFrame表
            DataFrame df = sqlContext.read().json(file);
    
            // 6.获取表中所有数据
            df.show();
    
            // 7.使用DataFrame自带的API操作DataFrame
            // select name from table
            df.select("name").show();
    
            // select name, age+10 as addage from table
            df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();
    
            // select name ,age from table where age>19
            df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();
    
            // select age,count(*) from table group by age
            df.groupBy(df.col("age")).count().show();
    
            // 8.将DataFrame注册成临时表使用sql语句操作
            df.registerTempTable("jtable");
    
            DataFrame sql = sqlContext.sql("select age,count(*) as gg from jtable group by age");
            sql.show();
            DataFrame sql2 = sqlContext.sql("select name,age from jtable");
            sql2.show();
    
            // 关闭
            sc.close();
        }
    }
    

    3.读取Hive

    如果读取hive中数据,要使用HiveContext
    HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表
    在idea访问由SparkSQL访问hive需要将hive-site.xml复制到resources

    scala版本

    package com.kevin.scala.dataframe
    
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 如果读取hive中数据,要使用HiveContext
      * HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表
      *     在idea访问由SparkSQL访问hive需要将hive-site.xml复制到resources
      */
    object DataFrameHive {
    
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf
        val conf = new SparkConf().setAppName("DataFrameHive").setMaster("local")
        // 2.创建SparkContext
        val sc = new SparkContext(conf)
        // 3.创建HiveContext
        val hiveContext = new HiveContext(sc)
        // 4.查看所有数据库
        hiveContext.sql("show tables").show()
        // 5.关闭sc
        sc.stop()
    
      }
    
    }
    

    java版本

    package com.kevin.java.dataframe;
    
    import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.hive.HiveContext;
    
    import java.util.List;
    
    /**
     * @author kevin
     * @version 1.0
     * @description     如果读取hive中数据,要使用HiveContext
     *  HiveContext.sql(sql)可以操作hive表,还可以操作虚拟表
     * @createDate 2019/1/6
     */
    public class DataFrameHive {
    
        public static void main(String[] args) {
    
            /*
             * 0.把hive里面的hive-site.xml放到spark/conf目录下
             * 1.启动Mysql
             * 2.启动HDFS
             * 3.启动Hive ./hive
             * 4.初始化HiveContext
             * 5.打包运行
             *
             * ./bin/spark-submit --master yarn-cluster --class com.kevin.java.dataframe.DataFrameHive /root/DTSparkSql.jar
             * ./bin/spark-submit --master yarn-client --class com.kevin.java.dataframe.DataFrameHive /root/DTSparkSql.jar
             */
            // 如果不设置master,则无法在本地运行,需要打包在集群运行
            SparkConf conf = new SparkConf().setAppName("DataFrameHive").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //SparkSession
            // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext,
            // 其实也可以使用JavaSparkContext,只不过内部也是做了sc.sc()的操作
            HiveContext hiveContext = new HiveContext(sc);
            DataFrame sql = hiveContext.sql("show databases");
            sql.show();
            sc.close();
        }
    
    }
    

    4.读取Mysql数据源

    scala版本

    package com.kevin.scala.dataframe
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
    /**
      *  读取mysql数据源
      */
    object DataFrameMysqlRead {
    
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf
        val conf = new SparkConf().setAppName("DataFrameMysqlRead").setMaster("local")
        // 配置join或者聚合操作shuffle数据时分区的数量
        conf.set("spark.sql.shuffle.partitions","1")
        // 2.创建SparkContext
        val sc = new SparkContext(conf)
        // 3.创建SQLContext对象对sql进行分析处理
        val sqlContext = new SQLContext(sc)
        val options = Map("url" -> "jdbc:mysql://192.168.171.101:3306/test",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" -> "root",
          "password" -> "Hadoop01!",
          "dbtable" -> "person")
        // 4.读取Mysql数据库表,加载为DataFrame
        val jdbcDF = sqlContext.read.format("jdbc").options(options).load()
        jdbcDF.show()
        // 5.注册为临时表
        jdbcDF.registerTempTable("temp_person")
        // 6.查询name=kevin
        val result = sqlContext.sql("select id,name,age from temp_person where name = 'kevin' ")
        result.show()
        // 7.关闭sc
        sc.stop()
    
      }
    
    }
    

    将数据写入Mysql

    package com.kevin.scala.dataframe
    
    import java.util.Properties
    
    import org.apache.spark.sql.{Row, SQLContext, SaveMode}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      *  将数据写入mysql
      */
    object DataFrameMysqlWrite {
    
      def main(args: Array[String]): Unit = {
        // 1.创建SparkConf
        val conf = new SparkConf().setAppName("DataFrameMysqlWrite").setMaster("local")
        // 2.创建sc
        val sc = new SparkContext(conf)
        // 3.创建sqlContext
        val sqlContext = new SQLContext(sc)
        // 4.通过并行化创建RDD
        val person = sc.parallelize(Array("3 cao 23","4 zheng 20","5 mai 20")).map(_.split(" "))
        // 5.通过StructType直接指定每个字段的schema
        val schema = StructType(List(
            StructField("id",IntegerType,true),
            StructField("name",StringType,true),
            StructField("age",IntegerType,true)
        ))
        // 6.将rdd映射到rowRdd
        val row = person.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
        // 7.创建DataFrame
        val df = sqlContext.createDataFrame(row,schema)
        // 8.数据库的账号和密码
        val prop = new Properties()
        prop.put("user","root")
        prop.put("password","Hadoop01!")
        // 9.将数据插入表中,SaveMode.Overwrite覆盖表的数据
        df.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.171.101:3306/test","person",prop)
        // 10.关闭sc
        sc.stop()
    
      }
    
    }
    

    读取和写入Mysql

    java版本

    package com.kevin.java.dataframe;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.DataFrameReader;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    
    import java.util.HashMap;
    import java.util.Properties;
    
    /**
     * @author kevin
     * @version 1.0
     * @description     读取mysql数据源
     * @createDate 2019/1/8
     */
    public class DataFrameMysql {
    
        public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("DataFrameMysql").setMaster("local");
            // 配置join或者聚合操作shuffle数据时分区的数量
            conf.set("spark.sql.shuffle.partitions","1");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            // 第一种方式,读取Mysql数据库表,加载为DataFrame
            HashMap<String, String> options = new HashMap<>();
            options.put("url","jdbc:mysql://192.168.171.101:3306/sparkdb");
            options.put("driver","com.mysql.jdbc.Driver");
            options.put("user","root");
            options.put("password","Hadoop01!");
            options.put("dbtable","person");
    
            DataFrame person = sqlContext.read().format("jdbc").options(options).load();
            person.show();
            // 注册为临时表
            person.registerTempTable("person1");
    
    
            // 第二种方式,读取Mysql数据库表,加载为DataFrame
            DataFrameReader reader = sqlContext.read().format("jdbc");
            reader.option("url","jdbc:mysql://192.168.171.101:3306/sparkdb");
            reader.option("driver","com.mysql.jdbc.Driver");
            reader.option("user","root");
            reader.option("password","Hadoop01!");
            reader.option("dbtable","score");
    
            DataFrame score = reader.load();
            score.show();
            score.registerTempTable("score1");
    
            DataFrame result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score "
                    + "from person1,score1 "
                    + "where person1.name = score1.name");
            result.show();
    
            // 将DataFrame结果保存到Mysql中
            Properties properties = new Properties();
            properties.setProperty("user","root");
            properties.setProperty("password","Hadoop01!");
    
            result.write().mode(SaveMode.Overwrite)
                    .jdbc("jdbc:mysql://192.168.171.101:3306/sparkdb","result",properties);
            System.out.println("-----Finish------");
            sc.stop();
        }
    
    }
    

    5.读取json文件并保存成parquet文件和加载parquet文件

    scala版本

    package com.kevin.scala.dataframe
    
    import org.apache.spark.sql.{SQLContext, SaveMode}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 读取json文件并保存成parquet文件和加载parquet文件
      */
    object DataFrameParquet {
    
      def main(args: Array[String]): Unit = {
        // 1.创建sparkconf
        val conf = new SparkConf().setAppName("DataFrameParquet").setMaster("local")
        // 2.创建sc
        val sc = new SparkContext(conf)
        // 3.创建sqlContext
        val sqlContext = new SQLContext(sc)
        val json = sc.textFile("DTSparkSql\\src\\main\\resources\\json")
        // 4.读取json文件将文件转成rdd
        val df = sqlContext.read.json(json)
    
        // SaveMode指定存储文件时的保存模式:Overwrite:覆盖,Append:追加,ErrorIfExists:如果存在就报错,Ignore:若果存在就忽略
        // 5.将DataFrame保存成parquet文件,保存成parquet文件只能使用Overwrite和Ignore两种方式
        df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet")
    
        // 6.加载parquet文件成df,加载parquet文件只能使用下面两种
        val load = sqlContext.read.parquet("./sparksql/parquet")
        // val load = sqlContext.read.format("parquet").load("./sparksql/parquet")
    
        // 7.显示表中所有的数据
        load.show()
        // 8.关闭sc
        sc.stop()
    
      }
    
    }
    

    java版本

    package com.kevin.java.dataframe;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    
    /**
     * @author kevin
     * @version 1.0
     * @description     读取json文件并保存成parquet文件和加载parquet文件
     * @createDate 2019/1/6
     */
    public class DataFrameParquet {
    
        public static void main(String[] args) {
    
            // 1.创建SparkConf作业设置作业名称和模式
            SparkConf conf = new SparkConf().setAppName("DataFrameParquet").setMaster("local");
    
            // 2.基于Sparkconf对象创建一个SparkContext上下文,它是通往集群的唯一通道,且在创建时会创建任务调度器
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            // 3.根据sc创建SQLContext用于sql的分析处理
            SQLContext sqlContext = new SQLContext(sc);
    
            // 4.读取文件数据
            String file = "DTSparkSql\\src\\main\\resources\\json";
            JavaRDD<String> jsonRDD = sc.textFile(file);
    
            // 5.读取json形式的文件并转为DataFrame
            DataFrame df = sqlContext.read().json(jsonRDD);
            // DataFrame json = sqlContext.read().format("json").load("./spark/json");
            // json.show();
    
            // 6.将DataFrame保存成parquet文件
            // SaveMode指定存储文件时的保存模式:Overwrite:覆盖,Append:追加,ErrorIfExists:如果存在就报错,Ignore:若果存在就忽略
            // 保存成parquet文件有以下两种方式
            df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
            // df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet");
    
            // 7.加载parquet文件成DataFrame
            // 记载parquet文件有以下两种方式
            DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
            // DataFrame load = sqlContext.read().parquet("./sparksql/parquet");
    
            // 8.查看表中所有数据
            load.show();
            // 9.关闭
            sc.close();
        }
    }
    

     

    展开全文
  • 点击上方蓝字关注我们...本篇主要讲解的是如何基于Hive直接读取外部数据源另关注公众号,后台可获取更多关于大数据/算法/python/java等学习资料和实际应用场景解决方案Hive读取ES数据1.下载jar包并加载1#下载地址:...
    0ef3f55aed0322a4def1933bd9e91ffd.png

    点击上方蓝字关注我们

    9cf65bf2043df12f990548db763fd668.png

    导语

    数仓建设的第一步就是数据获取,目前大多数公司都是使用数据同步神器-Datax源码重构/Kettle/Sqoop/MongoDump等工具将外部数据同步到数仓。
    本篇主要讲解的是如何基于Hive直接读取外部数据源另关注公众号,后台可获取更多关于大数据/算法/python/java等学习资料和实际应用场景解决方案

    Hive读取ES数据

    1.下载jar包并加载

    1# 下载地址:https://www.elastic.co/cn/downloads/hadoop
    2ADD JAR /home/elasticsearch-hadoop-2.3.4.jar;

    2.创建映射表217edf62f1741673cea42a2c275f28c8.png

     1create external table wedw_ods.span_other_tmp(
    2  es_id string,
    3  id string,
    4  parent_id string,
    5  service_name string,
    6  kind string,
    7  name string,
    8  timestamp bigint,
    9  duration bigint,
    10  error boolean,
    11  ip string,
    12  trace_id string,
    13  reference string,
    14  tags array<struct<key:string,value:string>>,
    15  baggages array<struct<key:string,value:string>>
    16 )
    17 -- ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    18 ROW FORMAT SERDE 'org.elasticsearch.hadoop.hive.EsSerDe'
    19 -- WITH SERDEPROPERTIES ("case.insensitive" = "false","mapping.baggages_key"="baggages")
    20STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    21TBLPROPERTIES(
    22  'es.resource' = 'index:other-2020-04-08/span'--指定索引和type
    23  'es.index.auto.create' = 'false'--是否自动创建索引
    24  'es.query'='?q=_id:AXFWjbHX9yjdMoK7JOl3'--指定查询读取
    25  'es.nodes'=''--指定es节点ip
    26  'es.field.read.empty.as.null'='true'--是否读取空字段
    27  'es.port' = '9200'--指定es 节点端口
    28  'es.nodes.wan.only' = 'true',
    29  'es.nodes.discovery' = 'false'--是否开启自动探查
    30  'es.index.read.missing.as.empty'='true'
    31  'es.read.metadata'='true',
    32  'es.batch.size.entries' = '10000',  -- 每次读取的批次数
    33  'es.scroll.size' = '10000'--每次翻页数 
    34  -- 'es.output.json' = 'true', --以json格式输出
    35  'es.read.field.include' = '_id,id,parentId,serviceName,traceId,kind,name,timestamp,duration,error,ip,trace_id,tags,reference,tags.key,tags.value,baggages.key,baggages.value'--只读取指定字段
    36  'es.mapping.names'='es_id:_metadata._id,parent_id:parentId,service_name:serviceName,trace_id:traceId,tags:tags'--映射字段  hive字段:es字段
    37  'es.read.field.as.array.include' = 'tags,baggages' --以数组的方式读取指定字段
    38);

    3.遇到的问题

    1.通常在做es映射的时候,需要保持hive的字段类型和es中的mapping类型要一致。
    2.es-hadoop.jar包要和es版本和hive版本要兼容。否则会抛出“Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'”错误

    Hive映射Mongo

    1.使用直连的方式

    前提需要的jar包: mongo-hadoop-core-2.0.0.jar mongo-hadoop-hive-2.0.0.jar mongo-java-driver-3.4.2.jar

    1CREATE TALBE tmp.test
    2id  string,
    3  name string
    4)
    5STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
    6WITH SERDEPROPERTIES ('mongo.columns.mapping'='{"id":"_id"}')
    7TBLPROPERTIES ('mongo.uri' = 'mongodb://user:password@ip:port/db.collection');

    2.使用Bson文件的方式

    2.1 导出使用mongodump导出Bson文件
    1mongodump  -u userName -p Password -h ip:port  --collection CollectName --db dbName -o target_Path
    2.2 将Bson文件导入hdfs
    1hdfs dfs -put target_Path hdfs_target_path
    2.3 Hive读取Bson文件
     1-- 前提需要jar包:
    2-- mongo-hadoop-core-2.0.0.jar
    3-- mongo-hadoop-hive-2.0.0.jar
    4-- mongo-java-driver-3.4.2.jar
    5-- 建表语句:
    6create table if not exists ${table_name}
    7(field type comment "")
    8row format serd  ‘com.mongodb.hadoop.hive.BSONSerDe’
    9with serdeproperties('mongo.columns.mapping'='{hive字段与mongo字段的映射关系(如果hive字段名和mongo中的字段名一致,可省略)}')
    10stored as inputformat 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
    11outputformat 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
    12location ‘HDFS的目录’

    Hive映射HBase

     1create external table if not exists wedw_ods.log_status_from_hbase
    2(
    3m_id string
    4,order_info_id  string
    5,before_status  int
    6,after_status int
    7,status_type  int
    8,description  string
    9,gmt_created  string
    10,gmt_modified  string
    11)
    12stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    13with serdeproperties ("hbase.columns.mapping" = ":key,f:order_info_id,f:before_status,f:after_status,f:status_type,f:description,f:gmt_created,f:gmt_modified")
    14tblproperties ("hbase.table.name" = "log:log_status");

    注意:该Hive表一个外部表,所以删除该表并不会删除HBase表中的数据
    1、建表或映射表的时候如果没有指定:key则第一个列默认就是行键
    2、HBase对应的Hive表中没有时间戳概念,默认返回的就是最新版本的值
    3、由于HBase中没有数据类型信息,所以在存储数据的时候都转化为String类型

    最后说明

    以上方法是直接以hive表的形式读取外部数据,这种方式最大的特点就是简单。
    但是也有很大的弊端,那就是在读取或者写入表的时候会对外部存储系统造成负载过高。
    通常结合实际的系统负载以及数据量综合评估后,再行选择具体的同步方式。

    879e97fe1d9a458d171b459e0e21d228.gif

    ●2020年大厂面试题-数据仓库篇

    ●数据同步神器-Datax源码重构

    ●十分钟搞定分布式一致性算法

    ●zookeeper源码解读之-DataTree模型构建+Leader选举

    ●zookeeper源码解读之-服务端启动流程

    ●一文教你如何玩转zookeeper

    ●zookeeper应用场景解决方案-Leader选

    ●实战:如何实时采集上亿级别数据?

    ●Kafka深度剖析HW以及LEO

    ●Livy REST 提交Spark作业

    ●Spark集成ElasticSearch

    ●Spark数据倾斜之骚操作解决方案

    ●一道简单的算法面试题

    ●Impala介绍以及常见问题

    ●ElasticSearch无感知重构索引●ElasticSearch分页搜索以及deep paging性能揭秘

    ●ElasticSearch 一个field索引两次解决排序问题

    ●ElasticSearch Partial Update大揭秘

    ●Hive常见问题汇总

    展开全文
  • spark 读取各类数据源

    千次阅读 2018-04-15 18:56:10
    本文章主要通过代码实现spark读取各类数据源1 spark读取hive数据import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org....
  • Hbase Java API简单实践(附代码解释) 按照惯例,先上代码 (代码还有很多可以优化的地方,待正式工作了,有了更海量的需要处理的数据,更复杂的应用场景,我再回来更新此文。) 只贴出Es用scroll方式读取数据...
  • DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件
  • 文章目录分类文件格式文本文件JSON逗号分隔值与制表符分隔值csvSequenceFile对象文件Hadoop输入输出格式Spark ...spark SQL中的结构化数据:json和apache hive在内的结构化数据源 数据库与键值存储 文件格式 文本文...
  • 1 概述 ...1)Spark SQL可以从各种结构化数据源(JSON ,Hive,CSV等)中读取数据 2)支持使用SQL和HQL语句查询数据,查询结果以DataSet或DataFrame形式返回 3)Spark SQL支持SQL和常规的Python/Java/S...
  • 这个报错是生成计划树的时候报的...具体错误需要一步一步的排查,从创建数据源开始。  举个例子:用spark-sql 读取 hive 和hbase 相互映射的表,如果不一步步排查 ,那么就会抱着个错,可以选择什么都不做,直接 ...
  • spark基础 六 spark SQL

    2018-08-30 11:00:12
    可以从各种结构化数据源读取数据 JSON Hive等 不仅支持在spark内使用SQL语句进行数据查询,也支持从类似商业软件中通过标准数据库连接器连接spark SQL进行查询 在spark内部使用spark SQL时,支持SQL与常规的...
  • 2.SparpSQL特点3.Hive 和SparkSQL的对比4.SparkSQL中的两个抽象2 集群模式初体验2.1 DataFrame2.1.1读取txt文件并输出 1.SparkSQL概述 1.什么是SparkSQL? 用于处理构造化数据的spark模块 可以通过DataFrame和DataSet...
  • 第九章 Spark SQL

    2018-03-22 15:35:45
    1、可以从各种结构化数据源(例如JSON、Hive、Parquet等)中读取数据 2、Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持外部工具链接SQL进行查询。 3、支持SQL与常规python/java/scala代码高度...
  • Spark SQL 简单的说Spark SQL是spark用来操作结构化和半结构化数据的接口。本文来讲述一下它的一些基本操作。 Spark SQL的特性 ... 可以从各种结构化数据源读取数据,如(JSON、HIVE等) 可以通过JDBC或...
  • Hadoop入门

    2020-12-10 16:34:26
    数据源->数据采集->数据分析(Hdfs、ES、Hbase)->数据分析(MapReduce、Hive、Spark)->数据存储(关系型数据库)->页面读取展示 二、为什么使用Hadoop 1、为什么使用? 开源、java语言、跨平台...
  • 81.DataFrame实现多数据源数据的关联分析 82.SparkSQL读取AVRO格式数据详解 83.SparkSQL读取CSV或TSV格式数据详解 84.自定义schema的方式转换RDD为DataFrame及直接运行SQL在数据文件上 85.SparkSQL中自带函数的使用...
  • 用户可通过页面选择数据源即可创建数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发...
  • DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套...
  • 数据源数据管理底层模块,定义读取、编辑、查询数据源表数据的API datagear-util 系统常用工具集模块 datagear-web 系统web模块,定义web控制器、操作页面 依赖 Java 8+ Servlet 3.1+ 编译 (执行单元测试...
  • 47_mysql数据源连接池 48_NIO" d% v1 P# ~3 S/ L 49_NIO程序- u5 T2 a5 N" {! @8 q4 c 50_Vmware安装-client centos7机安装2 Q. l/ r7 y) ^% n8 |4 _. k 51_centos文件权限-常用命令 52_网络静态ip-NAT连接方式-YUM...
  • 不过,Cassandra 是用Java 开发的,所以若要深入分析代码,你需要对Java 语言有更坚实的理解。虽然不一定需要懂得Java,但Java 可以帮助你更好地了解异常、学会如何编译源码以及使用一些流行的客户端。本书中的很...
  • antlr4权威指南

    2017-09-30 10:47:22
    Hadoop生态系统中的Hive、Pig、数据仓库和分析系统所使用的语言都用到了ANTLR;Lex Machina将ANTLR用于分析法律文本;Oracle公司在SQL开发者IDE和迁移工具中使用了ANTLR;NetBeans公司的IDE使用ANTLR来解析C++;...
  • 深入浅出StreamingPro

    2020-12-06 05:17:56
    ).asInstanceOf[JMap[Any, Any]] else new java.util.HashMap() // 初始化策略,需要创建算法、引用、组合器 strategy.initialize(name, createAlgorithms(desc), createRefs(desc), createCompositors(desc...

空空如也

空空如也

1 2
收藏数 23
精华内容 9
关键字:

java读取hive源数据

java 订阅