如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery

Posted

技术标签:

【中文标题】如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery【英文标题】:How to save hyperLogLog field to BigQuery from ApacheBeam with Data Flow runner 【发布时间】:2019-04-04 15:27:28 【问题描述】:

我需要将 HLL 草图从 ApacheBeam 保存到 BigQuery。

我为 Apache-Beam 找到了一些扩展 library:

但我找不到将草图本身保存到 BigQuery 的方法。以后可以通过一段时间滑动来使用它与合并功能和其他功能:见link

我的代码:

 .apply("hll-count",  Combine.perKey(ApproximateDistinct.ApproximateDistinctFn
                            .create(StringUtf8Coder.of())))
.apply("reify-windows", Reify.windows())
                    .apply("to-table-row", ParDo.of(new DoFn< ValueInSingleWindow<KV<GroupByData,HyperLogLogPlus>>, TableRow>() 
                        @ProcessElement
                        public void processElement(ProcessContext processContext) 
                            ValueInSingleWindow<KV<GroupByData,HyperLogLogPlus>> windowed = processContext.element();
                            KV<GroupByData, HyperLogLogPlus> keyData = windowed.getValue();
                            GroupByData key = keyData.getKey();

                            HyperLogLogPlus hyperLogLogPlus = keyData.getValue();
                            if (key != null) 

                                TableRow tableRow = new TableRow();
                                tableRow.set("country_code",key.countryCode);
                                tableRow.set("event", key.event);
                                tableRow.set("profile", key.profile);

                                 tableRow.set("occurrences", hyperLogLogPlus.cardinality());

我刚刚找到了hyperLogLogPlus.cardinality() 的操作方法,但是如何编写缓冲区本身,以便稍后在 BiGQuery 中运行合并函数。

使用hyperLogLogPlus.getBytes 也不适用于合并。

【问题讨论】:

似乎 Apache Beam 正在处理它:issuetracker.google.com/issues/123269269#comment5 和这里 issues.apache.org/jira/browse/… 更新:HyperLogLog++ 的 BigQuery 兼容实现已向github.com/google/zetasketch 开源,有关将其集成到 Apache Beam 的设计文档 (docs.google.com/document/d/…) 已发送至 dev@beam .apache.org. 【参考方案1】:

目前 Apache Beam 不支持此功能,但有人正在研究它。

具体来说: 您提到的 Apache Beam 中的 extension library 取决于 this HyperLogLog 实现。该库生成的草图与 Google Cloud BigQuery 计算的 sketches 不一致。所以在 BigQuery 中合并草图是没有意义的。

【讨论】:

【参考方案2】:

自 2019 年 4 月首次提出此问题以来,已发布了与 BigQuery 兼容的 HLL 草图实现,如此 GCP 博客文章 Using HLL++ to speed up count-distinct in massive datasets 中所述。

这篇博文有说明性代码 sn-ps,展示了如何将 HLL 草图保存到 BigQuery 以及 GCS 文件。

引用帖子的相关部分:

[HyperLogLog 的 Google 实现] 于 2017 年添加到 BigQuery,最近已开源并在 Apache Beam 2.16 版中直接可用。这意味着它可用于 Cloud Dataflow ...

注意:从 2.16 版开始,有几种近似计数算法的实现。我们建议使用 HllCount.java,尤其是在您需要草图和/或需要与 Google Cloud BigQuery 兼容的情况下。

来自帖子的第 3 部分,“在 BigQuery 中存储草图”:

BigQuery 通过 HLL_COUNT 函数支持 HLL++,而且 BigQuery 的草图与 Beam 的完全兼容,因此很容易跨两个系统与草图对象进行互操作。

在下面的示例中,我们将: 1.在Beam中将数据预聚合成草图; 2. 将 BigQuery 中的草图存储为 byte[] 列以及有关时间间隔的一些元数据; 3. 在 BigQuery 中运行汇总查询,这要归功于 Beam 中预先计算的草图,它可以以交互速度提取结果。

【讨论】:

我看不到如何在梁 sql 中使用它..(只能通过复制代码并创建我自己的转换器)

以上是关于如何使用 Data Flow runner 从 ApacheBeam 将 hyperLogLog 字段保存到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Cloud Data Flow 任务中外部化应用程序属性

如何将 Keras tf.data 与生成器( flow_from_dataframe )一起使用?形成完美的输入管道

分析 Cloud Data Flow BigQuery 吞吐量/流水线

如何使用 build_runner 从生成的文件中清理项目

如何在 Spring Cloud Data Flow 中为 Spring Batch 作业设置调度程序?

如何自动化部署 Spring Data Flow?