spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要

Posted

技术标签:

【中文标题】spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要【英文标题】:spark: schema change -- transform & filter a dataframe on a column if it exits; don't if it doesn't 【发布时间】:2016-08-26 16:20:23 【问题描述】:

我使用的数据发生了架构更改。对于结果数据帧,它结合了新旧数据,我想转换和过滤的列曾经不存在于旧数据中。它不是由“null”填充的。我想尽可能地对该列进行转换和过滤,即每当列退出时,我想对其进行转换和过滤;对于没有此类列的早期数据,我将保留每一行。

问题在于以下代码导致java.lang.NullPointerException,因为早期数据没有“ip”列。

val filteredData = sqlContext.sql(
s"SELECT $fieldsString FROM data $filterTerm")
.withColumn("ip",firstIp($"ip"))
.filter("`ip` not in ('30.90.30.90', '70.80.70.80')")
.filter("`ip` not like '10.%'")

上面的“firstIp”函数只是一个从数组中获取第一个IP地址的udf;它由val firstIp = udf[String, String](_.split(",")(0)) 定义。我不想按架构将数据拆分为两部分 - 具有“ip”列的部分和不包含“ip”列的部分......但是如果不以这种方式拆分数据,我的目标是否可以实现?

【问题讨论】:

【参考方案1】:

这个答案解决了旧版本的问题,它只要求在不同的架构上使用 filter。现在问题已经完全改变了,这真的没有意义

您可以简单地检查该列是否存在:

// let's create some test data
case class SchemaA(host: String)
case class SchemaB(host: String, ip: String) 

val testDataA = sc.parallelize(Seq(
    SchemaA("localhost"),
    SchemaA("other")
)).toDF()
val testDataB = sc.parallelize(Seq(
    SchemaB("localhost", "127.0.0.1"),
    SchemaB("other", "192.168.0.1")
)).toDF()

def doSomething(df: DataFrame) 
  val filtered = if (df.columns.contains("ip")) 
    df.filter("ip  in ('127.0.0.1')")
   else 
    df
  
  // do whatever you want after filtering...
  filtered.select($"host").show()


doSomething(testDataA)
doSomething(testDataB)

【讨论】:

谢谢@bluenote10!我编辑了我的问题。还不够清楚。并且可能具有误导性。 重点是,我将新旧日志合并到一个数据帧中。日志具有不同的架构。因此,旧日志没有的列现在由“null”填充。我认为也许 withColumn(udf) 导致 java.lang.NullPointerException;或者过滤是。 @MichM:这就是为什么在提问时准备一个最小示例是个好主意。那么你的问题就真的变成了处理缺失值的问题,而不是如何处理不同的模式。

以上是关于spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要的主要内容,如果未能解决你的问题,请参考以下文章

在镶木地板的地图类型列上使用 spark-sql 过滤下推

在新列上过滤 Spark DataFrame

如果数据框中存在列,则R在列上应用函数

Azure 逻辑应用 Odata 筛选查询,在两列上使用 if 语句

如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换

Spark - 地图转换