使用数据流的 GCS 文件流式传输(apachebeam python)

Posted

技术标签:

【中文标题】使用数据流的 GCS 文件流式传输(apachebeam python)【英文标题】:GCS file streaming using dataflow(apachebeam python) 【发布时间】:2019-07-29 10:06:45 【问题描述】:

我有一个 GCS,我每分钟都会在其中获取文件。我使用 apache beam python sdk 创建了一个流式数据流。我为输入 gcs 存储桶和输出 gcs 存储桶创建了 pub/sub 主题。我的数据流正在流式传输,但我的输出是未存储在输出存储桶中。这是我的以下代码,

from __future__ import absolute_import

    import os
    import logging
    import argparse
    from google.cloud import language
    from google.cloud.language import enums
    from google.cloud.language import types
    from datetime import datetime
    import apache_beam as beam 
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import StandardOptions
    from apache_beam.io.textio import ReadFromText, WriteToText

    #dataflow_options = ['--project=****','--job_name=*****','--temp_location=gs://*****','--setup_file=./setup.py']
    #dataflow_options.append('--staging_location=gs://*****')
    #dataflow_options.append('--requirements_file ./requirements.txt')
    #options=PipelineOptions(dataflow_options)
    #gcloud_options=options.view_as(GoogleCloudOptions)


    # Dataflow runner
    #options.view_as(StandardOptions).runner = 'DataflowRunner'
    #options.view_as(SetupOptions).save_main_session = True

    def run(argv=None):
        """Build and run the pipeline."""
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--output_topic', required=True,
            help=('Output PubSub topic of the form '
                '"projects/***********".'))
        group = parser.add_mutually_exclusive_group(required=True)
        group.add_argument(
            '--input_topic',
            help=('Input PubSub topic of the form '
                '"projects/************".'))
        group.add_argument(
            '--input_subscription',
            help=('Input PubSub subscription of the form '
                '"projects/***********."'))
        known_args, pipeline_args = parser.parse_known_args(argv)

      # We use the save_main_session option because one or more DoFn's in this
      # workflow rely on global context (e.g., a module imported at module level).
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True
        pipeline_options.view_as(StandardOptions).streaming = True
        p = beam.Pipeline(options=pipeline_options)


        # Read from PubSub into a PCollection.
        if known_args.input_subscription:
            messages = (p
                        | beam.io.ReadFromPubSub(
                            subscription=known_args.input_subscription)
                        .with_output_types(bytes))
        else:
            messages = (p
                        | beam.io.ReadFromPubSub(topic=known_args.input_topic)
                        .with_output_types(bytes))

        lines = messages | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))

        class Split(beam.DoFn):
            def process(self,element):
                element = element.rstrip("\n").encode('utf-8')
                text = element.split(',') 
                result = []
                for i in range(len(text)):
                    dat = text[i]
                    #print(dat)
                    client = language.LanguageServiceClient()
                    document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
                    sent_analysis = client.analyze_sentiment(document=document)
                    sentiment = sent_analysis.document_sentiment
                    data = [
                    (dat,sentiment.score)
                    ] 
                    result.append(data)
                return result

        class WriteToCSV(beam.DoFn):
            def process(self, element):
                return [
                    ",".format(
                        element[0][0],
                        element[0][1]
                    )
                ]

        Transform = (lines
                    | 'split' >> beam.ParDo(Split())
                    | beam.io.WriteToPubSub(known_args.output_topic)
        )
        result = p.run()
        result.wait_until_finish()

    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()

我做错了什么请有人给我解释一下。

【问题讨论】:

【参考方案1】:

WriteToPubSub 将数据写入 PubSub 主题,而不是 GCS 存储桶。您想要做的可能是使用 WriteToText,或使用 apache_beam.io.filesystems 将数据写入存储桶的 DoFn。

额外注意的是,您的 WriteToCsv 转换似乎没有在任何地方使用。

【讨论】:

感谢您的反馈,但我在想的是我创建了一个桶的主题,我在其中获取传入的文件。所以当我使用 ReadFromPubSub 时它到底做了什么?输出是文件名桶?如果是,那么我可以使用 pubsub 的输出并将输入作为“gs://bucketname/outputof pubsub”吗?或者 readfrompubsub 直接一个一个地流式传输新文件,我不需要提供任何输入文件名?请帮助先生 1) 我使用apache_beam.io.WriteToText 将流数据(来自 ReadFromPubSub)写入 GCS.. 但流消息只保留在临时文件夹中(在目标存储桶位置内)。直到我排干管道,然后我才能看到实际数据出现在所需目的地的分片数量。有任何已知问题吗? 2)另外我想澄清一下,它只是写入GCS的窗口流吗?如果我要将每个已发布的消息流(非窗口)写入 GCS,预期的行为是什么?每条消息创建一个文件?

以上是关于使用数据流的 GCS 文件流式传输(apachebeam python)的主要内容,如果未能解决你的问题,请参考以下文章

每 5 分钟上传到 GCS 的文本文件如何将它们上传到 BigQuery?

使用 gsutil 将数据从 aws s3 传输到 gcs - SSL 认证错误

使用 Python 向 Google Cloud Storage 写入流式传输

从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery

BigQuery:如何使用传输 API 将文件从 GCS 加载到现有表?

将数据从 GCS 传输到 Bigquery 表失败