数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException

Posted

技术标签:

【中文标题】数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException【英文标题】:Dataflow: SocketTimeoutException when writing with BigQueryIO 【发布时间】:2016-08-30 13:16:13 【问题描述】:

我正在使用 Dataflow 通过 BigQueryIO.Write.to() 将数据写入 BigQuery。

有时,我会从 Dataflow 收到此警告:


 metadata: 
  severity: "WARNING"    
  projectId: "[...]"    
  serviceName: "dataflow.googleapis.com"    
  region: "us-east1-d"    
  labels: 
   compute.googleapis.com/resource_type: "instance"     
   compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"     
   dataflow.googleapis.com/region: "us-east1-d"     
   dataflow.googleapis.com/job_name: "[...]"     
   compute.googleapis.com/resource_id: "[...]"     
   dataflow.googleapis.com/step_id: ""     
   dataflow.googleapis.com/job_id: "[...]"     
  
  timestamp: "2016-08-30T11:32:00.591Z"    
  projectNumber: "[...]"    
 
 insertId: "[...]"   
 log: "dataflow.googleapis.com/worker"   
 structPayload: 
  message: "exception thrown while executing request"    
  work: "[...]"    
  thread: "117"    
  worker: "dataflow-[...]-08240401-e41e-harness-7dkd"    
  exception: "java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:229)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)"    
  logger: "com.google.api.client.http.HttpTransport"    
  stage: "F5"    
  job: "[...]"    
 

我在此日志之后没有看到任何“重试”日志。

我的问题是:

我会丢失数据吗?我不知道写操作是否正确完成。如果我正确理解代码,则整个写入批处理处于不确定状态。 如果是这样,我是否有办法确保将数据写入 BigQuery 一次? 如果是这样,严重性不应该是 ERROR 而不是 WARNING?

以下是我的一些使用背景:

我在流模式下使用 Dataflow,使用 KafkaIO.java 从 Kafka 读取 “有时”可以是每小时 0 到 3 次 根据工作的不同,我使用 2 到 36 名 n1-standard-4 类型的工人 根据工作的不同,我正在向 BigQuery 写入 3k 到 10k 条消息/秒 平均消息大小为 3kB Dataflow 工作器位于 us-east1-d 区域,BigQuery 数据集位置是美国

【问题讨论】:

【参考方案1】:

您会看到这些错误与来自 BigQuery 流式服务的暂时性问题有关。我的经验是,您可能会在工作的整个生命周期中看到这些问题。如果您发现这些日志出现大量突破,这通常意味着 BigQuery 流式传输服务出现故障。

Cloud Dataflow 将重试请求的行(请参阅此处的代码 BigQuery... line 290)。如果在警告后的某个时间点您没有在表中看到这些日志项或您的记录 - 还有其他问题。

在流式传输模式下,服务将无限重试。这意味着作业不会因为这个问题而失败。由于我们一直在尝试——它确实提出了一个问题,即这是一个错误还是一个警告。我们将在内部进行辩论,您也可以在Apache Beam user group 上发帖推动辩论:-)

您可以在 Cloud Logging 中针对该警告消息创建指标并对其采取措施。我们正在研究更深入的 Stackdriver 集成,这是一个很好的用例。

您不会丢失数据,而是会延迟数据到达 BigQuery。我已经建立了一些简单的固定窗口并计算 1 分钟的窗口 - 使用事件处理时间。然后,我将随时间推移的计数作为新鲜度的指标。如果我的固定窗口落后于水印,则插入有问题。

根据评论进行了编辑以进一步澄清

在此异常继承自 IOException 的情况下,路径然后调用 ApiErrorExtractor() 以测试这是否是由于速率限制问题。

在这种情况下,SocketTimeout 不是由于速率限制,因此向调用者抛出异常。调用者是finishBundle 中的BigQuery.IO 第2308 行。它调用flushRows() 捕获IOException 并抛出RuntimeException。

在蒸汽模式下,任何以这种方式失败的包都会无限次重试。注意:在批处理模式下,运行器将尝试 4 次然后失败。

在这种情况下(非速率限制情况),您将不会重试行日志。

您的数据不会丢失,而是会随着捆绑包的重试而延迟。

最坏的情况是所有工作人员都遇到此问题,因此管道无法取得进展。如果 BigQuery 流式传输服务已关闭或断开所有连接,则可能会发生这种情况。现在——一旦 BiqQuery 摄取服务稳定并且包通过,您可能会看到速率限制情况出现,但回退代码将有助于抑制这些错误。

最糟糕的情况是,您的传入管道数据速率一直徘徊在 BigQuery 流式摄取服务所控制的最大写入速率(速率限制速率)附近。因此,如果您遇到重试(暂时或其他)积压 - 您的管道可能永远赶不上。

流式数据流中有一个 Drain 功能,它将停止处理传入的数据,然后推进管道以优雅地排出所有未完成的窗口。但是,Drain 要求 finishBundle() 成功。因此,在这种情况下 (SocketTimeout) Drain 将被卡住。如果您终止了管道与流失 - 您将遇到未完成的捆绑包的数据丢失。

如果您愿意,可以覆盖 BigQuery.IO 逻辑并通过管道传输在其他地方出错的数据。你可以这样做,但我会依靠 BigQuery 流媒体服务永远不会出现终端中断。话虽如此,如果您始终以接近速率限制的速率运行并且对不可恢复的积压处理很敏感,您可能希望实施不同的缩减或分片机制以避免速率限制问题。

关于积压恢复的另一个建议是,您可以停止事件流到您的流媒体源。例如,停止写入 Pub/Sub 中的主题。您将开始使用订阅来写另一个主题。您现有的 Dataflow 管道将耗尽现有主题。您仍然需要处理如何处理新订阅中的新积压,但至少可以保证您不会丢失现有管道中的任何数据。

如果您不使用事件时间处理,这种方法可能非常有效;但是,您正在使用事件时间处理,您的窗口将有重叠的输出,这些输出都标记为 ONTIME,即使情况并非如此。

我在这里就您的用例做了很多假设,但我想分享一下,因为您的问题在考虑数据丢失时提出了其他架构概念。

希望这会有所帮助。

【讨论】:

感谢您的回答。但是,我不相信 Dataflow 会为此批次重试。由于引发了异常,BigQuery 返回的错误(如果有,我们读取它们超时)不会添加到futures(#L221) 列表中。所以allErrors(#L283)为空,没有重试。 我正在深入了解调用者如何处理抛出的异常,并将在今天晚些时候返回。 A21z - 我在回复您的评论时添加了其他信息。如果这没有帮助,请告诉我。

以上是关于数据流:使用 BigQueryIO 写入时出现 SocketTimeoutException的主要内容,如果未能解决你的问题,请参考以下文章

BigQueryIO - 为每个项目写入两个表

使用CsvHelper写入数据时出现异常

使用 pyspark 将数据帧写入 Kafka 时出现异常

在linux中使用write系统调用写入数据时出现问题

写入 BigQuery 时出现 MojoExecutionException

使用 Spark Java 在 Big Query 中写入 Date 数据类型时出现问题