pyspark 将列值与另一列进行比较包含值范围

Posted

技术标签:

【中文标题】pyspark 将列值与另一列进行比较包含值范围【英文标题】:pyspark compare column values with another column contains range of values 【发布时间】:2017-09-03 10:39:45 【问题描述】:

我想将一列的值与具有参考值范围的另一列进行比较。

我已尝试使用以下代码:

from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"])
intersect = lambda type: (udf(
    lambda x, y: (
        list(set(x) & set(y)) if x is not None and y is not None else None),
    ArrayType(type)))
integer_intersect = intersect(IntegerType())

# df1.select(
#     integer_intersect("value", "Reference_value"),
#     size(integer_intersect("value", "Reference_value"))).show()
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0)
df1.show()   

如果我们创建如下数据框,上面的代码就可以工作:

因为 value 和 referencence_value 列是 array_type 和 long_type 但是如果我正在使用 csv 读取数据帧,那么我将无法转换为数组类型。这里 df1 是从 CSV 读取的

df1 is as follows df1=

category    value   Reference value

count          1        1
n_timer       n20       n40,n20
frames         54       56
timer          n8      n3,n6,n7
pdf          FALSE      TRUE
zip          FALSE      FALSE

我想将“value”列与“Reference_value”列进行比较,并导出两个新数据框,如果 value 列不在参考值集中,则一个数据框用于过滤行。

输出 df2=

  category      value      Reference value

    count          1        1
    n_timer       n20       n40,n20
    zip          FALSE      FALSE

输出 df3=

category    value   Reference value

frames         54       56
timer          n8      n3,n6,n7
pdf          FALSE      TRUE

有没有像array_contains 这样更简单的方法。我尝试了 Array_contains 但没有工作

from pyspark.sql.functions import array_contains
df.where(array_contains("Reference_value", df1["vale"]))

【问题讨论】:

【参考方案1】:
#One can copy paste the below code for direct input and outputs

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
from pyspark.sql.functions import split 
sc = SparkContext.getOrCreate()
sqlContext = SQLContext.getOrCreate(sc)
df1 = sc.parallelize([("count","1","1"), ("n_timer","n20","n40,n20"), ("frames","54","56"),("timer","n8","n3,n6,n7"),("pdf","FALSE","TRUE"),("zip","FALSE","FALSE")]).toDF(["category", "value","Reference_value"])
print(df1.show())
df1=df1.withColumn("Reference_value", split("Reference_value", ",\s*").cast("array<string>"))
df1=df1.withColumn("value", split("value", ",\s*").cast("array<string>"))
intersect = lambda type: (udf(
    lambda x, y: (
        list(set(x) & set(y)) if x is not None and y is not None else None),
    ArrayType(type)))
string_intersect = intersect(StringType())
df2=df1.where(size(string_intersect("value", "Reference_value")) > 0)
df3=df1.where(size(string_intersect("value", "Reference_value")) <= 0)
print(df2.show())
print(df3.show())

 input df1=
+--------+-----+---------------+
|category|value|Reference_value|
+--------+-----+---------------+
|   count|    1|              1|
| n_timer|  n20|        n40,n20|
|  frames|   54|             56|
|   timer|   n8|       n3,n6,n7|
|     pdf|FALSE|           TRUE|
|     zip|FALSE|          FALSE|
+--------+-----+---------------+

df2=
+--------+-------+---------------+
|category|  value|Reference_value|
+--------+-------+---------------+
|   count|    [1]|            [1]|
| n_timer|  [n20]|     [n40, n20]|
|     zip|[FALSE]|        [FALSE]|
+--------+-------+---------------+

df3=
+--------+-------+---------------+
|category|  value|Reference_value|
+--------+-------+---------------+
|  frames|   [54]|           [56]|
|   timer|   [n8]|   [n3, n6, n7]|
|     pdf|[FALSE]|         [TRUE]|
+--------+-------+---------------+

【讨论】:

以上是关于pyspark 将列值与另一列进行比较包含值范围的主要内容,如果未能解决你的问题,请参考以下文章

将列值与第一行进行比较并保留 R 中的原始值

返回查询的所有行,其中一列中的字符串值与另一列中的字符串值匹配

选择一列的子集,然后与另一列进行比较

使用 PySpark 连接与另一列中的两列确定的范围相匹配的数据框

SQL查询以获取与另一列的MAX值对应的列值?

在python中,我如何对一列中每个值与另一列中的值发生的次数(多少行)建立矩阵?