精华内容
下载资源
问答
  • 1.foreach val list = new ArrayBuffer() myRdd.foreach(record => { list += record }) 2.foreachPartition val list = new ArrayBuffer rdd.foreachPartition(it ...

    1.foreach

        val list = new ArrayBuffer()
        myRdd.foreach(record => {
          list += record
        })

    2.foreachPartition

        val list = new ArrayBuffer
        rdd.foreachPartition(it => {
          It.foreach(r => {
            list += r
          })
        })

    说明:

    foreachPartition属于算子操作,可以提高模型效率。比如在使用foreach时,将RDD中所有数据写Mongo中,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

    参考官网的说明:

    https://spark.apache.org/docs/latest/streaming-programming-guide.html

     

    转载于:https://www.cnblogs.com/shaozhiqi/p/11599748.html

    展开全文
  • <div><p>Hi <p>I have a data JAVARDD ...Is there a possibility to write from foreachPartition or write parallely? <p>Thanks, Yukti</p><p>该提问来源于开源项目:elastic/elasticsearch-hadoop</p></div>
  • spark foreachPartition思考

    2019-03-07 14:02:13
    对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not ...

    case:Spark向kafka中写入数据

     对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接

    datas.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer,
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 

    方式二

    利用广播变量的形式,将KafkaProducer广播到每一个executor

    参考 https://blog.csdn.net/u013013024/article/details/77877570

        import java.util.concurrent.Future
        import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
        class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
          /* This is the key idea that allows us to work around running into
             NotSerializableExceptions. */
          lazy val producer = createProducer()
          def send(topic: String, key: K, value: V): Future[RecordMetadata] =
            producer.send(new ProducerRecord[K, V](topic, key, value))
          def send(topic: String, value: V): Future[RecordMetadata] =
            producer.send(new ProducerRecord[K, V](topic, value))
        }
         
        object KafkaSink {
          import scala.collection.JavaConversions._
          def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
            val createProducerFunc = () => {
              val producer = new KafkaProducer[K, V](config)
              sys.addShutdownHook {
                // Ensure that, on executor JVM shutdown, the Kafka producer sends
                // any buffered messages to Kafka before shutting down.
                producer.close()
              }
              producer
            }
            new KafkaSink(createProducerFunc)
          }
          def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
        }
        // 广播KafkaSink
        val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
          val kafkaProducerConfig = {
            val p = new Properties()
            p.setProperty("bootstrap.servers", Conf.brokers)
            p.setProperty("key.serializer", classOf[StringSerializer].getName)
            p.setProperty("value.serializer", classOf[StringSerializer].getName)
            p
          }
          log.warn("kafka producer init done!")
          ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
        }
        //输出到kafka
        segmentedStream.foreachRDD(rdd => {
          if (!rdd.isEmpty) {
            rdd.foreach(record => {
              kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
              // do something else
            })
          }
        })

     

    展开全文
  • foreach与foreachPartition

    千次阅读 2018-07-09 10:47:11
    概述RDD.foreachPartition/foreach这两个action的操作: 这两个action主要用于对每个partition中的iterator实行迭代的处理。通过用户传入的function对iterator进行内容的处理。foreach的操作在foreach中,传入一个...

    概述

    RDD.foreachPartition/foreach这两个action的操作:
    这两个action主要用于对每个partition中的iterator实行迭代的处理。通过用户传入的function对iterator进行内容的处理。


    foreach的操作

    在foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容,这种处理你并不知道这个iterator的foreach什么时候结束,只能是foreach的过程中,你得到一条数据,就处理一条数据。

    由下面的源码中,foreach操作是直接调迭代rdd中每一条数据进行function操作。

      /**
       * Applies a function f to all elements of this RDD.
       * 将函数应用在RDD的所有元素;
       */
      def foreach(f: T => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        //runJob job的运行
        sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    示例说明:

    val list = new ArrayBuffer()
    Rdd.foreach(record => {
      list += record
      If (list.size >= 10000) {
        list.flush....
      }
    })
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上面这段示例代码中,如果这么使用就会存在一个问题:
    迭代的最后,list的结果可能还没有达到10000条,这个时候,你在内部的处理的flush部分就不会执行,也就是迭代的最后如果没有达到10000的数据就会丢失。
    所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})。


    foreachPartition操作

    这个函数也是根据传入的function进行处理,但不同处在于,这里function的传入参数是一个partition对应数据的iterator,而不是直接使用iterator的foreach。

      /**
       * Applies a function f to each partition of this RDD.
       * 将函数f应用于该RDD的每个分区。
       */
      def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    示例代码:
    这种情况下,如果是上面foreach的示例代码中list这个片段在这个action中就能够正常的去处理。

    Val list = new ArrayBuffer
    rdd.foreachPartition(it => {
      It.foreach(r => {
    List += r
    If (list.size > 10000) flush
      })
      If (list.size > 0) flush
    })
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    总结

    最后说下这两个action的区别:

    Foreach与foreachPartition都是在每个partition中对iterator进行操作,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用;

    而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理。

    在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

      /**
       * Applies a function f to all elements of this RDD.
       * 将函数应用在RDD的所有元素;
       */
      def foreach(f: T => Unit): Unit = withScope {
        val cleanF = sc.clean(f)
        //runJob job的运行
        sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    示例说明:


    展开全文
  • 算子优化 foreachPartition

    千次阅读 2017-03-08 20:38:08
    算子优化 foreachPartition foreach的写库原理 默认的foreach的性能缺陷在哪里? 首先,对于每条数据,都要单独去调用一次function,task为每个数据,都要去执行一次function函数。 如果100万条数据...
    算子优化 foreachPartition


    foreach的写库原理


    默认的foreach的性能缺陷在哪里?


    首先,对于每条数据,都要单独去调用一次function,task为每个数据,都要去执行一次function函数。
    如果100万条数据,(一个partition),调用100万次。性能比较差。


    另外一个非常非常重要的一点
    如果每个数据,你都去创建一个数据库连接的话,那么你就得创建100万次数据库连接。
    但是要注意的是,数据库连接的创建和销毁,都是非常非常消耗性能的。虽然我们之前已经用了
    数据库连接池,只是创建了固定数量的数据库连接。


    你还是得多次通过数据库连接,往数据库(MySQL)发送一条SQL语句,然后MySQL需要去执行这条SQL语句。
    如果有100万条数据,那么就是100万次发送SQL语句。


    以上两点(数据库连接,多次发送SQL语句),都是非常消耗性能的。


    foreachPartition,在生产环境中,通常来说,都使用foreachPartition来写数据库的


    使用批处理操作(一条SQL和多组参数)
    发送一条SQL语句,发送一次
    一下子就批量插入100万条数据。


    用了foreachPartition算子之后,好处在哪里?


    1、对于我们写的function函数,就调用一次,一次传入一个partition所有的数据
    2、主要创建或者获取一个数据库连接就可以
    3、只要向数据库发送一次SQL语句和多组参数即可


    在实际生产环境中,清一色,都是使用foreachPartition操作;但是有个问题,跟mapPartitions操作一样,
    如果一个partition的数量真的特别特别大,比如真的是100万,那基本上就不太靠谱了。


    一下子进来,很有可能会发生OOM,内存溢出的问题。


    一组数据的对比:生产环境


    一个partition大概是1千条左右
    用foreach,跟用foreachPartition,性能的提升达到了2~3分钟。


    实际项目操作:
    首先JDBCHelper里面已经封装好了一次批量插入操作!
    1343行
    批量插入session detail


    唯一不一样的是我们需要ISessionDetailDAO里面去实现一个批量插入
    List<SessionDetail> sessionDetails
    展开全文
  • spark foreach与foreachPartition 详解

    万次阅读 2018-11-28 17:55:45
    spark foreach与foreachPartition 每个partition中iterator时行迭代的处理,通过用户传入的function对iterator进行内容的处理 一:foreach的操作: Foreach中,传入一个function,这个函数的传入参数就是每个...
  • Spark中foreachRDD、foreachPartition和foreach解读

    万次阅读 多人点赞 2018-11-13 18:00:02
    foreachRDD、foreachPartition和foreach的不同之处主要在于它们的作用范围不同,foreachRDD作用于DStream中每一个时间间隔的RDD,foreachPartition作用于每一个时间间隔的RDD中的每一个partition,foreach作用于每一...
  • foreach和foreachPartition的区别 一,基本使用 1,RDD分布式数据集的五大特性 1),A list of partitions(一系列的分区) 2),A function for computing eachsplit(计算每个分片的方法) 3),A list of ...
  • Spark中foreachRDD、foreachPartition和foreach解读 https://blog.csdn.net/Scapel/article/details/84030362 另外Dframe 与RDD对应关系是 一个RDD对应个计算间隔内的Dframe ... ......
  • val drive_param = "xxx" val dfs = Seq("xx").toDF("user") dfs.repartition(100).foreachPartition(par => { //drive_param 无法使用, foreachPartition 会把每个rdd 放到executor 上执行,无法读取drive...
  • 2.foreachPartition 与 mapPartitions 3. 1.Transformation与Action spark的运算操作有两种类型:分别是Transformation和Action,区别如下: Transformation:代表的是转化操作就是我们的计算流程,返回是RDD[T],...
  • Spark Streaming之妙用foreachRDD和foreachPartition
  • 在应用场景上区别是mapPartitions可以获取返回值,继续在返回RDD上做其他的操作,而foreachPartition因为没有返回值并且是action操作,所以使用它一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,或者...
  • object foreach&foreachPartition{ def main(args: Array[String]): Unit = { val config = new SparkConf().setAppName("demo").setMaster("local[*]") val sc = new SparkContext(config) sc.se...
  • foreachRDD、foreachPartition和foreach的不同之处主要在于它们的作用范围不同, foreachRDD作用于DStream中每一个时间间隔的RDD, foreachPartition作用于每一个时间间隔的RDD中的每一个partition, foreach...
  • RDD之foreach和foreachPartition方法

    千次阅读 2019-03-19 17:13:34
    而当我们不想要生成新的RDD时,我们要使用foreach或者foreachPartition方法 foreach 当执行完foreach,发现并没有打印出来结果。 这个foreach方法是一个Action方法,而且任务执行的时候是在executor端执行的,所以...
  • foreach(f)将方法用于RDD的所以元素&... sc.parallelize([1, 2, 3, 4, 5]).foreach(f)foreachPartition(f)[source]将方法用于RDD的每个分区&gt;&gt;&gt; def f(iterator):... for x in itera...
  • spark使用java代码实现foreachPartition

    千次阅读 2019-02-22 16:47:00
    如果数据太大直接用dataframe转list内存会不够,所以可以通过foreachPartition遍历读取 System.setProperty("hadoop.home.dir","h:\\hadoop2.3.7"); string mastor="local" string name="wordcount"+system....
  • 主题:RDD的foreachPartition/foreach的操作   说明:这两个action主要用于对每个partition中的iterator时行迭代的处理.通过用户传入的function对iterator进行内容的处理. 一、foreach的操作 foreach中,传入一...
  • 一、首先说一下foreach和foreachPartition的区别,类似于map和mapPartition的区别。 (一)map和mapPartition的区别 map是对RDD的每一个元素进行操作,mapPartition是对每个partition的迭代器进行操作。 ...
  • scala中的foreachPartition和mapPartition

    千次阅读 2018-11-21 22:35:48
    在最近项目中,因为由于数据量不是特别大并且内存充足,所以采用了foreachPartition代替了foreach,使用了mapPartition代替了map。下面给大家讲解一下,关于他们之间的区别。 map是spark中非常强大的一个算子,...
  • SparkStreaming-foreachrdd foreachpartition foreachrdd foreachpartition_百度搜索SparkStreaming之foreachRDD - legotime的博客 - CSDN博客spark RDD中foreachPartition和foreach说明 - balabalabala - CSDN博客...
  • RDD.foreachPartition/foreach这两个action的操作:  这两个action主要用于对每个partition中的iterator实行迭代的处理。通过用户传入的function对iterator进行内容的处理。 foreach的操作 由下面的源码中,...
  • 在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。 如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,...
  • RDD 中foreach与foreachPartition区别

    千次阅读 2017-12-14 13:44:45
    参考: http://blog.csdn.net/u013939918/article/details/60881711 ... RDD.foreachPartition/foreach的操作 在这个action的操作中: 这两个action主要用于对每一个partition中的iterat

空空如也

空空如也

1 2 3 4 5 ... 16
收藏数 309
精华内容 123
关键字:

foreachpartition