如何使用 python 将字典写入 Dataflow 中的 Bigquery

Posted

技术标签:

【中文标题】如何使用 python 将字典写入 Dataflow 中的 Bigquery【英文标题】:How to write dictionaries to Bigquery in Dataflow using python 【发布时间】:2018-05-02 21:57:57 【问题描述】:

我正在尝试从 GCP 存储中读取 csv,将其转换为字典,然后写入 Bigquery 表,如下所示:

p | ReadFromText("gs://bucket/file.csv") 
  | (beam.ParDo(BuildAdsRecordFn()))
  | WriteToBigQuery('ads_table',dataset='dds',project='doubleclick-2',schema=ads_schema)

其中:'doubleclick-2' 和 'dds' 是现有项目和数据集,ads_schema 定义如下:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

BuildAdsRecordFn() 定义如下:

class AdsRecord:
  dict = 

  def __init__(self, line):
    record = line.split(",")
    self.dict['Advertiser_ID'] = record[0]
    self.dict['Campaign_ID'] = record[1]
    self.dict['Ad_ID'] = record[2]
    self.dict['Ad_Name'] = record[3]
    self.dict['Click_through_URL'] = record[4]
    self.dict['Ad_Type'] = record[5]


class BuildAdsRecordFn(beam.DoFn):
  def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

  def process(self, element):
    text_line = element.strip()
    ads_record = AdsRecord(text_line).dict
    return ads_record

但是,当我运行管道时,出现以下错误:

"dataflow_job_18146703755411620105-B" failed., (6c011965a92e74fa): BigQuery job "dataflow_job_18146703755411620105-B" in project "doubleclick-2" finished with error(s): errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0: Value encountered without start of object

这是我使用的样本测试数据:

100001,1000011,10000111,ut,https://bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral
100001,1000011,10000112,eu,http://weebly.com/sed/vel/enim/sit.jsp,Dynamic Click

我对 Dataflow 和 python 都是新手,所以无法弄清楚上面的代码有什么问题。非常感谢任何帮助!

【问题讨论】:

你用 pytho 解决了这个问题吗? 【参考方案1】:

我刚刚实现了您的代码,但它并不能正常工作,但我收到了不同的消息错误(例如“您不能将 dict 作为ParDo 的结果返回”)。

这段代码对我来说正常工作,注意不仅我没有使用类属性dict,而且现在返回了一个列表:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

class BuildAdsRecordFn(beam.DoFn):
    def __init__(self):
      super(BuildAdsRecordFn, self).__init__()

    def process(self, element):
      text_line = element.strip()
      ads_record = self.process_row(element)      
      return ads_record

    def process_row(self, row):
        dict_ = 

        record = row.split(",")
        dict_['Advertiser_ID'] = int(record[0]) if record[0] else None
        dict_['Campaign_ID'] = int(record[1]) if record[1] else None
        dict_['Ad_ID'] = int(record[2]) if record[2] else None
        dict_['Ad_Name'] = record[3]
        dict_['Click_through_URL'] = record[4]
        dict_['Ad_Type'] = record[5]
        return [dict_]

with beam.Pipeline() as p:

    (p | ReadFromText("gs://bucket/file.csv")
       | beam.Filter(lambda x: x[0] != 'A')
       | (beam.ParDo(BuildAdsRecordFn()))
       | WriteToBigQuery('ads_table', dataset='dds',
           project='doubleclick-2', schema=ads_schema))
      #| WriteToText('test.csv'))

这是我模拟的数据:

Advertiser_ID,Campaign_ID,Ad_ID,Ad_Name,Click_through_URL,Ad_Type
1,1,1,name of ad,www.url.com,sales
1,1,2,name of ad2,www.url2.com,sales with sales

我还过滤掉了我在文件中创建的标题行(在Filter 操作中),如果您没有标题,那么这不是必需的

【讨论】:

感谢您的快速回复!我试过了,但仍然遇到同样的错误。这是我在 csv 文件中的测试数据: 100001,1000011,10000111,ut,bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral 100001,1000011,10000112,eu,weebly.com/sed/vel/enim/sit.jsp,Dynamic Click 您的评论中未发布 csv 数据。另外,仅用于测试,例如,当您尝试使用 gcloud bq 时它是否有效?也许数据本身有问题(我建议将您的数据样本放在问题中,因为它更容易阅读) 抱歉输入太早了...在上面添加了测试数据。是的,我可以直接使用 bq 将数据加载到 BigQuery 中,没有任何问题。 我刚刚编辑了我的答案,看看它现在是否适合你。我尝试运行您的代码,但它确实不起作用,但出于不同的原因。然后我决定实现一个稍微不同的版本,它对我有用。

以上是关于如何使用 python 将字典写入 Dataflow 中的 Bigquery的主要内容,如果未能解决你的问题,请参考以下文章

如何在python中读取和写入字典到外部文件? [复制]

如何将嵌套字典写入json

将python字典写入CSV列:第一列的键,第二列的值

python关于字典如何格式化地写入文件之中

如何将带有嵌套字典的列表写入 csv 文件?

Python:将嵌套字典写入 CSV