如何将火花数据输出到具有单独列的 csv 文件?

Posted

技术标签:

【中文标题】如何将火花数据输出到具有单独列的 csv 文件?【英文标题】:How to output spark data to a csv file with separate columns? 【发布时间】:2016-09-01 11:40:17 【问题描述】:

我的代码 1st 使用正则表达式提取数据并将该数据写入文本文件(字符串格式)。 然后我尝试从文本文件中的内容创建一个数据框,以便我可以拥有导致错误的单独列。 (将其写入 csv 文件会将整个内容写入一列)。

with open("C:\\Sample logs\\dataframe.txt",'a') as f:
    f.write(str(time))
    f.write(" ")
    f.write(qtype)
    f.write(" ")
    f.write(rtype)
    f.write(" ")
    f.write(domain)
    f.write("\n")
 new = sc.textFile("C:\\Sample logs\\dataframe.txt").cache() # cause df requires an rdd
 lines1 = new.map(lambda x: (x, ))
 df = sqlContext.createDataFrame(lines1)

但我收到以下错误:

TypeError: Can not infer schema for type: type 'unicode'

我尝试了其他一些方法,但没有帮助。我要做的就是在执行写操作后,我想创建一个具有单独列的数据框,以便使用 groupBy()。

文本文件中的输入:

1472128348.0 HTTP - tr.vwt.gsf.asfh
1472237494.63 HTTP - tr.sdf.sff.sdfg
1473297794.26 HTTP - tr.asfr.gdfg.sdf
1474589345.0 HTTP - tr.sdgf.gdfg.gdfg
1472038475.0 HTTP - tr.sdf.csgn.sdf

csv 格式的预期输出:

与上面相同,但分成几列,以便我可以执行 分组操作。

【问题讨论】:

你能不能lines1.take(1) 请举例说明输入数据和预期的数据框结构 saveAsTextFile 类也有 DataFrame 方法吗?你的 spark 版本是什么? @AlbertoBonsanto:它返回输入的第一行 @Yaron:我已将其添加到问题中 【参考方案1】:

为了将“空格分隔的单词”替换为您需要替换的单词列表:

lines1 = new.map(lambda x: (x, ))

 lines1 = new.map(lambda line: line.split(' '))

我在我的机器上试了一下,在执行以下操作后

df = sqlContext.createDataFrame(lines1)

创建了一个新的 DF:

df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)

df.show()
+-------------+----+---+-----------------+
|           _1|  _2| _3|               _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

你可以执行groupBy:

>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>> 

为了使用架构,您首先需要定义它: 见:https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

可以在下面找到架构示例(您需要添加字段、更新名称、键入以便将其应用于您的案例)

from pyspark.sql.types import *
schema = StructType([
    StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True),
    StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

之后,您将能够使用架构运行它:

df = sqlContext.createDataFrame(lines1,schema)

现在,您将拥有字段的名称:

df.show()
+-------------+----+---+-----------------+
|           F1|  F2| F3|               F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

要将其保存为 CSV,您需要使用 "to_pandas()" 和 "to_csv()" (python pandas的一部分)

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html

df.toPandas().to_csv('mycsv.csv')

csv 文件的内容:

cat mycsv.csv

,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf

请注意,您可以使用“.cast()”转换列,例如将 F1 转换为 float 类型 - 添加一个 float 类型的新列,并删除旧列)

df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")

【讨论】:

非常感谢!这行得通!顺便说一句,当我使用架构时,对于 StructField "FloatType" 只写入空值。 StringType 没有问题。但是你知道为什么 FloatType 将 null 复制到我的 excel 表吗? 当我更新架构以在 F1 上使用“FloatType”时,出现以下错误:“TypeError:FloatType 无法接受类型为 的对象”。我不确定为什么它被识别为 unicode。请参阅我关于“.cast()”的回答中的更新 当我进行铸造时,没有添加新列,但删除了旧的 F1。 尝试以下操作:在转换之前:df.show(),转换:df = df.withColumn("F1float", df["F1"].cast("float")) ,之后演员表:df.show() - 让我知道是否添加了新列 嗨..我试过了。现在添加了一个新列。但是新列只有 1 个重复值,这与 F1 不同,F1 具有多种值。似乎发生了一些转换,并且只添加了一个值(F1 中不存在)。

以上是关于如何将火花数据输出到具有单独列的 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

以递归方式附加到 .csv 文件

如何用python把多个csv文件数据处理后汇总到新csv文件

如何将火花日志文件转换为一个 CSV 文件

求Python大神指导,一个csv文件,把其中每一列的数据提取出来单独保存为一个csv文件

如何将具有 1 列文件的 CSV 导入具有 2 列的 DB 表?