2015-02-22 19:25:29 axxbc123 阅读数 128

当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。

  • Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。
  • Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册
When Spark is transferring data over the network or spilling data to disk, it needs to
serialize objects into a binary format. This comes into play during shuffle operations,
where potentially large amounts of data are transferred. By default Spark will use
Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization
library that improves on Java’s serialization by offering both faster serialization
times and a more compact binary representation, but cannot serialize all types of
objects “out of the box.” Almost all applications will benefit from shifting to Kryo for
serialization.

 

 

代码示例:

package spark.examples.kryo

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.KryoRegistrator

//两个成员变量name和age,同时必须实现java.io.Serializable接口
class MyClass1(val name: String, val age: Int) extends java.io.Serializable {
}

//两个成员变量name和age,同时必须实现java.io.Serializable接口
class MyClass2(val name: String, val age: Int) extends java.io.Serializable {

}

//注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializable
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClass1]);
    kryo.register(classOf[MyClass2]);
  }
}

object SparkKryo {
  def main(args: Array[String]) {
    //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
    val conf = new SparkConf()
    conf.setAppName("SparkKryo")
    conf.setMaster("local[3]")

    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(new MyClass1("Tom", 31), new MyClass1("Jack", 23), new MyClass1("Mary", 19)))
    val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
    //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
    rdd.saveAsObjectFile(fileDir)
    //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
    val rdd1 = sc.objectFile[MyClass1](fileDir + "/" + "part-00000")


    rdd1.foreachPartition(iter => {
      while (iter.hasNext) {
        val objOfMyClass1 = iter.next();
        println(objOfMyClass1.name)
      }
    })

    sc.stop
  }
}

 

查看保存到文件中的内容,是个二进制数据:

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      蓑_xi??蛔?z汲   i       e ur [Lspark.examples.kryo.MyClass1;?
独#v?  xp   sr spark.examples.kryo.MyClass1z 峌#   xp

 

 

问题:

对于普通字符,数字,字符串写入到object文件,是否也是序列化的过程?明确指定使用kvro序列化Int之后,保存的文件确实是二进制的。去掉对Int的注册之后,结果还是一样,序列化的结果完全一样,结果都是:

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      F脗?庻籡陭姯&?  '       # ur [IM篳&v瓴?  xp         

 

 

 

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.KryoRegistrator

//注册使用Kryo序列化的类,对Int进行序列化
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[Int]);
  }
}

object SparkKryoPrimitiveType {
  def main(args: Array[String]) {
    //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
    val conf = new SparkConf()
    conf.setAppName("SparkKryoPrimitiveType")
    conf.setMaster("local[3]")

    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1, 3, 7, 9, 11, 22))
    val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
    //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
    rdd.saveAsObjectFile(fileDir)
    //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
    val rdd1 = sc.objectFile[Int](fileDir + "/" + "part-00000")

    rdd1.foreachPartition(iter => {
      while (iter.hasNext) {
        println(iter.next())
      }
    })

    sc.stop
  }
}

 

 其它:

指定使用Kyro序列化,以及注册Kyro序列化类,可可以使用如下方式

val conf = new SparkConf()
//这句是多于的,调用conf的registerKryoClasses时,已经设置了序列化方法
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Be strict about class registration
///如果一个要序列化的类没有进行Kryo注册,则强制Spark报错
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

 

  /**
   * Use Kryo serialization and register the given set of classes with Kryo.
   * If called multiple times, this will append the classes from all calls together.
   */
  def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
    val allClassNames = new LinkedHashSet[String]()
    allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
    allClassNames ++= classes.map(_.getName)

    set("spark.kryo.classesToRegister", allClassNames.mkString(","))
    set("spark.serializer", classOf[KryoSerializer].getName)
    this
  }

 

 

2016-11-23 17:36:59 houzhizhen 阅读数 1472

先列出测试结果,后面附有测试代码,可以发现,改成Kyro序列化之后,可以节约大量空间。

  JavaSerial KyroSerial
Int 81 2
empty string 7 3
string with 1 character 8 4
string with 2 character 9 4
string with 10 character 17 12
string with 20 character 27 22
     

测试代码

import org.apache.spark._
import java.nio.ByteBuffer
import org.apache.spark.serializer._;

object TestDataSerialize {
  def main(args: Array[String]) = {
    // testKyro();
    testJavaSerial()
    testKyroSerial()
  }
  
