apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

Posted

技术标签:

【中文标题】apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python【英文标题】:apache-beam reading multiple files from multiple folders of GCS buckets and load it biquery python 【发布时间】:2020-12-23 13:12:14 【问题描述】:

我想每小时设置一个管道来解析 GCS 存储桶不同文件夹中的 2000 个原始 protobuf 格式文件,并将数据加载到大查询中。到目前为止,我能够成功解析 proto 数据。

我知道读取文件夹中所有文件的通配符方法,但我现在不想这样做,因为我有来自不同文件夹的数据,我想像并行一样更快地运行它,而不是按顺序运行

如下图

for x,filename enumerate(file_separted_comma):
    --read data from prto
    --load data to bigquery 

现在我想知道以下方法是否是从 apache 梁中的不同文件夹解析多个文件并将数据加载到大查询中的最佳或推荐方法。

还有一件事,从proto解析后的每条记录,我都把它变成JSON记录来加载到大查询中,不知道这也是将数据加载到大查询而不是直接加载反序列化的好方法(解析)原始数据。

我正在从 Hadoop 作业转移到数据流,以通过设置此管道来降低成本。

我是 apache-beam 的新手,不知道什么是优缺点,因此有人可以看看代码并在这里帮助我制定更好的生产方法

import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems


def get_deserialized_log(serialized_log):
    log = rtbtracker_log_pb2.RtbTrackerLogProto()
    log.ParseFromString(serialized_log)
    return log


def print_row(message):
    message=message[3]
    message = message.replace('_', '/');
    message = message.replace('*', '=');
    message = message.replace('-', '+');
    #finalbunary=base64.b64decode(message.decode('UTF-8'))
    finalbunary=base64.b64decode(message)
    msg=get_deserialized_log(finalbunary)

    jsonObj = MessageToDict(msg)
    #jsonObj = MessageToJson(msg)
    return jsonObj

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line



def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    app_args, pipeline_args = parser. parse_known_args()

    with beam.Pipeline(options=PipelineOptions()) as p:
        input_list=app_args.input
        file_list = input_list.split(",")
        res_list = ["/home/file_-00000-of-00001.json".format(i) for i in range(len(file_list))]

        for i,file in enumerate(file_list):
            onesec=p | "Read Text ".format(i) >> beam.io.textio.ReadFromText(file)
            parsingProtoFile=onesec | 'Parse file'.format(i) >> beam.Map(parse_file)
            printFileConetent=parsingProtoFile | 'Print output '.format(i) >>beam.Map(print_row)
        
            #i want to load to bigquery here
            ##LOAD DATA TO BIGQUERY

            #secondsec=printFileConetent | "Write TExt ".format(i) >> ##beam.io.WriteToText("/home/file_".format(i),file_name_suffix=".json", 
###num_shards=1 , 
##append_trailing_newlines = True)
        

if __name__ == '__main__':
    run()

在本地运行下面的代码

python3 another_main.py --input=tracker_one.gz,tracker_two.gz

我没有提到输出路径,因为我不想将数据保存到 gcs,因为我将把它加载到 bigquery 中

就像下面在 dataflowrunner 中运行的一样

python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing

注意到两个作业将针对同一个作业名称中的单个输入文件运行,并且不知道为什么会发生这种情况以及相同 的 PFA 屏幕截图

【问题讨论】:

【参考方案1】:

这种读取文件的方法很好(只要输入文件的数量不太大)。但是,如果您可以将要读取的文件集表示为通配符表达式(可以匹配多个文件夹),则性能可能会更好,并且 Dataflow 将并行读取与该模式匹配的所有文件。

要写入 BigQuery,最好使用 built-in BigQuery sink。默认行为是以 JSON 格式创建临时文件,然后将其加载到 BigQuery 中,但您也可以改用 Avro,这样会更有效。您还可以使用 Flatten 将所有输入合并到一个 PCollection 中,这样您的管道中只需要一个 BigQuery 接收器。

【讨论】:

嗨@danieim,我将从表中获取要处理的文件列表,并且我不知道如何使用通配符匹配它们,因为这将是动态的。输入 gcs bucker 将像这样 gs://bucket/YYYY/MM/DD/HH/MIN/,MIN 将为 00 或 30,因为来自组件的数据将被拆分为两个半小时文件夹 你也可以指导我为什么两个工作运行相同的工作名称 对于一组动态文件,您可以使用github.com/apache/beam/blob/master/sdks/python/apache_beam/io/… 转换,它将文件名的 PCollection 作为其输入。对于多个工作的问题:很可能您在代码中的某处对 p.run() 进行了两次调用 是的,你是对的,我是通过 ReadAllFromText 完成的,现在工作正常,对于多个工作,两次调用 p.run() 是原因,现在也已修复并且工作正常。非常感谢

以上是关于apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python的主要内容,如果未能解决你的问题,请参考以下文章

模拟访问公共 GCS 存储桶的结果

将数据从 gs 存储桶移动到 s3 亚马逊存储桶的 GCP dataproc 集群 hadoop 作业失败 [控制台]

将多个 .gz 文件从一个 GCS 存储桶复制到 Java 中的另一个存储桶

数据流:storage.Client() 导入错误或如何列出 GCP 存储桶的所有 blob

所有存储桶的 Firebase 存储安全规则

如何使用多个工作人员加快批量导入谷歌云数据存储的速度?