rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值

Posted

技术标签:

【中文标题】rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值【英文标题】:rdd.mapPartitions to return a Boolean from udf in Spark Scala 【发布时间】:2019-03-28 17:58:41 【问题描述】:

我将 Scala 2.11 与 Spark 2.1 一起使用

我有一个 MutableList[String] 定义为变量 objectKeys

我正在尝试按如下方式使用 Spark 并行化:

val numPartitioning = 10
val rdd = sc.parallelize(objectKeys, numPartitioning);
val x = rdd.mapPartitions(read_files_from_list(objectKeys))


def read_files_from_list(keys:MutableList[String]): Boolean = 
  // my logic to iterate over keys
  if success
     return true;
  else 
     return false;

但是我收到错误类型不匹配; found : Boolean required: Iterator[String] ⇒ Iterator[?] 在涉及默认参数的应用程序中发生错误。

要让我的 udf 'read_files_from_list' 接受 MutableList[String] 并返回布尔值,我需要做哪些更改

【问题讨论】:

Scaladoc mapPartitions 需要一个从Iterable[T]Iterable[U] 的函数(其中T 是输入元素的类型RDD U 是输出元素的类型 RDD) - 鉴于此,您希望如何在那里使用您的函数?,您真正想要做什么? (这似乎是一个 XY 问题). 我将在 objectKeys 变量中有 s3 文件路径。我想一一阅读并转换为orc文件格式。我愿意更改我的 UDF-read_files_from_list 的返回类型 这里有很多东西,为什么列表必须是可变的?为什么返回布尔值? (完成执行的功能应该足够了)。看来您只需要foreach 即可对RDD 的每个元素执行转换函数。但是,仅使用 Spark 来并行化一个简单的 foreach 到一个足够小以适合本地内存的列表,这对我来说似乎有点矫枉过正。也许,您真的想从所有 S3 路径创建一个大的 RDD 并将该 RDD 保存为分区的 ORC 文件?还是逐个文件?这会更有意义,但您可以改为查看 Glue 我需要逐个文件迭代它们并生成一个orc文件。我可以在 UDF 中编写我的逻辑,但是如何解决上述错误? 请仔细阅读我之前的评论。如果您需要逐个文件执行此操作,并且 List / RDD 的每个元素都是一个路径。然后,只需使用您的函数在 RDD 上调用 foreach。如果您不是真正映射分区,为什么要调用mapPartitions。但同样,使用 Spark 在小型数据集上并行化简单的foreach 是没有意义的。如果您使用 Spark 进行转换,那是可以的,但只需在初始列表上调用 foreach,如果您想并行执行多个 Spark 任务,已经有回答了这个问题。 【参考方案1】:

mapPartitions 需要一个迭代器到迭代器的转换。您返回一个常量值 true/false 作为布尔值。

这里怎么写函数

def read_files_from_list(keys:Iterator[String]): Iterator[Boolean] = keys.map( key => 
  // my logic to iterate over keys
  if success
     return true;
  else 
     return false;
)

【讨论】:

上述解决方案中唯一缺少的是返回类型是 Iterator[Boolean] 而我们返回的是布尔值。我对此进行了相应的更改,并且似乎正在工作。谢谢@deo!

以上是关于rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值的主要内容,如果未能解决你的问题,请参考以下文章

将rdd转换为数据框时,pyspark对mapPartitions使用一项任务

如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row

spark中的广播变量broadcast

Spark 将任务分发给多个执行器

使用toLocalIterator后似乎为空。

toLocalIterator 使用后似乎为空