将一个数据框列值传递给另一个数据框过滤条件表达式 + Spark 1.5

Posted

技术标签:

【中文标题】将一个数据框列值传递给另一个数据框过滤条件表达式 + Spark 1.5【英文标题】:Pass one dataframe column values to another dataframe filter condition expression + Spark 1.5 【发布时间】:2016-02-05 07:38:15 【问题描述】:

我有两个输入数据集 第一个输入数据集如下:

year,make,model,comment,blank
"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt

第二个输入数据集:

TagId,condition
1997_cars,year = 1997 and model = 'E350'
2012_cars,year=2012 and model ='S'
2015_cars ,year=2015 and model = 'Volt'

现在我的要求是读取第一个数据集,并且基于第二个数据集中的过滤条件需要通过向第一个输入数据集引入一个新列 TagId 来标记第一个输入数据集的行 所以预期应该是这样的:

year,make,model,comment,blank,TagId
"2012","Tesla","S","No comment",2012_cars
1997,Ford,E350,"Go get one now they are going fast",1997_cars
2015,Chevy,Volt, ,2015_cars

我试过了:

val sqlContext = new SQLContext(sc)
val carsSchema = StructType(Seq(
    StructField("year", IntegerType, true),
    StructField("make", StringType, true),
    StructField("model", StringType, true),
    StructField("comment", StringType, true),
    StructField("blank", StringType, true)))

val carTagsSchema = StructType(Seq(
    StructField("TagId", StringType, true),
    StructField("condition", StringType, true)))


val dfcars = sqlContext.read.format("com.databricks.spark.csv").option("header", "true") .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
val dftags = sqlContext.read.format("com.databricks.spark.csv").option("header", "true") .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")

val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
val cdtnval = dftags.select("condition")
val df2=dfcars.filter(cdtnval)
<console>:35: error: overloaded method value filter with alternatives:
  (conditionExpr: String)org.apache.spark.sql.DataFrame <and>
  (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.DataFrame)
       val df2=dfcars.filter(cdtnval)

另一种方式:

val col = dftags.col("TagId")
val finaldf = dfcars.withColumn("TagId", col)
org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];

finaldf.write.format("com.databricks.spark.csv").option("header", "true").save("/TestDivya/Spark/carswithtags.csv")

如果有人给我指点如何将过滤条件传递给数据框的过滤功能,我将不胜感激。 或另一种解决方案。 对于这样一个幼稚的问题,我很抱歉,因为我是 scala 和 Spark 的新手

谢谢

【问题讨论】:

【参考方案1】:

对此没有简单的解决方案。我认为您可以采用两个大方向:

将条件 (dftags) 收集到本地列表。然后一个一个地检查它,在汽车(dfcars)上执行每个作为filter。使用结果来获得所需的输出。

将条件 (dftags) 收集到本地列表。自己为它们实现解析和评估代码。遍历汽车 (dfcars) 一次,评估 map 中每一行的规则集。

如果你有大量的条件(所以你无法收集它们)和大量的汽车,那么情况就非常糟糕。您需要根据每种情况检查每辆车,因此这将非常低效。在这种情况下,您需要首先优化规则集,以便更有效地对其进行评估。 (决策树可能是一个不错的解决方案。)

【讨论】:

如何将 dftags 转换为 list ?你能分享例子吗。我是 Spark 和 Scala 的新手。 我的意思只是dftags.collect。它将分布式 RDD 变成了一个简单的数组。

以上是关于将一个数据框列值传递给另一个数据框过滤条件表达式 + Spark 1.5的主要内容,如果未能解决你的问题,请参考以下文章

如何在 df.groupby 之后将数据框列值作为窗口大小传递?

使用python正则表达式用字符串的小数部分替换数据框列值

如果数据框列值匹配字典键,检查不同的列是不是匹配字典值

熊猫:考虑多种条件正确过滤数据框列

将附加信息(数据)附加到数据框列值

(Python)如何修复数据框列值中的数值表示错误