Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api
Posted
技术标签:
【中文标题】Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api【英文标题】:Spark-Scala: Parse Fixed width line into Dataframe Api with exception handling 【发布时间】:2019-07-05 07:53:55 【问题描述】:我是初学者使用 scala 学习 spark .pardon 为我的英语不好...我需要编写一个程序来使用 spark-scala Dataframe Api 将分隔和固定宽度的文件解析为 Dataframe。此外,如果输入数据已损坏,则程序必须按以下给定方式处理:
A:ignoring the input data
B:investigate the error in input
C:stop on error
为了实现上述目标,我已经使用 DataFrame Api 选项成功完成了对分隔文件的异常处理解析。但我不知道如何对固定宽度文件应用相同的技术。我使用的是 Spark 2.4.3 版本。
// predefined schema used in program
val schema = new StructType()
.add("empno",IntegerType,true)
.add("ename",StringType,true)
.add("designation",StringType,true)
.add("manager",StringType,true)
.add("hire_date",StringType,true)
.add("salary",DoubleType,true)
.add("deptno",IntegerType,true)
.add("_corrupt_record", StringType, true)
// parse csv file into DataFrame Api
// option("mode","PERMISSIVE") used to handle corrupt record
val textDF =sqlContext.read.format("csv").option("header", "true").schema(schema).option("mode", "PERMISSIVE").load("empdata.csv")
textDF.show
// program for fixed width line
// created lsplit method to split line into list of tokens based on width input / string
def lsplit(pos: List[Int], str: String): List[String] =
val (rest, result) = pos.foldLeft((str, List[String]()))
case ((s, res),curr) =>
if(s.length()<=curr)
val split=s.substring(0).trim()
val rest=""
(rest, split :: res)
else if(s.length()>curr)
val split=s.substring(0, curr).trim()
val rest=s.substring(curr)
(rest, split :: res)
else
val split=""
val rest=""
(rest, split :: res)
// list is reversed
result.reverse
// create case class to hold parsed data
case class EMP(empno:Int,ename:String,designation:String,manager:String,hire_dt:String,salary:Double,deptno:Int)
// create variable to hold width length
val sizeOfColumn=List(4,4,5,4,10,8,2);
// code to transform string to case class record
val ttRdd=textDF.map
x =>
val row=lsplit(sizeOfColumn,x.mkString)
EMP(row(0).toInt,row(1),row(2),row(3),row(4).toDouble,row(5).toInt)
Code works fine for proper data but fails if incorrect data comes in file.
for e.g: "empno" column has some non-integer data..program throws exception NumberFormatException..
The program must handle if actual data in file does not match the specified schema as handled in delimited file.
请在这里帮助我。我需要对固定宽度文件使用与分隔文件相同的方法。
【问题讨论】:
那么如果长度不正确一定会产生错误? B是什么意思? 我的意思是如果输入文件中的实际数据与模式不匹配......我们可以看到我正在将一个字段的数据转换为双精度,但如果输入非数字字符则程序失败......同样是Dataframe Api 使用 mode="PERMISSIVE" 在分隔的情况下处理......但我找不到固定宽度文件的相同...... B 意味着损坏的记录被复制到 _corrupt_column ......所以我可以调查为什么首先发生错误...... 但这应该会发生,有趣 【参考方案1】:这很明显。
您正在将自己的方法与 API“许可”选项相结合。
permissive 会拾取错误数据类型等错误。然后你自己的进程 lsplit 仍然执行并且可以得到一个空异常。例如。如果我输入 empnum "YYY" 这显然是可以观察到的。
如果数据类型OK,长度错误,大部分情况下你处理正确,但是字段是乱码。
您的 lsplit 需要更加健壮,并且您需要检查其中是否存在错误或在调用不调用之前是否存在错误。
第一种情况
+-----+-----+---------------+
|empno|ename|_corrupt_record|
+-----+-----+---------------+
| null| null| YYY,Gerry|
| 5555|Wayne| null|
+-----+-----+---------------+
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 30, localhost, executor driver): java.lang.NumberFormatException: For input string: "null"
第二种情况
+------+-----+---------------+
| empno|ename|_corrupt_record|
+------+-----+---------------+
|444444|Gerry| null|
| 5555|Wayne| null|
+------+-----+---------------+
res37: Array[EMP] = Array(EMP(4444,44Ger), EMP(5555,Wayne))
简而言之,有些工作要做,实际上不需要标题。
【讨论】:
我需要问我是否可以在固定宽度文件的情况下使用 API“permissive”选项。如果可以,我们该怎么做?因为我没有找到任何 API 文档来使用各种选项...感谢您的回复。 我会尝试让 lsplit 更加健壮,如果我找到一些解决方案..会在这里发布。 是的,但是您需要检查 null,如所示示例所示。也不需要 csv,但如果你这样做也没问题。肯定更健壮。请接受答案。 解决方案成功。以上是关于Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api的主要内容,如果未能解决你的问题,请参考以下文章