2017-05-06 15:12:48 weinierzui 阅读数 3575
  • Spark专场】Deep Dive: How Spark Uses Memory

    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等新技术,企业又可以以较小的成本、更高效地挖掘出提升企业业务的数据与方法,实现云、数、智的自然融合和协力发展。

    1701 人正在学习 去看看 2017CCTC大会

1.  Spark连接hbase

//后续更多细节补充,现在还不太懂。      如有大神看到请不吝赐教

Spark连接hbase的步骤:

1.      构建sparkconf配置信息,设置spark主机位置,设置程序名称,资源数等

2.      构建sparkcontext

3.      构建Sqlcontext

4.      通过sqlcontext操作构建RDD

5.      将RDD转换为dataframe

6.      用DataFrame注册表

7.      操作表进行处理

package cn.Dalong.test.DaLong_hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
/**
  * Created by DreamBoy on 2017/5/5.
  */
object scala {
  def main(args: Array[String]): Unit = {
    //设置控制台不显示不必要的信息
  
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    //设置spark参数
   
val conf =newSparkConf().setMaster("local[2]").setAppName("HbaseTest")
    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = newSparkContext(conf)
    val hbaseConf = HBaseConfiguration.create()
    val sqlContext = newSQLContext(sc)
    //配置HBase
   
hbaseConf.set("hbase.rootdir","hdfs://http://192.168.10.228/hbase")
    hbaseConf.set("hbase.zookeeper.quorum","192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")
    hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
    hbaseConf.set("hbase.master","192.168.10.230")
    //定义表Hbase表的名字
   
val tableName = "deppon_test"
   
val
out_tbl="deppon_tt"
   
//设置需要在hbase中查询的表名
   
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

    //创建连接
   
val connection = ConnectionFactory.createConnection(hbaseConf)
    //scan操作   hao_zl构建hadoopRDD对象
    // hao_zl
并获取访问hbase的返回集[(ImmutableBytesWritable,result)]元组
   
val hbaseRDD = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hbaseRDD.foreach(println(_))

    println("==========================================================")
    /*
    hbaseRDD.foreach{ case (_,result)=>
      val key = Bytes.toString(result.getRow)
      val cookieid = Bytes.toString(result.getValue("basicmod".getBytes, "cookieid".getBytes))
      val createtime = Bytes.toString(result.getValue("basicmod".getBytes, "createtime".getBytes))
      val pv = Bytes.toString(result.getValue("basicmod".getBytes, "pv".getBytes))
      //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv)
      (key,cookieid,createtime,pv)
    }*/
    //
获取对应到对应需要的列信息   函数=>
   
val tblrdd = hbaseRDD.map {case(_, result) =>
      val key = Bytes.toString(result.getRow)
      val cookieid = Bytes.toString(result.getValue("basicmod".getBytes,"cookieid".getBytes))
      val createtime = Bytes.toString(result.getValue("basicmod".getBytes,"createtime".getBytes))
      val pv = Bytes.toString(result.getValue("basicmod".getBytes,"pv".getBytes))
      //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv)
     
(key, cookieid, createtime, pv)
    }
    //hao_zl 隐式转换为dataframe
   
import sqlContext.implicits._
    //将返回的表中列信息封装成tbl_test类的元素值
   
val rowrdd = tblrdd.map(x=>tbl_test(x._1,x._2,x._3,x._4)).toDF()
  //注册表
   
rowrdd.registerTempTable("tbl")
    sqlContext.sql("select * from tbl").show()//.write.mode(SaveMode.Append).format("org.apache.phoenix.spark").insertInto("deppon_tt")
    //mode(SaveMode.Overwrite).options(Map("table" -> "USER_OVERVIEW", "zkUrl" -> conf.getString("Hbase.url"))).format("org.apache.phoenix.spark").save()




   
sc.stop()
  }

}
case class tbl_test(id:String,cookieid:String,createtime:String,pv:String)

 

 

2.  Spark连接hive数据

1.      本地运行需要将hive的hive-site.xml hdfs-site.xml文件配置到项目中的/main/resources文件夹下使本地程序找到集群就可以运行了

2.      服务器上运行的命令:

spark-submit --executor-memory 2g--driver-memory 200M --total-executor-cores 288 --executor-cores 2 --confspark.kryoserializer.buffer.max=256m --conf spark.kryoserializer.buffer=64m--master yarn --class cn.deppon.sparkhive.Dalong/home/appman/DL_test/original-MavenTest-1.0-SNAPSHOT.jar

