Scala:从带有列的csv读取数据具有空值

Posted

技术标签:

【中文标题】Scala:从带有列的csv读取数据具有空值【英文标题】:Scala : Reading data from csv with columns have null values 【发布时间】:2021-03-25 04:14:29 【问题描述】:

环境 - spark-3.0.1-bin-hadoop2.7、ScalaLibraryContainer 2.12.3、Scala、SparkSQL、eclipse-jee-oxygen-2-linux-gtk-x86_64

我有一个 csv 文件,它有 3 列,数据类型为:String、Long、Date。我已将 csv 文件转换为数据帧并希望显示它。 但它给出了以下错误

java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.examples.sql.SparkSQLExample5$.$anonfun$runInferSchemaExample$2(SparkSQLExample5.scala:30)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

在 Scala 代码中

.map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();

如果后续行的值少于标题中存在的值数,则会出现错误。基本上我正在尝试使用 Scala 和 Spark 从 csv 读取数据,其中列具有空值。

行的列数不同。 如果所有行都有 3 个列值,则它运行成功。

package org.apache.spark.examples.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.sql.Date
import org.apache.spark.sql.functions._
import java.util.Calendar;

object SparkSQLExample5 

 case class Person(name: String, age: String, birthDate: String)

 def main(args: Array[String]): Unit = 
val fromDateTime=java.time.LocalDateTime.now;
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
import spark.implicits._
runInferSchemaExample(spark);
spark.stop()


private def runInferSchemaExample(spark: SparkSession): Unit = 
import spark.implicits._
println("1. Creating an RDD of 'Person' object and converting into 'Dataframe' "+ 
    " 2. Registering the DataFrame as a temporary view.")
println("1. Third column of second row is not present.Last value of second row is comma.")
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/test.csv")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();
val finalOutput=peopleDF.select("name","age","birthDate")
finalOutput.show();

csv 文件

col1,col2,col3
row21,row22,
row31,row32,

【问题讨论】:

【参考方案1】:

读取csv文件时尝试PERMISSIVE模式,它会为缺少的字段添加NULL

val df = spark.sqlContext.read.format("csv").option("mode", "PERMISSIVE") .load("examples/src/main/resources/test.csv")

您可以找到更多信息 https://docs.databricks.com/data/data-sources/read-csv.html

【讨论】:

@nagraj036...感谢您的回复!我正在使用“spark.sparkContext”,其中“.option”不可用。 你是使用 textFile 方法还是 csv 方法来加载数据?我猜想在 textFile 方法上选项不可用 感谢回复! ........我正在使用 textFile 方法,那么这个(textFile 方法)的解决方案是什么?还是我必须使用 csv 方法?【参考方案2】:

输入:csv 文件

col1,col2,col3
row21,row22,
row31,row32,

代码:

import org.apache.spark.sql.SparkSession

object ReadCsvFile 

  case class Person(name: String, age: String, birthDate: String)

  def main(args: Array[String]): Unit = 
    val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
    readCsvFileAndInferCustomSchema(spark);
    spark.stop()
  

  private def readCsvFileAndInferCustomSchema(spark: SparkSession): Unit = 
    val df = spark.read.csv("C:/Users/Ralimili/Desktop/data.csv")
    val rdd = df.rdd.mapPartitionsWithIndex  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
    val mapRdd = rdd.map(attributes => 
      Person(attributes.getString(0), attributes.getString(1),attributes.getString(2))
    )
    val finalDf = spark.createDataFrame(mapRdd)
    finalDf.show(false);
  


输出

+-----+-----+---------+
|name |age  |birthDate|
+-----+-----+---------+
|row21|row22|null     |
|row31|row32|null     |
+-----+-----+---------+

如果您想填充一些值而不是空值,请使用以下代码

 val customizedNullDf = finalDf.na.fill("No data")
 customizedNullDf.show(false);

输出

+-----+-----+---------+
|name |age  |birthDate|
+-----+-----+---------+
|row21|row22|No data  |
|row31|row32|No data  |
+-----+-----+---------+

【讨论】:

以上是关于Scala:从带有列的csv读取数据具有空值的主要内容,如果未能解决你的问题,请参考以下文章

使用熊猫读取带有时间戳列的 csv

无法从 synapse spark scala notebook 读取 csv 文件

有效地将数据从 CSV 读取到具有多个分隔符的数据框中

数据库中的空值与NULL的区别以及python中的NaN和None

数据库中的空值与NULL的区别以及python中的NaN和None

如何使用 scala spark 从没有标题且有超过 150 列的 csv 创建数据集