在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”

Posted

技术标签:

【中文标题】在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”【英文标题】:Error while using WriteToBigquery in python for Dataflow pipeline. Unicode object has no attribute 'items' 【发布时间】:2018-11-25 01:08:08 【问题描述】:

我的示例数据是 json 格式,看起来像:


  "metadata": 
    "action": "insert",
    "type": "export",
    "version": 1,
    "timestamp": "2018-11-23T09:17:59.048-08:00"
  ,
  "data": 
    "attr1": 61,
    "day": "2018-11-22",
    "pin": "2C49956",
    "CDP": 0,
    "DP": 0,
    "VD": 0,
    "seo": 0,
    "dir": 0,
    "other": 0,
    "at": 0
  

这是在一个平面文件中,目标是以批处理模式运行数据流管道以将数据插入到 bigquery 表中。在我想从元数据中获取时间戳并将其添加为数据部分中的键值对的转换之一中,我从数据流中收到错误消息,说“unicode 对象没有属性”items。

代码如下:

import collections
import json
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, \
    WorkerOptions
from apache_beam.io.gcp import bigquery


# Creating options object
def create_options(argv):
    # pipeline options
    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'something'
    google_cloud_options.job_name = datetime.now().strftime('somename')
    google_cloud_options.staging_location = 'some_loc'
    google_cloud_options.temp_location = 'another_loc'
    options.view_as(StandardOptions).runner = 'DirectRunner'
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(WorkerOptions).machine_type = 'n1-standard-1'
    return options

    class PrepareData(beam.DoFn):
        """
        ParDo function to create a dictionary of data for downstream consumption
        """

        def process(self, element):
            data = json.loads(element)
            modified_data = "action": data["metadata"]["action"], "timestamp": data["metadata"]["timestamp"], "data": data
            return [modified_data]


    class FilterInserts(beam.DoFn):
        """
        Filter data for inserts
        """

        def process(self, element):
            if element["action"] == "insert":
                element['data']['data']['timestamp'] = element['timestamp']
                # for dict in element["data"]["data"]:
                #     dict["timestamp"] = element["timestamp"]
                return element["data"]["data"]


    def run_pipe(options, argv):
        """
        Creating pipelines
        """
        p = beam.Pipeline(options=options)

        main_pipe =p | 'PREPARE_DATA' >> beam.io.ReadFromText('/home/Downloads/sample_1') | beam.ParDo(PrepareData())

        """ Separating pipes for various actions """
        insert_pipe= main_pipe | beam.ParDo(FilterInserts())

        """
        Inserts--> sinking to BQ
        """
        insert_pipe | 'INSERT' >> beam.io.WriteToBigQuery(
            project='some-data-warehouse',
            dataset='sample_data',
            table='sample',
            write_disposition='WRITE_APPEND',
            create_disposition='CREATE_IF_NEEDED')


        p.run()


    def main():
        """
        Main function to drive the run
        :return: errors if any
        """
        parser = argparse.ArgumentParser()
        args = parser.parse_args()
        try:
            # create options
            opt = create_options(argv=args)
            # run pipeline
            run_pipe(opt, argv=args)
        except Exception as e:
            logging.error('Pipeline failed with error : %s', e)
            raise Exception('Pipeline failed with error : %s', e)


    if __name__ == "__main__":
        main()

我在直接运行器上运行它以在本地测试,但即使我将运行器更改为 dataflow-runner,我也会遇到相同的错误。 错误信息是:

Exception: ('Pipeline failed with error : %s', AttributeError(u"'unicode' object has no attribute 'items' [while running 'INSERT/WriteToBigQuery']",))

谁能帮我找出问题所在以及如何解决这个问题?

【问题讨论】:

【参考方案1】:

使用下表的schema(可根据需要修改):

schema = 'VD:INTEGER,pin:STRING,timestamp:STRING,other:INTEGER,CDP:INTEGER,dir:INTEGER,attr1:INTEGER,seo:INTEGER,day:STRING,DP:INTEGER,at:INTEGER'

在您的 FilterInserts 课程上尝试以下操作:

class FilterInserts(beam.DoFn):
    """
    Filter data for inserts
    """

    def process(self, element):
        if element["action"] == "insert":
            element['data']['data']['timestamp'] = element['timestamp']

            return [
            'VD': element['data']['data']['VD'],
            'pin': element['data']['data']['pin'],
            'timestamp': element['data']['data']['timestamp'],
            'other': element['data']['data']['other'],
            'CDP': element['data']['data']['CDP'],
            'dir': element['data']['data']['dir'],
            'attr1' : element['data']['data']['attr1'],
            'seo' : element['data']['data']['seo'],
            'day' : element['data']['data']['day'],
            'DP' : element['data']['data']['DP'],
            'at' : element['data']['data']['at'],
            ]

出现问题是因为您需要向 BigQuery 发送一个键值数组,并且您发送的是一个带有 unicode 字符串的 JSON 字典。

希望对你有帮助。

【讨论】:

【参考方案2】:

创建一个列名与要插入的管道字典的键相同的表。

例如:

output2 = output|beam.io.WriteToBigQuery('gcpcloud1-254210:dataflow.dataflow')

我的输出管道是字典'age': 30, 'city': 'New York', 'name': 'John'

我想插入。所以我的表有一个架构age:INTEGER,name:STRING,city:STRING

input = (p | beam.io.ReadFromText('gs://bucketname/*.json'))
output=(input|beam.Map(lambda e : json.loads(e))
beam.io.WriteToBigQuery('projectid:dataset.table'))

【讨论】:

以上是关于在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery

我们如何在 Spring cloud dataflow kafka binder 中为 Kafka 维护租户数据隔离?

将 Pub/Sub 连接到 Dataflow Python 管道

在 GCP Dataflow 上的 python apache 光束中使用 scipy

GCP Dataflow + Apache Beam - 缓存问题

如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道