pyspark:数据框在另一个数据框的列中按ID选择行

Posted

技术标签:

【中文标题】pyspark:数据框在另一个数据框的列中按ID选择行【英文标题】:pyspark: dataframe select row by id in another dataframe's column 【发布时间】:2019-04-18 09:29:14 【问题描述】:

我想要

    time_create==last_timestamp过滤df1, 通过从 df1 中选择的 store_product_id 过滤 df2

这里我只以df1为例,

按 time_create 选择很好:

df1[df1.time_create==last_timestamp].show()

但是,使用选定的store_product_id,过滤原始数据框df1 给了我很多行。

df1[df1.store_product_id.isin(df1[df1.time_create==last_timestamp].store_product_id)].show()

我还尝试收集与time_create==last_timestamp 匹配的store_product_id 列表。

ids = df1[df1.time_create==last_timestamp].select('store_product_id').collect()
df1[df1.store_product_id.isin(ids)].show()

但出现错误:

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [01e8f3c0-3ad5-4b69-b46d-f5feb3cadd5f]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
    at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
    at org.apache.spark.sql.functions$.lit(functions.scala:110)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

正确的方法是什么?

【问题讨论】:

.select('store_product_id') 替换为.select(['store_product_id']) ? @ma3oun .select('store_product_id') 正在工作。错误来自 df1[df1.store_product_id.isin(ids)] 。似乎 isin 只接受 python list 或 tuple 。但是前面的代码竟然没有失败,很奇怪。 收集到的ids大概是列表的形式 Row数据结构,我猜你需要一个值列表,所以使用toPandas而不是collect,然后提取值列表 【参考方案1】:

您要查找的函数是join。这是一个基于您的数据的简单示例:

import pyspark as sp
from pyspark.sql import SparkSession

samples = ['store_product_id':1,'time_create':2,'last_timestamp':3,'store_product_id':2,'time_create':2,'last_timestamp':2,'store_product_id':3,'time_create':4,'last_timestamp':4,'store_product_id':4,'time_create':2,'last_timestamp':5]

spark = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

df1 = spark.createDataFrame(samples)
df1.show()

这会产生:

+--------------+----------------+-----------+
|last_timestamp|store_product_id|time_create|
+--------------+----------------+-----------+
|             3|               1|          2|
|             2|               2|          2|
|             4|               3|          4|
|             5|               4|          2|
+--------------+----------------+-----------+

让我们按时间过滤并从中创建另一个数据框:

df2 = df1.filter(df1.time_create==df1.last_timestamp)
ids = df2.select('store_product_id').show()

+----------------+
|store_product_id|
+----------------+
|               2|
|               3|
+----------------+

这是我们在 store_product_id 上加入两个数据集的地方:

df3 = df1.join(df2,'store_product_id','inner').show()

+----------------+--------------+-----------+--------------+-----------+
|store_product_id|last_timestamp|time_create|last_timestamp|time_create|
+----------------+--------------+-----------+--------------+-----------+
|               3|             4|          4|             4|          4|
|               2|             2|          2|             2|          2|
+----------------+--------------+-----------+--------------+-----------+

内连接根据 store_product_id

产生 df1 和 df2 的交集

【讨论】:

【参考方案2】:

正如@ags29 所说,

df1[df1.time_create==last_timestamp].select(['store_product_id']).collect() 的结果是 Row 列表:

[Row(store_product_id=u'01e8f3c0-3ad5-4b69-b46d-f5feb3cadd5f')]

我需要将行转换为字符串,正确的方法是:

ids = df1[df1.time_create==last_timestamp].select('store_product_id').collect()
ids = map(lambda x: x.store_product_id, ids)
df1[df1.store_product_id.isin(ids)].show()

这与 pandas 完全不同。

【讨论】:

以上是关于pyspark:数据框在另一个数据框的列中按ID选择行的主要内容,如果未能解决你的问题,请参考以下文章

我正在尝试在另一个数据框的列中查找数据框中的列的元素,但 index() 对我不起作用

在pyspark数据框的列中使用正则表达式捕获两个字符串之间的第一次出现的字符串

如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况

如何检查来自不同数据框的列值?

将pyspark数据框的列转换为小写

数据框在多列上连接,pyspark中的列有一些条件[重复]