object Dalong {
    def main(args: Array[String]): Unit = {
      //设置conf信息
     
val conf = new SparkConf().setAppName("Dalong").setMaster("local[2]")
      //创建spark上下文对象并加载配置信息初始化环境
     
val sparkContext =newSparkContext(conf)
      //使用spark上下文环境常见hivesql
     
val sqlContext =newHiveContext(sparkContext)
      //可以直接运行需要执行的代码并插入到hive
     
sqlContext.sql("create table dl_test.DL_in_test3 as select * from dl_test.test_e2 where country = 'china'").show()
    }
}

 


3.spark连接hbase 读写操作

代码:

package cn.DaLong_hbase


import com.deppon.spark.hbase.HbaseTool.hbaseConf
import com.deppon.spark.hbase.{HbaseTool, SparkCont}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}


/**
  * Created by DreamBoy on 2017/5/8.
  */
object Hbase_read_write {
  def main(args: Array[String]): Unit = {


    //val sc = SparkCont.getSparkContext("Hbase_read_Write")
//构建sc
    val conf = new SparkConf().setMaster("local[2]").setAppName("Hbase_read_Write")
    // .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)


    //val hbase_read = HbaseTool.HbaseTool_Read("member",sc)
//构建读取表信息的rdd
    hbaseConf.set(TableInputFormat.INPUT_TABLE,"member")
    val hbase_read = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )


    //构建写表信息的rdd
    //val hbase_write = HbaseTool.HbaseTool_Write("member",sc)
    val hbase_write = new JobConf(hbaseConf)
    hbase_write.setOutputFormat(classOf[TableOutputFormat])
    hbase_write.set(TableOutputFormat.OUTPUT_TABLE,"member")


    val h_Read_data = hbase_read.map{
      case(_,result)=>{
      val row = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue("info".getBytes(),"name".getBytes()))
      val age = Bytes.toString(result.getValue("info".getBytes(),"age".getBytes()))
        (row,name,age)
      }
    }
  //  val result = h_Read_data.collect()
  //println(result.toBuffer)




//由于数据存在部分为空的情况,所以控制台会报空指针的情况
//  \x00\x00\x00\x01                                  column=info:age, timestamp=1494219674019, value=\x00\x00\x00\x0F                                                                                 
//  \x00\x00\x00\x01                                  column=info:name, timestamp=1494219674019, value=jack  
//  这两行数据中row的值传入的本身是Int类型的但是存储的时候转换成了16进制的形式,读取出来可能无法解析导致结果的是多个空格
//但是数据(包括上面这种情况)都做了对应的处理,并已经正确写入到hbase中
    //将读出的数据加工之后写入到表中
   val rdd = h_Read_data.map{
      arr=>{
        val put = new Put(Bytes.toBytes(arr._1+20+"<<"))
        put.addColumn("info".getBytes(),"name".getBytes(),Bytes.toBytes(arr._2))
        put.addColumn("info".getBytes(),"age".getBytes(),Bytes.toBytes(arr._3+"=="))
        //构成(ImmutableBytesWritable,result)的元组的形式
        (new ImmutableBytesWritable,put)
      }
    }


    rdd.saveAsHadoopDataset(hbase_write)
    sc.stop()
  }
}





 

2017-05-24 10:16:19 oNieJianJun 阅读数 5328
  • Spark专场】Deep Dive: How Spark Uses Memory

    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等新技术,企业又可以以较小的成本、更高效地挖掘出提升企业业务的数据与方法,实现云、数、智的自然融合和协力发展。

    1701 人正在学习 去看看 2017CCTC大会

1、spark客户端登录不上,按图配置即可登录


域名为openfire中配置的服务器名


2、openfire管理端初始登录不上

a.删除ofUser表记录;

b.执行sql:INSERT INTO ofUser (username, plainPassword, name, email, creationDate, modificationDate) VALUES ('admin', 'admin', 'Administrator', 'admin@example.com', '0', '0');

c.用账号admin密码admin登录


3.openfire重新初始化设置

在~\openfire\conf\openfire.xml中删除<setup>true</setup>配置,重启openfire服务器即可


2019-10-16 02:07:36 TT15751097576 阅读数 23
  • Spark专场】Deep Dive: How Spark Uses Memory

    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等新技术,企业又可以以较小的成本、更高效地挖掘出提升企业业务的数据与方法,实现云、数、智的自然融合和协力发展。

    1701 人正在学习 去看看 2017CCTC大会

Spark 连接 KafKa

数据的流程与细节方向

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NsuvdjmY-1571162694434)(C:\Users\tt\AppData\Roaming\Typora\typora-user-images\1571158628657.png)]

