Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有

Posted

技术标签:

【中文标题】Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有【英文标题】:Spark Dataframe API to Select multiple columns, map them to a fixed set, and Union ALL 【发布时间】:2021-02-26 08:44:25 【问题描述】:

我有一个定义了此架构的 CSV 源文件。

["Name", "Address", "TaxId", "SS Number", "Mobile Number", "Gender", "LastVisited"]

从此 CSV 中,我需要执行以下操作:

    选择一个列子集,一次一个,并将它们全部映射到这个固定架构:

    ["Name", "Address", "Mobile", "UniqueID", "UniqueIdentifierRefCode"]

因此,例如,在第一次迭代中,我将仅选择列的子集:

[Col("Name"), Col("Address"), Col("Mobile Number"), Col("TaxId"), Lit("TaxIdentifier")]

    在下一次迭代中,我需要选择一个不同的子集,但将它们映射到相同的固定模式:

    [Col("Name"), Col("Address"), Col("Mobile Number"), Col("SS Number"), Lit("SocialSecurityNumber")]

我可以通过运行 for 循环、选择列并最后执行 UnionAll 来完成所有这些操作。但是有没有更好的方法让 Spark 处理这个问题?

【问题讨论】:

对于某些行,您是否有 TaxID 或 SS 编号?如果该行有出租车,您想使用它,如果该行有一个 ss 编号,您想使用它吗? 我想同时导出两者。对于原始 CSV 中的每一行,我想在输出中构造 2 行。第一行将有 TaxId,下一行将有 SS Number。两行都有一个共同的模式,并且应该显示在输出中。但我不想写循环,因为我导出的行数将来也可以达到 3 或 4。 【参考方案1】:

您不需要循环,但可以使用联合,如果您将数据框过滤到所需的行,您可以使用 Union - 在我使用的过滤器中 IsNotNull() 但您可以使用任何过滤器您喜欢(如果您不确定过滤器语法,请给我们更多详细信息,我会提供帮助)。

var taxId = dataFrame.Filter(Functions.Col("TaxId").IsNotNull())
    .WithColumn("UniqueId", Functions.Col("TaxId"));

var ssId = dataFrame.Filter(Functions.Col("ss").IsNotNull())
    .WithColumn("UniqueId", Functions.Col("ss"));

var unionedDataFrame = taxId.Union(ssId);
unionedDataFrame.Show()

获得最终数据框后,您可以选择实际需要的列或删除不需要的列:

unionedDataFrame.Drop("TaxId").Show()

unionedDataFrame.Select("name, UniqueId").Show()

在 Spark 中,这与逻辑上完全相同:

dataFrame.Filter(Functions.Col("TaxId").IsNotNull())
    .WithColumn("UniqueId", Functions.Col("TaxId"))
    .Union( 
      dataFrame.Filter(Functions.Col("ss").IsNotNull())
       .WithColumn("UniqueId", Functions.Col("ss"))
     ).Show()

还要注意,当你调用一个方法时,你会得到一个新的 DataFrame,因此 dataFrame.Filter() 的结果是一个单独的 DataFrame 到 dataFrame 但需要注意的重要一点是,由于惰性评估,Spark 创建在执行查询时进行计划。

【讨论】:

我知道这可以做到,但可以通过不创建 2 个数据框来完成。我想将这两种操作合二为一。 Spark 使用惰性求值,所以只要您不执行类似 .Show() 之类的操作,这将是一个操作 - 看看 data-flair.training/blogs/apache-spark-lazy-evaluation

以上是关于Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

Scala Spark-> 从 DataFrame 中选择前 15 列

pyspark dataframe api速览

将 UDF 应用于 Spark Dataframe 中的多个列

使用 Spark Dataframe scala 将多个不同的列转换为 Map 列

如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?