spark优化

设置合适的资源参数

spark程序跑在yarn集群上

1
2
3
4
--queue:集群队列
--num-executors:executor数量,默认2
--executor-memory:executor内存,默认512M
--executor-cores:每个executor的并发数,默认1

executor的数量可以根据任务的并发量进行估算,例如我有1000个任务,每个任务耗时1分钟,若10个并发则耗时100分钟,100个并发耗时10分钟,根据自己对并发需求进行调整即可。默认每个executor内有一个并发执行任务,一般够用,也可适当增加,当然内存的使用也会有所增加

对于yarn-client模式,整个application所申请的资源为:

1
2
total vores = executor-cores * num-executors + spark.yarn.am.cores
total memory= (executor-memory + spark.yarn.executor.memoryOverhead) * num-executors + (spark.yarn.am.memory + spark.yarn.am.memoryOverhead)

当申请的资源超出所指定的队列的min cores和min memory时,executor就有被yarn kill掉的风险。而spark的每个stage是有状态的,如果被kill掉,对性能影响比较大。例如,本例中的baseRDD被cache,如果某个executor被kill掉,会导致其上的cache的parition失效,需要重新计算,对性能影响极大

executor-memory

这里还有一点需要注意,executor-memory设置的是executor jvm启动的最大堆内存,java内存除了堆内存外,还有栈内存、堆外内存等,所以spark使用spark.yarn.executor.memoryOverhead对非堆内存进行限制,也就是说executor-memory + spark.yarn.executor.memoryOverhead是所能使用的内存的上线,如果超过此上线,就会被yarn kill掉。本次优化,堆外内存的优化起到了至关重要的作用,我们后续会看到。

spark.yarn.executor.memoryOverhead

spark.yarn.executor.memoryOverhead默认是executor-memory * 0.1,最小是384M。比如,我们的executor-memory设置为1G,spark.yarn.executor.memoryOverhead是默认的384M,则我们向yarn申请使用的最大内存为1408M,但由于yarn的限制为倍数(不知道是不是只是我们的集群是这样),实际上yarn运行我们运行的最大内存为2G。这样感觉浪费申请的内存,申请的堆内存为1G,实际上却给我们分配了2G,如果对spark.yarn.executor.memoryOverhead要求不高的话,可以对executor-memory再精细化,比如申请executor-memory为640M,加上最小384M的spark.yarn.executor.memoryOverhead,正好一共是1G。

spark.yarn.am.memory

除了启动executor外,spark还会启动一个am,可以使用spark.yarn.am.memory设置am的内存大小,默认是512M,

spark.yarn.am.memoryOverhead

spark.yarn.am.memoryOverhead默认也是最小384M。有时am会出现OOM的情况,可以适当调大spark.yarn.am.memory。

spark.executor.extraJavaOptions

executor默认的永久代内存是64K,可以看到永久代使用率长时间为99%,通过设置spark.executor.extraJavaOptions适当增大永久代内存,例如:–conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=64m”

driver-memory

driver端在yarn-client模式下运行在本地,也可以对相关参数进行配置,如–driver-memory等。

Direct Memory

我们使用的spark版本是1.5.2(更准确的说是1.5.3-shapshot),shuffle过程中block的传输使用netty(spark.shuffle.blockTransferService)。基于netty的shuffle,使用direct memory存进行buffer(spark.shuffle.io.preferDirectBufs),所以在大数据量shuffle时,堆外内存使用较多。当然,也可以使用传统的nio方式处理shuffle,但是此方式在spark 1.5版本设置为deprecated,并将会在1.6版本彻底移除,所以我最终还是采用了netty的shuffle。

jvm关于堆外内存的配置相对较少,通过-XX:MaxDirectMemorySize可以指定最大的direct memory。默认如果不设置,则与最大堆内存相同。

Direct Memory是受GC控制的,例如ByteBuffer bb = ByteBuffer.allocateDirect(1024),这段代码的执行会在堆外占用1k的内存,Java堆内只会占用一个对象的指针引用的大小,堆外的这1k的空间只有当bb对象被回收时,才会被回收,这里会发现一个明显的不对称现象,就是堆外可能占用了很多,而堆内没占用多少,导致还没触发GC。加上-XX:MaxDirectMemorySize这个大小限制后,那么只要Direct Memory使用到达了这个大小,就会强制触发GC,这个大小如果设置的不够用,那么在日志中会看到java.lang.OutOfMemoryError: Direct buffer memory。

例如,在我们的例子中,发现堆外内存飙升的比较快,很容易被yarn kill掉,所以应适当调小-XX:MaxDirectMemorySize(也不能过小,否则会报Direct buffer memory异常)。当然你也可以调大spark.yarn.executor.memoryOverhead,加大yarn对我们使用内存的宽容度,但是这样比较浪费资源了。

GC优化

GC优化前,最好是把gc日志打出来,便于我们进行调试。

1
--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

通过看gc日志,我们发现一个case,特定时间段内,堆内存其实很闲,堆内存使用率也就5%左右,长时间不进行父gc,导致Direct Memory一直不进行回收,一直在飙升。所以,我们的目标是让父gc更频繁些,多触发一些Direct Memory回收。

第一,可以减少整个堆内存的大小,当然也不能太小,否则堆内存也会报OOM。这里,我配置了1G的最大堆内存。

第二,可以让年轻代的对象尽快进入年老代,增加年老代的内存。这里我使用了-Xmn100m,将年轻代大小设置为100M。另外,年轻代的对象默认会在young gc 15次后进入年老代,这会造成年轻代使用率比较大,young gc比较多,但是年老代使用率低,full gc比较少,通过配置-XX:MaxTenuringThreshold=1,年轻代的对象经过一次young gc后就进入年老代,加快年老代fullgc的频率

第三,可以让年老代更频繁的进行fullgc。一般年老代gc策略我们主要有-XX:+UseParallelOldGC和-XX:+UseConcMarkSweepGC这两种,ParallelOldGC吞吐率较大,ConcMarkSweepGC延迟较低。我们希望fullgc频繁些,对吞吐率要求较低,而且ConcMarkSweepGC可以设置-XX:CMSInitiatingOccupancyFraction,即年老代内存使用率达到什么比例时触发CMS。我们决定使用CMS,并设置-XX:CMSInitiatingOccupancyFraction=10,即年老代使用率10%时触发fullgc

通过对GC策略的配置,我们发现fullgc进行的频率加快了,带来好处就是Direct Memory能够尽快进行回收,当然也有坏处,就是gc时间增加了,cpu使用率也有所增加。

最终我们对executor的配置如下:

1
--executor-memory 1G --num-executors 160 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.executor.extraJavaOptions="-XX:MaxPermSize=64m -XX:+CMSClassUnloadingEnabled -XX:MaxDirectMemorySize=1536m -Xmn100m -XX:MaxTenuringThreshold=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10 -XX:+UseCompressedOops -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError"