GCP 数据流 UDF 输入

Posted

技术标签:

【中文标题】GCP 数据流 UDF 输入【英文标题】:GCP Dataflow UDF input 【发布时间】:2021-10-13 00:23:58 【问题描述】:

我正在尝试使用 Dataflow 删除 Datastore Bulk 并使用 JS UDF 过滤有关 doc 的实体。但是这段代码:

function func(inJson) 
  var row = JSON.parse(inJson);
  var currentDate = new Date();
  var date = row.modifiedAt.split(' ')[0];

  return some code

原因

TypeError: Cannot read property "split" from undefined

输入应该是实体的 JSON 字符串,并且实体应该具有 modifiedAt 属性。

究竟是什么将 Dataflow 传递给 UDF,我如何在 Dataflow 控制台中记录它?

【问题讨论】:

【参考方案1】:

假设 modifiedAt 是您添加的属性,我希望 Dataflow 中的 JSON 与 Datastore rest api (https://cloud.google.com/datastore/docs/reference/data/rest/v1/Entity) 匹配。这意味着您可能想要 row.properties.modifiedAt 。您可能还想提取属性的timestampValue (https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects/runQuery#Value)。

【讨论】:

【参考方案2】:

看看这个新的published Google Cloud blog,了解如何开始使用 Dataflow UDF。

使用 Datastore Bulk Delete 数据流模板,UDF 输入是数据存储实体的 JSON 字符串。正如 Jim 所指出的,这些属性嵌套在 properties 字段下,每个 schema。

关于您关于日志记录的其他问题,您可以在 UDF 中添加以下打印语句,其输出将显示在 Dataflow 控制台中的 Worker Logs 下:

print(JSON.stringify(row, null, 2))

或者,您可以使用以下日志查询在 Cloud Logging 中查看日志:

log_id("dataflow.googleapis.com/worker")
resource.type="dataflow_step"

【讨论】:

以上是关于GCP 数据流 UDF 输入的主要内容,如果未能解决你的问题,请参考以下文章

UDF/UDAF开发总结

将整行的 Hive UDF 作为输入

将 UDF 动态应用于数据框中 N 列中的 1 到 N 列

Java UDF on Hadoop 输入参数——从 Pig on Hadoop 调用

如何将 pandas udf 应用于大型矩阵数据框

sql server udf,返回与输入表达式相同的类型