2019-10-09 20:49:40 Master_chaoAndQi 阅读数 189
  • Spark数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6057 人正在学习 去看看 CSDN讲师

一 环境准备

需求描述:创建StreamingContext,从kafka中实时消费启动日志数据,借助redis对当天的启动日志进行去重,将去重后的结果写入redis和Habse

1.1 pom文件

 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

      
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.2-HBase-1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <!--在使用phoenix时需注释掉下面的两个依赖 hbase-client 
        hbase-server 否则会因为jar包冲突,出现错误
        -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>

1.2 config配置:

# Kafka配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.group=guochao
# Redis配置
redis.host=hadoop102
redis.port=6379
zooleeper.list=hadoop102:2181,hadoop103:2181,hadoop104:2181

1.3 properties解析工具类

object PropertiesUtil {
  private val in: InputStream = ClassLoader.getSystemResourceAsStream("config.properties")
  private val pro = new Properties()
  pro.load(in);
  // 获取对应的value的值
  def getPropertiesValue(propertiesName:String)={
    pro.getProperty(propertiesName)
  }
}

1.4 HbaseUtil工具类

object HbaseUtil {
  //连接hbase

  def getConnection: Connection ={
    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum",PropertiesUtil.getPropertiesValue("zooleeper.list"))
    ConnectionFactory.createConnection(configuration)
  }

}

1.5 kafkaUtil根据指定的topic返回对应的Dstream

/**
  * 创建Kafka 数据源 从指定的topic 消费数据 返回DStream
  */
object MyKafkaUtils {
  //从kafka 指定的topic 中消费数据
  def getDstreamFromKafka(ssc:StreamingContext, topic:String): InputDStream[(String, String)] ={
    val kafkaParams: Map[String, String] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->PropertiesUtil.getPropertiesValue("kafka.broker.list"),
      ConsumerConfig.GROUP_ID_CONFIG->PropertiesUtil.getPropertiesValue("kafka.group")
    )
    KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
      ssc,
      kafkaParams,
      Set(topic)
    )
  }
}

1.6 jedisUtils从连接池中获取Jedis连接实例

object JedisUtils {
  private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
  jedisPoolConfig.setMaxTotal(100) //最大连接数
  jedisPoolConfig.setMaxIdle(20) //最大空闲
  jedisPoolConfig.setMinIdle(20) //最小空闲
  jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
  jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
  jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
  private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "hadoop102", 6379)

  // 直接得到一个 Redis 的连接
  def getJedisClient: Jedis = {
    jedisPool.getResource
  }
}

1.7 样例类

package com.gc.bean
case class StartupLog(mid: String,
                      uid: String,
                      appId: String,
                      area: String,
                      os: String,
                      channel: String,
                      logType: String,
                      version: String,
                      ts: Long,
                      var logDate: String,
                      var logHour: String)

二 Spark直接将数据写入Hbase

package com.gc.app

import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSON
import com.gc.bean.StartupLog
import com.gc.util.{ConstantParam, HbaseUtil, JedisUtils, MyKafkaUtils}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

/**
  * 统计日活(启动日志)
  * 一个用户当天首次登陆算一个活跃用户
  *
  */
object DauApp {
  def main(args: Array[String]): Unit = {
    //创建StreamingContext 对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DauApp")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))


    // 消费kafka中的实时数据流
    val kafkaSource: InputDStream[(String, String)] = MyKafkaUtils.getDstreamFromKafka(ssc,ConstantParam.START_LOG)
    //处理数据 封装样例类
    val logDstream = kafkaSource.map({
      case (_, value) => {
        //将数据解析为json
        val log: StartupLog = JSON.parseObject(value, classOf[StartupLog])
        log.logDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
        log.logHour = new SimpleDateFormat("HH").format(new Date())
        log
      }
    })
    // 借助redis 对数据进行去重 将数据转换成RDD进行操作
    // 创建连接查询出Redis 中的数据  并将数据进行广播
    val client = JedisUtils.getJedisClient
    val set: util.Set[String] = client.smembers(s"gmall:start:dau:${new SimpleDateFormat("yyyy-MM-dd").format(new Date())}")
    client.close()
    val sscBd: Broadcast[util.Set[String]] = ssc.sparkContext.broadcast(set) // 广播变量

    val filterDstream: DStream[StartupLog] = logDstream.transform(rdd => {
      rdd.filter(log => {
        !sscBd.value.contains(log.mid) // 如果在redis中不存在则为当天首次活跃用户
      })
    })
    // 将数据写入redis 维护活跃设备Id
    filterDstream.foreachRDD(rdd=>{
      rdd.foreachPartition(it=>{
        // 创建jedis 连接
        val client = JedisUtils.getJedisClient
        // 遍历写入数据 借助set
        it.foreach(log=>{
       client.sadd(s"gmall:start:dau:${log.logDate}",log.mid) //按照设备Id进行去重
        })
        //关闭连接
        client.close()

      })
    })

    // 将去重后的结果写入到hbase中
    filterDstream.foreachRDD(rdd=>{
      rdd.foreachPartition(it=>{
        //创建hbase的连接
        var putList=ListBuffer[Put]();
        val conn = HbaseUtil.getConnection
        val table: Table = conn.getTable(TableName.valueOf("start_log_adu"))
        it.foreach(log=>{
          val put =new Put(Bytes.toBytes(log.mid))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("uid"),Bytes.toBytes(log.uid))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("appId"),Bytes.toBytes(log.appId))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("area"),Bytes.toBytes(log.area))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("os"),Bytes.toBytes(log.os))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("channel"),Bytes.toBytes(log.channel))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logType"),Bytes.toBytes(log.logType))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("version"),Bytes.toBytes(log.version))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("ts"),Bytes.toBytes(log.ts))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logDate"),Bytes.toBytes(log.logDate))
          put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("logHour"),Bytes.toBytes(log.logHour))
          putList+=put
        })
        import scala.collection.JavaConversions._
        table.put(putList);
        table.close()
        conn.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

三 Spark整合Phoenix将数据写入hbase

通过Phoenix创建表:
phoenix

create table gmall_dau(
mid varchar,
uid varchar,
appid varchar,
area varchar,
os varchar,
channel varchar,
logType varchar,
version varchar,
ts bigint,
logDate varchar,
logHour varchar
CONSTRAINT dau_pk PRIMARY KEY (mid,logDate))
column_encoded_bytes=0;

CONSTRAINT dau_pk PRIMARY KEY (mid,logDate)); 以mid和logDate作为主键,在hbase中会进行无缝拼接,拼接成Hbase中的rowkey

saveToPhoenix方法签名

def saveToPhoenix(tableName: String, cols: Seq[String],
                    conf: Configuration = new Configuration, zkUrl: Option[String] = None, tenantId: Option[String] = None)
                    : Unit = {

将原直接写入hbase的代码修改为通过Phoenix写入数据到hbase

import org.apache.phoenix.spark._ // 隐式转换
    filterDstream.foreachRDD(rdd=>{
        rdd.saveToPhoenix(
          tableName = "GMALL_DAU",       cols=Seq("MID","UID","APPID","AREA","OS","CHANNEL","LOGTYPE","VERSION","TS","LOGDATE","LOGHOUR"),
          zkUrl=Some("hadoop102,hadoop103,hadoop104:2181")
        )
    })

结果:
通过phoenix查询

四遇到的问题

问题1 :Phoenix建表语句大小写问题

写入数据问题
原因:在建表的时候,使用的是小写,phoenix会将建表语句全部改为大写,字段名不匹配,前面的0.Mid中 0 是因为在建表时未指定对应的列族,默认列族为0(如果想使用小写,则需要将表明和字段名用双引号包裹起来)

问题2:jar包冲突

在项目中同时加入了Hbase-client hbase-server phoenix-hbase导致jar包冲突,在使用Phoenix 写入数据的时候,将其它的两个注释掉即可

Caused by: com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/disruptor/WaitStrategy;)V
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2212)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
	at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:241)
	at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:147)
	at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
	at java.sql.DriverManager.getConnection(DriverManager.java:664)
	at java.sql.DriverManager.getConnection(DriverManager.java:208)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:113)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:97)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:92)
	at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:71)
	at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:306)
	at org.apache.phoenix.spark.ProductRDDFunctions$$anonfun$1.apply(ProductRDDFunctions.scala:41)
	at org.apache.phoenix.spark.ProductRDDFunctions$$anonfun$1.apply(ProductRDDFunctions.scala:37)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	... 3 more