  def testSerial(serialInst: SerializerInstance) = {

    println("test IntObject")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize(i)
      println(byteBuf.limit())
    }
    println("test empty string")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize("")
      println(byteBuf.limit())
    }
    println("test  string with 1 character")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize("a")
      println(byteBuf.limit())
    }
    println("test  string with 2 character")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize("ab")
      println(byteBuf.limit())
    }
    println("test  string with 10 character")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize("abcdefghij")
      println(byteBuf.limit())
    }
    println("test  string with 20 character")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize("abcdefghijabcdefghij")
      println(byteBuf.limit())
    }
    println("test SerObject")
    for (i <- 1 to 1) {
      val byteBuf = serialInst.serialize(new SerObject(i, "testname", i + 1))
      println(byteBuf.limit())
    }
    println("\n\n")
  }
  
  def testKyroSerial() = {
    println("testKyroSerial")
    val conf = new SparkConf().setAppName("spark util test").setMaster("local[2]");

    val kryoSerializer = new KryoSerializer(conf)
    val serialInst = kryoSerializer.newInstance()
    testSerial(serialInst)
  }

  def testJavaSerial() = {
    println("testJavaSerial")
    val conf = new SparkConf().setAppName("spark util test").setMaster("local[2]");

    val javaSerializer = new org.apache.spark.serializer.JavaSerializer(conf)
    val serialInst = javaSerializer.newInstance()
    testSerial(serialInst)
  }
}



2018-08-24 12:43:00 xiligey1 阅读数 345

Spark 2.0.2, double[], 使用Kyro序列化加速,和手动注册类名

Kyro通常比原生的Java默认实现快10倍,所以建议使用Kyro来加速。

如何开启Kyro

开启的方法很简单,就是设参数spark.serializer。有三种方式:

  • 程序内:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val spark = SparkSession
  .builder()
  .config(conf)
  .getOrCreate()
  • spark-submit参数:
    spark-submit --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
  • 全局默认配置:conf/spark-defaults.conf:
    spark.serializer org.apache.spark.serializer.KryoSerializer

开启完后要注册类名

为什么要注册类名

Kyro默认序列化实例时在前面会写上类名,比如java.lang.Double,类名越长,额外的存储开销越大。为了解决这个问题,Kyro允许将类名注册进映射表里,通过分配数字ID来替换冗长的类名,比如java.lang.Double使用数字0来代替。这种方式节省了储存空间,但代价是我们必须手动将所有性能相关的类名注册。

spark使用Twitter chill注册了常用的Scala类,也对自己的常用类都进行了注册,具体见KryoSerializer.scala。但很遗憾,在实际使用中,仍然有大量的类名未包含其中,必须手动注册。

怎么注册类名

  1. 找到没有注册的类名
conf.set("spark.kryo.registrationRequired", "true")

开启spark.kryo.registrationRequired=true,于是Kyro遇到没有注册的类名时就会抛异常告警。于是,一遍遍反复排查直到完全跑通,纯体力活。

  1. 如何注册私有类?
    程序内部时,spark指南中使用classOf方法来找类名,
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

它的问题是,你没法导入其他包的私有类。解决方法是使用Class.forName,如:

conf.registerKryoClasses(Array(Class.forName("org.apache.spark.SomePrivateClass")))

因为KyroSerializer也是使用Class.forName来解析spark.kryo.classesToRegister字段,所以直接指定类名即可。

  1. 如何注册Java原生类型和数组?
    使用Class.forName来获取,具体的书写规则见Class.getName()。

举点例子:

Element Type Encoding
double[] "[B"
double[][] "[[B"
Object() "java.lang.Object"
new Object[3] "[Ljava.lang.Object;"
  1. 范例
    最后,贴份对spark-ml中逻辑回归和XGBoost的手动注册类名,己经囊括了所有用法,
