本文假定读者看过Matei Zaharia的论文http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf,熟悉spark术语并且有一定的spark使用经验。

RDD(Resilient Distributed Dataset),是分布的只读数据集合。它是Spark中对数据集合的一种抽象,Spark对其定义了一系列的操作(map, filter, join等)。

以下将列出在spark源代码RDD类中,比较重要的数据及其操作。逻辑操作(map, reduce, join等)将会在下个专题中分析。


构造函数: 其中sc为Spark运行环境, deps为RDD的依赖。下文会详细说明。

abstract class RDD[T: ClassTag](
    @transient private var sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging { ... }

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))

sc: SparkContext的简称,表示这个RDD运行的Spark环境。

/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc

id: 在sc环境下这个RDD所拥有的唯一标识符。

/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()

partitions: 代表RDD中部分数据的单元为partition。一个RDD由多个Partition组成,这些所有partition构成了RDD中的所有数据。checkpoint会在下文中说明。

/**
 * Implemented by subclasses to return the set of partitions in this RDD. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getPartitions: Array[Partition]
    
@transient private var partitions_ : Array[Partition] = null
/**
 * Get the array of partitions of this RDD, taking into account whether the
 * RDD is checkpointed or not.
 */
 final def partitions: Array[Partition] = {
     checkpointRDD.map(_.partitions).getOrElse {
         if (partitions_ == null) {
         partitions_ = getPartitions
         }
         partitions_
     }
 }

partitioner: 一个RDD需要以特定的映射方式将数据分散到不同地方去,表示这个映射方式的正是partitioner。有两种Partitioner,HashPartitioner(默认)和RangePartitioner。

/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None

dependencies: 一个RDD通常由另外一个或多个RDD生成而来,会产生的映射。在Spark中,dependencies正是表示这个产生这个映射的元组

/**
 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getDependencies: Seq[Dependency[_]] = deps

private var dependencies_ : Seq[Dependency[_]] = null

/**
 * Get the list of dependencies of this RDD, taking into account whether the
 * RDD is checkpointed or not.
 */
final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
        if (dependencies_ == null) {
            dependencies_ = getDependencies
        }
        dependencies_
    }
}

storageLevel: 表示这个RDD的持久化方式,有不存储、存储到内存、存储到硬盘以及内存硬盘混合存储方式。默认情况下,数据是不进行存储的。persist()操作能改变这个RDD的持久化方式,cache()将这个RDD持久化到内存中,unpersist()能撤销这个RDD的持久化。

private var storageLevel: StorageLevel = StorageLevel.NONE

/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet..
 */
def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
        throw new UnsupportedOperationException(
                "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()


/**
 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
 *
 * @param blocking Whether to block until all blocks are deleted.
 * @return This RDD.
 */
def unpersist(blocking: Boolean = true): this.type = {
    logInfo("Removing RDD " + id + " from persistence list")
    sc.unpersistRDD(id, blocking)
    storageLevel = StorageLevel.NONE
    this
}

checkpoint: RDD可以通过调用checkpoint()的方法,将数据保存在节点硬盘或HDFS中,以应对数据丢失的情况。调用后,因为这个RDD已被存储在永久存储介质中,当需要这个RDD时并不需要重新进行计算,因此会将该RDD的dependencies移除。

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with SparkContext.setCheckpointDir() and all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint() {
    if (context.checkpointDir.isEmpty) {
        throw new Exception("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
        checkpointData = Some(new RDDCheckpointData(this))
        checkpointData.get.markForCheckpoint()
    }
}

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)

/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

/**
 * Return whether this RDD has been checkpointed or not
 */
def isCheckpointed: Boolean = {
    checkpointData.map(_.isCheckpointed).getOrElse(false)
}

/**
 * Gets the name of the file to which this RDD was checkpointed
 */
def getCheckpointFile: Option[String] = {
    checkpointData.flatMap(_.getCheckpointFile)
}

compute:获得这个RDD的某个partition的数据迭代器,split代表某个partition。

/**
 * :: DeveloperApi ::
 * Implemented by subclasses to compute a given partition.
 */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}

/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

getPreferredLocations: 获得某个partition的偏好存储位置。例如对于操作HDFS块的RDD,这个操作将会返回块的存储位置。

/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil