如何获取 ValueProvider 的值并将其写入 BigQuery 表?
Posted
技术标签:
【中文标题】如何获取 ValueProvider 的值并将其写入 BigQuery 表?【英文标题】:How to get the value of a ValueProvider and write it in a BigQuery table? 【发布时间】:2021-01-18 10:15:31 【问题描述】:早安,
我创建了一个 DataFlow 模板,用于读取 BigQuery 中的一些信息、应用一些转换并将结果写入一个新的 BigQuery 表中。
这个模板有两个参数:
输入查询 项目名称我想通过“WriteToBigquery”转换将项目名称写入 bigquery 表中,但不是写入用户填写的项目名称,而是返回错误..
你知道我怎样才能得到这个值并写出来吗?
感谢您的帮助!
代码:
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
default='',
help='q')
parser.add_value_provider_argument(
'--projet',
default='',
help='d')
[...]
my_options = pipeline_options.view_as(BqReaderOptions).query
myProjet = pipeline_options.view_as(BqReaderOptions).projet
nb_val = (
p
| 'Readl' >> beam.io.ReadFromBigQuery(query=my_options, use_standard_sql = True)
|beam.Map(lambda elem :elem== ' 0' )
| 'countVal' >> beam.combiners.Count.PerElement()
|beam.Map(lambda elem : "Nb" : int(elem), 'projet': myProjet ))
ERROR :
default_encoder "Object of type '%s' is not JSON serializable" % type(obj).__name__) TypeError: Object of type 'RuntimeValueProvider' is not JSON serializable [while running 'writeToBigQuery1/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']
【问题讨论】:
【参考方案1】:您收到该错误是因为您正在输出 ValueProvider
作为转换的结果,并且它尝试对 JSON 进行默认编码,但失败了。但是,您的意图是将项目输出为字符串而不是原始ValueProvider
。你可以阅读details on how to use ValueProvider
in your own functions,但基本上你只需要创建一个包含ValueProvider
的DoFn 对象,并在其上使用get
方法,如下所示:
class MyFn(beam.DoFn):
def __init__(self, project): # Pass in project as a ValueProvider
self.project = project
def process(self, elem):
yield "Nb" : int(elem), "project": self.project.get()
【讨论】:
是的,它有效!非常感谢您的帮助!以上是关于如何获取 ValueProvider 的值并将其写入 BigQuery 表?的主要内容,如果未能解决你的问题,请参考以下文章
SQL 到 SQL CE:如何读取 SQL 或 Access 数据库并将其写为 SQL CE 数据库?
music21:读取 MIDI 文件的 BPM 和乐器信息并将其写回文件