问题3 redis 强制退出问题

MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.

强制关闭Redis快照导致不能持久化。
连接上redis客户端设置如下参数

config set stop-writes-on-bgsave-error no
2019-07-18 20:54:52 qq_40510501 阅读数 17
  • Spark数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6057 人正在学习 去看看 CSDN讲师

         kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用Spark Streaming实时计算框架实时地读取kafka中的数据然后进行计算

本次整合所需依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

集群操作:

1.启动zookeeper集群

zkServer.sh start

2.启动kafka集群

kafka-server-start.sh  /opt/software/kafka/config/server.properties

3.创建topic

kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic kafka_spark

4.topic中生产数据

kafka-console-producer.sh --broker-list node01:9092 --topic  kafka_spark

编写Spark Streaming程序:

package com.nb.lpq

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable

//todo:利用sparkStreaming接受kafka中的数据实现单词计数----采用receivers
object SparkStreamingKafka_Receiver_checkpoint {
  def updateFunc(a:Seq[Int], b:Option[Int]) :Option[Int] ={
    Some(a.sum+b.getOrElse(0))
  }
  def main(args: Array[String]): Unit = {
    val checkpointPath = "./kafka-receiver"

    val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
      createFunc(checkpointPath)
    })
    ssc.start()
    ssc.awaitTermination()
  }
  def createFunc(checkpointPath:String): StreamingContext = {

    //todo:1、创建sparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkStreamingKafka_Receiver_checkpoint")
      .setMaster("local[4]")
      //todo:开启wal预写日志
      .set("spark.streaming.receiver.writeAheadLog.enable","true")
    //todo:2、创建sparkContext
    val sc = new SparkContext(sparkConf)

    sc.setLogLevel("WARN")

    //todo:3、创建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint(checkpointPath)
    //todo:4、指定zkServer
    val zkServer="node02:2181,node03:2181,node04:2181"

    //todo:5、指定groupId
    val groupId="spark-kafka-receiver01"

    //todo:6、指定topics 这个可以利用一个消费者组来消费多个topic,
    //(topic_name -> numPartitions)  指定topic消费的线程数
    val topics=Map("kafka_spark"->1)

    //todo:7、并行运行更多的接收器读取kafak topic中的数据,这里设置3个
    val resultDStream: immutable.IndexedSeq[DStream[String]] = (1 to 3).map(x => {
      //todo:8、通过使用KafkaUtils的createStream接受kafka topic中的数据,生成DStream
      val kafkaDataDStream: DStream[String] = KafkaUtils.createStream(ssc, zkServer, groupId, topics).map(x => x._2)
      kafkaDataDStream
    }
    )
    //todo:利用StreamContext将所有的DStream组合在一起
    val kafkaDStream: DStream[String] = ssc.union(resultDStream)

    //todo:8、获取kafka中topic的内容

    //todo:9、切分每一行。每个单词记为1
    val wordAndOne: DStream[(String, Int)] = kafkaDStream.flatMap(_.split(" ")).map((_,1))

    //todo:10、相同单词出现的次数累加
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)

    //todo:打印
    result.print()
    ssc

  }
}

 运行代码,查看控制台:

本次整合遇到的问题:

     程序和kafka服务都不报错,但就是接受不到消息???

     当我们运行程序时,会产生日志文件(如下图),当日志文件过多,程序会读取日志文件里的东西(或许是文件过多堵塞了?),所以,我们可以把该文件夹删除,再次运行,就可以实时接受消息了

 

2019-07-18 21:21:52 qq_40510501 阅读数 10
  • Spark数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6057 人正在学习 去看看 CSDN讲师

1.安装flume1.6以上

2.下载依赖包:spark-streaming-flume-sink_2.11-2.0.2.jar-------->放入到flume的lib目录下

3.修改自己的scala-library版本(在lib目录下),pom里面什么版本,找到地址传到  flume/lib/ (这块很重要,scala-library的版本必须与项目里的scala的版本一致)

4.依赖:

<properties>
    <scala.version>2.11.8</scala.version>
    <hadoop.version>2.7.4</hadoop.version>
    <spark.version>2.0.2</spark.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume_2.10</artifactId>
        <version>2.0.2</version>
    </dependency>
</dependencies>

5.编写flume-poll.conf配置文件,在conf文件夹下

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
# 下面的路径是数据的路径,可以自定义
a1.sources.r1.spoolDir = /root/data 
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=node01<!—自己的主节点信息-->
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

6.在服务器上的 /root/data目录下准备数据文件data.txt

7.具体代码:

package com.nb.lpq

import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * sparkStreaming整合flume 拉模式Pull

  */
object SparkStreaming_Flume_Poll {
  //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
  //runningCount 历史的所有相同key的value总和
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount =runningCount.getOrElse(0)+newValues.sum
    Some(newCount)
  }


  def main(args: Array[String]): Unit = {
    //配置sparkConf参数
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
    //构建sparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)

    sc.setLogLevel("WARN")

    //构建StreamingContext对象,每个批处理的时间间隔
    val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
    //设置checkpoint
    scc.checkpoint("./")
    //设置flume的地址,可以设置多台
    val address=Seq(new InetSocketAddress("192.168.248.123",8888))
    // 从flume中拉取数据
    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)

    //获取flume中数据,数据存在event的body中,转化为String
    val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
    //实现单词汇总
    val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)

    result.print()
    scc.start()
    scc.awaitTermination()
  }

}

8.启动flume

flume-ng agent -n a1 -c /opt/software/apache-flume-1.7.0-bin/conf -f /opt/software/apache-flume-1.7.0-bin/conf/flume-poll.conf -Dflume.root.logger=INFO,console

9.启动程序,查看控制台

遇到的问题:

      两边运行都没有问题,但就是不输出东西???

     

      文件夹里的数据数据文件只会被加载一次,一次过后,文件名会被更改为xxx.COMPLETED;

      把data下的data.txt数据文件改个名字,例如mv data.txt.COMPLETED data.txt,此时,就会加载该文件

2015-02-21 10:58:18 axxbc123 阅读数 18
  • Spark数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6057 人正在学习 去看看 CSDN讲师

在http://bit1129.iteye.com/blog/2184467一文中对Spark Streaming整合Flume-NG进行了基本的配置,并且Spark Streaming能够监听到来自于Flume的数据输出(通过Sink),不过代码很简单同时也是在单机上(Master和Worker在同一台机器上)进行试验的,因而还有有几个问题没有解决,本文继续Spark Streaming整合Flume-NG

 

package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object SparkFlumeNGWordCount {
  def main(args : Array[String]) {
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount")
    val ssc = new StreamingContext(conf, Seconds(10))

    //9999端口是由这个Spark负责开启并监听的,Spark Streaming采集写到这个端口的数据
    //问题:如果这个代码运行在集群中,那么localhost指的是driver所在的IP,还是每个worker所在的IP
    //即每个worker都会启动这个端口?这很重要因为它将影响Flume的配置(Flume的Sink就是9999端口)
    ///答案:只是一个Worker的IP,那么问题是如何知道Receiver在哪个Worker Node上启动?做法时先启动Spark Streaming,然后确定Receiver在哪个Node上启动
    val lines = FlumeUtils.createStream(ssc,"localhost",9999)

    lines.cache();

    //lines是DStream对象,它的每个元素是SparkFlumeEvent对象,可以将它转换为字符窜(evt.event.getBody.array())
    // 打印显示前10行的字符串
    lines.map(evt => {
      val str = new String(evt.event.getBody.array())
      //打印到控制的文本内容
      ("string received: "+ str)
      //print方法是action,也就是让map的转换操作运行,必须调用action
    }).print()

    //保存到磁盘文件

    lines.map(evt => {
      val str = new String(evt.event.getBody.array())
      //保存到磁盘文件的内容
     ("string received: "+ str)
    }).saveAsTextFiles("file:///home/hadoop/flumeoutput", "suff")

     lines.count().map(cnt => "Received " + cnt + " flume events. at " + System.currentTimeMillis() ).print()
    ssc.start()
    ssc.awaitTermination();
  }
}

例子中,Spark Streaming对接收到来自于Flume的输入(通过9999端口)一方面打印到控制台(通过print算子,print只打印前10行),另一方面通过saveAsTextFiles写到磁盘文件中,这验证了一个问题,进行Flume输出的数据进行了包装,包装有headers,有body的JSON串,但是通过SparkFlumeEnvent.event.getBody.array()还是很容易的获取到真正的数据,获取到用户数据后可以对数据进行操作

 

有几个问题需要解决:

1. FlumeUtils的createStream方法有两个参数,host和port,那么这个host和port只会在Driver所在机器上开启监听,还是在所有Workers上也会监听。这个问题实质上是要回答这样一个问题,如下代码是在哪里执行

 

val lines = FlumeUtils.createStream(ssc,"localhost",9999)

 

因为这是main函数,即DriverProgram,我认为上面的代码应该在Driver上运行,  因为Driver的目的,一是构造RDD以及相应的DAG,然后提交作业,作业中的Task是例子中的print()和saveAsTextFiles触发Job提交,然后再划分Stage形成TaskSet创建的,这些Task是与val lines = FlumeUtils.createStream(ssc,"localhost",9999)代码无关的,所以,这个application只会在Driver上开启9999端口

答:这个认为是错误的,即9999是在Worker上监听的,也就是数据直接流向Worer节点了,而输出是在Driver上。这也就可以理解,Spark RDD的数据本地性了,所有的数据都在本地计算。

同时发现了一个现象:

三台虚机(一主两从),提交application的时候,指定的参数--total-executor-cores 为2,当集群启动后,发现Master分配了两个core,两个Slave也分配两个,但是由于物理机只有4个core,因此,两个Slave真正的core个数是每个Slave1个,

之前提到过1个core无法运行spark streaming程序,因为关掉一个虚机,采用一主一从每个分配2个core的方式运行。

是否可以认为--total-executor-cores参数的意义是给集群中的每个节点分配这么core(如果某个节点core不够,那么就有几个分配几个)

 

关于hostname和9999的详细说明:

1. Spark will listen on the given port for Flume to push data into it.
2. When in local mode, it will listen on localhost:9999
3. When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port 9999, for receiving data from Flume. So only the configured machine will listen on port 9999.

 

 

2. 上例中,saveAsTextFiles,是DStream的一个方法。 注意Files是复数形式,即会产生多个文件目录,多个文件是指DStream中的每个RDD都会调用其saveAsFile方法,也就是每个RDD都会产生一个文件目录

 

  //prefix:
  //suffix:
  def saveAsTextFiles(prefix: String, suffix: String = "") {
    ///saveFunc方法,time入参是rdd构造时的时间,因此每个RDD都不同
    val saveFunc = (rdd: RDD[T], time: Time) => {
      ///根据前缀,后缀以及RDD的标识创建RDD的名字
      val file = rddToFileName(prefix, suffix, time)
      ///每个RDD都保存在不同的文件目录中
      rdd.saveAsTextFile(file)
    }
    //对于DStream中的每个RDD,调用saveFunc函数
    this.foreachRDD(saveFunc)
  }

