Apache Flink:使用本地的Flink方式执行作业(调试使用)

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:使用本地的Flink方式执行作业(调试使用)相关的知识,希望对你有一定的参考价值。

1.声明

当前内容主要为本人测试和使用Apache Flink,采用本地的方式完成Flink的基本作业(非远程打包jar提交任务)

当前内容参考:Apache Flink官方文档

当前内容为:

  1. 本地运行方式运行任务
  2. 读取文本文件并打印出来
  3. 操作集合数据

2.基本pom依赖

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.13.0</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.12.1</log4j.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- This dependency is provided, because it should not be packaged into 
			the JAR file. -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

3.查找可以在本地运行的环境的方法

1.直接找到StreamExecutionEnvironment这个类并查看方法(以environment的方法)
在这里插入图片描述

这个时候直接使用createLocalEnvironment的方法(这个就是在本地执行,不用打包上传再执行)

4.开始编写demo

文本读取准备的文件abc.txt

张三 18
李四 22
王五 12
赵六 14
老七 19
重八 14

代码:


import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 
 * @author hy
 * @createTime 2021-05-23 08:18:33
 * @description 当前内容主要为测试和使用当前的Flink,主要为本地方式运行这个Flink
 *
 */
public class LocalRunningDataSourceTest {
	public static void main(String[] args) {
		// 采用本地模式
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

		// 从本地的webUI方式提供createLocalEnvironmentWithWebUI
		// Configuration conf=new Configuration();
		/* conf.set(option, value)*/
		// StreamExecutionEnvironment env =
		// StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 采用默认配置
		
		

		
		// 设定数据来源为集合数据
		DataStream<Person> flintstones = env.
				fromElements(new Person("Fred", 35),
						new Person("Wilma", 35),
				new Person("Pebbles", 2));

		DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
			@Override
			public boolean filter(Person person) throws Exception {
				return person.age >= 18;
			}
		});

		adults.print();

		
		
		// 设置数据来源为当前的文本文件:
		DataStreamSource<String> readTextFile = env.readTextFile("D:\\\\eclipse-workspace\\\\Apache-Flink-Start\\\\resources\\\\abc.txt");
		// 直接读取文件为文本类型,最后进行print操作
		readTextFile.print();
		
		try {
			// 最后开始执行
			JobExecutionResult result = env.execute("Fraud Detection");
			if(result.isJobExecutionResult()) {
				System.out.println("执行完毕......");
			}
			System.out.println();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	static class Person {
		private String name;
		private Integer age;
		// 省略get\\set\\有参\\无参\\toString等方法
	}
}

执行结果:
在这里插入图片描述
测试成功!

5.总结

1.由于在测试环境下,打包上传到Flink太麻烦,所以采取本地运行Flink方式,可以有效的debug和测试发现各种问题

2.Apache Flink和Apache Spark都是支持本地运行的方式运行作业的!

以上是关于Apache Flink:使用本地的Flink方式执行作业(调试使用)的主要内容,如果未能解决你的问题,请参考以下文章

Flink实战系列Flink 本地 Web UI 的配置及使用

flink报错踩坑:org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction

bug记录: 本地idea运行没有问题,打包后出现ClassNotFoundException: org.apache.flink.util.OptionalFailure。

Flink SQL DDL

Flink的安装部署及WordCount测试

Flink的安装部署及WordCount测试