spark 数据整合

2019-07-05 23:30:12 valada 阅读数 263

Spark 生态较为完善,已经被越来越多的互联网公司应用于生产项目,对于 ETL 开发人员而言,日常数据同步任务和临时取数任务如果有基于 Spark 封装的一个小工具,办公效率会有大幅度提升。

本场 Chat 会阐述企业现有的数据处理的痛点,以一个真实场景作为切入口,展开对需求的分析,开发一个简单且通用的工具,提升团队作战效率。

本场 Chat 您将学到如下内容:

  1. 掌握多数据源整合的方法(一条 SQL 实现 MySQL join sqlserver)
  2. 提升 Spark 技术实力和代码设计能力
  3. 工具化解决常见问题,培养架构思维(一个 Linux 命令即可完成数据迁移、处理)
  4. 开箱急用的极简代码,赋能数据团队

阅读全文: http://gitbook.cn/gitchat/activity/5d1abf619909a17298c6d067

您还可以下载 CSDN 旗下精品原创内容社区 GitChat App ,阅读更多 GitChat 专享技术内容哦。

FtooAtPSkEJwnW-9xkCLqSTRpBKX

2019-10-09 20:49:40 Master_chaoAndQi 阅读数 902

一 环境准备

需求描述:创建StreamingContext,从kafka中实时消费启动日志数据,借助redis对当天的启动日志进行去重,将去重后的结果写入redis和Habse

1.1 pom文件

 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

      
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.2-HBase-1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <!--在使用phoenix时需注释掉下面的两个依赖 hbase-client 
        hbase-server 否则会因为jar包冲突,出现错误
        -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>

1.2 config配置:

# Kafka配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.group=guochao
# Redis配置
redis.host=hadoop102
redis.port=6379
zooleeper.list=hadoop102:2181,hadoop103:2181,hadoop104:2181

1.3 properties解析工具类

object PropertiesUtil {
  private val in: InputStream = ClassLoader.getSystemResourceAsStream("config.properties")
  private val pro = new Properties()
  pro.load(in);
  // 获取对应的value的值
  def getPropertiesValue(propertiesName:String)={
    pro.getProperty(propertiesName)
  }
}

1.4 HbaseUtil工具类

object HbaseUtil {
  //连接hbase

  def getConnection: Connection ={
    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum",PropertiesUtil.getPropertiesValue("zooleeper.list"))
    ConnectionFactory.createConnection(configuration)
  }

}

1.5 kafkaUtil根据指定的topic返回对应的Dstream

/**
  * 创建Kafka 数据源 从指定的topic 消费数据 返回DStream
  */
object MyKafkaUtils {
  //从kafka 指定的topic 中消费数据
  def getDstreamFromKafka(ssc:StreamingContext, topic:String): InputDStream[(String, String)] ={
    val kafkaParams: Map[String, String] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->PropertiesUtil.getPropertiesValue("kafka.broker.list"),
      ConsumerConfig.GROUP_ID_CONFIG->PropertiesUtil.getPropertiesValue("kafka.group")
    )
    KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
      ssc,
      kafkaParams,
      Set(topic)
    )
  }
}

1.6 jedisUtils从连接池中获取Jedis连接实例

object JedisUtils {
  private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
  jedisPoolConfig.setMaxTotal(100) //最大连接数
  jedisPoolConfig.setMaxIdle(20) //最大空闲
  jedisPoolConfig.setMinIdle(20) //最小空闲
  jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
  jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
  jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
  private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "hadoop102", 6379)

  // 直接得到一个 Redis 的连接
  def getJedisClient: Jedis = {
    jedisPool.getResource
  }
}

1.7 样例类

package com.gc.bean
case class StartupLog(mid: String,
                      uid: String,
                      appId: String,
                      area: String,
                      os: String,
                      channel: String,
                      logType: String,
                      version: String,
                      ts: Long,
                      var logDate: String,
                      var logHour: String)

二 Spark直接将数据写入Hbase

package com.gc.app

import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSON
import com.gc.bean.StartupLog
import com.gc.util.{ConstantParam, HbaseUtil, JedisUtils, MyKafkaUtils}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

/**
  * 统计日活(启动日志)
  * 一个用户当天首次登陆算一个活跃用户
  *
  */
