从 csv 文件将数据添加到现有的 apache spark 数据帧
Posted
技术标签:
【中文标题】从 csv 文件将数据添加到现有的 apache spark 数据帧【英文标题】:Adding data to an existing apache spark dataframe from a csv file 【发布时间】:2016-09-16 14:40:36 【问题描述】:我有一个包含两列的 spark 数据框:名称、年龄如下:
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
数据框是使用
创建的sqlContext.createDataFrame()
接下来我需要做的是从外部“csv”文件中添加第三列“UserId”。外部文件有几列,但我只需要包含第一列,即“UserId”:
两个数据源中的记录数相同。我在 Windows 操作系统上使用独立的 pyspark 版本。最终结果应该是一个包含三列的新数据框:UserId、Name、Age。
有什么建议吗?
【问题讨论】:
【参考方案1】:我使用 pandas 来完成这项工作。它允许以多种不同的方式连接数据帧。
1) 我们需要首先只导入那个额外的列(在我们删除标题之后,虽然这也可以在导入之后完成)并将其转换为 RDD
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
userid_rdd = sc.textFile("C:……/userid.csv").map(lambda line: line.split(","))
2) 将 'userid' RDD 转换为 spark 数据帧
userid_df = userid_rdd.toDF(['userid'])
userid_df.show()
3) 将 'userid' 数据框转换为 pandas 数据框
userid_toPandas = userid_df.toPandas()
userid_toPandas
4) 将“预测”数据帧(现有数据帧)转换为 pandas 数据帧
predictions_toPandas = predictions.toPandas()
predictions_toPandas
5) 使用“concat”将两个 pandas 数据帧合并为一个新的数据帧
import pandas as pd
result = pd.concat([userid_toPandas, predictions_toPandas], axis = 1, ignore_index = True)
result
【讨论】:
【参考方案2】:您可以通过连接两个数据框来完成此操作,但为此您需要在展位表中包含 id 或其他键。如果行的位置相同,我建议将其复制到 Excel 文件中,否则您没有足够的信息来合并它们。
【讨论】:
【参考方案3】:您可以从 csv 创建一个新的数据框。
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
# Import the csv file to the SparkSQL table.
df = sqlContext.read.csv("abc.csv")
df.createOrReplaceTempView(table_a)
# Create a new dataframe with only the columns required. In your case only user id
df_1 = spark.sql("select userid from table_a")
#Now do a join with the existing dataframe which has the original data. ( [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] )
# Lets call the original alice-bob dataframe as df_ori. So,
df_result = df_ori.join(df_1, how=inner, on= (any column cols if there are any or index row)
【讨论】:
以上是关于从 csv 文件将数据添加到现有的 apache spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 fast-csv npm 将新行或新行的数据(新行)附加到现有的 csv 文件
通过 Spark 将 csv 文件加载到现有的 HIVE 故事中
使用 pandas 将不同位置的行附加到现有的 csv 文件