我们可以在不创建模式的情况下在 spark 数据框中加载分隔的文本文件吗?

Posted

技术标签:

【中文标题】我们可以在不创建模式的情况下在 spark 数据框中加载分隔的文本文件吗?【英文标题】:Can we load delimited text file in spark data frame without creating schema? 【发布时间】:2017-10-01 08:53:15 【问题描述】:

我有格式良好的文本文件,如下所示。

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|

现在我必须将此文本文件加载到 spark 数据框中。

我可以这样做

val schema = StructType(Array(

      StructField("OrgId", StringType),
      StructField("LineItemId", StringType),
      StructField("SegmentId", StringType),
      StructField("SequenceId", StringType),
      StructField("Action", StringType)))

 val textRdd1 = sc.textFile("s3://trfsdisu/SPARK/Text1.txt")
 val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
 var df1 = sqlContext.createDataFrame(rowRdd1, schema).drop("index")

但是通过这种方式我已经创建了架构,例如,如果我有一个包含 100 列的文本文件,我必须写 100 次。

所以我需要加载像 csv 这样的文件。

val df1 = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load("s3://sdi/SPARK/FinancialLineItem/MAIN")

但这对我不起作用,因为我有不是 csv 格式的文本文件。

那么有什么方法可以在 spark 数据框中加载 csv 样式的文本文件吗?

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion",
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"),
        when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"),
        when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
        when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
        when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
        when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
        when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
        when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"),
        when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
        when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
        when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"),
        when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"),
        when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
        when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
        when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
        when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
        when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
        when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
        when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
        when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
        when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
        when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
        when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction"))
        .filter(!$"FFAction".contains("D"))

【问题讨论】:

【参考方案1】:

在尝试解决您的问题时,我遇到的第一个问题是spark-csv,您只能使用字符分隔符而不是字符串分隔符。

我找到的解决方案有点棘手:

使用 | 作为分隔符从 CSV 加载数据。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc);

val df = sqlContext.read.format("csv")
            .option("header", "true")
            .option("delimiter", "|")
            .option("inferSchema",true")
            .load("/home/robin/Bureau/Spark/csv_strange_delimiter.csv")

这将创建一个如下所示的数据框:

df.show
+----------+---+----------+---+---------+---+----------+---+------+
|     OrgId| ^1|LineItemId| ^3|SegmentId| ^5|SequenceId| ^7|Action|
+----------+---+----------+---+---------+---+----------+---+------+
|4295877341|  ^|       136|  ^|        4|  ^|         1|  ^|     I|
|4295877346|  ^|       136|  ^|        4|  ^|         1|  ^|     I|
|4295877341|  ^|       138|  ^|        2|  ^|         1|  ^|     I|
|4295877341|  ^|       141|  ^|        4|  ^|         1|  ^|     I|
|4295877341|  ^|       143|  ^|        2|  ^|         1|  ^|     I|
|4295877341|  ^|       145|  ^|       14|  ^|         1|  ^|     I|
| 123456789|  ^|       145|  ^|       14|  ^|         1|  ^|     I|
+----------+---+----------+---+---------+---+----------+---+------+

2 : 删除包含“^”的列

val column_to_keep = df.columns.filter(v => (!v.contains("^"))).toSeq
val result = df.select(column_to_keep.head, column_to_keep.tail: _*)

result.show
+----------+----------+---------+----------+------+
|     OrgId|LineItemId|SegmentId|SequenceId|Action|
+----------+----------+---------+----------+------+
|4295877341|       136|        4|         1|  I|!||
|4295877346|       136|        4|         1|  I|!||
|4295877341|       138|        2|         1|  I|!||
|4295877341|       141|        4|         1|  I|!||
|4295877341|       143|        2|         1|  I|!||
|4295877341|       145|       14|         1|  I|!||
| 123456789|       145|       14|         1|  I|!||
+----------+----------+---------+----------+------+

【讨论】:

另外你能告诉我如何添加|!|在所有记录的操作列中我更新了我的代码 你可以使用这里解释的连接函数:***.com/a/31452109/6138873 让我们continue this discussion in chat. 所以它尝试了 concat 函数,但数据框的架构发生了变化我尝试了这个 val dfMainOutputFinal=dfMainOutput.select(concat($"FFAction", lit("|!|")))跨度>

以上是关于我们可以在不创建模式的情况下在 spark 数据框中加载分隔的文本文件吗?的主要内容,如果未能解决你的问题,请参考以下文章

在不计算的情况下获取 Spark 数据框中的行数

我们可以在不使用 GraphQL 的情况下在项目中实施 AWS-Appsync 吗?

在没有 spark UI 的情况下在 AWS EMR 中监控 spark 集群

如果通过 spark-submit 提交作业,我们可以在不显式创建的情况下使用 spark 会话对象吗

如何在不传递值和 rowid 的情况下在 H base-shell 中创建表?

如何在不使用 -webkit-app-region 的情况下在 Electron 中移动无框窗口