如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery
Posted
技术标签:
【中文标题】如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery【英文标题】:How to save hyperLogLog field to BigQuery from ApacheBeam with Data Flow runner 【发布时间】:2019-04-04 15:27:28 【问题描述】:我需要将 HLL 草图从 ApacheBeam 保存到 BigQuery。
我为 Apache-Beam 找到了一些扩展 library:
但我找不到将草图本身保存到 BigQuery 的方法。以后可以通过一段时间滑动来使用它与合并功能和其他功能:见link
我的代码:
.apply("hll-count", Combine.perKey(ApproximateDistinct.ApproximateDistinctFn
.create(StringUtf8Coder.of())))
.apply("reify-windows", Reify.windows())
.apply("to-table-row", ParDo.of(new DoFn< ValueInSingleWindow<KV<GroupByData,HyperLogLogPlus>>, TableRow>()
@ProcessElement
public void processElement(ProcessContext processContext)
ValueInSingleWindow<KV<GroupByData,HyperLogLogPlus>> windowed = processContext.element();
KV<GroupByData, HyperLogLogPlus> keyData = windowed.getValue();
GroupByData key = keyData.getKey();
HyperLogLogPlus hyperLogLogPlus = keyData.getValue();
if (key != null)
TableRow tableRow = new TableRow();
tableRow.set("country_code",key.countryCode);
tableRow.set("event", key.event);
tableRow.set("profile", key.profile);
tableRow.set("occurrences", hyperLogLogPlus.cardinality());
我刚刚找到了hyperLogLogPlus.cardinality()
的操作方法,但是如何编写缓冲区本身,以便稍后在 BiGQuery 中运行合并函数。
使用hyperLogLogPlus.getBytes
也不适用于合并。
【问题讨论】:
似乎 Apache Beam 正在处理它:issuetracker.google.com/issues/123269269#comment5 和这里 issues.apache.org/jira/browse/… 更新:HyperLogLog++ 的 BigQuery 兼容实现已向github.com/google/zetasketch 开源,有关将其集成到 Apache Beam 的设计文档 (docs.google.com/document/d/…) 已发送至 dev@beam .apache.org. 【参考方案1】:目前 Apache Beam 不支持此功能,但有人正在研究它。
具体来说: 您提到的 Apache Beam 中的 extension library 取决于 this HyperLogLog 实现。该库生成的草图与 Google Cloud BigQuery 计算的 sketches 不一致。所以在 BigQuery 中合并草图是没有意义的。
【讨论】:
【参考方案2】:自 2019 年 4 月首次提出此问题以来,已发布了与 BigQuery 兼容的 HLL 草图实现,如此 GCP 博客文章 Using HLL++ to speed up count-distinct in massive datasets 中所述。
这篇博文有说明性代码 sn-ps,展示了如何将 HLL 草图保存到 BigQuery 以及 GCS 文件。
引用帖子的相关部分:
[HyperLogLog 的 Google 实现] 于 2017 年添加到 BigQuery,最近已开源并在 Apache Beam 2.16 版中直接可用。这意味着它可用于 Cloud Dataflow ...
注意:从 2.16 版开始,有几种近似计数算法的实现。我们建议使用 HllCount.java,尤其是在您需要草图和/或需要与 Google Cloud BigQuery 兼容的情况下。
来自帖子的第 3 部分,“在 BigQuery 中存储草图”:
BigQuery 通过 HLL_COUNT 函数支持 HLL++,而且 BigQuery 的草图与 Beam 的完全兼容,因此很容易跨两个系统与草图对象进行互操作。
在下面的示例中,我们将: 1.在Beam中将数据预聚合成草图; 2. 将 BigQuery 中的草图存储为 byte[] 列以及有关时间间隔的一些元数据; 3. 在 BigQuery 中运行汇总查询,这要归功于 Beam 中预先计算的草图,它可以以交互速度提取结果。
【讨论】:
我看不到如何在梁 sql 中使用它..(只能通过复制代码并创建我自己的转换器)以上是关于如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Cloud Data Flow 任务中外部化应用程序属性
如何将 Keras tf.data 与生成器( flow_from_dataframe )一起使用?形成完美的输入管道
分析 Cloud Data Flow BigQuery 吞吐量/流水线