到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常
Posted
技术标签:
【中文标题】到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常【英文标题】:Exceptions in Google Cloud Dataflow pipelines to Cloud Bigtable 【发布时间】:2015-09-09 18:05:09 【问题描述】:执行 DataFlow 管道时,我们偶尔会看到这些异常。 我们能对他们做些什么吗? 我们有一个非常简单的流程,它从 GCS 中的文件读取并在输入文件中的每行创建一条记录 - 输入文件中大约有 100 万行。
此外,管道内的数据会发生什么情况?是否经过再加工?还是在传输到 BigTable 的过程中丢失了?
(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:428)
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284)
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202)
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123)
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110)
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100)
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178)
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166)
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256)
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262)
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335)
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932)
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61)
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232)
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166)
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302)
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more
我们可以做些什么来强化我们的代码?
而且数据流本身也很简单
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setMaxNumWorkers(20);
Pipeline p = Pipeline.create(options);
CloudBigtableIO.initializeForWrite(p)
.apply(TextIO.Read.from(options.getInputFile()))
.apply(ParDo.of(new HBaseBigtableWriter(options)));
p.run();
ParDo
看起来像:
public class HBaseBigtableWriter extends DoFn<String, Void>
private Connection conn;
private BufferedMutator mutator;
private final CloudBigtableTableConfiguration btConfig;
public HBaseBigtableWriter(CloudBigtableOptions options)
this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options);
@Override
public void startBundle(DoFn<String, Void>.Context c) throws Exception
super.startBundle(c);
conn = new BigtableConnection(btConfig.toHBaseConfig());
mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId()));
@Override
public void processElement(DoFn<String, Void>.ProcessContext c)
Put put = Put(....);
//some of based on the input line.. no sideInputs or anything
p.addImmutable(...)
mutator.mutate(put); //mentioned line in stacktrace
@Override
public void finishBundle(DoFn<String, Void>.Context c) throws Exception
try
mutator.close();
catch (RetriesExhaustedWithDetailsException e)
retriesExceptionAggregator.addValue(1);
List<Throwable> causes = e.getCauses();
if (causes.size() == 1)
throw (Exception) causes.get(0);
else
throw e;
finally
conn.close();
super.finishBundle(c);
这个也是时不时出现的。
java.util.concurrent.RejectedExecutionException: Task io.grpc.SerializingExecutor$TaskRunner@5a497f63 rejected from java.util.concurrent.ThreadPoolExecutor@49e90a5c[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112)
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398)
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256)
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230)
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180)
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121)
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253)
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168)
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285)
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175)
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162)
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110)
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179)
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69)
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424)
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316)
此外,对于 Google SDK 类,它似乎也在发生同样的情况 - 特别是在负载下 - 即数据流作业 2015-09-10_10_26_26-7782438171725519247
(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times,
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285)
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258)
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181)
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613)
对这些例外有什么建议吗? 谢谢!
【问题讨论】:
最终使用CloudBigtableIO.writeToTable()
- 到目前为止再也没有看到这个问题。
【参考方案1】:
关闭一个连接然后进行突变可能会导致您看到堆栈跟踪(我猜当您在缓冲突变仍在进行时停止工作人员时会发生这种情况)。
您能否在我们的 github 问题跟踪器上打开一个错误?我认为这可能是诊断此问题的最有效方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues
如果我正确读取堆栈跟踪,您似乎没有利用 CloudBigtableIO.writeToTable() 方法,并且您正在使用自定义 ParDo 来写入数据。如果是这样,那么您问题的答案实际上取决于您在自定义 ParDo 中所做的事情以及“停止工作人员”的动态。
【讨论】:
更新了问题。我们在概念上做错了什么?另一方面,使用CloudBigtableIO.writeToTable()
实现了一个管道并运行一些测试以查看异常是否消失
必须删除答案标签,就像使用 Google SDK 我们再次看到错误一样。查看更新的问题
github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/478 跟进以上是关于到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常的主要内容,如果未能解决你的问题,请参考以下文章
在 Google Cloud Datastore 与 Google Cloud Bigtable 中存储用户事件历史记录
抛开价格不谈,为啥要选择 Google Cloud Bigtable 而不是 Google Cloud Datastore?