将 PANDAS 与 Apache Beam 一起使用

Posted

技术标签:

【中文标题】将 PANDAS 与 Apache Beam 一起使用【英文标题】:Using PANDAS with Apache Beam 【发布时间】:2020-01-17 15:01:10 【问题描述】:

我是 Apache Beam 的新手,刚开始使用 Python SDK 进行开发。 关于 Apache Beam,我知道高级别的 Pipelines、Pcollections、Ptransforms、ParDo 和 DoFn。

在我当前的项目管道中,已经使用 PANDAS 实现了使用下面提到的语法来读取、转换和写入文件

我想了解这是否是 Apache Beam 的正确实现,因为我们仅使用 PANDAS 直接读取和写入文件,而不是逐个元素地处理文件。

步骤:

    创建管道 创建输入文件路径的 pcollection 调用DoFn并传递文件路径 使用 PANDAS 在 DoFn 中执行所有操作(读取、转换和写入)。

示例高级代码:

import **required libraries

class ActionClass(beam.DoFn):

    def process(self, file_path):
        #reading file using PANDAS into dataframe 
        df = pandas.read_csv('file_path')
        # do some transformation using pandas
        #write dataframe to output file from inside DoFn only.
        return

def run():

    p = beam.Pipeline(options=options)

    input = p | beam.io.ReadFromText('input_file_path') --reading only file path

    output = input | 'PTransform' | beam.ParDo(ActionClass)

【问题讨论】:

【参考方案1】:

在我看来,如果您想要使用 pandas 处理大量小型 CSV 文件,那么这可能是 Apache Beam 的有效用例。

谢谢

【讨论】:

【参考方案2】:

我的看法是你没有使用光束的力量。

因为您的解决方案没有采用光束真正有用的并行过程。

我建议您使用 ReadFromText 阅读 CSV 并使用 Map 或 ParDo 对数据进行转换 在这种情况下,Beam 将读取 CSV,并可以通过您执行转换的不同工作人员分发数据。

现在根据您的尝试,您可以直接在 Beam 上使用数据框 https://beam.apache.org/documentation/dsls/dataframes/overview/

  from apache_beam.dataframe.io import read_csv

with beam.Pipeline() as p:
  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
  agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
  agg.to_csv('output')

【讨论】:

以上是关于将 PANDAS 与 Apache Beam 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam DirectRunner 与“正常”并行进程

Apache Beam:DoFn 与 PTransform

Apache Beam 剖析

Python + Beam + Flink

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误

Python 上的 Apache Beam 将 beam.Map 调用相乘