RDD

特点

Resillient Distributed Dataset,即弹性分布式数据集
RDD的内部属性
通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些信息可以支持更复杂的算法或优化。
1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。
2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需要进行的用户自定义函数运算。
3)对父RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
4)可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce。
5)可选:每一个分片的优先计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置。(存储的是一个表,可以将处理的分区“本地化”)

分区— partitions
计算函数— computer(p,context)
依赖— dependencies()
分区策略(Pair RDD)– partitioner()
本地性策略— preferredLocations(p)

1
2
3
4
5
6
7
8
9
10
//只计算一次 
protected def getPartitions: Array[Partition]
//对一个分片进行计算,得出一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只计算一次,计算RDD对父RDD的依赖
protected def getDependencies: Seq[Dependency[_]] = deps
//可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
@transient val partitioner: Option[Partitioner] = None
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

1)RDD的特点
1)创建:只能通过转换 ( transformation ,如map/filter/groupBy/join 等,区别于动作 action) 从两种数据源中创建 RDD 1 )稳定存储中的数据; 2 )其他 RDD。
2)只读:状态不可变,不能修改。
3)分区:支持使 RDD 中的元素根据那个 key 来分区 ( partitioning ) ,保存到多个结点上。还原时只会重新计算丢失分区的数据,而不会影响整个系统。
4)路径:在 RDD 中叫世族或血统 ( lineage ) ,即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的。
5)持久化:支持将会被重用的 RDD 缓存 ( 如 in-memory 或溢出到磁盘 )。
6)延迟计算: Spark 也会延迟计算 RDD ,使其能够将转换管道化 (pipeline transformation)。
7)操作:丰富的转换(transformation)和动作 ( action ) , count/reduce/collect/save 等。
执行了多少次transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。
https://blog.csdn.net/guohecang/article/details/51736572

本地性策略

一直在用spark但是没有考虑过优化的看过来。
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
Spark中的数据本地性有三种:

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;
计算数据的task由executor执行,数据在executor的BlockManager中;性能最好
NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,
就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输
NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分,比如从数据库中获取数据
RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输
ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中

如何配置Locality呢?可以统一采用spark.locality.wait来设置(例如设置5000ms)。当然可以分别设置spark.locality.wait.process、spark.locality.wait.node、spark.locality.wait.rack等;一般的具体设置是Locality优先级越高则可以设置越高的等待超时时间。

1
2
new SparkConf()
.set("spark.locality.wait", "10")

如果数据是PROCESS_LOCAL,但是此时并没有空闲的Core来运行我们的Task,此时Task就要等待,例如等待3000ms,3000ms内如果能够运行待运行的Task则直接运行,如果超过了3000ms,此时数据本地性就要退而求其次采用NODE_LOCAL的方式。同样的道理,NODE_LOCAL也会有等待的超时时间,以此类推…
对于ANY的情况,默认情况状态下性能会非常低下,此时强烈建议使用Tachyon,例如在百度云上,为了确保计算速度,就在计算集群和存储集群之间加入了Tachyon,通过Tachyon来从远程抓取数据,而Spark基于Tachyon来进行计算,这就更好的满足了数据本地性。

数据本地性任务分配的源码在 taskSetManager.scala 。
https://blog.csdn.net/oitebody/article/details/80137721
数据本地性的副作用
https://www.jianshu.com/p/a1d0824053d8

参考:
https://www.cnblogs.com/yourarebest/p/5122372.html
https://www.cnblogs.com/jackie2016/p/5643100.html
http://blog.sina.com.cn/s/blog_9ca9623b0102w8pc.html
https://bit1129.iteye.com/blog/2186084
https://www.jianshu.com/p/a1d0824053d8
https://blog.csdn.net/u013939918/article/details/60897191