BigQuery 中的数据流/Beam 流式插入导致 SSL 错误

Posted

技术标签:

【中文标题】BigQuery 中的数据流/Beam 流式插入导致 SSL 错误【英文标题】:Dataflow/Beam streaming inserts in BigQuery causing SSL errors 【发布时间】:2021-08-17 07:14:27 【问题描述】:

我们将数据流式传输到 BigQuery(使用动态目标)的 Dataflow 管道在作业日志中显示了许多异常。所有这些都是javax.net.ssl.SSLException: Connection reset 例外。我在下面提供了堆栈跟踪,但想知道这是否会导致数据丢失

这些错误很多(每天数百个),并且似乎与 worker 日志 相吻合,如下所示:

Execution of work for computation 'P6' on key '-REDACTED-' failed with uncaught 
exception. Work will be retried locally.

我能否断定工作在工作人员本地有效重试并且没有数据丢失,但作业日志中仍然提到异常? p>

我发现仅通过查看不同的日志就很难理解哪些错误实际上导致了数据丢失

示例堆栈跟踪:

Error message from worker: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69) 
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271) 

Caused by: javax.net.ssl.SSLException: Connection reset java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293) 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288) 
java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1581) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:979) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) 

Caused by: java.net.SocketException: Connection reset java.base/java.net.SocketInputStream.read(SocketInputStream.java:186) 
java.base/java.net.SocketInputStream.read(SocketInputStream.java:140) 
java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) 
java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) 
java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) 
java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354) 
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963) 
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252) 
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292) 
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351) 
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552) 
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609) 
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133) 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107) 
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274) 
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40) 
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232) 
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137) 
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252) 
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369) 
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:363) 
com.google.api.client.json.JsonParser.parse(JsonParser.java:335) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79) 
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73) 
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456) 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878) 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342) 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834)

【问题讨论】:

【参考方案1】:

这些错误不应导致数据丢失,因为在抛出异常时会重试捆绑包。

【讨论】:

谢谢!是所有错误的情况吗?我使用 .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) 设置了 BigQuery,这可能会影响此行为吗? 是的,步骤总是在失败时重试(对于流式管道无限期地,对于批处理的相同工作项最多 4 次,此时它放弃并使整个管道失败。)

以上是关于BigQuery 中的数据流/Beam 流式插入导致 SSL 错误的主要内容,如果未能解决你的问题,请参考以下文章

在 BigQuery Apache Beam 中访问 TableRow 列

BigQuery 流式插入如何计费?

BigQuery 流式插入数据可用性延迟

流式缓冲区 - Google BigQuery

使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表

Dataflow 大型侧输入中的 Apache Beam