在 PySpark 数据框中拆分和计算列值

Posted

技术标签:

【中文标题】在 PySpark 数据框中拆分和计算列值【英文标题】:Split and count column values in PySpark dataframe 【发布时间】:2019-09-03 06:45:49 【问题描述】:

我在 hdfs 位置有一个 csv 文件,并已转换为 dataframe,而我的 dataframe 如下所示...

column1,column2,column3
Node1,  block1, 1,4,5
Node1,  block1, null
Node1,  block2, 3,6,7
Node1,  block2, null
Node1,  block1, null

我想解析这个dataframe,我的输出dataframe 应该在下面。

column1,column2,column3
Node1,  block1, counter0:1,counter1:4,counter2:5
Node1,  block1, null
Node1,  block2, counter0:3,counter1:6,counter2:7
Node1,  block2, null
Node1,  block1, null

我遇到了下面提到的一些错误,所以任何人都可以帮助我解决这个错误,或者可以帮助我正确/修改代码吗?谢谢。

import pyspark
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.types as T
from pyspark.sql.functions import udf

start_value = 2
schema_name = 2
start_key = 0

df = spark.read.csv("hdfs://path/Ccounters/test.csv",header=True)

def dict(x):
    split_col = x.split(",")
    col_nm = df.schema.names[schema_name]
    convert = map(lambda x :col_nm + str(start_key) +":"+str(x) ,split_col)
    con_str = ','.join(convert)
    return con_str
udf_dict = udf(dict, StringType())

df1 =df.withColumn('distance', udf_dict(df.column3))
df1.show()

getting error below:

 File "/opt/data/data11/yarn/local/usercache/cdap/appcache/application_1555606923440_67815/container_e48_1555606923440_67815_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 160, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o58.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

【参考方案1】:

我发现您不能在 UDF 中使用 spark 对象(例如“map”函数),这是有道理的 (https://***.com/a/57230637)。执行所需操作的替代方法是在 UDF 中使用 for 循环。


第一次编辑

添加了一个可以轻松将此UDF应用于多个列的部分,基于此问题的答案:how to get the name of column with maximum value in pyspark dataframe

df = spark.createDataFrame([('Node1', 'block1', '1,4,5', None), ('Node1', 'block1', None, '1,2,3'), ('Node1', 'block2', '3,6,7', None), ('Node1', 'block2', None, '4,5,6'), ('Node1', 'block1', None, '7,8,9')], ['column1', 'column2', 'column3', 'column4'])

#     df.show()
#     +-------+-------+-------+-------+
#     |column1|column2|column3|column4|
#     +-------+-------+-------+-------+
#     |  Node1| block1|  1,4,5|   null|
#     |  Node1| block1|   null|  1,2,3|
#     |  Node1| block2|  3,6,7|   null|
#     |  Node1| block2|   null|  4,5,6|
#     |  Node1| block1|   null|  7,8,9|
#     +-------+-------+-------+-------+

def columnfill(x):
# if x is empty, return x
if x == None: 
    return x
else:
    split = x.split(',')
    y = []
    z = 0
    for i in split:
        y.append('counter'+str(z)+':'+str(i))
        z += 1
    return ','.join(y)

udf_columnfill = udf(columnfill, StringType())

### Apply UDF to a single column:
# df_result1 = df.withColumn('distance', udf_columnfill(df.column3))

### Code for applying UDF to multiple columns

# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ")).drop('"+ str(c) +"')" for c in (columnnames)])

#     # Print condition to see which transformations are executed
#     print(cond)
#     df.withColumn('column3_new', udf_columnfill(df.column3)).drop('column3').withColumn('column4_new', udf_columnfill(df.column4)).drop('column4')   

# Create the new dataframe that evaluates the defined condition
df_result2 = eval(cond)

#     df_result2.show()
#     +-------+-------+--------------------------------+--------------------------------+
#     |column1|column2|column3_new                     |column4_new                     |
#     +-------+-------+--------------------------------+--------------------------------+
#     |Node1  |block1 |counter0:1,counter1:4,counter2:5|null                            |
#     |Node1  |block1 |null                            |counter0:1,counter1:2,counter2:3|
#     |Node1  |block2 |counter0:3,counter1:6,counter2:7|null                            |
#     |Node1  |block2 |null                            |counter0:4,counter1:5,counter2:6|
#     |Node1  |block1 |null                            |counter0:7,counter1:8,counter2:9|
#     +-------+-------+--------------------------------+--------------------------------+   

第二次编辑

在插入列名的位置添加了一个额外的 UDF 输入值,作为列值的前缀:

# Updated UDF
def columnfill(cinput, cname):
    # if x is empty, return x
    if cinput == None: 
        return cinput

    else:
        values = cinput.split(',')
        output = []
        count = 0
        for value in values:
            output.append(str(cname)+str(count)+":"+str(value))
            count += 1
        return ','.join(output)

udf_columnfill = udf(columnfill, StringType())

# Define columns that should be transformed
columnnames = ['column3', 'column4']
# Create a condition that joins multiple string parts, containing column operations
cond2 = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ", f.lit('" + str(c) + "_new'))).drop('"+ str(c) +"')" for c in (columnnames)])

df_result3 = eval(cond2)
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |column1|column2|column3_new                                 |column4_new                                 |
# +-------+-------+--------------------------------------------+--------------------------------------------+
# |Node1  |block1 |column3_new0:1,column3_new1:4,column3_new2:5|null                                        |
# |Node1  |block1 |null                                        |column4_new0:1,column4_new1:2,column4_new2:3|
# |Node1  |block2 |column3_new0:3,column3_new1:6,column3_new2:7|null                                        |
# |Node1  |block2 |null                                        |column4_new0:4,column4_new1:5,column4_new2:6|
# |Node1  |block1 |null                                        |column4_new0:7,column4_new1:8,column4_new2:9|
# +-------+-------+--------------------------------------------+--------------------------------------------+

print(cond)
# df.withColumn('column3_new', udf_columnfill(df.column3, f.lit('column3_new'))).drop('column3').withColumn('column4_new', udf_columnfill(df.column4, f.lit('column4_new'))).drop('column4')

【讨论】:

非常感谢。这行得通。然而,这仅适用于一列。假设我们在 df 中有多个列,我们需要对其进行转换,然后我们可以使用 for 循环它将如何创建许多需要再次加入的 dfs?我试图避免加入 dfs 以重新洗牌 df 中的数据,这可能导致最后不正确的 df 所以想知道其他方法吗? 我认为使用 UDF 一次评估多个列或连接多个数据框并不是一个理想的解决方案。我在答案中添加了一些新代码,它将 UDF 应用于必须转换的每一列。为了保持简洁,在应用后它也会删除此列,但如果愿意,可以跳过这部分。 是的,我按照你上面提到的方法做了。非常感谢您的帮助。 很好用。您能否接受答案以表明它为您的问题提供了解决方案? @RajeshMeher 我认为您的问题现在变得过于具体,无法在其他情况下进行一般使用。我的建议是坚持您的第一个问题,并为您添加的每个附加项提出一个新问题(即“添加列名作为列值的前缀”。)这样,共享知识也更适用于其他用户。尽管如此,我在解决方案中添加了一个编辑,该解决方案包含一个额外的 UDF 输入,可以在其中定义列值前缀。希望它能按预期工作。

以上是关于在 PySpark 数据框中拆分和计算列值的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 数据框将 json 列值拆分为***多列

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

Pyspark 数据帧拆分并将分隔列值填充到 N 索引数组中

在 PySpark 数据框中拆分字符串

PySpark数据框显示错误的值

将列表的列拆分为同一 PySpark 数据框中的多列