Apache Flink:简单的Java工作Demo(基于官方demo)
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:简单的Java工作Demo(基于官方demo)相关的知识,希望对你有一定的参考价值。
当前开发环境:eclipse
、JDK1.8
、Apache Flink1.13.0
1.声明
当前内容主要为本人学习,内容主要来源官方文档
当前内容主要为
- 使用maven方式创建基于官方欺诈检测的demo
- 修改数据来源为:随机数
- 自定义的Sink来处理收集数据
- 将maven项目打包并上传到flink中并执行
- 查看最后的数据
2.按照官方方式创建maven项目并实现修改
1.使用官方命令方式创建maven项目(没有换行)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.13.0 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
执行结果
2.将生成的文件拷贝到eclipse工程中,还有部分xml的依赖
3.修改pom文件内容,实现使用maven打包
<groupId>Apache-Flink-Start</groupId>
<artifactId>Apache-Flink-Start</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Apache-Flink-Start</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<!-- <repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories> -->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into
the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<!-- Add connector dependencies here. They must be in the default scope
(compile). -->
<!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version> </dependency> -->
<!-- Add logging framework, to produce console output when running in the
IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.hy.flink.test.FraudDetectionJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.等待依赖加载完毕即可,此时flink的java的maven环境就好了
3.调整项目数据来源为随机数产生
1.创建账户交易类
/**
*
* @author hy
* @createTime 2021-05-16 09:58:32
* @description 一个账户交易实体类
*
*/
public class AccountTransation {
private String accountId;// 当前账户交易编号
private Date transationTime; // 交易时间
private Double transationMoney;// 交易金额
// 省略get\\set\\无参、有参、toString等方法
}
2.创建随机数产生账户交易类(参考官方类)
/**
*
* @author hy
* @createTime 2021-05-16 09:52:36
* @description 生成一个随机数的资源
*
*/
public class RandomNumberAccountTransactionSource extends FromIteratorFunction<AccountTransation> {
/**
*
*/
private static final long serialVersionUID = 1L;
public RandomNumberAccountTransactionSource() {
super(new RandomNumberIterator());
// TODO Auto-generated constructor stub
}
private static class RandomNumberIterator
implements Iterator<AccountTransation>, Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String[] accountIds = { "1001", "1002", "1003", "1004" };
private RandomNumberIterator() {
}
@Override
public boolean hasNext() {
return true;
}
@Override
public AccountTransation next() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int index = (int) (Math.random() * accountIds.length);
String accountId = accountIds[index];
Double money = Math.random() * 1000 + 1;
AccountTransation accountTransation = new AccountTransation(accountId, new Date(), money);
return accountTransation;
}
}
}
3.创建最后的Sink(就是收集后的数据,主要通过Log显示)
/**
*
* @author hy
* @createTime 2021-05-16 10:27:25
* @description 主要为打印当前的账户的信息
*
*/
public class AccountAlertSink implements SinkFunction<AccountTransation> {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(AccountTransation value, Context context) {
LOG.info(value.toString());
}
}
4.调整项目main和其他
1.修改FraudDetector
package com.hy.flink.test;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import com.hy.flink.test.pojo.AccountTransation;
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector extends KeyedProcessFunction<String, AccountTransation, AccountTransation> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
AccountTransation transaction,
Context context,
Collector<AccountTransation> collector) throws Exception {
// 不停的收集数据,即存在诈骗的交易账户
if(transaction.getTransationMoney()>LARGE_AMOUNT) {
collector.collect(transaction);
}
}
}
2.调整FraudDetectionJob
package com.hy.flink.test;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import com.hy.flink.test.pojo.AccountAlertSink;
import com.hy.flink.test.pojo.AccountTransation;
import com.hy.flink.test.source.RandomNumberAccountTransactionSource;
/**
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 设置你的执行环境,任务执行环境用于定义任务的属性、创建数据源以及最终启动任务的执行
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设定数据来源
DataStream<AccountTransation> transactions = env
.addSource(new RandomNumberAccountTransactionSource())
.name("transactions");
// 处理数据来源
DataStream<AccountTransation> alerts = transactions
.keyBy(AccountTransation::getAccountId)
.process(new FraudDetector()) // 这里这个对象才是处理数据来源的
.name("fraud-detector");
// sink就是最后的结果()
alerts
.addSink(new AccountAlertSink())
.name("send-alerts");
// 最后开始执行
env.execute("Fraud Detection");
}
}
5.将maven项目打包
右键项目文件夹
然后开始执行run即可
等待打包完成
6.将jar包上传到flink上面并运行
1.直接上传到flink的文件夹中即可
2.运行使用flink运行jar
./bin/flink run Apache-Flink-Start-0.0.1-SNAPSHOT.jar
7.查看web界面运行结果
查看运行日志
这里直接打印结果了
8.取消任务
取消后(后台会报错)
此时任务已经取消了,并完成了执行任务
9.总结
1.Apache Flink是一个计算框架,主要处理数据流,一般分为设置数据源,解析和操作数据,输出结果数据
2.Apache Flink不关心数据来源(有边界或者无边界),不关心数据结果输出的地点,内部只进行数据的各种处理的操作
3.Source就是数据来源,Sink就是结果,通过process来执行数据处理任务,collector来收集数据
以上是关于Apache Flink:简单的Java工作Demo(基于官方demo)的主要内容,如果未能解决你的问题,请参考以下文章
flink-streaming-java 在 Apache Flink 中不可用
Apache Flink:使用Apache Kafka作为DataSource的简单demo
官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!
官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!