将 PANDAS 与 Apache Beam 一起使用
Posted
技术标签:
【中文标题】将 PANDAS 与 Apache Beam 一起使用【英文标题】:Using PANDAS with Apache Beam 【发布时间】:2020-01-17 15:01:10 【问题描述】:我是 Apache Beam 的新手,刚开始使用 Python SDK 进行开发。 关于 Apache Beam,我知道高级别的 Pipelines、Pcollections、Ptransforms、ParDo 和 DoFn。
在我当前的项目管道中,已经使用 PANDAS 实现了使用下面提到的语法来读取、转换和写入文件
我想了解这是否是 Apache Beam 的正确实现,因为我们仅使用 PANDAS 直接读取和写入文件,而不是逐个元素地处理文件。
步骤:
-
创建管道
创建输入文件路径的 pcollection
调用DoFn并传递文件路径
使用 PANDAS 在 DoFn 中执行所有操作(读取、转换和写入)。
示例高级代码:
import **required libraries
class ActionClass(beam.DoFn):
def process(self, file_path):
#reading file using PANDAS into dataframe
df = pandas.read_csv('file_path')
# do some transformation using pandas
#write dataframe to output file from inside DoFn only.
return
def run():
p = beam.Pipeline(options=options)
input = p | beam.io.ReadFromText('input_file_path') --reading only file path
output = input | 'PTransform' | beam.ParDo(ActionClass)
【问题讨论】:
【参考方案1】:在我看来,如果您想要使用 pandas 处理大量小型 CSV 文件,那么这可能是 Apache Beam 的有效用例。
谢谢
【讨论】:
【参考方案2】:我的看法是你没有使用光束的力量。
因为您的解决方案没有采用光束真正有用的并行过程。
我建议您使用 ReadFromText 阅读 CSV 并使用 Map 或 ParDo 对数据进行转换 在这种情况下,Beam 将读取 CSV,并可以通过您执行转换的不同工作人员分发数据。
现在根据您的尝试,您可以直接在 Beam 上使用数据框 https://beam.apache.org/documentation/dsls/dataframes/overview/
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
agg.to_csv('output')
【讨论】:
以上是关于将 PANDAS 与 Apache Beam 一起使用的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam DirectRunner 与“正常”并行进程