前几天把数据导入kafka中,现在要把数据从kafka中使用Scala将数据导入数据仓库;

新建maven工程(基础步骤),

在开百度中打开maven工程–>找到kafka的jar

导入scala框架

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sQxLxvU4-1571162694436)(C:\Users\tt\AppData\Roaming\Typora\typora-user-images\1571138951538.png)]

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

对于消费者,kafka中有两个设置的地方:对于老的消费者,由**–zookeeper参数设置;对于新的消费者,由–bootstrap-server参数**设置

import java.util.{Collections, Properties}

import org.apache.kafka.clients.consumer.KafkaConsumer

//Scala连接kafka的语句

object MyConsumer {
  def main(args: Array[String]): Unit = {

    val props = new Properties()
    //kafka中有两个设置的地方:对于老的消费者,有--zookeeper参数设置;对于心的消费者,由--bootstrap-server参数设置   +  虚拟机的静态地址和默认端口号
    props put("bootstrap.servers", "192.168.56.101:9092")
      //组
    props.put("group.id","users")
      //key的反序列化,
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
      //这是kafka服务器的地址,后面都需要使用(键/值)反序列化
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
      //需要kafka,后面是键值对
    val consumer = new KafkaConsumer[String,String](props)
      //取消息对列的信息,最后返回时一个集合(以前时消息队列,不方便)
    consumer.subscribe( Collections.singletonList("users"))
      //这就是怎么从kafka拿值
      //给consumer的运行时间(100毫秒)
    while (true){
      val rs = consumer.poll(100)
      print("=================================================")
        //给一个迭代器
      val it = rs.iterator()
        //判断下面有没有东西,由的话,就返回布尔值
      while (it.hasNext){
        val single = it.next()
          //键/值
        println(single.key(),".....",single.value())
      }
      print("=====================================================")
    }
      //关闭流
    consumer.close()
  }
}

打个胖包

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6tjL9qgu-1571162694437)(C:\Users\tt\AppData\Roaming\Typora\typora-user-images\1571156355467.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DACjoowV-1571162694439)(C:\Users\tt\AppData\Roaming\Typora\typora-user-images\1571156603825.png)]

到虚拟机里面启动kafka和spark和zookeeper

[root@bigdata bin]# zkServer.sh start
JMX enabled by default
Using config: /opt/bigdata/zk345/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata bin]# ./kafka-server-start.sh ../config/server.properties 

[root@bigdata ~]# cd /opt/bigdata/spark243/
[root@bigdata spark243]# cd sbin/
[root@bigdata sbin]# ./start-all.sh 
org.apache.spark.deploy.master.Master running as process 7192.  Stop it first.
bigdata: org.apache.spark.deploy.worker.Worker running as process 7261.  Stop it first.
[root@bigdata sbin]# jps
3377 ResourceManager
7905 Kafka
7877 QuorumPeerMain
3224 SecondaryNameNode
7192 Master
8233 Jps
3034 DataNode
6106 SparkSubmit
2940 NameNode
7261 Worker
[root@bigdata sbin]#
[root@bigdata bin]# ./kafka-streams-application-reset.sh --zookeeper 127.0.0.1:2181 --application-id user_friends --input-topics user_friends

[root@bigdata bin]# cd /opt/bigdata/spark243/bin/
[root@bigdata bin]# ./spark-submit /opt/mykafka.jar --class com.njbdqn.mykafka.MyConsumer
19/10/15 12:32:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
=================================================1org.apache.kafka.clients.consumer.ConsumerRecords@5b7ea70d
=====================================================2=================================================1org.apac
=====================================================2=================================================1org.apa

2019-12-01 17:04:11 weixin_40040107 阅读数 33
  • Spark专场】Deep Dive: How Spark Uses Memory

    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等新技术,企业又可以以较小的成本、更高效地挖掘出提升企业业务的数据与方法,实现云、数、智的自然融合和协力发展。

    1701 人正在学习 去看看 2017CCTC大会

一. SparkRDD 与 HBase的交互

1.1 依赖配置以及注意事项

1.1.1 特别注意

建议参考 2.3 添加数据 - put的使用里面的处理方法

1.1.2 POM 文件

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <hadoop.version>2.7.5</hadoop.version>
        <spark.version>2.3.4</spark.version>
        <scala.version>2.11.12</scala.version>
        <junit.version>4.12</junit.version>
        <netty.version>4.1.42.Final</netty.version>
    </properties>

    <dependencies>
        <!-- spark 核心依赖包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.0.0</version>
         </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <!--
                java.lang.NoClassDefFoundError: org/apache/spark/streaming/dstream/DStream
            -->
           <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 编译Scala 的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
            </plugin>
            <!-- 编译Java 的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

1.1.2 注意事项

在下载依赖的时候很有可能出现下载不下来的问题, 会卡到一个地方,一直下载
在这里插入图片描述
解决办法 : 尝试删掉这个文件, 或许可以

1.2 获取数据 - Get 的使用

特别注意 : 千万不要忘了导 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

package com.wangt.hbase.spark

import org.apache.hadoop.hbase.client.{Get, Result}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, TableName}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用 RDD 作为数据源, 将RDD中的数据写入到HBase
 * 特别注意 : 一定要导入 HBase 的隐式方法org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
 *
 * @author 王天赐
 * @create 2019-11-29 19:35
 */
object HBaseBulkGetExampleByRDD extends App{

	// 1.创建SparkConf 以及 SparkContext, 设置本地运行模式
	val conf = new SparkConf()
		.setMaster("local[*]")
		.setAppName("HBase")
	val sc = new SparkContext(conf)
	// 设置日志输出等级为 WARN
	sc.setLogLevel("WARN")

	try {
		// 2. 创建HBaseConfiguration对象设置连接参数
		val hbaseConf = HBaseConfiguration.create()
		// 设置连接参数
		hbaseConf.set("hbase.zookeeper.quorum", "222.22.91.81")
		hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

		// 3.创建HBaseContext
		val hc = new HBaseContext(sc, hbaseConf)

		// 4. 将需要获取的数据的 Rowkey 字段等信息封装到 RDD中
		val rowKeyAndQualifier = sc.parallelize(Array(
			Array(Bytes.toBytes("B1001"), Bytes.toBytes("name")),
			Array(Bytes.toBytes("B1002"), Bytes.toBytes("name")),
			Array(Bytes.toBytes("B1003"), Bytes.toBytes("name"))
		))

		// 5. 获取指定RowKey 以及指定字段的信息
		val result = rowKeyAndQualifier.hbaseBulkGet(hc, TableName.valueOf("Student"), 2,
			(info) => {
				val rowkey = info(0)
				// 字段名
				val qualify = info(1)
				val get = new Get(rowkey)
				get
			}
		)
		// 6. 遍历结果
		result.foreach(data => {
			// 注意 Data是 Tuple 类型
			val result: Result = data._2
			// 获取 Cell数组对象
			val cells: Array[Cell] = result.rawCells()
			// 遍历
			for (cell <- cells) {
				// 获取对应的值
				val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
				val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
				val value = Bytes.toString(CellUtil.cloneValue(cell))
				// 打印输出结果
				println("[ " + rowKey + " , " + qualifier + " , " + value + " ]")
			}
		})

	} finally {
		sc.stop()
	}

}

1.3 添加数据 - put 的使用

package com.wangt.hbase.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.client.{Get, Put, Result}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * @author 王天赐
 * @create 2019-11-29 9:28
 */
object HBaseBulkPutExample extends App {

	val tableName = "Student"

	val sparkConf = new SparkConf()
		.setAppName("HBaseBulkGetExample " + tableName)
    	.setMaster("local[*]")
	val sc = new SparkContext(sparkConf)

	try {

		//[(Array[Byte])]
		val rdd = sc.parallelize(Array(
			Array(Bytes.toBytes("B1001"),Bytes.toBytes("name"),Bytes.toBytes("张飞")),
			Array(Bytes.toBytes("B1002"),Bytes.toBytes("name"),Bytes.toBytes("李白")),
			Array(Bytes.toBytes("B1003"),Bytes.toBytes("name"),Bytes.toBytes("韩信"))))

		val conf = HBaseConfiguration.create()
		conf.set("hbase.zookeeper.quorum", "222.22.91.81")
		conf.set("hbase.zookeeper.property.clientPort", "2181")

		val hbaseContext = new HBaseContext(sc, conf)

		val getRdd = rdd.hbaseBulkPut(hbaseContext, TableName.valueOf("Student"),
			record => {
				val put = new Put(record(0))
				put.addColumn(Bytes.toBytes("info"), record(1), record(2));
				put
			}
		)

	} finally {
		sc.stop()
	}
}

1.4 删除数据 - delete 的使用

package com.wangt.hbase.spark

import org.apache.hadoop.hbase.client.Delete
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

/**
 * @author 王天赐
 * @create 2019-11-29 22:01
 */
object HBaseBulkDeleteExample extends App{

