使用python将csv转换为镶木地板文件

Posted

技术标签:

【中文标题】使用python将csv转换为镶木地板文件【英文标题】:Convert csv to parquet file using python 【发布时间】:2018-11-09 06:38:06 【问题描述】:

我正在尝试将 .csv 文件转换为 .parquet 文件。 csv 文件 (Temp.csv) 具有以下格式

1,Jon,Doe,Denver

我正在使用以下python代码将其转换为镶木地板

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os

if __name__ == "__main__":
    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType([
            StructField("col1", IntegerType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True)])
    dirname = os.path.dirname(os.path.abspath(__file__))
    csvfilename = os.path.join(dirname,'Temp.csv')    
    rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
    df = sqlContext.createDataFrame(rdd, schema)
    parquetfilename = os.path.join(dirname,'output.parquet')    
    df.write.mode('overwrite').parquet(parquetfilename)

结果只是一个名为 output.parquet 的文件夹,而不是我正在寻找的 parquet 文件,随后控制台上出现以下错误。

我也尝试运行以下代码来面对类似的问题。

from pyspark.sql import SparkSession
import os

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')    
df = spark.read.csv(csvfilename)

# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')    
df.write.mode('overwrite').parquet(parquetfilename)

如何做到最好?使用windows,python 2.7。

【问题讨论】:

Similar question? @lwileczek 这是一个不同的问题,因为链接的问题明确要求 Spark,这只是一般使用 Python。 【参考方案1】:

使用包 pyarrowpandas,您可以将 CSV 转换为 Parquet,而无需在后台使用 JVM:

import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

您将运行的一个限制是pyarrow 仅适用于 Windows 上的 Python 3.5+。使用 Linux/OSX 将代码作为 Python 2 运行,或者将您的 Windows 设置升级到 Python 3.6。

【讨论】:

感谢您的回答。没有办法在 Windows 上使用 Python 2.7 吗? 这是一种将单个文件转换为 parquet 文件的非常简单的方法,但是如果我们有多个 csv 文件,并且想将其 parquet 为单个 parquet 文件怎么办? @Zombraz 您可以遍历文件并将每个文件转换为镶木地板,如果您正在寻找 python 之外的任何东西,AWS EMR 上的 hive 非常适合将 csv 转换为镶木地板 @Zombraz - 您可以使用 Dask 或 PySpark 将多个 CSV 文件转换为单个 Parquet 文件(或多个 Parquet 文件)。有关详细信息,请参阅我的答案。【参考方案2】:

您可以仅使用 pyarrow 将 csv 转换为镶木地板 - 无需 pandas。 当您需要最小化代码依赖关系(例如使用 AWS Lambda)时,它可能会很有用。

import pyarrow.csv as pv
import pyarrow.parquet as pq

table = pv.read_csv(filename)
pq.write_table(table, filename.replace('csv', 'parquet'))

请参阅 pyarrow 文档以微调 read_csvwrite_table 函数。

【讨论】:

【参考方案3】:
import boto3
import pandas as pd
import pyarrow as pa
from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = boto3.client('s3',region_name='us-east-2')
obj = s3.get_object(Bucket='ssiworkoutput', Key='file_Folder/File_Name.csv')
df = pd.read_csv(obj['Body'])

table = pa.Table.from_pandas(df)

output_file = "s3://ssiworkoutput/file/output.parquet"  # S3 Path need to mention
s3 = S3FileSystem()

pq.write_to_dataset(table=table,
                    root_path=output_file,partition_cols=['Year','Month'],
                    filesystem=s3)

print("File converted from CSV to parquet completed")

【讨论】:

这是从 AWS S3 路径读取 CSV 文件的代码,以 Parquet 格式存储它,并在 AWS S3 路径中分区。 确保运行以下命令,pip3 install boto3 pip3 install pandas pip3 install pyarrow pip3 install fs-s3fs pip3 install s3fs 你是如何在 aws 上安装 pyarrow 而没有包大小问题的? @Haha 最简单的方法是使用已经包含 pyarrow 的awswrangler 层【参考方案4】:

有几种不同的方法可以使用 Python 将 CSV 文件转换为 Parquet。

Uwe L. Korn 的 Pandas 方法效果很好。

如果您想将多个 CSV 文件转换为多个 Parquet / 单个 Parquet 文件,请使用 Dask。这会将多个 CSV 文件转换为两个 Parquet 文件:

import dask.dataframe as dd

df = dd.read_csv('./data/people/*.csv')
df = df.repartition(npartitions=4)
df.to_parquet('./tmp/people_parquet4')

如果您只想输出一个 Parquet 文件,也可以使用 df.repartition(npartitions=1)。有关使用 Dask [此处][1] 将 CSV 转换为 Parquet 的更多信息。

这是一个在 Spark 环境中工作的 PySpark sn-p:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .getOrCreate()

df = spark.read.csv('data/us_presidents.csv', header = True)
df.repartition(1).write.mode('overwrite').parquet('tmp/pyspark_us_presidents')

您也可以在 Spark 环境中使用Koalas:

import databricks.koalas as ks

df = ks.read_csv('data/us_presidents.csv')
df.to_parquet('tmp/koala_us_presidents')

【讨论】:

【参考方案5】:

您可以使用 spark 编写为 PARQUET FILE:

spark = SparkSession.builder.appName("Test_Parquet").master("local[*]").getOrCreate()

parquetDF = spark.read.csv("data.csv")

parquetDF.coalesce(1).write.mode("overwrite").parquet("Parquet")

希望对你有帮助

【讨论】:

【参考方案6】:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')

【讨论】:

请添加一些解释为什么这回答了这个问题。 使用 pyspark 将 csv 转换为镶木地板,这对我有用,希望对您有所帮助 这种方法有效,但比使用 spark csv reader 慢几倍

以上是关于使用python将csv转换为镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章

转换为镶木地板的 csv 文件将“e0”添加到值的末尾

Hive/Bigsql Pandas 将浮点数转换为整数,使用 pyarrow 将空值转换为镶木地板文件

将 avro 转换为镶木地板(也许使用 hive?)

如果 csv 列标题包含空格,则在 spark 中将 csv 转换为 parquet 会出错

Spark SQL Java GenericRowWithSchema无法强制转换为java.lang.String

使用胶水保存为镶木地板文件时会修改数据帧标头