在 PySpark 数据框中拆分和计算列值
Posted
技术标签:
【中文标题】在 PySpark 数据框中拆分和计算列值【英文标题】:Split and count column values in PySpark dataframe 【发布时间】:2019-09-03 06:45:49 【问题描述】:我在 hdfs
位置有一个 csv 文件,并已转换为 dataframe
,而我的 dataframe
如下所示...
column1,column2,column3
Node1, block1, 1,4,5
Node1, block1, null
Node1, block2, 3,6,7
Node1, block2, null
Node1, block1, null
我想解析这个dataframe
,我的输出dataframe
应该在下面。
column1,column2,column3
Node1, block1, counter0:1,counter1:4,counter2:5
Node1, block1, null
Node1, block2, counter0:3,counter1:6,counter2:7
Node1, block2, null
Node1, block1, null
我遇到了下面提到的一些错误,所以任何人都可以帮助我解决这个错误,或者可以帮助我正确/修改代码吗?谢谢。
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.types as T
from pyspark.sql.functions import udf
start_value = 2
schema_name = 2
start_key = 0
df = spark.read.csv("hdfs://path/Ccounters/test.csv",header=True)
def dict(x):
split_col = x.split(",")
col_nm = df.schema.names[schema_name]
convert = map(lambda x :col_nm + str(start_key) +":"+str(x) ,split_col)
con_str = ','.join(convert)
return con_str
udf_dict = udf(dict, StringType())
df1 =df.withColumn('distance', udf_dict(df.column3))
df1.show()
getting error below:
File "/opt/data/data11/yarn/local/usercache/cdap/appcache/application_1555606923440_67815/container_e48_1555606923440_67815_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 160, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o58.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
【参考方案1】:我发现您不能在 UDF 中使用 spark 对象(例如“map”函数),这是有道理的 (https://***.com/a/57230637)。执行所需操作的替代方法是在 UDF 中使用 for 循环。
第一次编辑
添加了一个可以轻松将此UDF应用于多个列的部分,基于此问题的答案:how to get the name of column with maximum value in pyspark dataframe
df = spark.createDataFrame([('Node1', 'block1', '1,4,5', None), ('Node1', 'block1', None, '1,2,3'), ('Node1', 'block2', '3,6,7', None), ('Node1', 'block2', None, '4,5,6'), ('Node1', 'block1', None, '7,8,9')], ['column1', 'column2', 'column3', 'column4'])
# df.show()
# +-------+-------+-------+-------+
# |column1|column2|column3|column4|
# +-------+-------+-------+-------+
# | Node1| block1| 1,4,5| null|
# | Node1| block1| null| 1,2,3|
# | Node1| block2| 3,6,7| null|
# | Node1| block2| null| 4,5,6|
# | Node1| block1| null| 7,8,9|
# +-------+-------+-------+-------+
def columnfill(x):
# if x is empty, return x
if x == None:
return x
else:
split = x.split(',')
y = []
z = 0
for i in split:
y.append('counter'+str(z)+':'+str(i))
z += 1
return ','.join(y)
udf_columnfill = udf(columnfill, StringType())
### Apply UDF to a single column:
# df_result1 = df.withColumn('distance', udf_columnfill(df.column3))
### Code for applying UDF to multiple columns
# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ")).drop('"+ str(c) +"')" for c in (columnnames)])
# # Print condition to see which transformations are executed
# print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3)).drop('column3').withColumn('column4_new', udf_columnfill(df.column4)).drop('column4')
# Create the new dataframe that evaluates the defined condition
df_result2 = eval(cond)
# df_result2.show()
# +-------+-------+--------------------------------+--------------------------------+
# |column1|column2|column3_new |column4_new |
# +-------+-------+--------------------------------+--------------------------------+
# |Node1 |block1 |counter0:1,counter1:4,counter2:5|null |
# |Node1 |block1 |null |counter0:1,counter1:2,counter2:3|
# |Node1 |block2 |counter0:3,counter1:6,counter2:7|null |
# |Node1 |block2 |null |counter0:4,counter1:5,counter2:6|
# |Node1 |block1 |null |counter0:7,counter1:8,counter2:9|
# +-------+-------+--------------------------------+--------------------------------+
第二次编辑
在插入列名的位置添加了一个额外的 UDF 输入值,作为列值的前缀:
# Updated UDF
def columnfill(cinput, cname):
# if x is empty, return x
if cinput == None:
return cinput
else:
values = cinput.split(',')
output = []
count = 0
for value in values:
output.append(str(cname)+str(count)+":"+str(value))
count += 1
return ','.join(output)
udf_columnfill = udf(columnfill, StringType())
# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond2 = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ", f.lit('" + str(c) + "_new'))).drop('"+ str(c) +"')" for c in (columnnames)])
df_result3 = eval(cond2)
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |column1|column2|column3_new |column4_new |
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |Node1 |block1 |column3_new0:1,column3_new1:4,column3_new2:5|null |
# |Node1 |block1 |null |column4_new0:1,column4_new1:2,column4_new2:3|
# |Node1 |block2 |column3_new0:3,column3_new1:6,column3_new2:7|null |
# |Node1 |block2 |null |column4_new0:4,column4_new1:5,column4_new2:6|
# |Node1 |block1 |null |column4_new0:7,column4_new1:8,column4_new2:9|
# +-------+-------+--------------------------------------------+--------------------------------------------+
print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3, f.lit('column3_new'))).drop('column3').withColumn('column4_new', udf_columnfill(df.column4, f.lit('column4_new'))).drop('column4')
【讨论】:
非常感谢。这行得通。然而,这仅适用于一列。假设我们在 df 中有多个列,我们需要对其进行转换,然后我们可以使用 for 循环它将如何创建许多需要再次加入的 dfs?我试图避免加入 dfs 以重新洗牌 df 中的数据,这可能导致最后不正确的 df 所以想知道其他方法吗? 我认为使用 UDF 一次评估多个列或连接多个数据框并不是一个理想的解决方案。我在答案中添加了一些新代码,它将 UDF 应用于必须转换的每一列。为了保持简洁,在应用后它也会删除此列,但如果愿意,可以跳过这部分。 是的,我按照你上面提到的方法做了。非常感谢您的帮助。 很好用。您能否接受答案以表明它为您的问题提供了解决方案? @RajeshMeher 我认为您的问题现在变得过于具体,无法在其他情况下进行一般使用。我的建议是坚持您的第一个问题,并为您添加的每个附加项提出一个新问题(即“添加列名作为列值的前缀”。)这样,共享知识也更适用于其他用户。尽管如此,我在解决方案中添加了一个编辑,该解决方案包含一个额外的 UDF 输入,可以在其中定义列值前缀。希望它能按预期工作。以上是关于在 PySpark 数据框中拆分和计算列值的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回