在Dataflow作业中查找重复的数据 - Python

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Dataflow作业中查找重复的数据 - Python相关的知识,希望对你有一定的参考价值。

我想用这个例子创建一个工作流。

https:/github.comGoogleCloudPlatformprofessional-serviceestreemasterexamplescloud-composer-examplescomposer_dataflow_examples。

我想做完全相同的事情,我已经创建了所有的脚本,但我需要修改一下Dataflow工作,以检查我想摄入Bigquery的CSV中是否有任何重复的值。

这是Dataflow的代码。

    """dataflow_job.py is a Dataflow pipeline which reads a delimited file,
    adds some additional metadata fields and loads the contents to a BigQuery table."""


    import argparse
    import logging
    import ntpath
    import re

    import apache_beam as beam
    from apache_beam.options import pipeline_options


    class RowTransformer(object):
        """A helper class that contains utility methods to parse the delimited file
        and convert every record into a format acceptable to BigQuery. It also contains
        utility methods to add a load_dt and a filename fields as demonstration of
        how records can be enriched as part of the load process."""

        def __init__(self, delimiter, header, filename, load_dt):
            self.delimiter = delimiter

            # Extract the field name keys from the comma separated input.
            self.keys = re.split(',', header)

            self.filename = filename
            self.load_dt = load_dt

        def parse(self, row):
            """This method translates a single delimited record into a dictionary
            which can be loaded into BigQuery. It also adds filename and load_dt
            fields to the dictionary."""

            # Strip out the return characters and quote characters.
            values = re.split(self.delimiter, re.sub(r'[
"]', '', row))

            row = dict(list(zip(self.keys, values)))

            # Add an additional filename field.
            row['filename'] = self.filename

            # Add an additional load_dt field.
            row['load_dt'] = self.load_dt

            return row


    def run(argv=None):
        """The main function which creates the pipeline and runs it."""
        parser = argparse.ArgumentParser()

        # Add the arguments needed for this specific Dataflow job.
        parser.add_argument(
            '--input', dest='input', required=True,
            help='Input file to read.  This can be a local file or '
                'a file in a Google Storage Bucket.')

        parser.add_argument('--output', dest='output', required=True,
                            help='Output BQ table to write results to.')

        parser.add_argument('--delimiter', dest='delimiter', required=False,
                            help='Delimiter to split input records.',
                            default=',')

        parser.add_argument('--fields', dest='fields', required=True,
                            help='Comma separated list of field names.')

        parser.add_argument('--load_dt', dest='load_dt', required=True,
                            help='Load date in YYYY-MM-DD format.')

        known_args, pipeline_args = parser.parse_known_args(argv)
        row_transformer = RowTransformer(delimiter=known_args.delimiter,
                                        header=known_args.fields,
                                        filename=ntpath.basename(known_args.input),
                                        load_dt=known_args.load_dt)

        p_opts = pipeline_options.PipelineOptions(pipeline_args)

        # Initiate the pipeline using the pipeline arguments passed in from the
        # command line.  This includes information including where Dataflow should
        # store temp files, and what the project id is.
        with beam.Pipeline(options=p_opts) as pipeline:
            # Read the file.  This is the source of the pipeline.  All further
            # processing starts with lines read from the file.  We use the input
            # argument from the command line.
            rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input)

            # This stage of the pipeline translates from a delimited single row
            # input to a dictionary object consumable by BigQuery.
            # It refers to a function we have written.  This function will
            # be run in parallel on different workers using input from the
            # previous stage of the pipeline.
            dict_records = rows | "Convert to BigQuery row" >> beam.Map(
                lambda r: row_transformer.parse(r))

            # This stage of the pipeline writes the dictionary records into
            # an existing BigQuery table. The sink is also configured to truncate
            # the table if it contains any existing records.
            dict_records | "Write to BigQuery" >> beam.io.Write(
                beam.io.BigQuerySink(known_args.output,
                                    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
        run()

我在Apache beam方面的经验很有限 我怎么能增加一个函数来检查我的Rowtransformer创建的字典中是否有重复的值?

答案

你可以使用 Distinct 函数来删除重复的内容。详细内容可参见 此处

rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input)
                | "Remove Duplicates"   >> beam.Distinct()

dict_records = rows | "Convert to BigQuery row" >> beam.Map(
                lambda r: row_transformer.parse(r))

...
另一答案

Beam提供了一个和其他方法一样的drop复制方法(Sparkpandas.)。方法链接

所以你可以做 。

rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input)
                | "Remove Duplicates"   >> beam.RemoveDuplicates()

以上是关于在Dataflow作业中查找重复的数据 - Python的主要内容,如果未能解决你的问题,请参考以下文章

确定导致 Google Dataflow 作业失败的特定输入数据

使用 DataFlow 作业加载分区表

我可以将 Dataflow 作业配置为单线程吗?

在 Dataflow 中创建作业时出错(当前用户不能充当服务帐户)

TableRow 对象未在 dataFlow 作业中返回记录类型列

包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项