object DauApp {
  def main(args: Array[String]): Unit = {
    //创建StreamingContext 对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DauApp")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))


    // 消费kafka中的实时数据流
    val kafkaSource: InputDStream[(String, String)] = MyKafkaUtils.getDstreamFromKafka(ssc,ConstantParam.START_LOG)
    //处理数据 封装样例类
    val logDstream = kafkaSource.map({
      case (_, value) => {
        //将数据解析为json
        val log: StartupLog = JSON.parseObject(value, classOf[StartupLog])
        log.logDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
        log.logHour = new SimpleDateFormat("HH").format(new Date())
        log
      }
    })
    // 借助redis 对数据进行去重 将数据转换成RDD进行操作
    // 创建连接查询出Redis 中的数据  并将数据进行广播
    val client = JedisUtils.getJedisClient
    val set: util.Set[String] = client.smembers(s"gmall:start:dau:${new SimpleDateFormat("yyyy-MM-dd").format(new Date())}")
    client.close()
    val sscBd: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(set) // 广播变量

    val filterDstream: DStream[StartupLog] = logDstream.transform(rdd => {
      rdd.filter(log => {
        !sscBd.value.contains(log.mid) // 如果在redis中不存在则为当天首次活跃用户
      })
    })
    // 将数据写入redis 维护活跃设备Id
    filterDstream.foreachRDD(rdd=>{
      rdd.foreachPartition(it=>{
        // 创建jedis 连接
        val client = JedisUtils.getJedisClient
        // 遍历写入数据 借助set
        it.foreach(log=>{
       client.sadd(s"gmall:start:dau:${log.logDate}",log.mid) //按照设备Id进行去重
        })
        //关闭连接
        client.close()

      })
    })

    // 将去重后的结果写入到hbase中
    filterDstream.foreachRDD(rdd=>{
      rdd.foreachPartition(it=>{
        //创建hbase的连接
        var putList=ListBuffer[Put]();
        val conn = HbaseUtil.getConnection
        val table: Table = conn.getTable(TableName.valueOf("start_log_adu"))
        it.foreach(log=>{
          val put =new Put(Bytes.toBytes(log.mid))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("uid"),Bytes.toBytes(log.uid))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("appId"),Bytes.toBytes(log.appId))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("area"),Bytes.toBytes(log.area))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("os"),Bytes.toBytes(log.os))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("channel"),Bytes.toBytes(log.channel))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logType"),Bytes.toBytes(log.logType))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("version"),Bytes.toBytes(log.version))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("ts"),Bytes.toBytes(log.ts))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logDate"),Bytes.toBytes(log.logDate))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logHour"),Bytes.toBytes(log.logHour))
          putList+=put
        })
        import scala.collection.JavaConversions._
        table.put(putList);
        table.close()
        conn.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

三 Spark整合Phoenix将数据写入hbase

通过Phoenix创建表:
phoenix

create table gmall_dau(
mid varchar,
uid varchar,
appid varchar,
area varchar,
os varchar,
channel varchar,
logType varchar,
version varchar,
ts bigint,
logDate varchar,
logHour varchar
CONSTRAINT dau_pk PRIMARY KEY (mid,logDate))
column_encoded_bytes=0;

CONSTRAINT dau_pk PRIMARY KEY (mid,logDate)); 以mid和logDate作为主键,在hbase中会进行无缝拼接,拼接成Hbase中的rowkey

saveToPhoenix方法签名

