2018-05-30 00:55:55 w5201314ws6123 阅读数 554

问题1>为什么要使用kryo序列化呢?

           主要是解决数据在内存中的占用,和网络传输的性能

问题2>为什么不使用java本身自带的序列化机制呢?与kryo有何异同

             1>java本身就自带了ObjectinputStream和ObjectOutputStream序列化机制,这种自带的系列化本身就是可以直接使用,使用起来很方便,但是这种序列化机制会使数据占用大量的内存,消耗较大的内存空间,在大数据的应用中,内存资源很宝贵,我们应该以身作则优化自己书写的代码.

            2>使用kryo序列化机制,一个是数据的提取和数据写入内存是比较快的,也会占用很少的空间内存,了解到使用这样的序列化机制,使用的内存是Java自带的系列化机制使用量的1/10,可比之下,这是怎样的优化

  问题3>使用kryo序列化机制,会生效的地方

              1>当我们使用到外部变量的时候  ,使用kryo序列化,并且使用广播变量,这使得变量传输到executor的速度更快,而且会占用更小的内存

               3>当我们将rdd的数据进行cache的时候(缓存到内存或者磁盘根据自己需求选择),优化网络性能,主要是传输和io

 问题4>我们应该怎样做呢:val conf = new SparkConf("spark.serializer","org.apache.spark.serializer.KryoSerializer")

                                                         .registerKryoClasses(new Class[]{序列化的类})

             个人格言:人这一辈子,我们总是要吃一些苦头,看尽一些美好的,而你在黑暗的时候,你永远也想不到自己也有花期,


2019-07-02 18:41:04 tyz_tyz 阅读数 136

kryo序列化测试
测试spark程序运行中对RDD进行操作,添加与不添加序列化在性能上的区别。
区别包括占用内存大小,程序运行时间等。

测试spark流程
随机生成字符串,以空格分割成行,进行多次map遍历。对结果进行持久化,并保存成文件。
case class DataCase(input: Int)  // 一个简单的case class.
val testNew2 = testRDD.flatMap(x => x.split(" "))
            .map(x => DataCase(x))
            .map(x => x)  // map * 10086

testNew2.persist()
testNew2.repartition(1)
    .saveAsTextFile("./test.file")

persist() 方法包含了多种持久化类型。
persist() 默认持久化类型为org.apache.spark.storage.StorageLevel.MEMORY_ONLY
// persist(StorageLevel.MEMORY_ONLY)

持久化类型:
MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
DISK_ONLY : 只在磁盘上缓存 RDD。
MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本

另一个持久化方法 cache()
cache实际上调用persist(MEMORY_ONLY),等同于persist(MEMORY_ONLY)


序列化方案:使用kryo进行序列化或者使用java原生java.io.Serializable序列化。
使用序列化时,persist级别要使用MOMORY_ONLY_SER或者MEMORY_AND_DISK_SER。

测试数据:
800万条随机random 13位数字。生成的长字符串。以空格分隔开。106MB。
1000万条随机random 13位数据。生成的长字符串。以空格分割开。133MB

测试方案:
1 不使用kryo序列化,持久化使用默认的MEMORY_ONLY
2 不使用kryo序列化,持久化使用MEMORY_ONLY_SER
3 kryo序列化,不注册任何类,持久化使用MEMORY_ONLY_SER
4 kryo序列化,注册所有类(包括调用map时生成的匿名类),持久化使用MEMORY_ONLY_SER
5 kryo序列化,注册除匿名类外其他类,持久化使用MEMORY_ONLY_SER

测试结果:
1
文件大小106MB input 231.9MB  运行时间14s/15s/15s  持久化占用内存694.1MB
文件大小133MB input 342.4MB  运行时间33s  持久化占用内存(超出设置内存,持久化失败。)

2
文件大小106MB  input 231.9MB  运行时间18s/19s  持久化占用内存171.1MB
文件大小133MB  input 342.4MB  运行时间 30s  持久化占用内存213.8MB  

3
文件大小106MB  input231.9MB  运行时间15s/14s/15s  持久化占用内存 228.1MB
文件大小133MB  input342.4MB  运行时间23s  持久化占用内存285.1MB

4
文件大小106MB  input231.9MB  运行时间11s/12s/12s  持久化占用内存121.2MB
文件大小133MB  input342.4MB  运行时间21s/20s/20s  持久化占用内存151.5MB

5
文件大小106MB  input231.9MB  运行时间11s/12s/11s  持久化占用内存121.2MB
文件大小133MB  input342.4MB  运行时间20s/19s/19s  持久化占用内存151.5MB


测试结论:
1.    不使用任何序列化,使用默认内存持久化占用较大内存。
2.    使用java自带的序列化,占用内存减少很多,但是运行时间最长。
3.    使用kryo序列化,不注册需要的类,占用内存比2要增加一些,运行时间和1一样。
4.    使用kryo序列化,注册所有类,占用内存最少,运行时间相比2有很大提升,相比1有部分提升。
5.    使用kryo序列化,注册主要的类,不注册匿名类,得到的结果与4基本相符。实际操作可以不注册匿名类,避免大量重复代码。

