如何使用 Apache Beam Python 将输出写入动态路径

Posted

技术标签:

【中文标题】如何使用 Apache Beam Python 将输出写入动态路径【英文标题】:How to write output to a dynamic path using Apache Beam Python 【发布时间】:2020-12-16 07:23:37 【问题描述】:

我是 apache 梁的新手。我的场景如下,

我有多个 json 格式的事件。在每个事件中,event_time 列表示该事件的创建时间,我正在使用 event_time 计算它们的创建日期。 我想将这些事件分别写在他们的日期分区下。我的代码是这样的

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time


class EventFormatter(beam.DoFn):

  def process(self, element, *args, **kwargs):
    tmp_dict = 
    for i in range(len(element['properties'])):
        tmp_dict['messageid'] = element['messageid']
        tmp_dict['userid'] = element['userid']
        tmp_dict['event_time'] = element['event_time']
        tmp_dict['productid'] = element['properties'][i]['productid']

        yield tmp_dict


class DateParser(beam.DoFn):

    def process(self, element, *args, **kwargs):
        key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
        print(key, element)
        yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)


with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          "messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": ["productid": "product-173"],
          "messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": ["productid": "product-143","productid": "product-144"]
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )


  output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date=/'.format(....))

我无法找到我应该如何完成格式块。当我运行代码打印结果时,它给出了

('2020-08-27', 'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1')
('2020-08-25', 'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686')
('2020-08-25', 'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686')

例如。我想在 date=2020-08-25 文件夹下写 2 个事件,另外一个 date=2020-08-27。

在一天结束的时候,我想把每个事件写在他们的创建日期文件夹下。

我该怎么做?

感谢您的帮助,

奥古兹。

【问题讨论】:

【参考方案1】:

在您的代码中,您使用了多个输出。 这是为了将一个 DoFn (a ParDo) 的输出连接到另一个 DoFn,这对于整个管道来说是静态

如果你想根据你所拥有的内容将数据转储到不同的文件中,你必须实现一个DoFn只是为了写入。

类似这样的:

class WriteByKey(apache_beam.DoFn):
   def process(self, kv):
        key, value = kv
        with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/key.extension', 'a') as fp:
            fp.write(value)

     

您应该更改 DataParser DoFn 以生成元组(日期、值)而不是 TaggedOut,并将管道更改为如下所示:

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          "messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": ["productid": "product-173"],
          "messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": ["productid": "product-143","productid": "product-144"]
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser()) | beam.ParDo(WriteByKey())
  )

【讨论】:

感谢您的回答 Iñigo,但我对您的解决方案有疑问。在 WriteByKey 类中,beam.io.gcp.gcsio.GcsIO().open 中没有附加模式,因此我在写入时覆盖了我的事件。我可以为每个事件生成多个文件,但这是非生产性的方法。你能建议另一种解决方案来处理这个问题吗? 是的,它会抛出一个异常 id thw 文件已经存在。最好的选择是在管道中添加另一个步骤 beam.GroupByKey() see example【参考方案2】:

具体来说,要为每个键写几个元素,你可以这样做

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/key.extension', 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | ...
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )
  output = events | beam.GroupByKey() | beam.ParDo(WriteByKey())

请注意,跑步者可能需要在失败时重试元素,因此与其直接写入输出,更安全的方法是写入临时文件,然后在成功时自动重命名,例如

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         nonce = random.randint(1, 1e9)
         path = f'gs://bucket/path/key.extension'
         temp_path = f'path-nonce'
         with beam.io.gcp.gcsio.GcsIO().open(temp_path, 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')
         beam.io.gcp.gcsio.GcsIO().rename(temp_path, path)

【讨论】:

以上是关于如何使用 Apache Beam Python 将输出写入动态路径的主要内容,如果未能解决你的问题,请参考以下文章

Python 上的 Apache Beam 将 beam.Map 调用相乘

如何从 PCollection Apache Beam Python 创建 N 个元素组

使用Apache-beam在Python中删除字典中的第一项[重复]

Apache Beam 数据流 BigQuery

如何运行 Apache Beam 集成测试?

我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?