[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子

Posted 朱清云的技术博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子相关的知识,希望对你有一定的参考价值。

上面一节博客<<[1] Flink大数据流式处理利剑: 简介>>, 整体描述了Flink是什么,能做什么,其主要架构是什么,笔者也提供了一些经典案例的链接在案例参考章节。作为入门,现在笔者就提供一个简单的用Flink来统计字符出现频率的例子来带领大家进入Flink的世界。

下面是具体的实验步骤。

  1. 第一步安装JDK环境: 请网上搜索
  2. 第二步,安装Maven
  3. 第三步,用archetype:generate 生成一个模板代码
mvn archetype:generate  -DarchetypeGroupId=org.apache.flink  
-DarchetypeArtifactId=flink-walkthrough-datastream-java    
-DarchetypeVersion=1.12.1   
-DgroupId=frauddetection   
-DartifactId=frauddetection   
-Dversion=0.1    
-Dpackage=spendreport 
-DinteractiveMode=false

执行后,其代码结构如下:

其pom.xml 内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>frauddetection</groupId>
	<artifactId>frauddetection</artifactId>
	<version>0.1</version>
	<packaging>jar</packaging>
	<name>Flink Walkthrough DataStream Java</name>
	<url>https://flink.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.12.1</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>$target.java.version</maven.compiler.source>
		<maven.compiler.target>$target.java.version</maven.compiler.target>
		<log4j.version>2.12.1</log4j.version>
	</properties>
	<repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>Apache Development Snapshot Repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-walkthrough-common_$scala.binary.version</artifactId>
			<version>$flink.version</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
			<version>$flink.version</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_$scala.binary.version</artifactId>
			<version>$flink.version</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>$log4j.version</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>$log4j.version</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>$log4j.version</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>

			<!-- Java Compiler -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>$target.java.version</source>
					<target>$target.java.version</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<!-- Run shade goal on package phase -->
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>org.apache.flink:force-shading</exclude>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>org.apache.logging.log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>spendreport.FraudDetectionJob</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

		<pluginManagement>
			<plugins>
				<plugin>
					<groupId>org.eclipse.m2e</groupId>
					<artifactId>lifecycle-mapping</artifactId>
					<version>1.0.0</version>
					<configuration>
						<lifecycleMappingMetadata>
							<pluginExecutions>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-shade-plugin</artifactId>
										<versionRange>[3.0.0,)</versionRange>
										<goals>
											<goal>shade</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
								<pluginExecution>
									<pluginExecutionFilter>
										<groupId>org.apache.maven.plugins</groupId>
										<artifactId>maven-compiler-plugin</artifactId>
										<versionRange>[3.1,)</versionRange>
										<goals>
											<goal>testCompile</goal>
											<goal>compile</goal>
										</goals>
									</pluginExecutionFilter>
									<action>
										<ignore/>
									</action>
								</pluginExecution>
							</pluginExecutions>
						</lifecycleMappingMetadata>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
	</build>
</project>

其main方法所在的类如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob 
	public static void main(String[] args) throws Exception 
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		DataStream<Transaction> transactions = env
			.addSource(new TransactionSource())
			.name("transactions");

		DataStream<Alert> alerts = transactions.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");

		alerts.addSink(new AlertSink()).name("send-alerts");

		env.execute("Fraud Detection");
	

从上面的Flink的提供的例子来看,其就是三个步骤,添加Source --> 做Transformation --> Sink;

  1. 因为要做文字统计,所以需要从一个source里面读取文件,可以用netcat 软件进行模拟输入流的发送。以windows为例子, 大家可以到下面的网站去下载 https://eternallybored.org/misc/netcat/

    下载完后,把netcat的解压缩后的目录,配置到操作系统的path目录下面,这样就可以直接使用nc命令了
nc -L -p 9999


输入流的格式为一系列用“;”分隔开的字符串流,比如:

aaa;bbbb;aaa;aaa;aaaa;bbbb;bb;bbbb;aaaa;bb;
  1. 启动Flink的客户端,监听127.0.0.1:9999的文本输入流的发送;其代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
  1. 统计上面的aaa,bbbb,bb出现次数的代码如下:
    source对象为上面步骤5创建的source流对象。
	 source.flatMap(new FlatMapFunction<String, String>() 
         @Override
         public void flatMap(String value, Collector<String> out) throws Exception 
             String[] words = value.split(";");
             for(String word : words) 
                 out.collect(word.toLowerCase().trim());
             
         
     ).filter(new FilterFunction<String>() 
         @Override
         public boolean filter(String value) throws Exception 
             return StringUtils.isNotEmpty(value);
         
     ).map(new MapFunction<String, Tuple2<String, Integer>>() 
         @Override
         public Tuple2<String, Integer> map(String value) throws Exception 
             return new Tuple2<>(value, 1);
         
     ).keyBy(new KeySelector<Tuple2<String,Integer>, String>() 
         @Override
         public String getKey(Tuple2<String, Integer> value) throws Exception 
             return value.f0;
         
     ).sum(1).print();
  1. 执行流处理逻辑
    注意,env对象为步骤5创建的StreamExecutionEnvironment env
 env.execute("StreamingWordCountApp");
  1. 执行结果如下
    在nc里面输入为:

    在Flink的客户端的输出如下:

    只看key的最后结果;bbbb 最后出现的时候,是3次;aaa最后出现的时候,次数是3次;bb最后出现的次数是2次,都能完全对上。

大家可以根据上面的代码和例子,在自己的电脑上练一练!

参考文献:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/

以上是关于[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子的主要内容,如果未能解决你的问题,请参考以下文章

[3] Flink大数据流式处理利剑: Flink的部署架构

[4] Flink大数据流式处理利剑: Flink集群安装和运行

[1] Flink大数据流式处理利剑: 简介

[1] Flink大数据流式处理利剑: 简介

Flink

今晚直播流式处理新秀Flink原理与实践