kyro序列化 spark_spark序列化kyro - CSDN
精华内容
参与话题
  • Spark的Kryo序列化注册

    千次阅读 2017-01-13 10:37:25
    Spark序列化可以将RDD序列化来减少内存占用。 对于优化网络性能极为重要 spark.serializer=org.apache.spark.serializer.JavaSerializationSpark默认 使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何...

    Spark的Kryo序列化注册
    Spark序列化可以将RDD序列化来减少内存占用。 对于优化网络性能极为重要
    1、java序列化:

    spark.serializer=org.apache.spark.serializer.JavaSerialization
    Spark默认 使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。
    Java序列化很灵活但性能差速度很慢,同时序列化后占用的字节数也较多。
    2、Kryo序列化:
    spark.serializer=org.apache.spark.serializer.KryoSerialization
    KryoSerialization速度快,产生的结果更为紧凑(通常能提高10倍)。
    但Kryo不支持所有实现了java.io.Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。


    在 SparkConf 初始化的时候调用
    conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 
    使用 Kryo。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。
    需要在使用时注册需要序列化的类型,建议在对网络敏感的应用场景下使用Kryo。
    一个例子:
    val sparkConf = new SparkConf()
          /*.setMaster("local")*/
          .setAppName("Test")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          /*.set("spark.kryo.registrationRequired", "true")*/
          .registerKryoClasses(Array(
          classOf[Array[String]],
          classOf[util.HashMap[String, String]],
          classOf[util.ArrayList[String]],
    	  classOf[MyTest]
        ))
    

    在集群跑的时候把/*.set("spark.kryo.registrationRequired", "true")*/注释掉,否则会报如下错误:

    Class is not registered: scala.reflect.ClassTag$$anon$1或者Class is not registered: scala.reflect.ManifestFactory$$anon$9

    最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这很浪费没必要的空间。


    展开全文
  • spark2.0 Java序列化和Kyro序列化测试

    千次阅读 2016-11-23 17:39:03
    先列出测试结果,后面附有测试代码,可以发现,改成Kyro序列化之后,可以节约大量空间。   JavaSerial KyroSerial Int 81 2 empty string 7 3 string with 1 character 8 4 ...

    先列出测试结果,后面附有测试代码,可以发现,改成Kyro序列化之后,可以节约大量空间。

      JavaSerial KyroSerial
    Int 81 2
    empty string 7 3
    string with 1 character 8 4
    string with 2 character 9 4
    string with 10 character 17 12
    string with 20 character 27 22
         

    测试代码

    import org.apache.spark._
    import java.nio.ByteBuffer
    import org.apache.spark.serializer._;
    
    object TestDataSerialize {
      def main(args: Array[String]) = {
        // testKyro();
        testJavaSerial()
        testKyroSerial()
      }
      
      def testSerial(serialInst: SerializerInstance) = {
    
        println("test IntObject")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize(i)
          println(byteBuf.limit())
        }
        println("test empty string")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize("")
          println(byteBuf.limit())
        }
        println("test  string with 1 character")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize("a")
          println(byteBuf.limit())
        }
        println("test  string with 2 character")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize("ab")
          println(byteBuf.limit())
        }
        println("test  string with 10 character")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize("abcdefghij")
          println(byteBuf.limit())
        }
        println("test  string with 20 character")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize("abcdefghijabcdefghij")
          println(byteBuf.limit())
        }
        println("test SerObject")
        for (i <- 1 to 1) {
          val byteBuf = serialInst.serialize(new SerObject(i, "testname", i + 1))
          println(byteBuf.limit())
        }
        println("\n\n")
      }
      
      def testKyroSerial() = {
        println("testKyroSerial")
        val conf = new SparkConf().setAppName("spark util test").setMaster("local[2]");
    
        val kryoSerializer = new KryoSerializer(conf)
        val serialInst = kryoSerializer.newInstance()
        testSerial(serialInst)
      }
    
      def testJavaSerial() = {
        println("testJavaSerial")
        val conf = new SparkConf().setAppName("spark util test").setMaster("local[2]");
    
        val javaSerializer = new org.apache.spark.serializer.JavaSerializer(conf)
        val serialInst = javaSerializer.newInstance()
        testSerial(serialInst)
      }
    }



    展开全文
  • 以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java...

    当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候,Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快,但是使用Kyro时要注意,Kyro目前还是有些bug。

    • Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。
    • Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册
    When Spark is transferring data over the network or spilling data to disk, it needs to
    serialize objects into a binary format. This comes into play during shuffle operations,
    where potentially large amounts of data are transferred. By default Spark will use
    Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization
    library that improves on Java’s serialization by offering both faster serialization
    times and a more compact binary representation, but cannot serialize all types of
    objects “out of the box.” Almost all applications will benefit from shifting to Kryo for
    serialization.

     

     

    代码示例:

    package spark.examples.kryo
    
    import com.esotericsoftware.kryo.Kryo
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.serializer.KryoRegistrator
    
    //两个成员变量name和age,同时必须实现java.io.Serializable接口
    class MyClass1(val name: String, val age: Int) extends java.io.Serializable {
    }
    
    //两个成员变量name和age,同时必须实现java.io.Serializable接口
    class MyClass2(val name: String, val age: Int) extends java.io.Serializable {
    
    }
    
    //注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializable
    class MyKryoRegistrator extends KryoRegistrator {
      override def registerClasses(kryo: Kryo): Unit = {
        kryo.register(classOf[MyClass1]);
        kryo.register(classOf[MyClass2]);
      }
    }
    
    object SparkKryo {
      def main(args: Array[String]) {
        //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
        val conf = new SparkConf()
        conf.setAppName("SparkKryo")
        conf.setMaster("local[3]")
    
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(List(new MyClass1("Tom", 31), new MyClass1("Jack", 23), new MyClass1("Mary", 19)))
        val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
        //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
        rdd.saveAsObjectFile(fileDir)
        //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
        val rdd1 = sc.objectFile[MyClass1](fileDir + "/" + "part-00000")
    
    
        rdd1.foreachPartition(iter => {
          while (iter.hasNext) {
            val objOfMyClass1 = iter.next();
            println(objOfMyClass1.name)
          }
        })
    
        sc.stop
      }
    }
    

     

    查看保存到文件中的内容,是个二进制数据:

    SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      蓑_xi??蛔?z汲   i       e ur [Lspark.examples.kryo.MyClass1;?
    独#v?  xp   sr spark.examples.kryo.MyClass1z 峌#   xp

     

     

    问题:

    对于普通字符,数字,字符串写入到object文件,是否也是序列化的过程?明确指定使用kvro序列化Int之后,保存的文件确实是二进制的。去掉对Int的注册之后,结果还是一样,序列化的结果完全一样,结果都是:

    SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable      F脗?庻籡陭姯&?  '       # ur [IM篳&v瓴?  xp         

     

     

     

    import com.esotericsoftware.kryo.Kryo
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.serializer.KryoRegistrator
    
    //注册使用Kryo序列化的类,对Int进行序列化
    class MyKryoRegistrator extends KryoRegistrator {
      override def registerClasses(kryo: Kryo): Unit = {
        kryo.register(classOf[Int]);
      }
    }
    
    object SparkKryoPrimitiveType {
      def main(args: Array[String]) {
        //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        System.setProperty("spark.kryo.registrator", "spark.examples.kryo.MyKryoRegistrator")
        val conf = new SparkConf()
        conf.setAppName("SparkKryoPrimitiveType")
        conf.setMaster("local[3]")
    
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(List(1, 3, 7, 9, 11, 22))
        val fileDir = "file:///d:/wordcount" + System.currentTimeMillis()
        //将rdd中的对象通过kyro进行序列化,保存到fileDir目录中
        rdd.saveAsObjectFile(fileDir)
        //读取part-00000文件中的数据,对它进行反序列化,,得到对象集合rdd1
        val rdd1 = sc.objectFile[Int](fileDir + "/" + "part-00000")
    
        rdd1.foreachPartition(iter => {
          while (iter.hasNext) {
            println(iter.next())
          }
        })
    
        sc.stop
      }
    }

     

     其它:

    指定使用Kyro序列化,以及注册Kyro序列化类,可可以使用如下方式

    val conf = new SparkConf()
    //这句是多于的,调用conf的registerKryoClasses时,已经设置了序列化方法
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // Be strict about class registration
    ///如果一个要序列化的类没有进行Kryo注册,则强制Spark报错
    conf.set("spark.kryo.registrationRequired", "true")
    conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

     

      /**
       * Use Kryo serialization and register the given set of classes with Kryo.
       * If called multiple times, this will append the classes from all calls together.
       */
      def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
        val allClassNames = new LinkedHashSet[String]()
        allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
        allClassNames ++= classes.map(_.getName)
    
        set("spark.kryo.classesToRegister", allClassNames.mkString(","))
        set("spark.serializer", classOf[KryoSerializer].getName)
        this
      }

     

     

    展开全文
  • Spark中使用kyro序列化

    千次阅读 2018-01-08 10:42:46
    序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式: Java serialization: 默认的序列化方式。 Kryo serialization: 相较于 Java ...
    序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式:
    
    Java serialization: 默认的序列化方式。
    
    Kryo serialization: 相较于 Java serialization 的方式,速度更快,空间占用更小,但并不支持所有的序列化格式,同时使用的时候需要注册class。spark-sql中默认使用的是kyro的序列化方式。
    
    下文将会讲解kryo的使用方式并对比性能。

    配置

    可以在spark-default.conf设置全局参数,也可以代码中初始化时对SparkConf设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") ,该参数会同时作用于机器之间数据的shuffle操作以及序列化rdd到磁盘,内存。

    Spark不将Kyro设置成默认的序列化方式是因为它需要对类进行注册,官方强烈建议在一些网络数据传输很大的应用中使用kyro序列化。

    val conf = new SparkConf()
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
    val sc = new SparkContext(conf)
    如果你要序列化的对象比较大,可以增加参数spark.kryoserializer.buffer所设置的值。

    如果你没有注册需要序列化的class,Kyro依然可以照常工作,但会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。

    可以设置 spark.kryo.registrationRequired 参数为 true,使用kyro时如果在应用中有类没有进行注册则会报错:

    java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
    Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
    	at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488)
    	at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
    	at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517)
    	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622)
    	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:207)
    	at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(ParallelCollectionRDD.scala:65)
    	at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(ParallelCollectionRDD.scala:65)
    	at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:184)
    	at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65)
    	at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    	at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
    	at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    	at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:246)
    	at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:452)
    	at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:432)
    	at scala.Option.map(Option.scala:146)
    	at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:432)
    	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:264)
    	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    	at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:259)
    	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:333)
    	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:331)
    	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:331)
    	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:328)
    	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    	at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:328)
    	at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    	at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    2018-01-08 10:40:41  [ dispatcher-event-loop-2:29860 ] - [ ERROR ]  Failed to serialize task 0, not attempting to retry it.

    如上这个错误需要添加

    sparkConf.registerKryoClasses(
        Array(classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
        classOf[MyClass]))
    下面的 demo 将会演示不同方式的序列化对空间占用的情况。

    Demo

    case class Info(name: String ,age: Int,gender: String,addr: String)
    
    object KyroTest {
      def main(args: Array[String]) {
    
      val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf.set("spark.kryo.registrationRequired", "true")
     conf.registerKryoClasses(Array(classOf[Info], classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
      val sc = new SparkContext(conf)
    
      val arr = new ArrayBuffer[Info]()
    
      val nameArr = Array[String]("lsw","yyy","lss")
      val genderArr = Array[String]("male","female")
      val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")
    
      for(i <- 1 to 1000000){
        val name = nameArr(Random.nextInt(3))
        val age = Random.nextInt(100)
        val gender = genderArr(Random.nextInt(2))
        val address = addressArr(Random.nextInt(5))
        arr.+=(Info(name,age,gender,address))
        }
    
      val rdd = sc.parallelize(arr)
    
      //序列化的方式将rdd存到内存
      rdd.persist(StorageLevel.MEMORY_ONLY_SER)
      rdd.count()
      }
    }

    可以在web ui中看到缓存的rdd大小:


    序列化方式 是否注册 空间占用
    kyro 21.1 MB
    kyro 38.3 MB
    Java 25.1 MB

    转载自:http://blog.csdn.net/lsshlsw/article/details/50856842

    展开全文
  • Spark kyro序列化测试

    2019-05-26 22:09:01
    spark官网给出的几种调优点其中有一条是数据序列化 1.数据序列化,data serialization 1)java serialization(slow and large) 2)kyro serialization(qucikly compact) 注册使用,不注册性能相反 使用kryo的三...
  • spark之kryo 序列化

    千次阅读 2018-05-26 00:12:19
    几乎所有的资料都显示kryo 序列化方式优于java自带的序列化方式,而且在spark2.*版本中都是默认采用kryo 序列化。因此本文将做kryo 做一个测试以验证其性能。1.先给出定义: 把对象转换为字节序列的过程称为对象的...
  • 一直都在说kyro序列化,这一块对于spark在序列化这块性能提升比较大,今天研究下 spark的序列化机制 spark提供2个序列化库 1、Java serialization java 序列化库很灵活但是很慢,序列化对象占用的空间大。 2、...
  • Spark 使用 Kyro 序列化

    2020-08-19 22:54:48
    序列化 解决Driver端创建的对象 在Execute端传输问题 ...方式二: Kyro 第三方序列化Spark支持】 优点:序列化后的size 大概是Serializable 十分之一 val conf = new SparkConf().setAppName("Demo
  • kyro序列化

    千次阅读 2018-11-15 08:39:12
    一、序列化对比 在分布式系统中扮演着重要的角色,优化Spark程序时,...Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化 Kryo Serialization:...
  • Spark优化之Kyro序列化

    2018-09-06 11:00:50
    Spark 2.0.2, double[], 使用Kyro序列化加速,和手动注册类名 Kyro通常比原生的Java默认实现快10倍,所以建议使用Kyro来加速。 如何开启Kyro 开启的方法很简单,就是设参数spark.serializer。有三种方式: 程序内:...
  • 参考文章: 1.Spark 配置Kryo序列化机制 https://www.jianshu.com/p/68970d1674fa 2.Spark kyro Serialization ... 3.Spark中使用kyro序列化 ...4.【Spark七十八】Spark Kyro序列化 https://www.ite
  • 目录spark的序列化关于序列化的原理Kyro序列化(建议使用)总结 spark的序列化 进行 Spark 进行编程的时候, 初始化工作是在 driver端完成的, 而实际的运行程序是在executor端进行的. 所以就涉及到了进程间的通讯, ...
  • 默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化 这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,...
  • spark-kryo序列化方式

    2019-08-10 10:00:13
    Spark中,主要有三个地方涉及到了序列化: 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。 将自定义的类型作为RDD的泛型类型时(比如JavaRDD<SXT>,SXT是自定义类型),所有自定义类型...
  • 序列化 序列化:Java序列化 Kryo序列化 官网:http://spark.apache.org/docs/latest/tuning.html 如果对象比较大,需要添加一个参数spark.kryoserializer.buffer(默认64k)(有点小需要调大一点) spark....
  • 前言:kryo是个高效的java序列化/反序列化库,目前Twitter、yahoo、Apache、strom等等在使用该技术,比如Apache的spark、hive等大数据领域用的较多。为什么使用kryo而不是其他?因为性能足够好。比kyro更高效的序列...
  • Kryo与Spark内部默认序列化机制优缺点 默认优点 默认情况下,Spark内部使用java的序列化机制,ObjectOutputStream/ObjectInputStream 对象输入输出流机制,来进行序列化,这种默认序列化机制的好处在于,处理起来...
  • Spark Application中,经常会使用到一个共享变量,众所周知的,Spark是一个并行计算框架,对于这个变量,每一个executor的task在访问它的时候,都会去拷贝一份副本去使用。如下图所示: 对于这种默认方式,它会...
  • Kryo序列化与Java序列化

    千次阅读 2018-06-28 12:52:17
    1.序列化Spark中的用处在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。...
  • 在 Apache Spark 中,对于大数据应用程序,建议使用 Kryo 序列化而不是 java 序列化。与 java 序列化相比,当您移动和缓存大量数据时,与 java 序列化相比,Kryo 占用的内存更少。 虽然 kryo 支持 RDD 缓存 和 ...
1 2 3 4 5 ... 20
收藏数 452
精华内容 180
热门标签
关键字:

kyro序列化 spark