如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN

Posted

技术标签:

【中文标题】如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN【英文标题】:How to implement NOT IN for two DataFrames with different structure in Apache Spark 【发布时间】:2015-11-11 13:52:19 【问题描述】:

我在我的 Java 应用程序中使用 Apache Spark。 我有两个DataFrames:df1df2df1 包含 Rows 和 emailfirstNamelastNamedf2 包含 Rows 和 email

我想创建一个DataFrame: df3,其中包含df1 中的所有行,而df2 中不存在该电子邮件。

有没有办法用 Apache Spark 做到这一点?我尝试从df1df2 创建JavaRDD<String>,方法是将它们转换为toJavaRDD() 并过滤df1 以包含所有电子邮件,然后使用subtract,但我不知道如何映射新的@ 987654342@ 到ds1 并获得DataFrame

基本上我需要df1 中的所有行,其电子邮件不在df2 中。

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
                            "WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
                        .map(row -> row.getString(0))
                        .subtract(customersBoughtEmail).collect();

【问题讨论】:

【参考方案1】:

Spark 2.0.0+

您可以直接使用NOT IN

火花

可以用外连接和过滤器来表示。

val customers = sc.parallelize(Seq(
  ("john@example.com", "John", "Doe"),
  ("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("jane@example.com")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

原始 SQL 等效项:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

【讨论】:

谢谢。第一个例子对我有用。这是 Java 版本 DataFrame customersWhoHaventOrderedTheProduct = customers .join(customersWhoOrderedTheProduct.select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter") .where(customersWhoOrderedTheProduct.col("email").isNull()).drop(customersWhoOrderedTheProduct.col("email")); 我尝试了等效的 SQL 但发生了这种情况 scala.MatchError: UUIDType (of class org.apache.spark.sql.cassandra.types.UUIDType$) 很高兴能帮上忙。 我使用Cassandra 并且我有一个UUID 作为主键。也许 Scala 无法匹配类型。 可以尝试导入com.datastax.spark.connector.types.UUIDType吗? 我在我的 IDE 中运行它。我正在使用setJars 将所有依赖项添加到 Spark。我应该在哪里导入这个?我没有在查询中使用任何UUID【参考方案2】:

我在python 中做到了,此外我建议您使用整数作为键而不是字符串。

from pyspark.sql.types import *

samples = sc.parallelize([
    ("abonsanto@fakemail.com", "Alberto", "Bonsanto"), ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
    ("stranger@fakemail.com", "Stranger", "Weirdo"), ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])

keys = sc.parallelize(
    [("abonsanto@fakemail.com",), ("mbonsanto@fakemail.com",), ("dbonsanto@fakemail.com",)]
)

complex_schema = StructType([
    StructField("email", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True)
])

simple_schema = StructType([
    StructField("email", StringType(), True)
])

df1 = sqlContext.createDataFrame(samples, complex_schema)
df2 = sqlContext.createDataFrame(keys, simple_schema)

df1.show()
df2.show()

df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show()

【讨论】:

谢谢。我正在使用Cassandra,所以我的很多主键都包含UUID

以上是关于如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN的主要内容,如果未能解决你的问题,请参考以下文章

在 YARN 中为 Apache zeppelin 分配 Spark 内存

在 Apache Spark 中为每行迭代添加范围变量

如何在 Spark on YARN 中为 Spark UI 创建安全过滤器

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

如何在 MLLIB / ApacheSpark 中为 RandomForrest 模型上的特征分配标签

Apache Spark 和 Hudi:大量的输出文件