宕机一台机器,结果一百多个 Flink 作业挂了

Posted zhisheng_blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了宕机一台机器,结果一百多个 Flink 作业挂了相关的知识,希望对你有一定的参考价值。

背景

因宕机了一台物理机器,实时集群不少作业发生 failover,其中大部分作业都能 failover 成功,某个部门的部分作业一直在 failover,始终未成功,到 WebUI 查看作业异常日志如下:

2021-11-09 16:01:11
java.util.concurrent.CompletionException: java.lang.reflect.UndeclaredThrowableException
 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.UndeclaredThrowableException
 at com.sun.proxy.$Proxy54.submitTask(Unknown Source)
 at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:72)
 at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$10(Execution.java:756)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more
Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize.
 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
 ... 11 more

解决异常过程

从上面的异常日志中我们提取到关键信息:

Caused by: java.io.IOException: The rpc invocation size 56424326 exceeds the maximum akka framesize.

看起来是 RPC 的消息大小超过了默认的 akka framesize 的最大值了,所以我们来了解一下这个值的默认值,从 官网 我们可以看的到该值的默认大小为 "10485760b",并且该参数的描述为:

Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.

翻译过来的意思就是:这个参数是 JobManager 和 TaskManagers 之间通信允许的最大消息大小,如果 Flink 作业因为通信消息大小超过了该值,你可以通过增加该值的大小来解决,该参数需要指定一个单位。

分析原因

Flink 使用 Akka 作为组件(JobManager/TaskManager/ResourceManager)之间的 RPC 框架,在 JobManager 和 TaskManagers 之间发送的消息的最大大小默认为 10485760b,如果消息超过这个限制就会失败,报错。这个可以看下抛出异常处的源码:

protected RpcInvocation createRpcInvocationMessage(String methodName, Class<?>[] parameterTypes, Object[] args) throws IOException {
    Object rpcInvocation;
    if (this.isLocal) {
        rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);
    } else {
        try {
            RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(methodName, parameterTypes, args);
            if (remoteRpcInvocation.getSize() > this.maximumFramesize) {
                // 异常所在位置
                throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
            }

            rpcInvocation = remoteRpcInvocation;
        } catch (IOException var6) {
            LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", var6);
            throw var6;
        }
    }

    return (RpcInvocation)rpcInvocation;
}

至于为什么 JobManager 和 TaskManager 之间的 RPC 消息大小会如此之大,初步的解释是在 task 出现异常之后,它需要调用 updateTaskExecutionState(TaskExecutionState,taskExecutionState) 这个 RPC 接口去通知 Flink Jobmanager 去改变对应 task 的状态并且重启 task。但是呢,taskExecutionState 这个参数里面有个 error 属性,当我的 task 打出来的错误栈太多的时候,在序列化的之后超过了 rpc 接口要求的最大数据大小(也就是 maximum akka framesize),导致调用 updateTaskExecutionState 这个 rpc 接口失败,Jobmanager 无法获知这个 task 已经处于 fail 的状态,也无法重启,然后就导致了一系列连锁反应。

解决办法

任务停止,在 flink-conf.yaml 中加入 akka.framesize 参数,调大该值。

akka.framesize: "62914560b"

然后将任务重启,可以观察 Jobmanager Configration 看看参数是否生效。

end

Flink 从入门到精通 系列文章

基于 Apache Flink 的实时监控告警系统
关于数据中台的深度思考与总结(干干货)
日志收集Agent,阴暗潮湿的地底世界

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug

以上是关于宕机一台机器,结果一百多个 Flink 作业挂了的主要内容,如果未能解决你的问题,请参考以下文章

记一次线上MongoDB宕机

nginx做负载均衡时,nginx宕机怎么办

FlinkFlink 报错 flink 1.12.5 启动作业报 partition not found

pos机一机一码啥意思,我要详细一点的,谢谢!!!

分布式系统原理介绍

虚拟化物理机宕机处理