从 csv 文件将数据添加到现有的 apache spark 数据帧

Posted

技术标签:

【中文标题】从 csv 文件将数据添加到现有的 apache spark 数据帧【英文标题】:Adding data to an existing apache spark dataframe from a csv file 【发布时间】:2016-09-16 14:40:36 【问题描述】:

我有一个包含两列的 spark 数据框:名称、年龄如下:

[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

数据框是使用

创建的
sqlContext.createDataFrame()

接下来我需要做的是从外部“csv”文件中添加第三列“UserId”。外部文件有几列,但我只需要包含第一列,即“UserId”:

两个数据源中的记录数相同。我在 Windows 操作系统上使用独立的 pyspark 版本。最终结果应该是一个包含三列的新数据框:UserId、Name、Age。

有什么建议吗?

【问题讨论】:

【参考方案1】:

我使用 pandas 来完成这项工作。它允许以多种不同的方式连接数据帧。

1) 我们需要首先只导入那个额外的列(在我们删除标题之后,虽然这也可以在导入之后完成)并将其转换为 RDD

from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
userid_rdd = sc.textFile("C:……/userid.csv").map(lambda line: line.split(","))

2) 将 'userid' RDD 转换为 spark 数据帧

userid_df = userid_rdd.toDF(['userid'])
userid_df.show()

3) 将 'userid' 数据框转换为 pandas 数据框

userid_toPandas = userid_df.toPandas()
userid_toPandas

4) 将“预测”数据帧(现有数据帧)转换为 pandas 数据帧

predictions_toPandas = predictions.toPandas() 
predictions_toPandas

5) 使用“concat”将两个 pandas 数据帧合并为一个新的数据帧

import pandas as pd
result = pd.concat([userid_toPandas, predictions_toPandas], axis = 1, ignore_index = True)
result

【讨论】:

【参考方案2】:

您可以通过连接两个数据框来完成此操作,但为此您需要在展位表中包含 id 或其他键。如果行的位置相同,我建议将其复制到 Excel 文件中,否则您没有足够的信息来合并它们。

【讨论】:

【参考方案3】:

您可以从 csv 创建一个新的数据框。

    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)

    # Import the csv file to the SparkSQL table.

    df = sqlContext.read.csv("abc.csv")
    df.createOrReplaceTempView(table_a)

    # Create a new dataframe with only the columns required. In your case only user id
     df_1 = spark.sql("select userid from table_a")

    #Now do a join with the existing dataframe which has the original data. ( [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] )
    # Lets call the original alice-bob dataframe as df_ori. So,

    df_result = df_ori.join(df_1, how=inner, on= (any column cols if there are any or index row)

【讨论】:

以上是关于从 csv 文件将数据添加到现有的 apache spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 fast-csv npm 将新行或新行的数据(新行)附加到现有的 csv 文件

在python中将行添加到现有的csv文件

通过 Spark 将 csv 文件加载到现有的 HIVE 故事中

使用 pandas 将不同位置的行附加到现有的 csv 文件

将 Apache NiFi 添加到现有的 Hortonworks HDP 集群

从命令行将图标添加到现有的 EXE 文件 [关闭]