def saveToPhoenix(tableName: String, cols: Seq[String],
                    conf: Configuration = new Configuration, zkUrl: Option[String] = None, tenantId: Option[String] = None)
                    : Unit = {

将原直接写入hbase的代码修改为通过Phoenix写入数据到hbase

import org.apache.phoenix.spark._ // 隐式转换
    filterDstream.foreachRDD(rdd=>{
        rdd.saveToPhoenix(
          tableName = "GMALL_DAU",       cols=Seq("MID","UID","APPID","AREA","OS","CHANNEL","LOGTYPE","VERSION","TS","LOGDATE","LOGHOUR"),
          zkUrl=Some("hadoop102,hadoop103,hadoop104:2181")
        )
    })

结果:
通过phoenix查询

四遇到的问题

问题1 :Phoenix建表语句大小写问题

写入数据问题
原因:在建表的时候,使用的是小写,phoenix会将建表语句全部改为大写,字段名不匹配,前面的0.Mid中 0 是因为在建表时未指定对应的列族,默认列族为0(如果想使用小写,则需要将表明和字段名用双引号包裹起来)

问题2:jar包冲突

在项目中同时加入了Hbase-client hbase-server phoenix-hbase导致jar包冲突,在使用Phoenix 写入数据的时候,将其它的两个注释掉即可

Caused by: com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/disruptor/WaitStrategy;)V
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2212)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
	at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:241)
	at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:147)
	at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
	at java.sql.DriverManager.getConnection(DriverManager.java:664)
	at java.sql.DriverManager.getConnection(DriverManager.java:208)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:113)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:97)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:92)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:71)
	at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:306)
	at org.apache.phoenix.spark.ProductRDDFunctions$$anonfun$1.apply(ProductRDDFunctions.scala:41)
	at org.apache.phoenix.spark.ProductRDDFunctions$$anonfun$1.apply(ProductRDDFunctions.scala:37)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	... 3 more

问题3 redis 强制退出问题

MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.

强制关闭Redis快照导致不能持久化。
连接上redis客户端设置如下参数

config set stop-writes-on-bgsave-error no
2019-01-15 18:10:15 yangyuguang 阅读数 377
  1. 确保spark 集群和hadoop集群 都能正确运行
  2. 修改spark的配置文件 spark-env.sh
    加上 这句话:
    export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
    让spark知道hadoop配置文件的路径
  3. 将spark-env.sh 分发到每台服务器上
2017-09-06 16:27:51 bluejoe2000 阅读数 1576

spark数据流(data flow)的合并可以通过union来实现。

先测试一下批量数据(batching data)的union:

scala> Seq("1","2","3","4").toDS.union(Seq("a","b","c","d").toDS).show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    a|
|    b|
|    c|
|    d|
+-----+

再来测试一下流数据(streaming data)的union:

val lines1 = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val lines2 = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load()
val words = lines3.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start
query.awaitTermination()

分别启动netcat:

nc -lk 9999
nc -lk 8888

测试结果如下:

-------------------------------------------
Batch: 11
-------------------------------------------
+-----+-----+                                                                   
|value|count|
+-----+-----+
|   9b|    2|
| 8888|    1|
|   8b|    2|
|   8c|    2|
|   9a|    2|
|   8a|    2|
|   9c|    2|
| 9999|    1|
+-----+-----+

再来研究分支,如下代码可以理解成分支:

val ds = Seq(1,2,3,4).toDS
val (ds1, ds2) = (ds.filter(_ % 2 == 0), ds.filter(_ % 2 == 1))
ds1.show
ds2.show

这个分支是个假的分支,ds其实被遍历了2次。。。
要实现真的分支,估计只能挂接多个Sink来实现了

2019-04-02 12:40:32 weixin_43866709 阅读数 461

spark整合hive就是让hive运行在spark上面,其实跟hive没有太大的关系,就是使用了hive的标准(HQL,元数据库,UDF,序列化,反序列化机制)

hive原来的计算模型是MR,将计算结果写入到HDFS中,有点慢,而spark整合hive是让hive运行在spark集群上面,使用spark中的RDD(DataFrame),这样速度很快。

下面来说一下这个元数据:
真正要计算的数据是保存在HDFS中的,hive使用MySQL当作元数据库,MySQL这个元数据库,保存的是hive表的描述信息,描述了有哪些database,table,以及表有多少列,每一列是什么类型,还有描述表的数据保存在HDFS的什么位置。
hive的元数据库建立了一种映射关系,执行HQL时,先到MySQL元数据库中查找描述信息,然后根据描述信息生成任务,然后将任务下发到spark集群中执行。

那么hive和MySQL有什么区别呢?
hive是一个数据仓库(存储数据并分析数据,分析数据仓库中的数据量很大,一般要分析很长的时间)
mysql是一个关系型数据库(关系型数据的增删改查(低延迟))

根据上面的介绍,我们知道首先要安装上MySQL。

1.安装MySQL

在集群上的一台机器上安装上MySQL,详细的安装过程请参考博客:
https://blog.csdn.net/weixin_43866709/article/details/88956929

2.启动MySQL,并创建一个普通用户,并且授权

启动MySQL:

mysql -uroot -p密码

创建一个普通用户并且授权(经过授权其他机器才可以连接上MySQL数据库):

> CREATE USER '用户名'@'%' IDENTIFIED BY '密码';   //创建用户
> GRANT ALL PRIVILEGES ON hivedb.* TO '用户名'@'%' IDENTIFIED BY '密码' WITH GRANT OPTION;
> FLUSH PRIVILEGES;

3.在spark的conf目录下创建一个hive的配置文件,hive-site.xml(在没有安装hive的情况下)

如果安装了hive,直接将hive中的hive-site.xml拷贝到spark中就可以了,

文件内容如下:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-->
<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <!--MySQL在哪台机器上-->
    <value>jdbc:mysql://node-6:3306/hivedb?createDatabaseIfNotExist=true</value>
    <description>JDBC connect string for a JDBC metastore</description>
  </property>

   <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <!--MySQL连接驱动-->
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <!--MySQL用户名-->
    <value>bigdata</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <!--填写密码-->
    <value>密码</value>
    <description>password to use against metastore database</description>
  </property>

</configuration>

4.上传一个MySQL连接驱动(sparkSubmit也要连接MySQL,获取元数据信息)

MySQL驱动下载地址:http://www.codedocs.net/maven2/mysql/mysql-connector-java/5.1.6

5.启动spark-sql

cd spark/bin

./spark-sql --master spark://L1:7077,L2:7077 --driver-class-path /home/hadoop/local-jars/mysql-connector-java-5.1.6.jar

说明:
我安装的集群是两个master,所以这里指定了两个master节点
–driver-class-path是指定MySQL连接驱动所在位置

启动完成之后,打开MySQL图形化界面管理工具,如Nacicat,连接上MySQL,或者使用MySQL命令行,可以看到有一个hivedb的数据库,里面一共有29张表,这些表就是用来记录hive表的元数据信息的。
在这里插入图片描述

6.创建一个table

创建命令:

create table t_boy(id bigint,name string,fv double) row format delimited fields terminated by ',';

创建成功之后,就可以看到这个表的信息了:

需要手动改一下DBS表中的DB_LOCATION_UIR改成hdfs的地址:(现在默认是本地的文件路径)
在这里插入图片描述

7.要在/etc/profile中配置一个环境变量(让sparkSQL知道hdfs在哪里,其实就是namenode在哪里)

export HADOOP_CONF_DIR=Hadoop下conf目录的路径

然后执行source /etc/profile使之生效.

8.重新启动SparkSQL的命令行

此时准备工作就完成了,可以上传一个文件到hive中:

load data local inpath '/home/hadoop/local-file/person.txt' into table t_boy;

上传之后,可以书写sql语句了:

select * from t_boy;

然后可以看到spark-sql会将sql语句转换成RDD提交到集群上运行。

附加

1.在启动spark-sql时,也可以直接指定sql命令,例如:

  ./spark-sql --master spark://L1:7077,L2:7077 --driver-class-path /home/hadoop/local-jars/mysql-connector-java-5.1.6.jar -e 'show tables;'

还可以将hive的sql语句写在一个文件中执行(用-f这个参数):

 ./spark-sql --master spark://L1:7077,L2:7077 --driver-class-path /home/hadoop/local-jars/mysql-connector-java-5.1.6.jar -f hive-sqls.sql

-f后面跟的是存放sql语句的文件名。

2.在IDEA中开发,整合hive:

首先要整合hive,必须加入hive的支持,在pom.xml文件中加入下面内容:

<!-- spark如果想整合Hive,必须加入hive的支持 -->
	<dependency>
	    <groupId>org.apache.spark</groupId>
	    <artifactId>spark-hive_2.11</artifactId>
	    <version>${spark.version}</version>
	</dependency>

然后将hive-site.xml,Hadoop中的core-site.xml,hadoop-site.xml文件拷贝到maven项目中的sources文件夹下:
然后再创建SparkSession时也要开启对hive的支持:

//如果想让hive运行在spark上,一定要开启spark对hive的支持
    val spark = SparkSession.builder()
      .appName("HiveOnSpark")
      .master("local[*]")
      .enableHiveSupport()//启用spark对hive的支持(可以兼容hive的语法了)
      .getOrCreate()

Spark的架构原理

阅读数 234