在 Python 中即时从 CSV 生成镶木地板
Posted
技术标签:
【中文标题】在 Python 中即时从 CSV 生成镶木地板【英文标题】:Generate parquet from CSV on the fly in Python 【发布时间】:2019-09-12 16:08:24 【问题描述】:我在 S3 存储桶上以 CSV 格式存储了多个非常大的数据集。我需要将这些 CSV 转换为 Apache Parquet 文件。
我没有(也不想要)任何 Spark 集群,如果我错了,请纠正我,但在我看来,pyspark
没有任何帮助。
基本上,从逐行流式传输 CSV 的迭代器中,我想根据模式生成 Parquet 文件。
据我了解,pyarrow
不能在输入中使用迭代器。
有人有办法解决它吗?
任何帮助表示赞赏!
【问题讨论】:
【参考方案1】:Spark 可以在本地运行,例如通过安装 pip install pyspark
并在本地 jupyter notebook 中运行代码。
spark = SparkSession.builder\
.master("local[*]").appName("csvConverter")\
.config("spark.sql.shuffle.partitions", 8)\
.getOrCreate()
以上初始化将设置 spark 以本地模式运行并使用尽可能多的内核。
通过本地运行的 spark 会话,您可以使用 spark 的并行处理功能轻松加载所有 csv - 应该比从文件中逐行流式传输它们快得多。
df = spark.read.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.schema(table_schema) \
.load("./data/*.csv")
有用于从 S3 读取数据的库。
要保存加载的数据,您可以轻松地将其写入镶木地板
df.write.parquet("output.parquet")
差不多就这些了。
如果您不想自己创建集群,请查看Databricks。在 Databricks 中,创建集群几乎是一键式练习,您可以在 Azure 或 AWS 上使用它们(Azure 提供 200 美元 free trial 供您使用)。最重要的是,Databricks 提供了一个免费的社区版,带有 6GB 内存集群- you can learn more here
【讨论】:
感谢您的帮助。问题是,csv
文件已经存储在 S3 存储桶中,我必须大约每月一次将它们转换为 parquet
。我只能运行一个 Docker 容器,恕我直言,在单个容器中启动 spark
集群可能不是一个好主意......我正在寻找一种非火花方式来做到这一点
我不明白为什么将它们存储在 S3 上是一个问题 - spark 可以轻松读取它们。如果您真的不想使用 spark,您可以使用常规 pandas 或几乎任何用于 s3 和 csv / parquets 的库。请记住,如果它们实际上非常大,在单个节点上运行 spark 仍然是有益的,如本文所述:databricks.com/blog/2018/05/03/…
@Daniel 看来问题是我们的朋友没有spark
集群设置,因此单个节点上的arrrow
是替代方案。那篇文章似乎更多地是关于基准测试和与pandas_udf
的集成。将spark
用于单机分析没有多大意义,因为它只是在浪费金钱,让集群一直只使用一台机器。总体而言,spark
是这里处理大批量数据的最佳框架,但是如果集群不可用,多线程 pyarrow
进程也可以工作,但随着时间的推移 imo 无法扩展。
@thePurplePython 请记住,Spark 有一个本地模式,可以在单台机器上完全开箱即用。如果您从 Spark 开始,您可以轻松扩展,并且您拥有来自整个生态系统的所有库和集成,其中包括在某些场景中使用 pyarrow
。 pyarrow
会让您处理大于可用内存的数据吗?
@Daniel Well local mode
推荐用于测试、桌面开发和非生产工作。此外,pyarrow
必须安装在集群中的每个节点上,才能与spark
集成一起使用。启动SparkSession
并且只使用集群中的一台机器来处理大数据用例是没有意义的。如果带有pyarrow
的单台机器具有高效的 RAM 和 CPU 量并且不是在谈论 TB 的数据,那么我希望迭代小批量过程能够很好地执行,因为它听起来就像只是将 csv 转换为 parquet 并写入在这个帖子场景中到s3
。【参考方案2】:
这是一个读取 csv 数据然后将其写入 parquet 表的代码 sn-p 示例。这是pandas
单机方法,不使用通过spark
的分布式方法。希望这会有所帮助。
本文为您的迭代批处理用例提供了一些很好的示例:
https://wesmckinney.com/blog/arrow-streaming-columnar/
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
PATH = "s3://..."
df = pd.read_csv(PATH, sep="\t")
df.head()
c1 c2 c3
0 1 2 3
1 4 5 6
2 7 8 9
parquet_table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(parquet_table, './df.parquet') # target path
! ls ./df.parquet
./df.parquet # list of parquet files
pandas_table = pq.read_table('./df.parquet') # confirm write worked
pandas_table.to_pandas() # data
c1 c2 c3
0 1 2 3
1 4 5 6
2 7 8 9
【讨论】:
以上是关于在 Python 中即时从 CSV 生成镶木地板的主要内容,如果未能解决你的问题,请参考以下文章