/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
/* * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),//会将分区索引index传入进来 preservesPartitioning)
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] private var cur: Iterator[B] = empty def hasNext: Boolean = cur.hasNext || self.hasNext && cur = f(self.next).toIterator; hasNext def next(): B = (if (hasNext) cur else empty).next()
21.filter
/** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = withScope val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true)
filter是针对RDD的每个元素利用函数f过滤,返回过滤后的结果:
/** Returns an iterator over all the elements of this iterator that satisfy the predicate `p`. * The order of the elements is preserved. * * @param p the predicate used to test values. * @return an iterator which produces those values of this iterator which satisfy the predicate `p`. * @note Reuse: $consumesAndProducesIterator */ def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] private var hd: A = _ private var hdDefined: Boolean = false
def hasNext: Boolean = hdDefined || //利用函数p过滤记录,直到找到符合条件的记录为止 do if (!self.hasNext) return false hd = self.next() while (!p(hd)) hdDefined = true true
def next() = if (hasNext) hdDefined = false; hd else empty.next()