	// 1.创建SparkConf 以及 SparkContext, 设置本地运行模式
	val conf = new SparkConf()
		.setMaster("local[*]")
		.setAppName("HBase")
	val sc = new SparkContext(conf)
	// 设置日志输出等级为 WARN
	sc.setLogLevel("WARN")

	try {
		// 2. 创建HBaseConfiguration对象设置连接参数
		val hbaseConf = HBaseConfiguration.create()
		// 设置连接参数
		hbaseConf.set("hbase.zookeeper.quorum", "222.22.91.81")
		hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

		// 3.创建HBaseContext
		val hc = new HBaseContext(sc, hbaseConf)

		// 4. 将需要删除的数据的 Rowkey 字段等信息封装到 RDD中
		val deletedRowkeyAndQualifier = sc.parallelize(Array(
			Array(Bytes.toBytes("B1001"), Bytes.toBytes("name"))
		))

		// 5. 删除数据
		deletedRowkeyAndQualifier.hbaseBulkDelete(hc, TableName.valueOf("Student"),
			(record) => {

				val rowkey = record(0)
				val qualifier = record(1)
				val delete = new Delete(rowkey)
				delete.addColumn(Bytes.toBytes("info"), qualifier)
				// 最后需要返回一个 Delete 对象
				delete
			},
			2 // 批处理的大小
		)
	} finally {
		sc.stop()
	}
}

1.5 自定义操作 - hbaseMapPartitions 的使用

  1. hbaseMapPartitions 相当于是针对每个RDD的分区的数据进行操作

  2. 强烈建议 : hbaseMapPartitions 只作为封装 Get 对象或者 Put 对象不要直接在里面 put数据

  3. Get 数据可以, 但是不要直接在 hbaseMapPartitions 方法里面就直接把数据提交删除,具体参考下面的代码

1.5.1 HBaseMapPartitionPut 操作

package com.wangt.hbase.spark

import java.util

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters._

/**
 * @author 王天赐
 * @create 2019-11-29 22:10
 */
object HBaseMapPartitionPutExample extends App {

	// 1.创建SparkConf 以及 SparkContext, 设置本地运行模式
	val conf = new SparkConf()
		.setMaster("local[*]")
		.setAppName("HBase")
	val sc = new SparkContext(conf)
	// 设置日志输出等级为 WARN
	sc.setLogLevel("WARN")

	try {
		// 2. 创建HBaseConfiguration对象设置连接参数
		val hbaseConf = HBaseConfiguration.create()
		// 设置连接参数
		hbaseConf.set("hbase.zookeeper.quorum", "222.22.91.81")
		hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

		// 3.创建HBaseContext
		val hc = new HBaseContext(sc, hbaseConf)

		// 4. 将 Rowkey 字段等信息封装到 RDD中
		val rdd = sc.parallelize(Array(
			Array(Bytes.toBytes("B1004"), Bytes.toBytes("name"),Bytes.toBytes("貂蝉"))
		))

		// 5.使用 HBaseMapPartition
		val putsRdd = rdd.hbaseMapPartitions[Put](hc, (rddData, connection) => {

			val puts = rddData.map(r => {
				// 取出对应的数据
				val rowkey = r(0)
				val qualifier = r(1)
				val value = r(2)

				// 注意 : 这个时候 我们有 HBase的Connection对象. 有 Table 对象, 我们可以做关于HBase的任何事
				// 包括但不限于 创建表 或者删除 只需要获取一个Admin对象即可 等等, 下面只是举一个例子
				val put = new Put(rowkey)
				// 将数据添加进去
				put.addColumn(Bytes.toBytes("info"), qualifier, value)
				put
			})
			// 最后再把 puts 返回即可, 它会自动把数据添加进RDD中
			puts
		})
		// 强烈建议 : ! 在 hbaseMapPartitions 方法中将RDD的数据封装成 put类型
		// 然后 在 hbaseBulkPut 去添加, 直接在 hbaseMapPartitions 添加, 虽然有 Connection对象, 但是真的不好用,
		// 参考 我在 HBaseMapPartitionGetExample 类里面写的, 可以直接分开
		// hbaseMapPartitions 封装get, hbaseBulkGet 获取数据, 然后 RDD 遍历 
		putsRdd.hbaseBulkPut(hc, TableName.valueOf("Student"), (put) => (put))

	}finally {
		sc.stop()
	}
}

1.5.2 HBaseMapPartitionGet 操作

注意 : HBaseMapPartition 和上面的方法的区别是, 就像map和 mapParatition的区别

它会针对每个分区统一执行一次map方法, 而不是针对每一条数据执行一次 推荐使用

package com.wangt.hbase.spark

import java.util

