在 pyspark Dataframe 上创建新的模式或列名

Posted

技术标签:

【中文标题】在 pyspark Dataframe 上创建新的模式或列名【英文标题】:Create new schema or column names on pyspark Dataframe 【发布时间】:2017-08-31 15:33:12 【问题描述】:

我看到了this post,它有点帮助,只是我需要使用列表更改数据帧的标题,因为它很长并且随着我输入的每个数据集而变化,所以我无法真正写出/很难-新列名中的代码。

例如:

df = sqlContext.read.load("./assets/"+filename, 
                          format='com.databricks.spark.csv', 
                          header='false', 
                          inferSchema='false')
devices = df.first()
metrics = df.take(2)[1]
# Adding the two header rows together as one as a way of later searching through and sorting rows
# delimiter is "..." since it doesn't occur anywhere in the data and we don't have to wory about multiple splits
header = [str(devices[i]) +"..."+ str(metrics[i]) for i in range(len(devices))]

df2 = df.toDF(header)

那么我当然会得到这个错误:

IllegalArgumentException: u"requirement failed: 列数不匹配。\n旧列名 (278):

header 的长度 = 278 和列数是一样的。 所以,真正的问题是,当我有一个新名称列表时,如何对数据帧中的标头进行非硬编码重命名?

我怀疑我必须不以实际列表对象的形式输入,但是如何在不遍历每一列的情况下做到这一点(使用 selectexpr 或别名并使用一个创建几个新的 dfs(不可变)一次更新新的列?(糟糕)

【问题讨论】:

len(devices)会返回什么? 【参考方案1】:

我尝试了不同的方法。因为我想模拟硬编码列表(而不是实际的列表对象),所以我使用了 exec() 语句和一个由所有链接标题创建的字符串。

注意:这限制为 255 列。所以如果你想要更多,你就必须打破它

for i in range(len(header)):
    # For the first of the column names, need to initiate the string header_str
    if i == 0:
        header_str = "'" + str(header[i])+"',"
    # For the last of the names, need a different string to close it without a comma
    elif i == len(header)-1:
        header_str = header_str + "'" + header[i] + "'"
    #For everything in the middle: just add it all together the same way
    else:
        header_str = header_str + "'" + header[i] + "',"

exec("df2 = df.toDF("+ header_str +")")

【讨论】:

【参考方案2】:

您可以遍历旧列名并将新列名作为别名。一个很好的方法是在 python 中使用函数zip

首先让我们创建列名列表:

old_cols = df.columns
new_cols = [str(d) + "..." + str(m) for d, m in zip(devices, metrics)]

虽然我假设“...”指的是另一个 python 对象,因为“...”在列名中不是一个好的字符序列。

最后:

df2 = df.select([df[oc].alias(nc) for oc, nc in zip(old_cols, new_cols)])

【讨论】:

当我运行最后一部分时 (df2 = df.select....) 我收到此错误:“TypeError: 'Column' object is not callable”。我尝试将每个转换为列表对象,但它给出了相同的错误,所以它必须与原始 df 列一起使用? 对不起,我忘记了括号。迭代不带括号的列表[f(c) for c in list] 是行不通的。我修改了答案

以上是关于在 pyspark Dataframe 上创建新的模式或列名的主要内容,如果未能解决你的问题,请参考以下文章

PySpark .groupBy() 和 .count() 在相​​对较小的 Dataframe 上运行缓慢

如何为现有 DataFrame 创建新行?在 PySpark 或 Scala 中

PySpark DataFrame基础操作

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

向 pyspark Dataframe 添加新行

创建结构 PySpark 的 DataFrame