flink 缓存问题处理过程,看我十八般武艺

Posted 香菜+

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink 缓存问题处理过程,看我十八般武艺相关的知识,希望对你有一定的参考价值。

1、问题描述

作为flink小白,第一次写flink的任务,所以战战兢兢的复写了官方的demo,还好一切顺利,自信心倍增,开始进入业务需求。

但是作为一个新手不踩几个坑那说不过去,所以遇上了第一个大的门槛。

1.1 版本信息

flink版本:1.15.2

java 版本:jdk11

OS:Ubuntu 18.04

1.2 lib 包冲突的问题

第一次使用flink 哪知道flink还有个lib 目录,在使用的过程中出现了包的冲突,主要其他同学在测试的过程中瞎鸡儿搞,放了几个包在lib下,也莫名其妙,不过好在身边有人帮忙解决了问题,将maven中的flink提供的核心包设置为provided,这个很简单。

1.3 缓存问题

这个问题最初不是我遇到的,是同事遇到了,但是花了好多时间也没解决,就放弃了。我接过来处理。这里说下背景,flink我们使用的是standalone 模式,并且是另外一个同事安装的。也不知道做了什么配置,也是第一次写flink,甚至都不知道怎么提交任务,在同事手把手教的情况下上车,所以直接上手难度挺高。

开发的一个任务,提交之后,因为有问题,所以直接报错了,看下报错日志。

看日志还是挺明显的,proto解析失败,遇到bug解决bug,所以后面修改代码,做了异常捕获,但是不管怎么修改,每次提交都是包这个错误,真的是头秃,解决不了。

看下等会一直在说的代码,这个类主要是做kafka数据的解析,将kafka中的数据解析成VehData。

public class BsmSchema implements DeserializationSchema<BsmDTO.VehData> 
    @Override
    public BsmDTO.VehData deserialize(byte[] bytes) throws IOException 
        try 
            if (Objects.nonNull(bytes) && bytes.length != 0) 
                BsmDTO.VehData bsm = BsmDTO.VehData.parseFrom(bytes);
                return bsm;
            
         catch (Exception e) 
        
        return null;
    
    @Override
    public boolean isEndOfStream(BsmDTO.VehData dpe) 
        return false;
    
    @Override
    public TypeInformation<BsmDTO.VehData> getProducedType() 
        return TypeInformation.of(new TypeHint<>() 
        );
    

2、解决过程

2.1 proto 解析失败

这个还是很简单的,因为数据的问题导致数据解析失败,所以直接通过try catch 对异常数据不处理,在本地测试通过后是正常的,打包之后,上传jar ,直接上面同样的错误。无语

2.2 确认try catch

因为报了同样的错误,所以认为没有打包进最新的代码,直接反编译代码,确认代码中存在try catch

2.3 去除proto解析的问题

因为一直报proto解析错误,那我就索性去掉了proto的解析部分,我直接返回了一个null,可以看到不会再有proto解析了。

    public BsmDTO.VehData deserialize(byte[] bytes) throws IOException 
//        try 
//            if (Objects.nonNull(bytes) && bytes.length != 0) 
//                BsmDTO.VehData bsm = BsmDTO.VehData.parseFrom(bytes);
//                return bsm;
//            
//        catch (Exception e) 
//        
        return null;
    

结果就是同样的报错,BsmSchema 的15行,这就很奇怪了,代码里根本没这一行代码,当然也通过反编译确认了没有这个代码。这里基本上验证了Flink运行的根本不是我刚刚提交的最新代码。真的是奇怪啊,这里我也确认了缓存的存在。

2.4 其他的任务运行正常

因为报的proto解析失败,BsmSchema 的15行,刚好有一个其他任务也是使用的这个BsmSchema ,但是其他任务运行正常,所以猜测不是这个类的问题。

2.5 手动清除缓存

web.tmpdir:/tmp/flink-web-600c749d-ca2b-4467-a557-923c955632a8

Job Manager下有这个web.tmpdir 的配置路径,所有上传的jar包都会存在这里。

手动清除之前上传的包,再次上传后,依然同样的报错,无语死了

2.6 控制台运行,试图越过缓存

因为之前一直使用的是web ui 提交的,所以怀疑是web 系统做了缓存,所以直接上传jar包到lib下,通过flink运行

flink run xxx.jar ,运行过后同样的报错,只不过这次展示在控制台。

最终的结果还是报错,报同样的错,有点崩溃了。

2.7 修改包名,试图越过缓存

猜测是缓存的问题,还是想越过缓存,猜测是jar的名字,所以在打包的时候直接修改了jar的名字。

还是同样的报错,没有一丝丝防备,也没有一丝丝顾虑,还是报错。

2.8 复刻环境,重现问题

实在没招了,只能怪环境了,还能咋样,在其他的机器上复刻一个同样版本的flink,然后提交任务发现执行正常,唉,还是那个环境的问题,这到底是为什么呐,难受

2.9 怀疑是包冲突导致

没有什么解决方案,就在这疑神疑鬼,怀疑是包冲突导致的,虽然没有一丝丝的痕迹显示如此,但是因为自己有限的知识,只能瞎猜了,把很多包都设置为provided的,打包上传,报错依旧没有解决。

2.10 求助大佬

寻寻觅觅,冷冷清清,凄凄惨惨戚戚,实在没招了,只能向大佬求助,在描述一顿问题之后,几个大佬都说没遇到过类似的问题,没办法了,只能自力更生。

3、解决方案

经过2天的折腾,实在没招了,也没有人求助了,只能自己瞎捉摸了,中午睡觉都在想这件事。唉,难受。

在下午的时候灵机一动,包名修改了没用,试着修改mainclass 的名字,修改之后,上传,果然解决了,困扰多日的问题终于解决了,在那一刻我被自己的聪明震惊了。哈哈,我可真是个机灵鬼。

4、总结

事后没有去重现这次问题,但是大概猜到了这个业务逻辑,

不知道是什么原因导致将flink 对提交的jar 做了类似缓存的操作,这个缓存的key 就是mainClass 的类名,所以换了类名之后就解决了问题

可能有人会问为什么不重启flink,首先原因是因为有一些任务在运行,重启会影响他们,也会破坏这个问题环境,

最后在解决问题的过程中也学习了一些flink其他的知识。解决问题的成就感满满

以上是关于flink 缓存问题处理过程,看我十八般武艺的主要内容,如果未能解决你的问题,请参考以下文章

Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1.10)

Flink 入门

2021年最新最全Flink系列教程_Flink流批一体API

01-Flink阶段内容介绍

Flink并行度

龙行商旅十八般武艺