到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常

Posted

技术标签:

【中文标题】到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常【英文标题】:Exceptions in Google Cloud Dataflow pipelines to Cloud Bigtable 【发布时间】:2015-09-09 18:05:09 【问题描述】:

执行 DataFlow 管道时,我们偶尔会看到这些异常。 我们能对他们做些什么吗? 我们有一个非常简单的流程,它从 GCS 中的文件读取并在输入文件中的每行创建一条记录 - 输入文件中大约有 100 万行。

此外,管道内的数据会发生什么情况?是否经过再加工?还是在传输到 BigTable 的过程中丢失了?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:428)
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more

我们可以做些什么来强化我们的代码?

而且数据流本身也很简单

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setMaxNumWorkers(20);

Pipeline p = Pipeline.create(options);

CloudBigtableIO.initializeForWrite(p)
            .apply(TextIO.Read.from(options.getInputFile()))
            .apply(ParDo.of(new HBaseBigtableWriter(options)));
p.run();

ParDo 看起来像:

public class HBaseBigtableWriter extends DoFn<String, Void> 
private Connection conn;
private BufferedMutator mutator;
private final CloudBigtableTableConfiguration btConfig;

public HBaseBigtableWriter(CloudBigtableOptions options) 
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options);

@Override
public void startBundle(DoFn<String, Void>.Context c) throws Exception 
    super.startBundle(c);
    conn = new BigtableConnection(btConfig.toHBaseConfig());
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId()));


@Override
public void processElement(DoFn<String, Void>.ProcessContext c)  
    Put put = Put(....);
    //some of based on the input line.. no sideInputs or anything
    p.addImmutable(...)
    mutator.mutate(put); //mentioned line in stacktrace
 

@Override
public void finishBundle(DoFn<String, Void>.Context c) throws Exception  
    try 
        mutator.close();
     catch (RetriesExhaustedWithDetailsException e) 
        retriesExceptionAggregator.addValue(1);
        List<Throwable> causes = e.getCauses();
        if (causes.size() == 1) 
            throw (Exception) causes.get(0);
         else 
            throw e;

        
    
    finally 
        conn.close();
        super.finishBundle(c);
    


这个也是时不时出现的。

java.util.concurrent.RejectedExecutionException: Task io.grpc.SerializingExecutor$TaskRunner@5a497f63 rejected from java.util.concurrent.ThreadPoolExecutor@49e90a5c[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112)
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398)
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256)
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230)
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180)
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121)
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253)
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168)
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285)
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175)
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162)
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110)
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179)
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69)
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424)
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316)

此外,对于 Google SDK 类,它似乎也在发生同样的情况 - 特别是在负载下 - 即数据流作业 2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times,
at    com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613)

对这些例外有什么建议吗? 谢谢!

【问题讨论】:

最终使用CloudBigtableIO.writeToTable() - 到目前为止再也没有看到这个问题。 【参考方案1】:

关闭一个连接然后进行突变可能会导致您看到堆栈跟踪(我猜当您在缓冲突变仍在进行时停止工作人员时会发生这种情况)。

您能否在我们的 github 问题跟踪器上打开一个错误?我认为这可能是诊断此问题的最有效方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

如果我正确读取堆栈跟踪,您似乎没有利用 CloudBigtableIO.writeToTable() 方法,并且您正在使用自定义 ParDo 来写入数据。如果是这样,那么您问题的答案实际上取决于您在自定义 ParDo 中所做的事情以及“停止工作人员”的动态。

【讨论】:

更新了问题。我们在概念上做错了什么?另一方面,使用CloudBigtableIO.writeToTable() 实现了一个管道并运行一些测试以查看异常是否消失 必须删除答案标签,就像使用 Google SDK 我们再次看到错误一样。查看更新的问题 github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/478 跟进

以上是关于到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常的主要内容,如果未能解决你的问题,请参考以下文章

google bigtable 上的多行

Google Cloud Bigtable 协处理器支持

在 Google Cloud Datastore 与 Google Cloud Bigtable 中存储用户事件历史记录

抛开价格不谈,为啥要选择 Google Cloud Bigtable 而不是 Google Cloud Datastore?

Google Cloud Bigtable 备份和恢复

Google Cloud Bigtable HBase shell 连接挂起