[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子
Posted 朱清云的技术博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子相关的知识,希望对你有一定的参考价值。
上面一节博客<<[1] Flink大数据流式处理利剑: 简介>>, 整体描述了Flink是什么,能做什么,其主要架构是什么,笔者也提供了一些经典案例的链接在案例参考章节。作为入门,现在笔者就提供一个简单的用Flink来统计字符出现频率的例子来带领大家进入Flink的世界。
下面是具体的实验步骤。
- 第一步安装JDK环境: 请网上搜索
- 第二步,安装Maven
- 第三步,用archetype:generate 生成一个模板代码
mvn archetype:generate -DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-java
-DarchetypeVersion=1.12.1
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
执行后,其代码结构如下:
其pom.xml 内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>frauddetection</groupId>
<artifactId>frauddetection</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Walkthrough DataStream Java</name>
<url>https://flink.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.1</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>$target.java.version</maven.compiler.source>
<maven.compiler.target>$target.java.version</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>$log4j.version</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>$log4j.version</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>$log4j.version</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>$target.java.version</source>
<target>$target.java.version</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>spendreport.FraudDetectionJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
其main方法所在的类如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts.addSink(new AlertSink()).name("send-alerts");
env.execute("Fraud Detection");
从上面的Flink的提供的例子来看,其就是三个步骤,添加Source --> 做Transformation --> Sink;
- 因为要做文字统计,所以需要从一个source里面读取文件,可以用netcat 软件进行模拟输入流的发送。以windows为例子, 大家可以到下面的网站去下载 https://eternallybored.org/misc/netcat/
下载完后,把netcat的解压缩后的目录,配置到操作系统的path目录下面,这样就可以直接使用nc命令了
nc -L -p 9999
输入流的格式为一系列用“;”分隔开的字符串流,比如:
aaa;bbbb;aaa;aaa;aaaa;bbbb;bb;bbbb;aaaa;bb;
- 启动Flink的客户端,监听127.0.0.1:9999的文本输入流的发送;其代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
- 统计上面的aaa,bbbb,bb出现次数的代码如下:
source对象为上面步骤5创建的source流对象。
source.flatMap(new FlatMapFunction<String, String>()
@Override
public void flatMap(String value, Collector<String> out) throws Exception
String[] words = value.split(";");
for(String word : words)
out.collect(word.toLowerCase().trim());
).filter(new FilterFunction<String>()
@Override
public boolean filter(String value) throws Exception
return StringUtils.isNotEmpty(value);
).map(new MapFunction<String, Tuple2<String, Integer>>()
@Override
public Tuple2<String, Integer> map(String value) throws Exception
return new Tuple2<>(value, 1);
).keyBy(new KeySelector<Tuple2<String,Integer>, String>()
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception
return value.f0;
).sum(1).print();
- 执行流处理逻辑
注意,env对象为步骤5创建的StreamExecutionEnvironment env
env.execute("StreamingWordCountApp");
- 执行结果如下
在nc里面输入为:
在Flink的客户端的输出如下:
只看key的最后结果;bbbb 最后出现的时候,是3次;aaa最后出现的时候,次数是3次;bb最后出现的次数是2次,都能完全对上。
大家可以根据上面的代码和例子,在自己的电脑上练一练!
参考文献:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/
以上是关于[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子的主要内容,如果未能解决你的问题,请参考以下文章
[3] Flink大数据流式处理利剑: Flink的部署架构