Apache Flink:使用本地的Flink方式执行作业(调试使用)
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:使用本地的Flink方式执行作业(调试使用)相关的知识,希望对你有一定的参考价值。
1.声明
当前内容主要为本人测试和使用Apache Flink,采用本地的方式完成Flink的基本作业(非远程打包jar提交任务)
当前内容参考:Apache Flink官方文档
当前内容为:
- 本地运行方式运行任务
- 读取文本文件并打印出来
- 操作集合数据
2.基本pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into
the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
3.查找可以在本地运行的环境的方法
1.直接找到StreamExecutionEnvironment
这个类并查看方法(以environment的方法)
这个时候直接使用createLocalEnvironment
的方法(这个就是在本地执行,不用打包上传再执行
)
4.开始编写demo
文本读取准备的文件abc.txt
张三 18
李四 22
王五 12
赵六 14
老七 19
重八 14
代码:
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*
* @author hy
* @createTime 2021-05-23 08:18:33
* @description 当前内容主要为测试和使用当前的Flink,主要为本地方式运行这个Flink
*
*/
public class LocalRunningDataSourceTest {
public static void main(String[] args) {
// 采用本地模式
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 从本地的webUI方式提供createLocalEnvironmentWithWebUI
// Configuration conf=new Configuration();
/* conf.set(option, value)*/
// StreamExecutionEnvironment env =
// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 采用默认配置
// 设定数据来源为集合数据
DataStream<Person> flintstones = env.
fromElements(new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
// 设置数据来源为当前的文本文件:
DataStreamSource<String> readTextFile = env.readTextFile("D:\\\\eclipse-workspace\\\\Apache-Flink-Start\\\\resources\\\\abc.txt");
// 直接读取文件为文本类型,最后进行print操作
readTextFile.print();
try {
// 最后开始执行
JobExecutionResult result = env.execute("Fraud Detection");
if(result.isJobExecutionResult()) {
System.out.println("执行完毕......");
}
System.out.println();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static class Person {
private String name;
private Integer age;
// 省略get\\set\\有参\\无参\\toString等方法
}
}
执行结果:
测试成功!
5.总结
1.由于在测试环境下,打包上传到Flink太麻烦,所以采取本地运行Flink方式,可以有效的debug和测试发现各种问题
2.Apache Flink和Apache Spark
都是支持本地运行的方式运行作业的!
以上是关于Apache Flink:使用本地的Flink方式执行作业(调试使用)的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 本地 Web UI 的配置及使用
flink报错踩坑:org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction
bug记录: 本地idea运行没有问题,打包后出现ClassNotFoundException: org.apache.flink.util.OptionalFailure。