调试flink源码

Posted 浪尖聊大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了调试flink源码相关的知识,希望对你有一定的参考价值。

本文主要是讲讲flink的源码编译,案例运行,flink源码调试过程。调试flink的源码及案例,需要先clone工程,编一下源码,去掉规范检查,修改工程,最后才是调试运行。

1. clone工程

git@github.com:apache/flink.git

接着在idea点击路径

File--->New--->Project from Version Control--->git

弹出窗口

调试flink源码

工程clone完成之后,可以在idea 的右下角切换到自己所用的分支,我的分支是1.6.

调试flink源码

切换完成之后,分支显示为:

调试flink源码

2. 编译源码

源码编译可以直接用idea的maven插件。

调试flink源码

报错如下:

调试flink源码

修改一下根目录下的pom.xml文件

去掉代码风格检查,注释掉这个的主要原因是我们要改源码,不注释掉无法编译通过。

<plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <version>2.17</version>
         <dependencies>
           <dependency>
             <groupId>com.puppycrawl.tools</groupId>
             <artifactId>checkstyle</artifactId>
             <!-- Note: match version with docs/internals/ide_setup.md -->
             <version>8.4</version>
           </dependency>
         </dependencies>
         <executions>
           <execution>
             <id>validate</id>
             <phase>validate</phase>
             <goals>
               <goal>check</goal>
             </goals>
           </execution>
         </executions>
         <configuration>
           <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <configLocation>/tools/maven/checkstyle.xml</configLocation>
           <logViolationsToConsole>true</logViolationsToConsole>
           <failOnViolation>true</failOnViolation>
         </configuration>
       </plugin>

再次编译,即可。

3. 运行kafka案例

点开工程栏,找到flink-examples模块,然后找到kafka案例,如下:

调试flink源码

将kafka的example修改为可运行的案例,官方demo是通过打包提交到集群的方式运行,需要传参的,而我们直接在idea中运行,不需要穿参数。代码修改如下:

Properties props = new Properties();
   props.put("bootstrap.servers", "mt-mdh.local:9093");
   props.put("zookeeper.connect","localhost:2181");
   props.put("group.id","test");

   props.put("metadata.fetch.timeout.ms","10000");
   props.put("metadata.max.age.ms","30000");
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.getConfig().disableSysoutLogging();
   env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
   env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

   DataStream<KafkaEvent> input = env
       .addSource(
         new FlinkKafkaConsumer010<>(
           "",
           new KafkaEventSchema(),
           props)
         .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
       .keyBy("word")
       .map(new RollingAdditionMapper());

   input.addSink(
       new FlinkKafkaProducer010<>(
           "bar",
           new KafkaEventSchema(),
           props));

   env.execute("Kafka 0.10 Example");

然后,右键,run。发现,并不能顺心如意的运行,还是报了一堆错误。。。

调试flink源码

实际上,只需要改一些run的运行配置即可避免该错误。

调试flink源码

在导航栏,run---> Edit Configurations

调试flink源码

修改为

再运行,就ok了。

关于debug,只要run运行成功之后,直接可以debug的。。。

flink的源码调试debug及阅读经验,敬请期待后续,文章,也可以点击原文阅读加入浪尖知识星球。


推荐阅读:





点赞,然后分享给小伙伴吧~

以上是关于调试flink源码的主要内容,如果未能解决你的问题,请参考以下文章

FLINK源代码调试方式

FLINK源代码调试方式

手把手教你获取编译和调试Flink的源代码

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

Flink on yarn 远程调试

「Flink」配置使用Flink调试WebUI