格式是prefix-时间戳.suff,如下所示

 

[hadoop@hadoop ~]$ pwd
/home/hadoop
[hadoop@hadoop ~]$ ls -l
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:15 flumeoutput-1424484950000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424484960000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424484970000.suff
drwxrwxr-x   2 hadoop hadoop       80 Feb 20 21:16 flumeoutput-1424484980000.suff
drwxrwxr-x   2 hadoop hadoop       80 Feb 20 21:16 flumeoutput-1424484990000.suff
drwxrwxr-x   2 hadoop hadoop      119 Feb 20 21:16 flumeoutput-1424485000000.suff
drwxrwxr-x   2 hadoop hadoop       41 Feb 20 21:16 flumeoutput-1424485010000.suff

 因此在定义prefix时,最后定义为多级目录,不要写成/home/hadoop/flumeoutput,应该写成/home/hadoop/flumeoutput/appname/flume

 

关于Spark Streaming接收到FlumeNG发送来的Avro数据的处理:

 

val events = FlumeUtils.createStream(ssc, receiverHostname, receiverPort)
val lines = events.map{e => new String(e.event.getBody().array(), "UTF-8")} 

 

 events是一个DStream,其中的每个元素是SparkFlumeEvents对象,SparkFlumeEvents.event获取到AvroFlumeEvent对象,AvroFlumeEvent的getBody方法获取到数据Body,array()方法转换为字节数组

 

 

2019-10-06 19:01:39 jingmin_heijie 阅读数 27
  • Spark数据源处理

    Spark多数据源处理教程,该课程主要介绍如何通过Spark的DataSource API来读写外部数据源中的数据,并结合一些具体场景来分析和解释使用DataSource API的好处以及需要注意的问题。

    6057 人正在学习 去看看 CSDN讲师

整合hive元数据metadata

  • MetaStore, 元数据存储

    SparkSQL 内置的有一个 MetaStore, 通过嵌入式数据库 Derby 保存元信息, 但是对于生产环境来说, 还是应该使用 Hive 的 MetaStore, 一是更成熟, 功能更强, 二是可以使用 Hive 的元信息

  • 查询引擎

    SparkSQL 内置了 HiveSQL 的支持, 所以无需整合

Hive 的 MetaStore 是一个 Hive 的组件

在这里插入图片描述

由上图可知道, 其实 Hive 中主要的组件就三个, HiveServer2 负责接受外部系统的查询请求, 例如 JDBC, HiveServer2 接收到查询请求后, 交给 Driver 处理, Driver 会首先去询问 MetaStore 表在哪存, 后 Driver 程序通过 MR 程序来访问 HDFS 从而获取结果返回给查询请求者

而 Hive 的 MetaStore 对 SparkSQL 的意义非常重大, 如果 SparkSQL 可以直接访问 Hive 的 MetaStore, 则理论上可以做到和 Hive 一样的事情, 例如通过 Hive 表查询数据

而 Hive 的 MetaStore 的运行模式有三种

  • 内嵌 Derby 数据库模式
    这种模式不必说了, 自然是在测试的时候使用, 生产环境不太可能使用嵌入式数据库, 一是不稳定, 二是这个 Derby 是单连接的, 不支持并发
  • Local 模式
    Local 和 Remote 都是访问 MySQL 数据库作为存储元数据的地方, 但是 Local 模式的 MetaStore 没有独立进程, 依附于 HiveServer2 的进程
  • Remote 模式
    和 Loca 模式一样, 访问 MySQL 数据库存放元数据, 但是 Remote 的 MetaStore 运行在独立的进程中

Hive开启元数据服务

[root@node01 apache-hive-2.1.1-bin]# cat conf/hive-site.xml 
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value>
  </property>
  <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>123456</value>
  </property>
  <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
  </property>
  <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
  </property>
  <property>
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
  </property>
  <property>
    <name>datanucleus.schema.autoCreateAll</name>
    <value>true</value>
 </property>

 <property>
	<name>hive.cli.print.current.db</name>
	<value>true</value>
</property>

<property>
	<name>hive.cli.print.header</name>
	<value>true</value>
</property>	

 <property>
		<name>hive.server2.thrift.bind.host</name>
		<value>node03</value>
   </property>
   
   <property>
	<name>hive.metastore.uris</name>
        <value>thrift://node03:9083</value>
   </property>
   
   <property>
	<name>hive.metastore.client.socket.timeout</name>
        <value>3600</value>
   </property>

<!--这一段配置是重点,将元数据访问配置为remote模式-->
<property>
  <name>hive.metastore.local</name>
  <value>false</value>
</property>

</configuration>
如果没有添加下面这一段,启动spark会报错
<!--这一段配置是重点,将元数据访问配置为remote模式-->
<property>
  <name>hive.metastore.local</name>
  <value>false</value>
</property>

错误如下:

19/10/06 18:25:19 WARN hive.metastore: Failed to connect to the MetaStore Server...
19/10/06 18:25:20 WARN hive.metastore: Failed to connect to the MetaStore Server...
19/10/06 18:25:21 WARN hive.metastore: Failed to connect to the MetaStore Server...
19/10/06 18:25:22 WARN metadata.Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

Caused by: java.lang.reflect.InvocationTargetException: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)

启动hive元数据服务

  • 前提是启动了mysql服务(存储元数据)
	service mysqld start
	nohup bin/hive --service metastore &
	nohup bin/hive --service hiveserver2 &

同时需要加载其他配置,包括HDFS

即使不去整合 MetaStore, Spark 也有一个内置的 MateStore, 使用 Derby 嵌入式数据库保存数据, 但是这种方式不适合生产环境, 因为这种模式同一时间只能有一个 SparkSession 使用, 所以生产环境更推荐使用 Hive 的 MetaStore

SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse, 这些配置信息一般存在于 Hadoop 和 HDFS 的配置文件中, 所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录

cd /export/servers/hadoop/etc/hadoop
cp hive-site.xml core-site.xml hdfs-site.xml /export/servers/spark/conf/   

scp -r /export/servers/spark/conf node02:/export/servers/spark/conf
scp -r /export/servers/spark/conf node03:/export/servers/spark/conf

hive-site.xml : 要读取 Hive 的配置信息, 主要是元数据仓库的位置等信息
core-site.xml : 读取安全有关的配置
hdfs-site.xml: 有可能需要在 HDFS 中放置表文件, 所以需要 HDFS 的配置

测试

# 在hdfs中创建文件夹
hdfs dfs -mkdir -p /dataset

# 测试数据,要将该数据加载到创建的hive表中
vim stu
 xjm     22      xjm
 hhj     22      hhj

# 上传本地文件
hdfs dfs -put stu /dataset/

# 进入hive客户端创建hive表
bin/hive
CREATE DATABASE IF NOT EXISTS spark_integrition;

USE spark_integrition;

CREATE EXTERNAL TABLE student
(
  name  STRING,
  age   INT,
  gpa   string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/dataset/hive';

LOAD DATA INPATH '/dataset/stu' OVERWRITE INTO TABLE student;

# 启动spark,进入spark中sbin目录
./start-all.sh
# 开启一个客户端,进入bin目录
./spark-shell

在这里插入图片描述

...
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql
sql   sqlContext

scala> spark.sql
   def sql(sqlText: String): org.apache.spark.sql.DataFrame

scala> spark.sql("use spark_integrition")
res0: org.apache.spark.sql.DataFrame = []

scala> val resDF = spark.sql("select * from student limit 10")
resDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> resDF.show()
+----+---+---+
|name|age|gpa|
+----+---+---+
| xjm| 22|xjm|
| hhj| 22|hhj|
+----+---+---+

Spark SQL整合hive

阅读数 1439

没有更多推荐了,返回首页