注册匿名类方法:
conf.registerKryoClasses(
    Array(
        Class.forName("test.TestRDD$$anonfun$1")
    )
)
val temp = testRDD.flatMap(x => x.split(" "))

注册泛型类方法:
conf.registerKryoClasses(
    Array(
        classOf[test.TestClass[_]]
    )
)
class TestClass[Int]() {...}
 

2019-11-30 09:28:18 weidajiangjiang 阅读数 14

默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

Kryo序列化注册方式的实例代码如代码清单2-3所示:

public class MyKryoRegistrator implements KryoRegistrator
{
  @Override
  public void registerClasses(Kryo kryo)
  {
    kryo.register(StartupReportLogs.class);
  }
}

配置Kryo序列化方式的实例代码如代码清单2-4所示:

代码清单2-4 Kryo序列化机制配置代码


//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator"); 
2019-02-21 16:59:58 bb23417274 阅读数 66

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。
Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

转自原则八

2018-12-17 09:59:50 chixushuchu 阅读数 129

Kryo与Spark内部默认序列化机制优缺点

默认优点
默认情况下,Spark内部使用java的序列化机制,ObjectOutputStream/ObjectInputStream
对象输入输出流机制,来进行序列化,这种默认序列化机制的好处在于,处理起来比较方便,不需要我们自己手动做什么事情,只是,在算子里使用到的变量,必须是实现Serializable接口的,可序列化即可。

默认缺点
默认序列机制的效率不高,序列化的速度相对比较慢,序列化以后的数据,占用的内存空间还是比较大
kryo机制
spark支持使用kryo序列化机制,Kryo序列化机制,比默认的java序列化机制,速度要快,序列化后的数据要更小,大概是java序列化机制的1/10

kryo序列化如何使用

// 构建Spark上下文
		SparkConf conf = new SparkConf()
				.setAppName(Constants.SPARK_APP_NAME_SESSION)
				.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
				.registerKryoClasses(new Class[]{
						CategorySortKey.class,
						IntList.class});

首先第一步,在SparkConf中设置一个属性,spark.serializer, org.apache.spark.serializer.kryoSerializer类
Kryo之所以没有被作为默认的序列化类库的原因,主要因为kryo要求,如果达到最佳性能的话,一定要注册自定义类(比如,算子函数中使用到了外部自定义类型的对象变量这时,就必须注册类,否则kryo达不到最佳性能)

第二步,注册使用到的,需要通过kryo序列化的,一些自定义类,SparkConf.registerKryoClassed()

哪些地方需要使用Kryo

有三个地方

  1. 算子函数中使用到外部变量
    算子函数使用外部变量,使用kryo以后,优化网络传输性能,可以优化集群中内存占用和消耗
  2. 持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
    优化内存占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC.
  3. shuffle
    优化网络传输性能

在这里插入图片描述

什么样序列化错误?

client模式提交spark作业,观察本地打印的log,如果出现了类似于Serializable、Serialize等等相关的错误log,那应该就是序列化错误。

  1. 第一种,算子函数里用到了外部自定义类型的变量,那么此时自定义类型必须实现Implements Serializable接口 不然会出错
  2. 自定义类型,作为rdd类型的元素,那么自定义类型也必须可序列化 不然会出错

以上两种情况 比如 自定义排序对象就需要实现序列化

//第四步,自定义二次排序key
		//第五步,将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序
		JavaPairRDD<CategorySortKey, String> sortKey2countRDD = categoryid2countRDD.mapToPair(new PairFunction<Tuple2<Long, String>, CategorySortKey, String>() {

			private static final long serialVersionUID = -8624622546415913486L;

			@Override
			public Tuple2<CategorySortKey, String> call(Tuple2<Long, String> longStringTuple2) throws Exception {
				String countInfo = longStringTuple2._2();
				long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
						countInfo, "\\|", Constants.FIELD_CLICK_COUNT));
				long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
						countInfo, "\\|", Constants.FIELD_ORDER_COUNT));
				long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
						countInfo, "\\|", Constants.FIELD_PAY_COUNT));
				CategorySortKey sortKey = new CategorySortKey(clickCount,
						orderCount, payCount);

				return new Tuple2<CategorySortKey, String>(sortKey, countInfo);
			}
		});

		JavaPairRDD<CategorySortKey, String> sortedCategoryCountRDD =
				sortKey2countRDD.sortByKey(false);//降序 拿取topn

  1. 不满足以上两种请情况,去使用第三方的,不支持序化的类型
Connection conn = 

studentsRDD.foreach(new VoidFunction() {
 
public void call(Row row) throws Exception {
  conn.....
}

});

Connection是不支持序列化的

spark之kryo 序列化

阅读数 3813

没有更多推荐了,返回首页