1.设置合理的消费并行度
最优的方案是:kafka分区数:broker *3/6/9
kafka分区能不能增加,能不能减少?
kafka分区数是可以增加的,但是不能减少
2.序列化
java的序列化,很沉重,会序列化好多无关的,耗时特别长
我们大数据生产中使用Kryo序列化
sparkConf.registerKryoClasses(
Array(
classOf[OrderInfo],
classOf[Opt_alliance_business],
classOf[DriverInfo],
classOf[RegisterUsers]
)
)
3.限流,压背
1.限流使用场景
当数据处理的时间⼤于batch的间隔时间,即batch processing time > batch interval时,就会导致
Executor端的Receiver的数据产⽣堆积,极端的情况下会导致OOM异常,因此,在Spark
Streaming1.5之前可以通过参数来调整每秒处理的数据量(通过SparkConf可以直接设置):
spark.streaming.receiver.maxRate
在Driect⽅式下,可以通过下⾯参数来设置:
spark.streaming.kafka.maxRatePerPartition
这个参数可以限制作业每次从kafka的分区中最多读取的记录数
从上⾯分析来看,似乎可以通过参数来解决数据流速度的问题,那么问题来了,如果我们升级集群或
者扩展机器后,集群的吞吐量提⾼了很多,我们就需要⼿动去调整参数以避免浪费资源,有没有⼀种
⾃动可以调节的⽅式呢?
在Spark Streaming1.5之后,引⼊了压背的机制,可以动态的⾃动调整数据处理的速率
sparkConf.set("spark.streaming.backpressure.initialRate","500")
sparkConf.set("spark.streaming.backpressure.enabled","true")
实现压背限流主要是依赖这三个类:
RateController:实现计算当前用什么样的速率来运行
rateEstimator:
任何实现流程序在spark源码里面都会调用这个StreamingListener特质,而这个特质StreamingListener里面给我提供了一系列的回调函数,当执行完成后调回调函数,回调函数内部实现压背.




processingEndTime:当前批次处理的结束时间
processingDelay:当前批次处理的所耗时长
schedulingDelay:processingStartTime- submissionTime,程序开始时间-提交时间=调度时间
numRecords:当前批次接收的数据
newRate:代表新的处理速率


做一个控制,当前批次处理的时间>上一个批次处理的时间,并且批次的数量大于零,当前批次处理消耗时间也是大于零的
本批次处理结束时间-上一批次处理结束时间=两个批次处理完的时间间隔

package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.internal.Logging
private[streaming] class PIDRateEstimator(
batchIntervalMillis: Long,
proportional: Double,
integral: Double,
derivative: Double,
minRate: Double
) extends RateEstimator with Logging {
private var firstRun: Boolean = true
private var latestTime: Long = -1L
private var latestRate: Double = -1D
private var latestError: Double = -1L
require(
batchIntervalMillis > 0,
s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.")
require(
proportional >= 0,
s"Proportional term $proportional in PIDRateEstimator should be >= 0.")
require(
integral >= 0,
s"Integral term $integral in PIDRateEstimator should be >= 0.")
require(
derivative >= 0,
s"Derivative term $derivative in PIDRateEstimator should be >= 0.")
require(
minRate > 0,
s"Minimum rate in PIDRateEstimator should be > 0")
logInfo(s"Created PIDRateEstimator with proportional = $proportional, integral = $integral, " +
s"derivative = $derivative, min rate = $minRate")
def compute(
time: Long,
numElements: Long,
processingDelay: Long,
schedulingDelay: Long
): Option[Double] = {
logTrace(s"\ntime = $time, # records = $numElements, " +
s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {
if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
logTrace(s"""
| latestRate = $latestRate, error = $error
| latestError = $latestError, historicalError = $historicalError
| delaySinceUpdate = $delaySinceUpdate, dError = $dError
""".stripMargin)
latestTime = time
if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
logTrace("First run, rate estimation skipped")
None
} else {
latestRate = newRate
latestError = error
logTrace(s"New rate = $newRate")
Some(newRate)
}
} else {
logTrace("Rate estimation skipped")
None
}
}
}
}
4.cpu空转时间
流任务–>task,如果task没拿到数据,这个task他依然是运行的,这个task就是空转的,即便是空转他依旧要消耗资源,因为他要进行序列化,反序列化,压缩,等等这些操作.
spark.locality.wait=1
默认是3毫秒的cpu等待时间,我们通过修改可以变为1毫秒,减少他空转时间从而降低资源的浪费
5.不要在代码中判断这个表是否存在
当我们要插入一批数据时,表一定是存在,不存在我们差个什么意思,对吧!
当你插入数据前做一层控制判断表是否存在,这个操作是及其浪费时间的,大概需要耗时1s左右
所以,我们在插入前就要提前创建好表,在生产中我们创建表全部用命令行的方式进行创建,因为用代码进行创建极其浪费时间
命令行方式创建表:
create "order_info",{NAME=>'MM',COMPRESSION=>'SNAPPY',SPLITS=>['0000|','0001|','0002|','0003|','0004|','0005|','0006|','0007|']}
6.推测执行
流 -run()起来后会出现
task是失败状态现象---->就会造成各种重试
pedding状态的task(等待状态的task)—>task就会被阻塞
总结:因为你这一个task的失败,导致程序不断重试,其余的task被阻塞在哪里,影响效率
为了解决上述的问题这个时候我们就要开启推测执行,
sparkConf.set("spark.speculation",true)
.set("spark.sqeculation.interval","300")
.set("saprk.speculation.quantile","0.9")
为什么90%?因为一批task往往失败的只有那一两个
7.开启动态资源分配(主要是针对SparkSQL,而SparkStreaming是不能开启的,因为他会出现数据丢失现象)
1.为什么要开启动态资源分配?
⽤户提交Spark应⽤到Yarn上时,可以通过spark-submit的num-executors参数显示地指定executor
个数,随后,ApplicationMaster会为这些executor申请资源,每个executor作为⼀个Container在
Yarn上运⾏。Spark调度器会把Task按照合适的策略分配到executor上执⾏。所有任务执⾏完后,
executor被杀死,应⽤结束。在job运⾏的过程中,⽆论executor是否领取到任务,都会⼀直占有着
资源不释放。很显然,这在任务量⼩且显示指定⼤量executor的情况下会很容易造成资源浪费
你开了50个executor 但是实际运行的只有30个,就会产生20个空转的现象,但是当我们开启了动态资源分配后,你依然是提交了50个executor,运行依然是30个,这样子下来依旧是闲置了20个,但是没关系他会在15s之后判定闲置executor,对其进行回收
provider类路径