使用 Spark 处理 txt 文件
Posted
技术标签:
【中文标题】使用 Spark 处理 txt 文件【英文标题】:Process txt file with Spark 【发布时间】:2019-06-08 09:13:44 【问题描述】:我需要将文本文件读入 Spark 中的数据集[T]。该文件格式不正确,因为它有一些空白字段,并且很难定义参数来分割字符串。我一直在尝试将数据读入 RDD,然后将其转换为案例类类型,但是,并非所有字段都被正确解析,并且出现错误:
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如何正确处理此文件? 我的 .txt 文件看起来像这样(匿名随机数据但格式相同):
NEW50752085 84.0485 -76.3851 85.1 THE NAME OF AN OBJECT
DEM00752631 51.9581 -85.3315 98.5 THE NAME OF AN OBJECT
KI004867205 40.8518 15.9351 276.5 THE NAME OF AN OBJECT FHG 41196
我尝试过这样处理:
val dataRdd = spark.sparkContext
.textFile("file.txt")
val dataArray = dataRdd
.map(_.split(" "))
case class caseClass(
c1: String,
c2: Double,
c3: Double,
c4: Double,
c5: String,
c6: String,
c7: String
)
val df = dataArray
.map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
.mapcase (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
.toDF()
【问题讨论】:
我认为你有标签作为分隔符。尝试改用spark.read.format("csv").option("delimiter", "\t")
。
【参考方案1】:
我将在此答案中做出一些可能不正确的假设,但根据您提供的数据和提供的错误,我相信它们是正确的。
假设 1:您的数据由几个空格分隔。我根据您提供的空字符串的 NumberFormatException 得出了这个假设。如果您的文件由制表符分隔,我们就不会遇到这种情况。 假设 2(这是出于我自己的考虑,但可能不是真的):每个数据元素由相同数量的空格分隔。对于这个答案的其余部分,我将假设空格数为四个。如果这个假设不成立,这将成为一个更加困难的问题。 假设 3:7 个数据元素中只有最后 2 个是可选的,有时不会出现。您的 NumberFormatException 是由您拆分一个空格引起的。假设以下行由空格分隔:
NEW50752085 84.0485 -76.3851 85.1 THE NAME OF AN OBJECT
当你在一个空格上分割时,这一行会被转换成下面的数组:
Array(NEW50752085, "", "", "", 84.0485, "", "", "", -76.3851, "", "", "", 85.1, "", "", "", THE, NAME, OF, AN, OBJECT)
这个数组的第二个元素是一个空字符串,是你试图转换成一个 Double 的元素。这就是在空字符串上为您提供 NumberFormatException 的原因。
.map(_.split(" "))
当您将其更改为分成 4 个空格时(根据我的假设,这可能是也可能不是真的),您会得到以下结果:
Array(NEW50752085, 84.0485, -76.3851, 85.1, THE NAME OF AN OBJECT)
但是现在我们遇到了另一个问题——它只有五个元素!我们要七个。
我们可以通过修改您以后的代码来改变这一点:
val df = dataArray.map(record =>
(record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4),
if(record.size > 5) record(5) else "",
if(record.size > 6) record(6) else "")
).mapcase (c1, c2, c3, c4, c5, c6, c7) => caseClass(c1, c2, c3, c4, c5, c6, c7).toDF
df.show
+-----------+-------+--------+----+--------------------+---+-----+
| c1| c2| c3| c4| c5| c6| c7|
+-----------+-------+--------+----+--------------------+---+-----+
|NEW50752085|84.0485|-76.3851|85.1|THE NAME OF AN OB...| | |
|DEM00752631|51.9581|-85.3315|98.5|THE NAME OF AN OB...| | |
|KI004867205|40.8518| 15.9351|76.5|THE NAME OF AN OB...|FHG|41196|
+-----------+-------+--------+----+--------------------+---+-----+
同样,这种方法只有在所有元素都由相同数量的空格分隔时才有效。
【讨论】:
谢谢!这将是一个很好的解决方案。但是,还有一个假设 - 我根本不需要最后 2 列(它们是可选的),所以我将从数据框中删除它们,或者我可以在映射到案例类时对其进行处理(没有 2列)。并且对第一个假设进行了修正,空格的数量是随机的。不幸的是,数据的格式设置为每列之间有一个或多个空格,因此并不一致 我已经重新格式化了第 2 列和第 3 列,因此输入格式会更加明显。抱歉我之前没有明确定义 “对象名称”在该属性中是否有空格?如果没有,我可以修改我的答案来解决你的问题并被接受。 有,按一个空格分割是个问题 啊,在手机上我看不到这个。当我有能力并会更新我的答案时,我会为此工作。我对如何解决这个问题有一个不错的想法。【参考方案2】:如果您的数据没有 spark 可读的封闭格式,您唯一的选择是使用 FileInputFormat
创建您自己的自定义阅读器
通过这种方式,您将能够为数据的每一行定义一个解析流,以确定如何拆分和处理边缘情况。
深入研究它的最佳方式是通过示例。这是一个非常可靠的: https://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/
【讨论】:
以上是关于使用 Spark 处理 txt 文件的主要内容,如果未能解决你的问题,请参考以下文章
记一次使用Memory Analyzer工具分析堆内存溢出问题