import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Get, Put, Result}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 王天赐
 * @create 2019-11-30 10:14
 */
object HBaseMapPartitionGetExample extends App{

	// 1.创建SparkConf 以及 SparkContext, 设置本地运行模式
	val conf = new SparkConf()
		.setMaster("local[*]")
		.setAppName("HBase")
	val sc = new SparkContext(conf)
	// 设置日志输出等级为 WARN
	sc.setLogLevel("WARN")

	try {
		// 2. 创建HBaseConfiguration对象设置连接参数
		val hbaseConf = HBaseConfiguration.create()
		// 设置连接参数
		hbaseConf.set("hbase.zookeeper.quorum", "222.22.91.81")
		hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

		// 3.创建HBaseContext
		val hc = new HBaseContext(sc, hbaseConf)

		// 4. 将 Rowkey 字段等信息封装到 RDD中
		val rdd = sc.parallelize(Array(
			Array(Bytes.toBytes("B1002"), Bytes.toBytes("name")),
			Array(Bytes.toBytes("B1003"), Bytes.toBytes("name"))
		))

		// 5.使用 HBaseMapPartition
		val results = rdd.hbaseMapPartitions[String](hc, (rddData, connection) => {

			// (1). 获取Table对象
			val table = connection.getTable(TableName.valueOf("Student"))
			// (2). 注意 rddData 是 iterator 类型
			// 可以使用下面的方式获取数据, 但是不推荐
			/*
			while(rddData.hasNext){
				val info = rddData.next()
			}
			*/
			// 官方的例子. 注意 : 这个map 不是RDD 算子, 而是scala自带的函数
			// 最后返回的是 一个 iterator 类型 比如下面的例子是 返回的 iterator[Put]
			val infos = rddData.map(r => {
				/**
				 * 获取指定 RowKey 和指定字段的值
				 */
				// 取出对应的数据
				val rowKey = r(0)
				val qualifier = r(1)
				// 创建 Get 对象, 并添加相应的对象以及rowkey信息
				val get = new Get(rowKey)
				get.addColumn(Bytes.toBytes("info"), qualifier)
				// 获取Result对象
				val result : Result = table.get(get)
				// 获取Cells ,类型是我加上为了 知道这个数据是什么类型的, 可以选择不加
				val cells : util.Iterator[Cell] = result.listCells().iterator()
				// 遍历 Cells
				val sb = new StringBuilder()
				while (cells.hasNext){
					val cell = cells.next()
					// 获取cell中的数据
					val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
					val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
					val value = Bytes.toString(CellUtil.cloneValue(cell))
					sb.append("[ " + rowKey + " , " + qualifier + " , " + value + " ]" )
				}
				// 将得到信息返回
				sb.toString()
			}
			)
			// 最后再把 infos 返回即可 ,它会把info中的信息封装到 RDD中
			infos
		})

		// 6.遍历结果
		results.foreach(println(_))

	}finally {
		sc.stop()
	}
}


二. SparkDStream 与 HBase的交互

2.1 依赖配置以及注意事项

Get 时如果你传入的rowKey是空的话, 后面获取Result的时候会报空指针, 解决方法参考我的代码

2.2 获取数据 - Get的使用

注意事项 : 我用的是netcat作为数据源, 需要先开netcat 再启动程序

参考 :

  1. 开启netcat => nc -lk cm5 8989 (端口和主机改成你自己的)

  2. 输入数据 :

B1001 name

B1002 name

注意 : 第一个是rowKey , 第二个是 字段 (建议输入你HBase中有的RowKey和字段)

package com.wangt.hbase.sparkstreaming

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author 王天赐
 * @create 2019-11-30 20:36
 */
object HBaseBulkGetDStreamExample extends App {

	// 1.创建SparkConf 以及 SparkContext, 设置本地运行模式
	val sc = new SparkContext("local[*]", "HBase")
	val ssc = new StreamingContext(sc, Seconds(5))
	// 设置日志输出等级为 WARN
	sc.setLogLevel("WARN")

	try {
		// 3.创建StreamingContext 设置将5s内的数据一块处理

		// 2.获取 HBase 配置对象以及HBaseContext
		val hBaseConf = HBaseConfiguration.create()
		hBaseConf.set("hbase.zookeeper.quorum", "222.22.91.81")
		hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")
		val hBaseContext = new HBaseContext(sc, hBaseConf)

		// 4.获取指定端口的数据
		val dStream = ssc.socketTextStream("cm5", 8989)
		dStream.print()

		// 将数据封装到Get对象, 然后将数据转换为 DStream
		val getsDStream = dStream.hbaseMapPartitions[Get](hBaseContext, (record, connection) => {
			val gets: Iterator[Get] = record.map(r => {
				// 读取的单条数据 : B1001 name
				val arr: Array[String] = r.split(" ")
				// 默认值
				var rowKey: Array[Byte] = Bytes.toBytes("-")
				var qualifier: Array[Byte] = Bytes.toBytes("0")
				// 这里其实应该过滤下, 过滤掉不符合的数据, 但是这里只是作为Demo, 假设数据格式是规范的...
				if (arr.length == 2) {
					rowKey = Bytes.toBytes(arr(0))
					qualifier = Bytes.toBytes(arr(1))
				}
				// 将数据封装成 get 对象
				val get = new Get(rowKey)
				get.addColumn(Bytes.toBytes("info"), qualifier)
				// 注意需要将get对象返回
				get
			})
			// 最后将gets 返回
			gets
		})
		// 根据 Get 获取对应的Result 以及结果
		val data = getsDStream.hbaseBulkGet(hBaseContext, TableName.valueOf("Student"),
			2, (get) => (get), result => {
				// 特别注意 : 这个判断的必须加上, 否则, 一旦你输入的rowkey是错误的, 获取不到数据
				// 会立马报错 ,程序就会停止 最好使用 rawCells().size , 其他的属性我都试过., 没有用, 比如 getExist的...
				if (result.rawCells().size > 0 ) {
					// 获取 Cells
					// 下面这种方法也是可以的
					//val cells = result.rawCells()
					val cells = result.listCells().iterator()
					var sb: StringBuilder = null
					while (cells.hasNext) {
						var cell = cells.next()
						// 获取指定的数据
						val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
						val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
						val value = Bytes.toString(CellUtil.cloneValue(cell))
						// 使用StringBuilder拼接数据
						sb = new StringBuilder()
						sb.append("[ " + rowKey + " , " + qualifier + " , " + value + " ]")
					}
					sb.toString()
				}
			})
		// 打印结果
		data.print()

		ssc.start()
		ssc.awaitTermination()
		ssc.stop()
	}

}

2.3 添加数据 - Put的使用

  1. 注意事项 : put中我增加了过滤数据的步骤, 建议在使用时都增加过滤数据, 否则如果是不规则的数据容易报错

  2. 推荐 :

(1) 尽量先对数据进行过滤, 拿到你想要格式的数据

(2) 使用 hbaseMapPartitions 对数据进行封装成 Get / Put 对象

(3) 使用 hbaseBulkGet / hbaseBulkPut 对数据进行处理

package com.wangt.hbase.sparkstreaming

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author 王天赐
 * @create 2019-12-01 9:15
 */
object HBaseBulkPutDStreamExample extends App {

	// 1.创建SparkContext 以及StreamingContext 设置本地运行模式以及数据间隔时间
	// (1) 补充: 如果创建了SparkContext, 那么在创建StreamingContext的时候直接将 sc 作为参数传入即可
	val sc = new SparkContext("local[*]", "HBase-Spark")
	sc.setLogLevel("WARN")
	val ssc = new StreamingContext(sc, Seconds(2))

	try {
		// 2.创建HBaseConfiguration对象以及HBaseContext对象
		val hBaseConf: Configuration = HBaseConfiguration.create()
		hBaseConf.set("hbase.zookeeper.quorum", "222.22.91.81:2181")
		val hBaseContext = new HBaseContext(sc, hBaseConf)

		// 3.获取DStream流
		val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("cm5", 8989)

		// 4.对不符合数据规则的数据进行过滤
		// 规则 : 要求 => B1001 name 张飞 (数据切分后长度为3)
		val filterDStream: DStream[String] = dStream.filter(line => {
			val data = line.split(" ")
			if (data.length == 3) {
				true
			} else {
				false
			}
		})

		// 5.将数据封装成 Put对象
		val putsDStream: DStream[Put] = filterDStream.hbaseMapPartitions(hBaseContext, (record, connection) => {
			val puts = record.map(r => {
				//(1) 切分数据
				val data = r.split(" ")
				//(2) 获取对应的数据
				val rowKey = Bytes.toBytes(data(0))
				val qualifier = Bytes.toBytes(data(1))
				val value = Bytes.toBytes(data(2))
				//(3) 封装数据
				val put = new Put(rowKey)
				put.addColumn(Bytes.toBytes("info"), qualifier, value)
				put
			})
			// 返回Puts
			puts
		})
		// 5.将put 对象中的数据写入到 HBase
		putsDStream.hbaseBulkPut(hBaseContext, TableName.valueOf("Student"), (put) => (put))

		// 6.开启StreamingContext
		ssc.start()
		ssc.awaitTermination()
		ssc.stop()
	}finally {
		sc.stop()
	}
}

= filterDStream.hbaseMapPartitions(hBaseContext, (record, connection) => {
val puts = record.map(r => {
//(1) 切分数据
val data = r.split(" ")
//(2) 获取对应的数据
val rowKey = Bytes.toBytes(data(0))
val qualifier = Bytes.toBytes(data(1))
val value = Bytes.toBytes(data(2))
//(3) 封装数据
val put = new Put(rowKey)
put.addColumn(Bytes.toBytes(“info”), qualifier, value)
put
})
// 返回Puts
puts
})
// 5.将put 对象中的数据写入到 HBase
putsDStream.hbaseBulkPut(hBaseContext, TableName.valueOf(“Student”), (put) => (put))

	// 6.开启StreamingContext
	ssc.start()
	ssc.awaitTermination()
	ssc.stop()
}finally {
	sc.stop()
}

}




2017-03-14 16:02:36 shenshendeai 阅读数 871
  • Spark专场】Deep Dive: How Spark Uses Memory

    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等新技术,企业又可以以较小的成本、更高效地挖掘出提升企业业务的数据与方法,实现云、数、智的自然融合和协力发展。

    1701 人正在学习 去看看 2017CCTC大会

引入mysql的jar包


package com.agm.database



import java.sql.DriverManager
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import java.util.Properties
import org.apache.log4j.{Level, Logger}
object SparkOnMysql {


  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val sparkConf = new SparkConf().setMaster("local").setAppName("spark sql test");
    val sc = new SparkContext(sparkConf);
    val sqlContext = new SQLContext(sc);
    
    //1. 不指定查询条件
    //这个方式链接MySql的函数原型是:
    //我们只需要提供Driver的url,需要查询的表名,以及连接表相关属性properties。下面是具体例子:
    val url = "jdbc:mysql://localhost:3306/zykcmp?user=root&password=123456";
    val prop = new Properties();
    val df = sqlContext.read.jdbc(url, "user", prop);
    println("第一种方法输出:"+df.count());
    println("1.------------->" + df.count());
    println("1.------------->" + df.rdd.partitions.size);
    
    //2.指定数据库字段的范围
    //这种方式就是通过指定数据库中某个字段的范围,但是遗憾的是,这个字段必须是数字,来看看这个函数的函数原型:
    /* def jdbc(
    url: String,
    table: String,
    columnName: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    connectionProperties: Properties): DataFrame*/
    //前两个字段的含义和方法一类似。columnName就是需要分区的字段,这个字段在数据库中的类型必须是数字;
    //lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。同样,我们也来看看如何使用:
    val lowerBound = 1;
    val upperBound = 6;
    val numPartitions = 2;
    val url1 = "jdbc:mysql://localhost:3306/zykcmp?user=root&password=123456";
    val prop1 = new Properties();
    val df1 = sqlContext.read.jdbc(url1, "user", "pic", lowerBound, upperBound, numPartitions, prop1);
    println("第二种方法输出:" + df1.rdd.partitions.size);
    df1.collect().foreach(println)
    
     /*这个方法可以将iteblog表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!但是根据前面介绍,这个函数的缺点就是只能使用整形数据字段作为分区关键字。
这个函数在极端情况下,也就是设置将numPartitions设置为1,其含义和第一种方式一致。*/
    
    //3.根据任意字段进行分区
    //基于前面两种方法的限制, Spark 还提供了根据任意字段进行分区的方法,函数原型如下:
    /*def jdbc(
    url: String,
    table: String,
    predicates: Array[String],
    connectionProperties: Properties): DataFrame*/
    //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:
    //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:
    val predicates = Array[String]("password <= 5", "password > 5 ")
    val url2 = "jdbc:mysql://localhost:3306/zykcmp?user=root&password=123456"
    val prop2 = new Properties()
    val df2 = sqlContext.read.jdbc(url, "user", predicates, prop2)
    println("第三种方法输出:"+df2.rdd.partitions.size+","+predicates.length);
    df2.collect().foreach(println)
    //最后rdd的分区数量就等于predicates.length。
   
    
    //4.通过load获取
    //Spark还提供通过load的方式来读取数据。
    val url3 = "jdbc:mysql://localhost:3306/zykcmp?user=root&password=123456"
    val df3 = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "user").load()
    println("第四种方法输出:"+df3.rdd.partitions.size);
    df.collect().foreach(println)


    sc.stop()
  }
}
没有更多推荐了,返回首页