Apache Flink:简单的Java工作Demo(基于官方demo)

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:简单的Java工作Demo(基于官方demo)相关的知识,希望对你有一定的参考价值。

当前开发环境:eclipseJDK1.8Apache Flink1.13.0

1.声明

当前内容主要为本人学习,内容主要来源官方文档

当前内容主要为

  1. 使用maven方式创建基于官方欺诈检测的demo
  2. 修改数据来源为:随机数
  3. 自定义的Sink来处理收集数据
  4. 将maven项目打包并上传到flink中并执行
  5. 查看最后的数据

2.按照官方方式创建maven项目并实现修改

1.使用官方命令方式创建maven项目(没有换行)

 mvn archetype:generate  -DarchetypeGroupId=org.apache.flink  -DarchetypeArtifactId=flink-walkthrough-datastream-java  -DarchetypeVersion=1.13.0  -DgroupId=frauddetection  -DartifactId=frauddetection  -Dversion=0.1  -Dpackage=spendreport  -DinteractiveMode=false

执行结果
在这里插入图片描述
在这里插入图片描述

2.将生成的文件拷贝到eclipse工程中,还有部分xml的依赖
在这里插入图片描述

3.修改pom文件内容,实现使用maven打包

<groupId>Apache-Flink-Start</groupId>
	<artifactId>Apache-Flink-Start</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Apache-Flink-Start</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.13.0</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.12.1</log4j.version>
	</properties>

	<!-- <repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>Apache Development Snapshot Repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories> -->

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- This dependency is provided, because it should not be packaged into 
			the JAR file. -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<!-- Add connector dependencies here. They must be in the default scope 
			(compile). -->

		<!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> 
			<version>${flink.version}</version> </dependency> -->

		<!-- Add logging framework, to produce console output when running in the 
			IDE. -->
		<!-- These dependencies are excluded from the application JAR by default. -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>1.2.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<transformers>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.hy.flink.test.FraudDetectionJob</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>


	</build>

3.等待依赖加载完毕即可,此时flink的java的maven环境就好了

3.调整项目数据来源为随机数产生

1.创建账户交易类


/**
 * 
 * @author hy
 * @createTime 2021-05-16 09:58:32
 * @description 一个账户交易实体类
 *
 */
public class AccountTransation {
	private String accountId;// 当前账户交易编号
	private Date transationTime; // 交易时间
	private Double transationMoney;// 交易金额
	// 省略get\\set\\无参、有参、toString等方法
}

2.创建随机数产生账户交易类(参考官方类)

/**
 * 
 * @author hy
 * @createTime 2021-05-16 09:52:36
 * @description 生成一个随机数的资源
 *
 */
public class RandomNumberAccountTransactionSource extends FromIteratorFunction<AccountTransation> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public RandomNumberAccountTransactionSource() {
		super(new RandomNumberIterator());
		// TODO Auto-generated constructor stub
	}

	private static class RandomNumberIterator
			implements Iterator<AccountTransation>, Serializable{

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;
		private String[] accountIds = { "1001", "1002", "1003", "1004" };

		private RandomNumberIterator() {
		}

		@Override
		public boolean hasNext() {
			return true;
		}

		@Override
		public AccountTransation next() {
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			int index = (int) (Math.random() * accountIds.length);
			String accountId = accountIds[index];
			Double money = Math.random() * 1000 + 1;
			AccountTransation accountTransation = new AccountTransation(accountId, new Date(), money);
			return accountTransation;
		}
	}

}

3.创建最后的Sink(就是收集后的数据,主要通过Log显示)

/**
 * 
 * @author hy
 * @createTime 2021-05-16 10:27:25
 * @description 主要为打印当前的账户的信息
 *
 */
public class AccountAlertSink implements SinkFunction<AccountTransation> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);

	@Override
	public void invoke(AccountTransation value, Context context) {
		LOG.info(value.toString());
	}
}

4.调整项目main和其他

1.修改FraudDetector

package com.hy.flink.test;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

import com.hy.flink.test.pojo.AccountTransation;

/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<String, AccountTransation, AccountTransation> {

	private static final long serialVersionUID = 1L;

	private static final double SMALL_AMOUNT = 1.00;
	private static final double LARGE_AMOUNT = 500.00;
	private static final long ONE_MINUTE = 60 * 1000;

	@Override
	public void processElement(
			AccountTransation transaction,
			Context context,
			Collector<AccountTransation> collector) throws Exception {

		// 不停的收集数据,即存在诈骗的交易账户
		if(transaction.getTransationMoney()>LARGE_AMOUNT) {
			collector.collect(transaction);
		}
		
	}
}

2.调整FraudDetectionJob


package com.hy.flink.test;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import com.hy.flink.test.pojo.AccountAlertSink;
import com.hy.flink.test.pojo.AccountTransation;
import com.hy.flink.test.source.RandomNumberAccountTransactionSource;

/**
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob {

	public static void main(String[] args) throws Exception {
		// 设置你的执行环境,任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		 

		// 设定数据来源
		DataStream<AccountTransation> transactions = env
				.addSource(new RandomNumberAccountTransactionSource())
				.name("transactions");

		// 处理数据来源
		DataStream<AccountTransation> alerts = transactions
				.keyBy(AccountTransation::getAccountId)
				.process(new FraudDetector()) // 这里这个对象才是处理数据来源的
				.name("fraud-detector");

		// sink就是最后的结果()
		alerts
		.addSink(new AccountAlertSink())
		.name("send-alerts");

		// 最后开始执行
		env.execute("Fraud Detection");
	}
}

5.将maven项目打包

右键项目文件夹
在这里插入图片描述
在这里插入图片描述

然后开始执行run即可

等待打包完成

在这里插入图片描述

6.将jar包上传到flink上面并运行

1.直接上传到flink的文件夹中即可

在这里插入图片描述

2.运行使用flink运行jar

./bin/flink run Apache-Flink-Start-0.0.1-SNAPSHOT.jar

在这里插入图片描述

7.查看web界面运行结果

在这里插入图片描述
查看运行日志
在这里插入图片描述
在这里插入图片描述

这里直接打印结果了

8.取消任务

在这里插入图片描述

在这里插入图片描述

取消后(后台会报错)
在这里插入图片描述

此时任务已经取消了,并完成了执行任务

9.总结

1.Apache Flink是一个计算框架,主要处理数据流,一般分为设置数据源,解析和操作数据,输出结果数据

2.Apache Flink不关心数据来源(有边界或者无边界),不关心数据结果输出的地点,内部只进行数据的各种处理的操作

3.Source就是数据来源,Sink就是结果,通过process来执行数据处理任务,collector来收集数据

以上是关于Apache Flink:简单的Java工作Demo(基于官方demo)的主要内容,如果未能解决你的问题,请参考以下文章

flink-streaming-java 在 Apache Flink 中不可用

Apache Flink:使用Apache Kafka作为DataSource的简单demo

官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!

官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!

Flink:java.lang.NoSuchMethodError:AvroSchemaConverter

Apache Flink -任意文件写入漏洞复现(CVE-2020-17518)