Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理

Posted

技术标签:

【中文标题】Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理【英文标题】:Apache Beam - Exception caught and throwed even though program excuting continuosly. How to stop that process or handle in pipeline 【发布时间】:2018-11-09 09:33:37 【问题描述】:

我有一个管道来获取数据 mysql 并用于将数据传输到 mongo db 使用以下代码运行此管道后,从 mysql 获取数据但无法加载到 mongodb

noSqlresult.apply(MongoDbIO.write().withUri(mongoUri)
                .withDatabase(mongoDatabase)
                .withCollection(resultCollectionName));

我发现了以下异常和一些不断尝试与 mongo db 通信的日志

com.mongodb.MongoSecurityException: Exception authenticating MongoCredentialmechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties=
at com.mongodb.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:162)
at com.mongodb.connection.SaslAuthenticator.access$200(SaslAuthenticator.java:39)
at com.mongodb.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:68)
at com.mongodb.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:46)
at com.mongodb.connection.SaslAuthenticator.doAsSubject(SaslAuthenticator.java:168)
at com.mongodb.connection.SaslAuthenticator.authenticate(SaslAuthenticator.java:46)
at com.mongodb.connection.DefaultAuthenticator.authenticate(DefaultAuthenticator.java:32)
at com.mongodb.connection.InternalStreamConnectionInitializer.authenticateAll(InternalStreamConnectionInitializer.java:122)
at com.mongodb.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:52)
at com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:127)
at com.mongodb.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:114)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is  "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" 
    at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:164)
    at com.mongodb.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:295)
    at com.mongodb.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
    at com.mongodb.connection.CommandHelper.sendAndReceive(CommandHelper.java:84)
    at com.mongodb.connection.CommandHelper.executeCommand(CommandHelper.java:34)
    at com.mongodb.connection.SaslAuthenticator.sendSaslStart(SaslAuthenticator.java:119)
    at com.mongodb.connection.SaslAuthenticator.access$000(SaslAuthenticator.java:39)
    at com.mongodb.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:52)
    ... 9 more
18/11/09 12:49:29 DEBUG org.mongodb.driver.cluster: Updating cluster description to  type=UNKNOWN, servers=[address=severip:27017, type=UNKNOWN, state=CONNECTING, exception=com.mongodb.MongoSecurityException: Exception authenticating MongoCredentialmechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties=, caused by com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is  "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" ]
18/11/09 12:49:29 DEBUG org.mongodb.driver.connection: Closing connection connectionIdlocalValue:17
18/11/09 12:49:29 DEBUG org.mongodb.driver.cluster: Updating cluster description to  type=UNKNOWN, servers=[address=severip:27017, type=UNKNOWN, state=CONNECTING, exception=com.mongodb.MongoSecurityException: Exception authenticating MongoCredentialmechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties=, caused by com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is  "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" ]
18/11/09 12:49:29 DEBUG org.mongodb.driver.connection: Closing connection connectionIdlocalValue:18

如何处理这种情况 mongo 有能力在不存在的情况下创建数据库,但不能使用 mongo 客户端连接在内部创建 mongoIO。有没有办法处理

显示它的错误是由于注意问题,但现有数据库未获得异常仅获得新数据库 确切的原因是什么以及如何处理这些错误

【问题讨论】:

也需要其他配置.....??? 问题中没有足够的信息。例如,不清楚如何在 MongoDB 实例中配置身份验证以及您将哪些身份验证信息传递给 MongoIO 没有身份验证问题,但得到 mongo 身份验证异常实际上我正在尝试将我的输出写入不存在的新数据库中。我已经尝试过它接受的现有数据库没有问题。 @安东 好的,现在我得到了你的问题@Anton,mongoUri 包含用户名和密码 mongoUri = mongodb://mongoUser:*******@severip:27017/db1 它是一个有效的 uri但它不断抛出 mongo 异常,错误代码:18 【参考方案1】:

ptransform的结果没有对象存储,所以需要等待连接超时异常,之后如果声明其他连接自动关闭,则可以执行在catch块中声明的操作。

如果你是处理异常的新手,我想你会从以下链接得到答案

java_exception_handler_***

【讨论】:

是的,知道了。由于 mongo 驱动程序类是这样实现的,因此它会多次重试以建立连接。错误是没有创建新数据库的权限

以上是关于Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 管道中的连续状态

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

apache beam ElasticSearchIO 遇到异常后job中断执行 自己定制beam IO

Apache Beam FixedWindows 之间的延迟

Apache Beam WordCount编程实战及源代码解读

如何使用 Apache BEAM 在 BigQuery 中执行快速联接