在 Python 中即时从 CSV 生成镶木地板

Posted

技术标签:

【中文标题】在 Python 中即时从 CSV 生成镶木地板【英文标题】:Generate parquet from CSV on the fly in Python 【发布时间】:2019-09-12 16:08:24 【问题描述】:

我在 S3 存储桶上以 CSV 格式存储了多个非常大的数据集。我需要将这些 CSV 转换为 Apache Parquet 文件。

我没有(也不想要)任何 Spark 集群,如果我错了,请纠正我,但在我看来,pyspark 没有任何帮助。

基本上,从逐行流式传输 CSV 的迭代器中,我想根据模式生成 Parquet 文件。 据我了解,pyarrow 不能在输入中使用迭代器。

有人有办法解决它吗?

任何帮助表示赞赏!

【问题讨论】:

【参考方案1】:

Spark 可以在本地运行,例如通过安装 pip install pyspark 并在本地 jupyter notebook 中运行代码。

spark = SparkSession.builder\
    .master("local[*]").appName("csvConverter")\ 
    .config("spark.sql.shuffle.partitions", 8)\
    .getOrCreate()

以上初始化将设置 spark 以本地模式运行并使用尽可能多的内核。

通过本地运行的 spark 会话,您可以使用 spark 的并行处理功能轻松加载所有 csv - 应该比从文件中逐行流式传输它们快得多。

df = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .schema(table_schema) \
  .load("./data/*.csv")

有用于从 S3 读取数据的库。

要保存加载的数据,您可以轻松地将其写入镶木地板

df.write.parquet("output.parquet")

差不多就这些了。

如果您不想自己创建集群,请查看Databricks。在 Databricks 中,创建集群几乎是一键式练习,您可以在 Azure 或 AWS 上使用它们(Azure 提供 200 美元 free trial 供您使用)。最重要的是,Databricks 提供了一个免费的社区版,带有 6GB 内存集群- you can learn more here

【讨论】:

感谢您的帮助。问题是,csv 文件已经存储在 S3 存储桶中,我必须大约每月一次将它们转换为 parquet。我只能运行一个 Docker 容器,恕我直言,在单个容器中启动 spark 集群可能不是一个好主意......我正在寻找一种非火花方式来做到这一点 我不明白为什么将它们存储在 S3 上是一个问题 - spark 可以轻松读取它们。如果您真的不想使用 spark,您可以使用常规 pandas 或几乎任何用于 s3 和 csv / parquets 的库。请记住,如果它们实际上非常大,在单个节点上运行 spark 仍然是有益的,如本文所述:databricks.com/blog/2018/05/03/… @Daniel 看来问题是我们的朋友没有spark 集群设置,因此单个节点上的arrrow 是替代方案。那篇文章似乎更多地是关于基准测试和与pandas_udf 的集成。将spark 用于单机分析没有多大意义,因为它只是在浪费金钱,让集群一直只使用一台机器。总体而言,spark 是这里处理大批量数据的最佳框架,但是如果集群不可用,多线程 pyarrow 进程也可以工作,但随着时间的推移 imo 无法扩展。 @thePurplePython 请记住,Spark 有一个本地模式,可以在单台机器上完全开箱即用。如果您从 Spark 开始,您可以轻松扩展,并且您拥有来自整个生态系统的所有库和集成,其中包括在某些场景中使用 pyarrowpyarrow 会让您处理大于可用内存的数据吗? @Daniel Well local mode 推荐用于测试、桌面开发和非生产工作。此外,pyarrow 必须安装在集群中的每个节点上,才能与spark 集成一起使用。启动SparkSession 并且只使用集群中的一台机器来处理大数据用例是没有意义的。如果带有pyarrow 的单台机器具有高效的 RAM 和 CPU 量并且不是在谈论 TB 的数据,那么我希望迭代小批量过程能够很好地执行,因为它听起来就像只是将 csv 转换为 parquet 并写入在这个帖子场景中到s3【参考方案2】:

这是一个读取 csv 数据然后将其写入 parquet 表的代码 sn-p 示例。这是pandas 单机方法,不使用通过spark 的分布式方法。希望这会有所帮助。

本文为您的迭代批处理用例提供了一些很好的示例:

https://wesmckinney.com/blog/arrow-streaming-columnar/

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

PATH = "s3://..."

df = pd.read_csv(PATH, sep="\t")

df.head()
c1  c2  c3
0   1   2   3
1   4   5   6
2   7   8   9

parquet_table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(parquet_table, './df.parquet') # target path

! ls ./df.parquet
./df.parquet # list of parquet files

pandas_table = pq.read_table('./df.parquet') # confirm write worked

pandas_table.to_pandas() # data
    c1  c2  c3
0   1   2   3
1   4   5   6
2   7   8   9

【讨论】:

以上是关于在 Python 中即时从 CSV 生成镶木地板的主要内容,如果未能解决你的问题,请参考以下文章

在python中使用s3 select解析多个镶木地板文件?

如何将 csv 文件转换为镶木地板

转换为镶木地板的 csv 文件将“e0”添加到值的末尾

是否可以直接从文件加载镶木地板?

Python:将熊猫数据框保存到镶木地板文件

生成镶木地板文件 - R 和 Python 之间的差异