如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN
Posted
技术标签:
【中文标题】如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN【英文标题】:How to implement NOT IN for two DataFrames with different structure in Apache Spark 【发布时间】:2015-11-11 13:52:19 【问题描述】:我在我的 Java 应用程序中使用 Apache Spark。
我有两个DataFrame
s:df1
和 df2
。 df1
包含 Row
s 和 email
、firstName
和 lastName
。 df2
包含 Row
s 和 email
。
我想创建一个DataFrame
: df3
,其中包含df1
中的所有行,而df2
中不存在该电子邮件。
有没有办法用 Apache Spark 做到这一点?我尝试从df1
和df2
创建JavaRDD<String>
,方法是将它们转换为toJavaRDD()
并过滤df1
以包含所有电子邮件,然后使用subtract
,但我不知道如何映射新的@ 987654342@ 到ds1
并获得DataFrame
。
基本上我需要df1
中的所有行,其电子邮件不在df2
中。
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");
JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));
List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();
【问题讨论】:
【参考方案1】:Spark 2.0.0+
您可以直接使用NOT IN
。
火花
可以用外连接和过滤器来表示。
val customers = sc.parallelize(Seq(
("john@example.com", "John", "Doe"),
("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")
val customersWhoOrderedTheProduct = sc.parallelize(Seq(
Tuple1("jane@example.com")
)).toDF("email")
val customersWhoHaventOrderedTheProduct = customers.join(
customersWhoOrderedTheProduct.select($"email".alias("email_")),
$"email" === $"email_", "leftouter")
.where($"email_".isNull).drop("email_")
customersWhoHaventOrderedTheProduct.show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
原始 SQL 等效项:
customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
"customersWhoOrderedTheProduct")
val query = """SELECT c.* FROM customers c LEFT OUTER JOIN
customersWhoOrderedTheProduct o
ON c.email = o.email
WHERE o.email IS NULL"""
sqlContext.sql(query).show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com| John| Doe|
// +----------------+----------+---------+
【讨论】:
谢谢。第一个例子对我有用。这是 Java 版本DataFrame customersWhoHaventOrderedTheProduct = customers .join(customersWhoOrderedTheProduct.select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter") .where(customersWhoOrderedTheProduct.col("email").isNull()).drop(customersWhoOrderedTheProduct.col("email"));
我尝试了等效的 SQL 但发生了这种情况 scala.MatchError: UUIDType (of class org.apache.spark.sql.cassandra.types.UUIDType$)
很高兴能帮上忙。
我使用Cassandra
并且我有一个UUID
作为主键。也许 Scala 无法匹配类型。
可以尝试导入com.datastax.spark.connector.types.UUIDType
吗?
我在我的 IDE 中运行它。我正在使用setJars
将所有依赖项添加到 Spark。我应该在哪里导入这个?我没有在查询中使用任何UUID
。【参考方案2】:
我在python
中做到了,此外我建议您使用整数作为键而不是字符串。
from pyspark.sql.types import *
samples = sc.parallelize([
("abonsanto@fakemail.com", "Alberto", "Bonsanto"), ("mbonsanto@fakemail.com", "Miguel", "Bonsanto"),
("stranger@fakemail.com", "Stranger", "Weirdo"), ("dbonsanto@fakemail.com", "Dakota", "Bonsanto")
])
keys = sc.parallelize(
[("abonsanto@fakemail.com",), ("mbonsanto@fakemail.com",), ("dbonsanto@fakemail.com",)]
)
complex_schema = StructType([
StructField("email", StringType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True)
])
simple_schema = StructType([
StructField("email", StringType(), True)
])
df1 = sqlContext.createDataFrame(samples, complex_schema)
df2 = sqlContext.createDataFrame(keys, simple_schema)
df1.show()
df2.show()
df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show()
【讨论】:
谢谢。我正在使用Cassandra
,所以我的很多主键都包含UUID
。以上是关于如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN的主要内容,如果未能解决你的问题,请参考以下文章
在 YARN 中为 Apache zeppelin 分配 Spark 内存
如何在 Spark on YARN 中为 Spark UI 创建安全过滤器
使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集