如何将 Pyspark 数据框标头设置为另一行?
Posted
技术标签:
【中文标题】如何将 Pyspark 数据框标头设置为另一行?【英文标题】:How to Set Pyspark Dataframe Headers to another Row? 【发布时间】:2018-05-29 21:36:58 【问题描述】:我有一个如下所示的数据框:
# +----+------+---------+
# |col1| col2 | col3 |
# +----+------+---------+
# | id| name | val |
# | 1 | a01 | X |
# | 2 | a02 | Y |
# +---+-------+---------+
我需要从中创建一个新的数据框,使用 row[1] 作为新的列标题并忽略或删除 col1、col2 等行。新表应如下所示:
# +----+------+---------+
# | id | name | val |
# +----+------+---------+
# | 1 | a01 | X |
# | 2 | a02 | Y |
# +---+-------+---------+
列可以是可变的,因此我不能使用名称在新数据框中显式设置它们。这不是使用 pandas df 的。
【问题讨论】:
第一个 DF 的原始来源是什么?为什么不一步一步创造“第二”呢? 来源来自 AWS Glue ETL 工作流程中的数据源。原始来源来自 S3 中的 csv 文件,在工作流程中添加了时髦的列标题名称。获取 df 的调用如下所示:glueContext.create_dynamic_frame.from_catalog(...) 然后我调用 datasource0.toDF() 来获取数据帧。 你试过使用read csv吗?sqlContext.read.format('csv').option('header', True).load(<S3 path>)
胶水作业必须使用确切的命令提取数据才能从胶水数据源中提取数据,而不仅仅是从 S3 中提取数据,而是从提供的源中读取数据。有一种方法可以用 pandas 做到这一点: df.columns = df.iloc[1] 然后 df.reindex(df.index.drop(1)) - 目标是复制这样的东西,但本机。
【参考方案1】:
假设只有一行 id
在 col1,name
在 col2 和 val
在 col3 em>,您可以使用以下逻辑(为了清晰和解释而注释)
#select the row with the header name
header = df.filter((df['col1'] == 'id') & (df['col2'] == 'name') & (df['col3'] == 'val'))
#selecting the rest of the rows except the first one
restDF = df.subtract(header)
#converting the header row into Row
headerColumn = header.first()
#looping columns for renaming
for column in restDF.columns:
restDF = restDF.withColumnRenamed(column, headerColumn[column])
restDF.show(truncate=False)
这应该给你
+---+----+---+
|id |name|val|
+---+----+---+
|1 |a01 |X |
|2 |a02 |Y |
+---+----+---+
但最好的选择是在使用 sqlContext 从源中读取 dataframe 时读取它 header 选项设置为 true
【讨论】:
【参考方案2】:你试过了吗? header=True
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
df = spark.read.csv("TSCAINV_062020.csv",header=True)
如果标题未设置为 True,Pyspark 将列名称设置为 _c0、_c1、_c2,并将列向下推一行。
【讨论】:
以上是关于如何将 Pyspark 数据框标头设置为另一行?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较
如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?