使用 Spark 处理 txt 文件

Posted

技术标签:

【中文标题】使用 Spark 处理 txt 文件【英文标题】:Process txt file with Spark 【发布时间】:2019-06-08 09:13:44 【问题描述】:

我需要将文本文件读入 Spark 中的数据集[T]。该文件格式不正确,因为它有一些空白字段,并且很难定义参数来分割字符串。我一直在尝试将数据读入 RDD,然后将其转换为案例类类型,但是,并非所有字段都被正确解析,并且出现错误:

java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
        at java.lang.Double.parseDouble(Double.java:538)
        at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
        at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
        at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
        at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
        at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

如何正确处理此文件? 我的 .txt 文件看起来像这样(匿名随机数据但格式相同):

NEW50752085  84.0485 -76.3851  85.1   THE NAME OF AN OBJECT                       
DEM00752631  51.9581 -85.3315  98.5   THE NAME OF AN OBJECT                                  
KI004867205  40.8518  15.9351 276.5   THE NAME OF AN OBJECT           FHG   41196

我尝试过这样处理:

    val dataRdd = spark.sparkContext
      .textFile("file.txt")

    val dataArray = dataRdd
      .map(_.split(" "))

  case class caseClass(
    c1: String,
    c2: Double,
    c3: Double,
    c4: Double,
    c5: String,
    c6: String,
    c7: String
  )

    val df = dataArray
      .map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
      .mapcase (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
      .toDF()

【问题讨论】:

我认为你有标签作为分隔符。尝试改用spark.read.format("csv").option("delimiter", "\t") 【参考方案1】:

我将在此答案中做出一些可能不正确的假设,但根据您提供的数据和提供的错误,我相信它们是正确的。

假设 1:您的数据由几个空格分隔。我根据您提供的空字符串的 NumberFormatException 得出了这个假设。如果您的文件由制表符分隔,我们就不会遇到这种情况。 假设 2(这是出于我自己的考虑,但可能不是真的):每个数据元素由相同数量的空格分隔。对于这个答案的其余部分,我将假设空格数为四个。如果这个假设不成立,这将成为一个更加困难的问题。 假设 3:7 个数据元素中只有最后 2 个是可选的,有时不会出现。

您的 NumberFormatException 是由您拆分一个空格引起的。假设以下行由空格分隔:

NEW50752085    84.0485    -76.3851    85.1    THE NAME OF AN OBJECT 

当你在一个空格上分割时,这一行会被转换成下面的数组:

Array(NEW50752085, "", "", "", 84.0485, "", "", "", -76.3851, "", "", "", 85.1, "", "", "", THE, NAME, OF, AN, OBJECT)

这个数组的第二个元素是一个空字符串,是你试图转换成一个 Double 的元素。这就是在空字符串上为您提供 NumberFormatException 的原因。

.map(_.split("    "))

当您将其更改为分成 4 个空格时(根据我的假设,这可能是也可能不是真的),您会得到以下结果:

Array(NEW50752085, 84.0485, -76.3851, 85.1, THE NAME OF AN OBJECT)

但是现在我们遇到了另一个问题——它只有五个元素!我们要七个。

我们可以通过修改您以后的代码来改变这一点:

val df = dataArray.map(record => 
  (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), 
  if(record.size > 5) record(5) else "",
  if(record.size > 6) record(6) else "")
).mapcase (c1, c2, c3, c4, c5, c6, c7) => caseClass(c1, c2, c3, c4, c5, c6, c7).toDF
df.show
+-----------+-------+--------+----+--------------------+---+-----+
|         c1|     c2|      c3|  c4|                  c5| c6|   c7|
+-----------+-------+--------+----+--------------------+---+-----+
|NEW50752085|84.0485|-76.3851|85.1|THE NAME OF AN OB...|   |     |
|DEM00752631|51.9581|-85.3315|98.5|THE NAME OF AN OB...|   |     |
|KI004867205|40.8518| 15.9351|76.5|THE NAME OF AN OB...|FHG|41196|
+-----------+-------+--------+----+--------------------+---+-----+

同样,这种方法只有在所有元素都由相同数量的空格分隔时才有效。

【讨论】:

谢谢!这将是一个很好的解决方案。但是,还有一个假设 - 我根本不需要最后 2 列(它们是可选的),所以我将从数据框中删除它们,或者我可以在映射到案例类时对其进行处理(没有 2列)。并且对第一个假设进行了修正,空格的数量是随机的。不幸的是,数据的格式设置为每列之间有一个或多个空格,因此并不一致 我已经重新格式化了第 2 列和第 3 列,因此输入格式会更加明显。抱歉我之前没有明确定义 “对象名称”在该属性中是否有空格?如果没有,我可以修改我的答案来解决你的问题并被接受。 有,按一个空格分割是个问题 啊,在手机上我看不到这个。当我有能力并会更新我的答案时,我会为此工作。我对如何解决这个问题有一个不错的想法。【参考方案2】:

如果您的数据没有 spark 可读的封闭格式,您唯一的选择是使用 FileInputFormat 创建您自己的自定义阅读器

通过这种方式,您将能够为数据的每一行定义一个解析流,以确定如何拆分和处理边缘情况。

深入研究它的最佳方式是通过示例。这是一个非常可靠的: https://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/

【讨论】:

以上是关于使用 Spark 处理 txt 文件的主要内容,如果未能解决你的问题,请参考以下文章

大数据处理技术实验8

记一次使用Memory Analyzer工具分析堆内存溢出问题

apache spark - 检查文件是不是存在

[spark程序]统计人口平均年龄(本地文件)(详细过程)

[spark程序]统计人口平均年龄(HDFS文件)(详细过程)

Spark 处理非结构化文件