spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要
Posted
技术标签:
【中文标题】spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要【英文标题】:spark: schema change -- transform & filter a dataframe on a column if it exits; don't if it doesn't 【发布时间】:2016-08-26 16:20:23 【问题描述】:我使用的数据发生了架构更改。对于结果数据帧,它结合了新旧数据,我想转换和过滤的列曾经不存在于旧数据中。它不是由“null”填充的。我想尽可能地对该列进行转换和过滤,即每当列退出时,我想对其进行转换和过滤;对于没有此类列的早期数据,我将保留每一行。
问题在于以下代码导致java.lang.NullPointerException
,因为早期数据没有“ip”列。
val filteredData = sqlContext.sql(
s"SELECT $fieldsString FROM data $filterTerm")
.withColumn("ip",firstIp($"ip"))
.filter("`ip` not in ('30.90.30.90', '70.80.70.80')")
.filter("`ip` not like '10.%'")
上面的“firstIp”函数只是一个从数组中获取第一个IP地址的udf;它由val firstIp = udf[String, String](_.split(",")(0))
定义。我不想按架构将数据拆分为两部分 - 具有“ip”列的部分和不包含“ip”列的部分......但是如果不以这种方式拆分数据,我的目标是否可以实现?
【问题讨论】:
【参考方案1】:这个答案解决了旧版本的问题,它只要求在不同的架构上使用 filter
。现在问题已经完全改变了,这真的没有意义。
您可以简单地检查该列是否存在:
// let's create some test data
case class SchemaA(host: String)
case class SchemaB(host: String, ip: String)
val testDataA = sc.parallelize(Seq(
SchemaA("localhost"),
SchemaA("other")
)).toDF()
val testDataB = sc.parallelize(Seq(
SchemaB("localhost", "127.0.0.1"),
SchemaB("other", "192.168.0.1")
)).toDF()
def doSomething(df: DataFrame)
val filtered = if (df.columns.contains("ip"))
df.filter("ip in ('127.0.0.1')")
else
df
// do whatever you want after filtering...
filtered.select($"host").show()
doSomething(testDataA)
doSomething(testDataB)
【讨论】:
谢谢@bluenote10!我编辑了我的问题。还不够清楚。并且可能具有误导性。 重点是,我将新旧日志合并到一个数据帧中。日志具有不同的架构。因此,旧日志没有的列现在由“null”填充。我认为也许 withColumn(udf) 导致 java.lang.NullPointerException;或者过滤是。 @MichM:这就是为什么在提问时准备一个最小示例是个好主意。那么你的问题就真的变成了处理缺失值的问题,而不是如何处理不同的模式。以上是关于spark:模式更改——如果存在,则转换和过滤列上的数据框;如果没有就不要的主要内容,如果未能解决你的问题,请参考以下文章
Azure 逻辑应用 Odata 筛选查询,在两列上使用 if 语句