在 BigQuery 接收器中进行一次性处理的情况下,重新洗牌是啥意思?

Posted

技术标签:

【中文标题】在 BigQuery 接收器中进行一次性处理的情况下,重新洗牌是啥意思?【英文标题】:What does reshuffling, in the context of exactly-once processing in BigQuery sink, mean?在 BigQuery 接收器中进行一次性处理的情况下,重新洗牌是什么意思? 【发布时间】:2019-03-02 09:38:59 【问题描述】:

我正在阅读 article 上关于某些 Dataflow 源和接收器实现的一次性处理的内容,但我无法理解 BigQuery 接收器上的示例。 来自文章

生成随机 UUID 是一项非确定性操作,因此我们必须在插入 BigQuery 之前添加重新洗牌。完成此操作后,Cloud Dataflow 的任何重试都将始终使用已洗牌的相同 UUID。重复插入 BigQuery 的尝试将始终具有相同的插入 ID,因此 BigQuery 能够过滤它们

// Apply a unique identifier to each record
c
 .apply(new DoFn<> 
  @ProcessElement
  public void processElement(ProcessContext context) 
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 
)
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> 
   @ProcessElement
   public void processElement(ProcessContext context) 
     insertIntoBigQuery(context.element().record(), context.element.id());
   
 );

reshuffle 是什么意思,如何防止在后续重试时为同一插入生成不同的 UUID?

【问题讨论】:

【参考方案1】:

Reshuffle 以不同的方式对数据进行分组。但是,这里使用它是因为它的副作用:检查点和重复数据删除。

如果不重新洗牌,如果同一个任务生成 UUID 并将数据插入 BigQuery,则存在重启工作器的风险,新工作器会生成新的 UUID 并将不同的行发送到 BigQuery,从而导致重复行。

Reshuffle 操作将 UUID 生成和 BigQuery 插入分为两个步骤,并在它们之间插入检查点和重复数据删除。

    首先,生成 UUID 并将其发送到 reshuffle。如果 UUID 生成 worker 重新启动,则没关系,因为 reshuffle 会删除重复行,从而消除失败/重新启动的 worker 中的数据。 生成的 UUID 由 shuffle 操作设置检查点。 BigQuery 插入工作线程使用检查点 UUID,因此即使重新启动它,它也会向 BigQuery 发送完全相同的数据。 BigQuery 使用这些 UUID 删除重复数据,因此在 BigQuery 中消除了来自重新启动的插入工作线程的重复数据。

【讨论】:

【参考方案2】:

我认为这篇文章很好地解释了为什么“改组”有助于从“至少一次”转变为“恰好一次”:

具体来说,窗口可能会尝试使用元素 e0、e1、e2 触发,但工作程序在提交窗口处理之前崩溃(但不会在这些元素作为副作用发送之前)。当工作人员重新启动时,窗口将再次触发,但现在出现了一个后期元素 e3。由于这个元素在窗口提交之前就出现了,它不被算作迟到的数据,所以 DoFn 用元素 e0、e1、e2、e3 再次调用。然后将这些发送到副作用操作。幂等性在这里没有帮助,因为每次发送不同的逻辑记录集。

还有其他方法可以引入非确定性。解决此风险的标准方法是依靠 Cloud Dataflow 当前保证只有一个版本的 DoFn 输出可以使其超过 shuffle 边界

您也可以查看 Reshuffle 的文档:

https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/transforms/Reshuffle.html

那里有一条关于弃用此类的说明,因此BigQueryIO 的未来实现可能会有所不同。

【讨论】:

以上是关于在 BigQuery 接收器中进行一次性处理的情况下,重新洗牌是啥意思?的主要内容,如果未能解决你的问题,请参考以下文章

使用自定义目标接收器将日志导出到 BigQuery(表分区)

在 Beam 管道中以编程方式生成 BigQuery 架构

在 python 中使用 BigQuery 接收器流式传输管道

BigQuery 流式插入使用模板表数据可用性问题

为啥需要在 BigQuery 中设置“asia-northeast1”的处理位置?

Bigquery 流式处理 API 超时错误