使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe

Posted

技术标签:

【中文标题】使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe【英文标题】:Cleaning CSV/Dataframe of size ~40GB using Spark and Scala 【发布时间】:2020-01-13 22:18:03 【问题描述】:

我是大数据世界的新手。我有一个初始 CSV,它的数据大小约为 40GB,但顺序不同。我的意思是,如果您看到初始 CSV,对于 Jenny 来说,没有年龄,因此性别列的值会转移到年龄,而剩余的列值会一直转移到行中的最后一个元素。

我想在 Scala 中使用带有 Spark 的数据帧来清理/处理这个 CVS。我使用 withColumn() API 尝试了很多解决方案,但没有任何对我有用。

如果有人可以建议我某种可用的逻辑或 API,它们可以以更简洁的方式解决这个问题。我可能不需要适当的解决方案,但指针也可以。非常感谢您的帮助!

初始 CSV/数据帧

必需的 CSV/数据框

编辑:

这就是我读取数据的方式:

val spark = SparkSession .builder .appName("SparkSQL")
  .master("local[*]") .config("spark.sql.warehouse.dir", "file:///C:/temp") 
  .getOrCreate()

import spark.implicits._
val df = spark.read.option("header", true").csv("path/to/csv.csv")

【问题讨论】:

只有在数据出现问题时才会出现这种情况。如果您使用 Spark API 读取 CSV 文件,它会根据逗号(,)截断列。行中的多余逗号很少,这就是它无法正常工作的原因。 @Nasruddin 尝试将其读取为 val df = spark.read.format("csv").option("header", true").load("path/to/csv.csv" ) 如果仍然无法正常工作,则数据有问题。数据格式不正确。在这种情况下,您必须检查是否有多余的逗号,然后需要替换。 @Nasruddin 在读取文件时使用 option("escape",",") 如果数据以逗号分隔,它将帮助您解决问题 【参考方案1】:

这看起来很像数据有缺陷。为了解决这个问题,我建议将 csv 文件的每一行作为单个字符串读取,并应用 map() 函数来处理数据

case class myClass(name: String, age: Integer, sex: String, siblings: Integer)

val myNewDf = myDf.map(row => 
  val myRow: String = row.getAs[String]("MY_SINGLE_COLUMN")
  val myRowValues = myRow.split(",")
  if (4 == myRowValues.size()) 
      //everything as expected 
      return myClass(myRowValues[0], myRowValues[1], myRowValues[2], myRowValues[3]) 
   else 
      //do foo to guess missing values
  

【讨论】:

【参考方案2】:

在您的情况下,数据格式不正确。要处理第一个数据,必须清理,即 CSV 的所有行都应该具有相同的架构或相同的分隔符/列数。

在 spark 中执行此操作的基本方法可能是:

    以文本形式加载数据 在加载的 DF/DS 上应用地图操作来清理它 手动创建架构 在清理后的 DF/DS 上应用架构

示例代码

//Sample CSV
John,28,M,3
Jenny,M,3
//Sample Code
val schema = StructType(
  List(
    StructField("name", StringType, nullable = true),
    StructField("age", IntegerType, nullable = true),
    StructField("sex", StringType, nullable = true),
    StructField("sib", IntegerType, nullable = true)
  )
)

import spark.implicits._
val rawdf = spark.read.text("test.csv")
rawdf.show(10)
val rdd = rawdf.map(row => 
  val raw = row.getAs[String]("value")
  //TODO: Data cleansing has to be done.
  val values = raw.split(",")
  if (values.length != 4) 
    s"$values(0),,$values(1),$values(2)"
   else 
    raw
  
)
val df = spark.read.schema(schema).csv(rdd)
df.show(10)

【讨论】:

【参考方案3】:

您可以尝试使用 age 的可选字段定义案例类,并将带有架构的 csv 直接加载到数据集中。

类似的东西:

import org.apache.spark.sql.Encoders
import sparkSession.implicits._

  case class Person(name: String, age: Option[Int], sex: String, siblings: Int)

  val schema = Encoders.product[Person].schema

  val dfInput = sparkSession.read
    .format("csv")
    .schema(schema)
    .option("header", "true")
    .load("path/to/csv.csv")
    .as[Person]

【讨论】:

以上是关于使用 Spark 和 Scala 清理大小约为 40GB 的 CSV/Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

清理 Jenkins 主目录

scala为什么要清理闭包

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]

在 Spark 中读取大的 gz 文件

使用 Scala/Spark 列出目录中的文件(包括文件信息)

Spark 中用 Scala 和 java 开发有啥区别