spark.kryo.classesToRegister      org.apache.spark.mllib.stat.MultivariateOnlineSummarizer,[D,[I,[F,org.apache.spark.ml.classification.MultiClassSummarizer,org.apache.spark.ml.classification.LogisticAggregator,ml.dmlc.xgboost4j.scala.Booster,ml.dmlc.xgboost4j.java.Booster,[Lml.dmlc.xgboost4j.scala.Booster;,org.apache.spark.ml.feature.LabeledPoint,org.apache.spark.ml.linalg.SparseVector,org.apache.spark.mllib.evaluation.binary.BinaryLabelCounter,scala.reflect.ClassTag$$anon$1,java.lang.Class,[Lorg.apache.spark.mllib.evaluation.binary.BinaryLabelCounter;,scala.collection.mutable.WrappedArray$ofRef,[Ljava.lang.String;,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation,org.apache.spark.sql.execution.columnar.CachedBatch,org.apache.spark.ml.feature.Instance,[[B,org.apache.spark.sql.catalyst.expressions.GenericInternalRow,[Ljava.lang.Object;

2019-05-21 17:37:06 qq_24363849 阅读数 66

Spark官网给出的几种调优点其中有一条是数据序列化

1.数据序列化,数据序列化
1)java序列化
2)kyro序列化(qucikly compact)
注册使用,不注册性能相反使用kryo的三种方式:
1)代码中增加conf.set (“spark .serializer”,“org.apache.spark.serializer.KryoSerializer”)
2)spark-default.conf中进行配置
3)提交作业时–conf key = value的形式添加,优先级比conf中设置高

指定完成后,需要对序列化的类进行注册conf.registerKryoClasses(Array(classOf [MyClass1],classOf [MyClass2]))

有几个类就注册几个类,MyClass1,MyClass2即为要注册的类

对以下几种情况进行测试
1.不进行序列化,34.3MB
在这里插入图片描述
2.java序列化,25.1MB
在这里插入图片描述
3.使用kyro序列化,但不注册类,40.2MB
在这里插入图片描述

4.使用kyro序列化,并注册类,21.1MB
在这里插入图片描述
综上,kyro序列化需要注册对应的类,如不注册,性能最糟,甚至不如不序列化
除了上边这种情况,序列化后总体要比不序列化好。

测试代码如下,源于https://blog.csdn.net/lsshlsw/article/details/50856842

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
import scala.collection.mutable.ArrayBuffer

case class Info(name: String ,age: Int,gender: String,addr: String)

object SerializeCompare {
  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[Info]))
    val sc = new SparkContext(conf)

    val arr = new ArrayBuffer[Info]()

    val nameArr = Array[String]("lsw","yyy","lss")
    val genderArr = Array[String]("male","female")
    val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")

    for(i <- 1 to 1000000){
      val name = nameArr(Random.nextInt(3))
      val age = Random.nextInt(100)
      val gender = genderArr(Random.nextInt(2))
      val address = addressArr(Random.nextInt(5))
      arr.+=(Info(name,age,gender,address))
    }

    val rdd = sc.parallelize(arr)
    
    //序列化的方式将rdd存到内存
    rdd.persist(StorageLevel.MEMORY_ONLY_SER)
    rdd.count()
  }
}
2019-02-19 14:37:45 huonan_123 阅读数 101
  • 默认没有序列化(StorageLevel.MEMORY_ONLY)
def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SequenceFileApp")
     // .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // .registerKryoClasses(Array(classOf[Student]))

    val sc = new SparkContext(sparkConf)
    val students = ListBuffer[Student]()

    for (i <- 0 to 1000000) {
      students.append(Student(i, "student" + i, 39))
    }

    val studentRDD = sc.parallelize(students)
    studentRDD.persist(StorageLevel.MEMORY_ONLY)
    studentRDD.count()

    Thread.sleep(1000 * 20)
    sc.stop()
  }
  • 如下图 95.4M
    在这里插入图片描述
  • 使用默认Java序列化
 def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SequenceFileApp")
    //  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // .registerKryoClasses(Array(classOf[Student]))

    val sc = new SparkContext(sparkConf)
    val students = ListBuffer[Student]()

    for (i <- 0 to 1000000) {
      students.append(Student(i, "student" + i, 39))
    }

    val studentRDD = sc.parallelize(students)
    studentRDD.persist(StorageLevel.MEMORY_ONLY_SER)
    studentRDD.count()

    Thread.sleep(1000 * 20)
    sc.stop()
  }
  • 如下图29.4M
    在这里插入图片描述

  • 使用Kryo的序列化(但是没有注册)

 def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SequenceFileApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // .registerKryoClasses(Array(classOf[Student]))

    val sc = new SparkContext(sparkConf)
    val students = ListBuffer[Student]()

    for (i <- 0 to 1000000) {
      students.append(Student(i, "student" + i, 39))
    }

    val studentRDD = sc.parallelize(students)
    studentRDD.persist(StorageLevel.MEMORY_ONLY_SER)
    studentRDD.count()

    Thread.sleep(1000 * 20)
    sc.stop()
  }
  • 如下图 61.9M
    在这里插入图片描述
  • 使用Kryo注册类之后的序列化
 def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SequenceFileApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(Array(classOf[Student]))

    val sc = new SparkContext(sparkConf)
    val students = ListBuffer[Student]()

    for (i <- 0 to 1000000) {
      students.append(Student(i, "student" + i, 39))
    }

    val studentRDD = sc.parallelize(students)
    studentRDD.persist(StorageLevel.MEMORY_ONLY_SER)
    studentRDD.count()

    Thread.sleep(1000 * 20)
    sc.stop()
  }
  • 如下图 19.0M
    在这里插入图片描述结论:使用Kryo序列化后,文件更小了

kyro序列化

阅读数 1714

Spark kyro Serialization

阅读数 4925

没有更多推荐了,返回首页