Datastream 开发打包问题
Posted 阿里云云栖号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Datastream 开发打包问题相关的知识,希望对你有一定的参考价值。
简介:Datastream作业开发时往往会遇到一些jar包冲突等问题,本文主要讲解作业开发时需要引入哪些依赖以及哪些需要被打包进作业的jar中,从而避免不必要的依赖被打入了作业jar中以及可能产生的依赖冲突。
Datastream作业开发时往往会遇到一些jar包冲突等问题,本文主要讲解作业开发时需要引入哪些依赖以及哪些需要被打包进作业的jar中,从而避免不必要的依赖被打入了作业jar中以及可能产生的依赖冲突。
一个Datastream作业主要涉及下述依赖:
Flink的核心依赖以及应用程序自身的依赖
每一个Flink应用程序都依赖于一系列相关的库,其中至少应该包括Flink的API. 许多应用程序还依赖于连接器相关的库(如 Kafka, Cassandra等).在运行Flink应用程序时,无论是在运行在分布式的环境下还是在本地IDE进行测试,Flink的运行时相关依赖都是必须的。
与大多数运行用户自定义应用程序的系统一样,Flink 中有两大类依赖项:
- Flink核心依赖:Flink 本身由一组运行系统所必需的类和依赖项组成,例如协调器、网络、检查点、容错、API、算子(例如窗口)、资源管理等。 所有这些类和依赖项的集合构成了 Flink 运行时的核心,在 Flink 应用程序启动时必须存在。这些核心类和依赖项都被打包在 flink-dist jar 中。 它们是 Flink 的 lib 文件夹的一部分,也是Flink基础容器镜像的一部分。这些依赖之于Flink就像Java 运行所需的包含 String 和 List 等类的核心库(rt.jar、charsets.jar 等)之于Java。Flink的核心依赖不包含任何连接器或扩展库(CEP、SQL、ML等),这使得Flink的核心依赖尽可能小,以避免默认情况下类路径中有过多的依赖项,同时减少依赖冲突。
- 用户应用程序依赖项:指特定用户应用程序所需的所有连接器、Format或扩展库。用户应用程序通常被打包成一个 jar文件,其中包含应用程序代码以及所需的连接器和库依赖项。用户应用程序依赖项不应包括 Flink DataStream API 和运行时依赖项,因为这些已经被包含在了Flink 的核心依赖中。
依赖配置步骤
1.添加基础依赖
每一个Flink应用程序的开发至少需要添加对相关API的基础依赖。
手动配置项目时,需要添加对Java/Scala API的依赖(这里以Maven为例,在其他构建工具(Gradle,SBT等)中可以使用同样的依赖)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.12.3</version>
<scope>provided</scope>
</dependency>
重要提示:请注意,所有这些依赖项都将其范围设置为"provided"。这意味着需要对它们进行编译,但不应将它们打包到项目生成的应用程序jar文件中——这些依赖项是Flink核心依赖项,在实际运行时已经被加载。
强烈建议将依赖项设置成"provided"的范围,如果未将它们设置为"provided",最好的情况下会导致生成的jar变得臃肿,因为它还包含所有Flink核心依赖项。而最怀的情况下,添加到应用程序jar文件中的Flink核心依赖项与您自己的一些依赖项会发生版本冲突(通常通过Flink的反向类加载机制来避免)。
关于IntelliJ的注意事项:为了使应用程序在IntelliJ IDEA中运行,有必要在运行配置中勾选"Include dependencies with "Provided" scope"选项框。如果没有该选项(可能是由于使用较旧的IntelliJ IDEA版本),那么一个简单的解决方法是创建一个调用应用程序 main() 方法的测试用例。
2.添加连接器和库的依赖
大多数应用程序的运行需要特定的连接器或库,例如Kafka、Cassandra等连接器。这些连接器不是Flink核心依赖项的一部分,必须作为额外依赖项添加到应用程序中。
下述代码是添加Kafka连接器依赖项的示例(Maven语法):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
我们建议将应用程序代码和它所有的依赖以jar-with-dependencies 的形式打包到一个application jar中。这个应用程序jar包可以被提交到已经存在的Flink集群上去,或者被加入到Flink应用程序的容器镜像中去。
从Maven作业模版(见下文Maven作业模版部分)创建的项目,通过mvn clean package命令会自动把依赖打到应用程序的jar包中去。对于没有使用模版进行配置的情况,建议使用Maven Shade Plugin (配置如附录所示) 来构建包含依赖的jar包。
重要提示:对于Maven(和其他构建工具)来说,要将依赖项正确打包到应用程序jar中,这些应用程序依赖项的scope必须指定为"compile"(与核心依赖项不同,核心依赖项的scope必须指定为"provided")。
注意事项
Scala版本
Scala的不同版本(2.11,2.12等)相互之间是不兼容的。因此,Scala 2.11对应的Flink版本不能用于使用Scala 2.12的应用程序。
所有依赖(或传递依赖)于Scala的Flink依赖项都以构建它们的Scala版本作为后缀,例如flink-streaming-scala_2.11。
只使用Java进行开发时可以选择任何Scala版本,使用Scala开发时需要选择与其应用程序的Scala版本匹配的Flink依赖版本。
注:2.12.8之后的Scala版本与之前的2.12.x版本不兼容,因此Flink项目无法将其2.12.x版本升级到2.12.8之后的版本。用户可以在本地自己编译对应Scala版本的Flink。为了使其能够正常工作,需要添加-Djapicmp.skip以在构建时跳过二进制兼容性检查。
Hadoop依赖
一般的规则: 永远不要将Hadoop相关依赖直接添加到应用程序中. (唯一的例外是将现有的Hadoop输入/输出Format与Flink的Hadoop兼容包一起使用时)
如果希望将Flink与Hadoop结合使用,则需要包含Hadoop依赖的Flink启动项,而不是将Hadoop添加为应用程序依赖项。Flink将使用HADOOP_CLASSPATH环境变量指定的Hadoop依赖项,可通过以下方式进行设置:
export HADOOP_CLASSPATH**=**
hadoop classpath``
这种设计有两个主要原因:
- 一些与Hadoop的交互可能发生在Flink的核心模块中,并且在用户应用程序启动之前,例如为检查点设置HDFS、通过Hadoop的Kerberos令牌进行身份验证,或者在YARN上进行部署等。
- Flink的反向类加载机制从核心依赖项中隐藏了许多可传递的依赖项。这不仅适用于Flink自己的核心依赖项,而且适用于Hadoop的依赖项。这样,应用程序就可以使用相同依赖项的不同版本,而不会发生依赖项冲突(相信我们,这是一件大事,因为Hadoop依赖树非常庞大。)
如果在IDE内部的测试或开发过程中需要Hadoop依赖项(例如HDFS访问),请将这些依赖项的scope配置为
test 或则 provided。
Transform table connector/format resources #
Flink使用Java的Service Provider Interfaces (SPI) 机制通过特定标识符加载table的connector/format工厂。由于每个table的connector/format的名为org.apache.flink.table.factories.Factory的SPI资源文件位于同一目录:META-INF/services下,因此在构建使用多个table connector/format的项目的uber jar时,这些资源文件将相互覆盖,这将导致Flink无法正确加载工厂类。
在这种情况下,推荐的方法是通过maven shade插件的ServicesResourceTransformer转换META-INF/services目录下的这些资源文件。给定示例的pom.xml文件内容如下,其中包含连接器flink-sql-connector-hive-3.1.2和flink-parquet format。
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>myProject</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- other project dependencies ...-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-3.1.2__2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet__2.11<</artifactId>
<version>1.13.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- ... -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
在配置了ServicesResourceTransformer之后, 项目构建uber-jar时,META-INF/services目录下的这些资源文件会被整合在一起而不是相互覆盖。
Maven作业模版
强烈建议使用该方式进行配置,可以减少很多重复的配置工作。
前置要求
唯一的环境要求是安装了Maven 3.0.4(或更高版本)和Java 8.x。
创建项目
使用以下两种方式中的一种创建项目:
- 使用Maven archetypes
$ mvn archetype:generate \\
-DarchetypeGroupId=org.apache.flink \\
-DarchetypeArtifactId=flink-quickstart-java \\
-DarchetypeVersion=1.12.3
这允许您命名新创建的项目。它将以交互方式要求您输入groupId、artifactId和包名。
- 运行quickstart脚本
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.3
我们建议您将此项目导入IDE以开发和测试它。IntelliJ IDEA原生支持Maven项目。如果使用Eclipse,可以使用m2e插件导入Maven项目。默认情况下,某些Eclipse捆绑包包含该插件,否则需要您手动安装。
请注意:默认的Java JVM heap size对于Flink来说可能太小了。你必须手动增加它。在Eclipse中,选择RunConfigurations->Arguments并写入VM Arguments框:-Xmx800m。在IntelliJ IDEA中,更改JVM选项的推荐方法是使用Help | Edit Custom VM Options选项菜单。细节见这篇文章.
构建项目
如果要生成/打包项目,请转到项目目录并运行"mvn clean package"命令。执行后将会得到一个JAR文件:target/-.jar,其中包含您的应用程序,以及作为依赖项添加到应用程序的连接器和库。
注意:如果使用与StreamingJob不同的类作为应用程序的主类/入口点,我们建议您相应地更改pom.xml文件中的mainClass设置。这样,Flink就可以直接从JAR文件运行应用程序,而无需另外指定主类。
附录: 构建带依赖的jar包的模版
要构建包含连接器和库所需的所有依赖项的应用程序JAR,可以使用以下shade插件定义:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
本文为阿里云原创内容,未经允许不得转载。
以上是关于Datastream 开发打包问题的主要内容,如果未能解决你的问题,请参考以下文章
4.Flink入门案例前置说明准备环境代码实现-DataSet-了解DataStream--匿名内部类--处理批DataStream-匿名内部类-处理流LambdaOn-Yarn-掌握
FusionInsight MRS Flink DataStream API读写Hudi实践