本文假定读者看过Matei Zaharia的论文http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf,熟悉spark术语并且有一定的spark使用经验。
RDD(Resilient Distributed Dataset),是分布的只读数据集合。它是Spark中对数据集合的一种抽象,Spark对其定义了一系列的操作(map, filter, join等)。
以下将列出在spark源代码RDD类中,比较重要的数据及其操作。逻辑操作(map, reduce, join等)将会在下个专题中分析。
构造函数: 其中sc为Spark运行环境, deps为RDD的依赖。下文会详细说明。
abstract class RDD[T: ClassTag](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging { ... }
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
sc: SparkContext的简称,表示这个RDD运行的Spark环境。
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
id: 在sc环境下这个RDD所拥有的唯一标识符。
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
partitions: 代表RDD中部分数据的单元为partition。一个RDD由多个Partition组成,这些所有partition构成了RDD中的所有数据。checkpoint会在下文中说明。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]
@transient private var partitions_ : Array[Partition] = null
/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
}
partitions_
}
}
partitioner: 一个RDD需要以特定的映射方式将数据分散到不同地方去,表示这个映射方式的正是partitioner。有两种Partitioner,HashPartitioner(默认)和RangePartitioner。
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
dependencies: 一个RDD通常由另外一个或多个RDD生成而来,会产生的映射。在Spark中,dependencies正是表示这个产生这个映射的元组。
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
private var dependencies_ : Seq[Dependency[_]] = null
/**
* Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}
storageLevel: 表示这个RDD的持久化方式,有不存储、存储到内存、存储到硬盘以及内存硬盘混合存储方式。默认情况下,数据是不进行存储的。persist()操作能改变这个RDD的持久化方式,cache()将这个RDD持久化到内存中,unpersist()能撤销这个RDD的持久化。
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
*
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
checkpoint: RDD可以通过调用checkpoint()的方法,将数据保存在节点硬盘或HDFS中,以应对数据丢失的情况。调用后,因为这个RDD已被存储在永久存储介质中,当需要这个RDD时并不需要重新进行计算,因此会将该RDD的dependencies移除。
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with SparkContext.setCheckpointDir() and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() {
if (context.checkpointDir.isEmpty) {
throw new Exception("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new RDDCheckpointData(this))
checkpointData.get.markForCheckpoint()
}
}
/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
/**
* Return whether this RDD has been checkpointed or not
*/
def isCheckpointed: Boolean = {
checkpointData.map(_.isCheckpointed).getOrElse(false)
}
/**
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile: Option[String] = {
checkpointData.flatMap(_.getCheckpointFile)
}
compute:获得这个RDD的某个partition的数据迭代器,split代表某个partition。
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
getPreferredLocations: 获得某个partition的偏好存储位置。例如对于操作HDFS块的RDD,这个操作将会返回块的存储位置。
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil