使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe
Posted
技术标签:
【中文标题】使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe【英文标题】:Cleaning CSV/Dataframe of size ~40GB using Spark and Scala 【发布时间】:2020-01-13 22:18:03 【问题描述】:我是大数据世界的新手。我有一个初始 CSV,它的数据大小约为 40GB,但顺序不同。我的意思是,如果您看到初始 CSV,对于 Jenny 来说,没有年龄,因此性别列的值会转移到年龄,而剩余的列值会一直转移到行中的最后一个元素。
我想在 Scala 中使用带有 Spark 的数据帧来清理/处理这个 CVS。我使用 withColumn() API 尝试了很多解决方案,但没有任何对我有用。
如果有人可以建议我某种可用的逻辑或 API,它们可以以更简洁的方式解决这个问题。我可能不需要适当的解决方案,但指针也可以。非常感谢您的帮助!
初始 CSV/数据帧
必需的 CSV/数据框
编辑:
这就是我读取数据的方式:
val spark = SparkSession .builder .appName("SparkSQL")
.master("local[*]") .config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
import spark.implicits._
val df = spark.read.option("header", true").csv("path/to/csv.csv")
【问题讨论】:
只有在数据出现问题时才会出现这种情况。如果您使用 Spark API 读取 CSV 文件,它会根据逗号(,)截断列。行中的多余逗号很少,这就是它无法正常工作的原因。 @Nasruddin 尝试将其读取为 val df = spark.read.format("csv").option("header", true").load("path/to/csv.csv" ) 如果仍然无法正常工作,则数据有问题。数据格式不正确。在这种情况下,您必须检查是否有多余的逗号,然后需要替换。 @Nasruddin 在读取文件时使用 option("escape",",") 如果数据以逗号分隔,它将帮助您解决问题 【参考方案1】:这看起来很像数据有缺陷。为了解决这个问题,我建议将 csv 文件的每一行作为单个字符串读取,并应用 map() 函数来处理数据
case class myClass(name: String, age: Integer, sex: String, siblings: Integer)
val myNewDf = myDf.map(row =>
val myRow: String = row.getAs[String]("MY_SINGLE_COLUMN")
val myRowValues = myRow.split(",")
if (4 == myRowValues.size())
//everything as expected
return myClass(myRowValues[0], myRowValues[1], myRowValues[2], myRowValues[3])
else
//do foo to guess missing values
【讨论】:
【参考方案2】:在您的情况下,数据格式不正确。要处理第一个数据,必须清理,即 CSV 的所有行都应该具有相同的架构或相同的分隔符/列数。
在 spark 中执行此操作的基本方法可能是:
-
以文本形式加载数据
在加载的 DF/DS 上应用地图操作来清理它
手动创建架构
在清理后的 DF/DS 上应用架构
示例代码
//Sample CSV
John,28,M,3
Jenny,M,3
//Sample Code
val schema = StructType(
List(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("sex", StringType, nullable = true),
StructField("sib", IntegerType, nullable = true)
)
)
import spark.implicits._
val rawdf = spark.read.text("test.csv")
rawdf.show(10)
val rdd = rawdf.map(row =>
val raw = row.getAs[String]("value")
//TODO: Data cleansing has to be done.
val values = raw.split(",")
if (values.length != 4)
s"$values(0),,$values(1),$values(2)"
else
raw
)
val df = spark.read.schema(schema).csv(rdd)
df.show(10)
【讨论】:
【参考方案3】:您可以尝试使用 age
的可选字段定义案例类,并将带有架构的 csv 直接加载到数据集中。
类似的东西:
import org.apache.spark.sql.Encoders
import sparkSession.implicits._
case class Person(name: String, age: Option[Int], sex: String, siblings: Int)
val schema = Encoders.product[Person].schema
val dfInput = sparkSession.read
.format("csv")
.schema(schema)
.option("header", "true")
.load("path/to/csv.csv")
.as[Person]
【讨论】:
以上是关于使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe的主要内容,如果未能解决你的问题,请参考以下文章
在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]