如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较
Posted
技术标签:
【中文标题】如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较【英文标题】:How to compare values in a pyspark dataframe column with another dataframe in pyspark 【发布时间】:2019-07-29 13:21:38 【问题描述】:我有一个 pyspark dataframe(df1)
,其第一行如下:
[Row(_c0='"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]', _c1='0')]
我想将“值”列表与下面dataframe(df2)
值的第一列进行比较,如下所示:
0 0.57581 1.25461 0.68694 0.974580 1.54789 0.23646
1 0.98745 0.23655 2.58970 4.587580 0.89756 1.25678
2 0.45780 5.78940 0.65986 2.125400 0.98745 1.23658
3 2.56834 0.25698 4.26587 0.569872 0.36987 0.68975
4 0.25678 1.23654 5.68320 0.986230 0.87563 2.58975
同样,我在df1
中有很多行,我必须查看df1
“值”列表中的哪些值大于df2
中的相应列。我需要找到满足上述条件的那些索引并且将其作为列表存储在另一列中到df1
。
例如1.172737 > 0.98745
,所以它的索引是1
。因此我将在df1 named(indices)
中有另一列,其中包含value1,如果出现另一个值,它必须附加相同的列。
比较是在各个列和行之间。上面显示的df1行是第1行,所以它必须与df2中的第一列进行比较。
如果我没有强调某事,请在 cmets 中告诉我。
【问题讨论】:
我认为你的 json 没有正确加载到你的 dataframe1 中,如果你解析它以获得正确的结构会更好。其次,如果两个数据框没有要连接的列,则无法比较它们。即使我认为它是 df1._c1 和 df2 的第一列。最后,您在 df1 中有 5 个值,在 df2 中有 6 个值:应该如何比较? 如我所说,比较的是第一行第一列(不是df2的第一行) 【参考方案1】:此代码适用于 Python 2.7 和 Spark 2.3.2:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType
# Create test dataframes
df1 = spark.createDataFrame([
['"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]', '0'],
['"type":"Fi","values":[0.6, 0.8, 0.5, 2.1, 0.4]', '0']
],['_c0','_c1'])
df2 = spark.createDataFrame([
[0, 0.57581, 1.25461, 0.68694, 0.974580, 1.54789, 0.23646],
[1, 0.98745, 0.23655, 2.58970, 4.587580, 0.89756, 1.25678],
[2, 0.45780, 5.78940, 0.65986, 2.125400, 0.98745, 1.23658],
[3, 2.56834, 0.25698, 4.26587, 0.569872, 0.36987, 0.68975],
[4, 0.25678, 1.23654, 5.68320, 0.986230, 0.87563, 2.58975]
],['id','v1', 'v2', 'v3', 'v4', 'v5', 'v6'])
# Get schema and load json correctly
json_schema = spark.read.json(df1.rdd.map(lambda row: row._c0)).schema
df1 = df1.withColumn('json', F.from_json('_c0', json_schema))
# Get column 1 values to compare
values = [row['v1'] for row in df2.select('v1').collect()]
# Define udf to compare values
def cmp_values(lst):
list_cmp = map(lambda t: t[0] > t[1], zip(lst, values)) # Boolean list
return [idx for idx, cond in enumerate(list_cmp) if cond] # Indices of satisfying elements
udf_cmp_values = F.udf(cmp_values, ArrayType(IntegerType()))
# Apply udf on array
df1 = df1.withColumn('indices', udf_cmp_values(df1.json['values']))
df1.show()
+--------------------+---+--------------------+---------+
| _c0|_c1| json| indices|
+--------------------+---+--------------------+---------+
|"type":"Fi","val...| 0|[Fi, [0.201009944...| [1]|
|"type":"Fi","val...| 0|[Fi, [0.6, 0.8, 0...|[0, 2, 4]|
+--------------------+---+--------------------+---------+
【讨论】:
您好,我收到错误 zip#1 must support iteration! 您是否尝试过复制/粘贴此代码并且有效(它适用于我)?如果此示例有效,但不适用于您的完整数据帧,则意味着您的最小示例不完整。我猜你在所有行中没有相同数量的值,这可能会导致这个错误。 我有一个后续问题,删除链接,在此先感谢***.com/questions/61823544/…@Pierre Gourseaud以上是关于如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串