pyspark 将列值与另一列进行比较包含值范围
Posted
技术标签:
【中文标题】pyspark 将列值与另一列进行比较包含值范围【英文标题】:pyspark compare column values with another column contains range of values 【发布时间】:2017-09-03 10:39:45 【问题描述】:我想将一列的值与具有参考值范围的另一列进行比较。
我已尝试使用以下代码:
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"])
intersect = lambda type: (udf(
lambda x, y: (
list(set(x) & set(y)) if x is not None and y is not None else None),
ArrayType(type)))
integer_intersect = intersect(IntegerType())
# df1.select(
# integer_intersect("value", "Reference_value"),
# size(integer_intersect("value", "Reference_value"))).show()
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0)
df1.show()
如果我们创建如下数据框,上面的代码就可以工作:
因为 value 和 referencence_value 列是 array_type 和 long_type 但是如果我正在使用 csv 读取数据帧,那么我将无法转换为数组类型。这里 df1 是从 CSV 读取的
df1 is as follows df1=
category value Reference value
count 1 1
n_timer n20 n40,n20
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
zip FALSE FALSE
我想将“value”列与“Reference_value”列进行比较,并导出两个新数据框,如果 value 列不在参考值集中,则一个数据框用于过滤行。
输出 df2=
category value Reference value
count 1 1
n_timer n20 n40,n20
zip FALSE FALSE
输出 df3=
category value Reference value
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
有没有像array_contains 这样更简单的方法。我尝试了 Array_contains 但没有工作
from pyspark.sql.functions import array_contains
df.where(array_contains("Reference_value", df1["vale"]))
【问题讨论】:
【参考方案1】:#One can copy paste the below code for direct input and outputs
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
from pyspark.sql.functions import split
sc = SparkContext.getOrCreate()
sqlContext = SQLContext.getOrCreate(sc)
df1 = sc.parallelize([("count","1","1"), ("n_timer","n20","n40,n20"), ("frames","54","56"),("timer","n8","n3,n6,n7"),("pdf","FALSE","TRUE"),("zip","FALSE","FALSE")]).toDF(["category", "value","Reference_value"])
print(df1.show())
df1=df1.withColumn("Reference_value", split("Reference_value", ",\s*").cast("array<string>"))
df1=df1.withColumn("value", split("value", ",\s*").cast("array<string>"))
intersect = lambda type: (udf(
lambda x, y: (
list(set(x) & set(y)) if x is not None and y is not None else None),
ArrayType(type)))
string_intersect = intersect(StringType())
df2=df1.where(size(string_intersect("value", "Reference_value")) > 0)
df3=df1.where(size(string_intersect("value", "Reference_value")) <= 0)
print(df2.show())
print(df3.show())
input df1=
+--------+-----+---------------+
|category|value|Reference_value|
+--------+-----+---------------+
| count| 1| 1|
| n_timer| n20| n40,n20|
| frames| 54| 56|
| timer| n8| n3,n6,n7|
| pdf|FALSE| TRUE|
| zip|FALSE| FALSE|
+--------+-----+---------------+
df2=
+--------+-------+---------------+
|category| value|Reference_value|
+--------+-------+---------------+
| count| [1]| [1]|
| n_timer| [n20]| [n40, n20]|
| zip|[FALSE]| [FALSE]|
+--------+-------+---------------+
df3=
+--------+-------+---------------+
|category| value|Reference_value|
+--------+-------+---------------+
| frames| [54]| [56]|
| timer| [n8]| [n3, n6, n7]|
| pdf|[FALSE]| [TRUE]|
+--------+-------+---------------+
【讨论】:
以上是关于pyspark 将列值与另一列进行比较包含值范围的主要内容,如果未能解决你的问题,请参考以下文章
返回查询的所有行,其中一列中的字符串值与另一列中的字符串值匹配