在 DataFrame 的每个分区内读取、转换和写入数据
Posted
技术标签:
【中文标题】在 DataFrame 的每个分区内读取、转换和写入数据【英文标题】:Read, transform and write data within each partition in the DataFrame 【发布时间】:2019-05-10 10:30:36 【问题描述】:语言 - Scala
Spark 版本 - 2.4
我是 Scala 和 Spark 的新手。 (我是python背景,所以整个JVM生态对我来说还是很陌生的)
我想编写一个火花程序来并行化以下步骤:
从数据帧中的 S3 读取数据
转换此数据帧的每一行
将更新后的数据帧写回 S3 的新位置
假设我有 3 个项目,A、B 和 C。对于这些项目中的每一个,我想要执行上述 3 个步骤。
我想对所有这 3 个项目并行执行此操作。
我尝试创建一个有 3 个分区的 RDD,每个分区有一个项目,分别是 A、B 和 C。
然后我尝试使用mapPartition
方法为每个分区编写我的逻辑(上面提到的3个步骤)。
我收到Task not serializable
错误。虽然明白这个错误的意思,但是不知道怎么解决。
val items = Array[String]("A", "B", "C")
val rdd = sc.parallelize(items, 3)
rdd.mapPartitions(
partition =>
val item = partition.next()
val filePath = new ListBuffer[String]()
filePath += s"$basePath/item=$item/*"
val df = spark.read.format("parquet").option("basePath",s"$basePath").schema(schema).load(filePaths: _*)
//Transform this dataframe
val newDF = df.rdd.mapPartitions(partition => partition.map(row =>methodToTransformAndReturnRow(row)))
newDf.write.mode(SaveMode.Overwrite).parquet(path)
)
我的用例是,对于每个项目,从 S3 读取数据,对其进行转换(我将新列直接添加到我们的用例的每一行),然后将每个项目的最终结果写回 S3。
注意 - 我可以读取整个数据,按项目重新分区,转换并将其写回,但重新分区会导致随机播放,我试图避免这种情况,我尝试的方式是,读取 executor 本身中每个项目的数据,以便它可以处理它获取的任何数据,并且不需要 shuffle。
【问题讨论】:
Spark 并不是一个真正的开箱即用的框架。我建议您花一些时间阅读一些教程,以基本了解如何使用 Spark,因为 - 如下面的答案所述 - 您正在做的事情非常不是如何做事在 Spark 中并提供完整的教程有点超出 *** 的范围...... @GlennieHellesSindholt 我明白你的意思,我对 Spark 也有相当多的了解。如果您能分享您对如何解决此问题的想法,我们将不胜感激。我知道我可以提取所有数据,按项目重新分区,然后编写转换并将最终结果写回。但这里的问题是,重新分区会导致洗牌,这是我想避免的。因此我的想法是,如果我可以在执行程序中读取分区本身,那么就不需要洗牌。 恐怕您在这里建议的解决方案表明对 Spark 的理解非常差。在 Spark 中嵌套数据帧很简单不可能。时期。另外,如果您担心性能,为什么还要考虑 IO 密集型选项? IO在性能方面永远是最差的选择。当然,如果仅通过转换无法实现目标,则应仅使用 shuffle 步骤,但 Spark 旨在有效地处理 shuffle。 感谢@GlennieHellesSindholt 的澄清。我知道我对 Spark 的理解很差,这就是我在这里询问解决方案的原因:)。嵌套是不可能的,那么我认为我的想法本身是不正确的。那我就用简单的方法继续前进。再次感谢! 【参考方案1】:我不确定你想用你展示的方法实现什么,但我觉得你可能会以困难的方式。除非有充分的理由这样做,否则最好让 Spark(尤其是 spark 2.0+)让它自己做。在这种情况下,只需使用单个操作处理所有三个分区。 Spark 通常会很好地管理您的数据集。它还可能会自动引入您没有想到的优化,或者如果您尝试过多地控制过程,它就无法进行的优化。话虽如此,如果它不能很好地管理流程,那么您可以通过尝试获得更多控制权并更多手动执行操作来开始与它争论。至少这是我目前的经验。
例如,我曾经进行过一系列复杂的转换,为每个步骤/DataFrame 添加了更多逻辑。如果我强迫 spark 评估每个中间数据帧(例如在中间数据帧上运行计数或显示),我最终会因为不足而无法评估一个数据帧(即它无法进行计数)资源。但是,如果我忽略了这一点并添加了更多转换,Spark 能够将一些优化推送到较早的步骤(来自较晚的步骤)。这意味着可以正确评估后续的 DataFrame(以及重要的是我的最终 DataFrame)。 尽管中间 DataFrame 本身无法评估,但仍处于整个过程中,但最终评估是可能的。
考虑以下问题。我使用 CSV,但对于镶木地板也一样。
这是我的输入:
data
├── tag=A
│ └── data.csv
├── tag=B
│ └── data.csv
└── tag=C
└── data.csv
这是其中一个数据文件的示例 (tag=A/data.csv)
id,name,amount
1,Fred,100
2,Jane,200
这是一个识别此结构中的分区的脚本(即标签是列之一)。
scala> val inDataDF = spark.read.option("header","true").option("inferSchema","true").csv("data")
inDataDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> inDataDF.show
+---+-------+------+---+
| id| name|amount|tag|
+---+-------+------+---+
| 31| Scott| 3100| C|
| 32|Barnaby| 3200| C|
| 20| Bill| 2000| B|
| 21| Julia| 2100| B|
| 1| Fred| 100| A|
| 2| Jane| 200| A|
+---+-------+------+---+
scala> inDataDF.printSchema
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- amount: integer (nullable = true)
|-- tag: string (nullable = true)
scala> inDataDF.write.partitionBy("tag").csv("outData")
scala>
我再次使用 csv 而不是 parquet,因此您可以省去读取标题和推断架构的选项(parquet 会自动执行此操作)。但除此之外,它的工作方式相同。
以上产生以下目录结构:
outData/
├── _SUCCESS
├── tag=A
│ └── part-00002-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
├── tag=B
│ └── part-00001-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
└── tag=C
└── part-00000-9e13ec13-7c63-4cda-b5af-e2d69cb76278.c000.csv
如果您想操作内容,请务必在读取和写入之间添加任何映射操作、连接、过滤或其他任何您需要的内容。
例如,金额加500:
scala> val outDataDF = inDataDF.withColumn("amount", $"amount" + 500)
outDataDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> outDataDF.show(false)
+---+-------+------+---+
|id |name |amount|tag|
+---+-------+------+---+
|31 |Scott |3600 |C |
|32 |Barnaby|3700 |C |
|20 |Bill |2500 |B |
|21 |Julia |2600 |B |
|1 |Fred |600 |A |
|2 |Jane |700 |A |
+---+-------+------+---+
然后简单地写 outDataDF 而不是 inDataDF。
【讨论】:
困难的方式当然是无关紧要的。这种方法是不可行的。 @GMc 我知道您正在尝试这样做,我自己也尝试过。这是最简单的解决方案,读取、重新分区、转换、写回。但是这里的问题是,当我读取所有项目的数据然后我进行重新分区时,它会导致洗牌,这是非常昂贵的。为了避免这个问题,我的想法是,如果我可以从 executor 本身读取每个分区,那么就不需要重新分区,shuffle 就消失了。简单来说,每个 executor 读取一些数据,对其进行转换并将其写回,根本没有任何 shuffle。这在您提供的简单解决方案中是不可能的。 我没有看到随机播放。我看到一个 FileScan 和 2 个 MapPartitions(一个在读,一个在写)。 当我读取所有项目的数据并在 Spark 中按项目进行重新分区时,它会导致随机播放,对吗? 我想答案取决于它。就我而言,我没有看到它。虽然洗牌可能是一个相对昂贵的步骤,但不要像瘟疫一样试图避免它。可能需要随机播放来启用或优化后续步骤。 Spark 不会(不应该?)进行洗牌,除非它认为有充分的理由这样做。连接是一个很好的例子——在这种情况下,它应该打乱较小的数据集以与较大的数据集对齐。在这种情况下,您基本上是在读取和写入相同的数据集。所以除非它严重歪斜,否则不应该有洗牌的理由。以上是关于在 DataFrame 的每个分区内读取、转换和写入数据的主要内容,如果未能解决你的问题,请参考以下文章
使用 dask.dataframe 从 CSV 文件中按分区读取尾部
在分区 Spark DataFrame 中使用多列是不是会使读取速度变慢?