过滤 pyspark 数据框中的行并创建一个包含结果的新列
Posted
技术标签:
【中文标题】过滤 pyspark 数据框中的行并创建一个包含结果的新列【英文标题】:Filtering rows in pyspark dataframe and creating a new column that contains the result 【发布时间】:2020-06-29 22:37:59 【问题描述】:所以我试图确定周日发生在旧金山市中心边界内的犯罪活动。我的想法是首先编写一个 UDF 来标记每个犯罪是否在我确定为市区的区域内,如果它发生在该区域内,那么它将具有“1”和“0”的标签,否则。之后,我试图创建一个新列来存储这些结果。我尽力写了我能写的所有东西,但由于某种原因它不起作用。这是我写的代码:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
def filter_dt(x,y):
if (((x < -122.4213) & (x > -122.4313)) & ((y > 37.7540) & (y < 37.7740))):
return '1'
else:
return '0'
schema = StructType([StructField("isDT", BooleanType(), False)])
filter_dt_boolean = udf(lambda row: filter_dt(row[0], row[1]), schema)
#First, pick out the crime cases that happens on Sunday BooleanType()
q3_sunday = spark.sql("SELECT * FROM sf_crime WHERE DayOfWeek='Sunday'")
#Then, we add a new column for us to filter out(identify) if the crime is in DT
q3_final = q3_result.withColumn("isDT", filter_dt(q3_sunday.select('X'),q3_sunday.select('Y')))
我得到的错误是:Picture for the error message
我的猜测是我现在拥有的 udf 不支持将整个列作为要比较的输入,但我不知道如何修复它以使其工作。请帮忙!谢谢!
【问题讨论】:
【参考方案1】:尝试如下更改最后一行-
from pyspark.sql.functions import col
q3_final = q3_result.withColumn("isDT", filter_dt(col('X'),col('Y')))
【讨论】:
【参考方案2】:样本数据会有所帮助。现在我假设您的数据如下所示:
+----+---+---+
|val1| x| y|
+----+---+---+
| 10| 7| 14|
| 5| 1| 4|
| 9| 8| 10|
| 2| 6| 90|
| 7| 2| 30|
| 3| 5| 11|
+----+---+---+
那么你不需要 udf,因为你可以使用 when() 函数进行评估
import pyspark.sql.functions as F
tst= sqlContext.createDataFrame([(10,7,14),(5,1,4),(9,8,10),(2,6,90),(7,2,30),(3,5,11)],schema=['val1','x','y'])
tst_res = tst.withColumn("isdt",F.when(((tst.x.between(4,10))&(tst.y.between(11,20))),1).otherwise(0))This will give the result
tst_res.show()
+----+---+---+----+
|val1| x| y|isdt|
+----+---+---+----+
| 10| 7| 14| 1|
| 5| 1| 4| 0|
| 9| 8| 10| 0|
| 2| 6| 90| 0|
| 7| 2| 30| 0|
| 3| 5| 11| 1|
+----+---+---+----+
如果我的数据有误,但仍需要将多个值传递给 udf,则必须将其作为数组或结构传递。我更喜欢结构体
from pyspark.sql.functions import udf
from pyspark.sql.types import *
@udf(IntegerType())
def check_data(row):
if((row.x in range(4,5))&(row.y in range(1,20))):
return(1)
else:
return(0)
tst_res1 = tst.withColumn("isdt",check_data(F.struct('x','y')))
结果是一样的。但最好避免 UDF 并使用 spark 内置函数,因为 spark 催化剂无法理解 udf 内部的逻辑并且无法对其进行优化。
【讨论】:
以上是关于过滤 pyspark 数据框中的行并创建一个包含结果的新列的主要内容,如果未能解决你的问题,请参考以下文章
遍历 pandas 数据框中的行并匹配列表中的元组并创建一个新的 df 列
遍历 pyspark 数据框中的列,而不为单个列创建不同的数据框