从0到1Flink的成长之路

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路相关的知识,希望对你有一定的参考价值。

继续坚持!!!

 

Flink入门案例

  • 前言
  • 一、API和编程模型
  • 二、准备工程
    • 1.pom文件
    • 2.log4j.properties
  • Flink初体验
    • 1.批处理:WorldCount
    • 2.流处理:WorldCount
  • 应用提交部署
    • 1.修改流式程序
    • 2.命令方式提交
    • 3.UI方式提交

前言

基于Flink计算引擎,分别用批处理(Batch Processing)和流处理(Streaming Process)中实现经典程序:词频统计WordCount,具体开发环境准备和编程实现如下。


 

一、API和编程模型

 API

Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大。

编程模型

Flink 应用程序结构主要包含三部分:Source/Transformation/Sink,如下图所示:

第一步、从数据源DataSource获取数据
第二步、对数据进行转换处理
第三步、结果数据输出DataSink

2 准备工程

      1.pom文件

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.10.0</flink.version>
<scala.version>2.11</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>

<!-- Apache Flink 的依赖, 这些依赖项,不应该打包到JAR文件中. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- flink操作hdfs,所需要导入该包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>

<!-- 添加logging框架, 在IDE中运行时生成控制台输出. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>

<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--<mainClass>xxx.xxxx.flink.StreamWordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

2. log4j.properties4

log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

Flink初体验

       1.批处理:WordCount

基于Flink 分析引擎,编程实现批处理词频统计WordCount,从本地系统读取文本数据,分析处理后,将结果打印控制台。

package xxx.xxxx.flink.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 使用Flink计算引擎实现离线批处理:词频统计WordCount
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class FlinkBatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 准备环境-env:创建ExecutionEnvironment实例对象
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;
// 2. 准备数据-source:从本地文件系统读取文本数据
DataSource<String> inputDataSet = env.readTextFile("datas/wordcount.data");
// 3. 处理数据-transformation:调用DataSet中转换函数,处理分析数据
// 3.1 将每行数据按照分隔符进行分割
FlatMapOperator<String, String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\\\s+");
for(String word: words){
out.collect(word); // 输出操作
}
}
});
// 3.2 将每个单词转换为二元组,表示每个单词出现一次
MapOperator<String, Tuple2<String, Integer>> tupleDataSet = wordDataSet.map(new MapFunction<String, Tuple2<S
tring, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3 对元组类型数据,按照单词分组和组内聚合统计
AggregateOperator<Tuple2<String, Integer>> resultDataSet = tupleDataSet
.groupBy(0) // 按照单词分组,索引下标为0
.sum(1);// 次数累加,索引下标为1
// 4. 数据输出-sink:将结果数据打印控制台
resultDataSet.print();
// 5. 触发执行-execute:触发计算,将结果打印控制台
env.execute(FlinkBatchWordCount.class.getSimpleName());
}
}

2. 流处理:WordCount

编写Flink程序,接收TCP Socket的单词数据,并以空格进行单词拆分,分组统计单词个数。

package xx.xxxx.flink.stream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 使用Flink 计算引擎实现流式数据处理:从Socket接收数据,实时进行词频统计WordCount
*/
public class FlinkStreamWordCount {
public static void main(String[] args) throws Exception {
// 1.准备环境-env:获取流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
// 2.准备数据-source:从Socket实时接收流式数据,指定IP地址和端口号
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3.处理数据-transformation:调用DataStream中转换函数处理分析数据
// 3.1 对每行数据按照符分割
SingleOutputStreamOperator<String> wordDataStream = inputDataStream.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for(String word: line.trim().split("\\\\s+")){
out.collect(word);
}
}
}
);
// 3.2 将每个单词转换为二元组,表示每个单词出现一次
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDataStream = wordDataStream.map(
new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
}
);
// 3.3 对元组数据,按照单词分组和组内聚合统计
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = tupleDataStream
.keyBy(0) // 单词分组,索引下标为:0
.sum(1);// 次数累加,索引下标为:1
// 4.输出结果-sink:将结果数据打印控制台
resultDataStream.print();
// 5.触发执行-execute:启动流式应用
env.execute(FlinkStreamWordCount.class.getSimpleName()) ;
}
}

启动NetCat终端,命令如下:`nc -lk`,再运行Flunk程序,运行结果截图如下:

附录:使用Java 8中提供Lambda表达式和Stream流式编程,实现Flink 计算引擎中流式数据处理,具体代码如下:

package xx.xxxx.flink.stream;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* 使用Flink 计算引擎实现流式数据处理:从Socket接收数据,实时进行词频统计WordCount
*/
public class LambdaStreamWordCount{
public static void main(String[] args) throws Exception {
// 1.准备环境-env:获取流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.准备数据-source:从Socket实时接收流式数据,指定IP地址和端口号
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3.处理数据-transformation:调用DataStream中转换函数处理分析数据
// 3.1 对每行数据按照符分割
SingleOutputStreamOperator<String> wordDataStream = inputDataStream.flatMap(
(String line, Collector<String> out) -> Arrays.stream(line.trim().split("\\\\s+")).forEach(out::collect)
).returns(Types.STRING);
// 3.2 将每个单词转换为二元组,表示每个单词出现一次
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDataStream = wordDataStream.map(
(String word) -> Tuple2.of(word, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));
// 3.3 对元组数据,按照单词分组和组内聚合统计
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = tupleDataStream
.keyBy(0)
.sum(1);
// 4.输出结果-sink:将结果数据打印控制台
resultDataStream.print();
// 5.触发执行-execute:启动流式应用
env.execute(LambdaStreamWordCount.class.getSimpleName()) ;
}
}

4 应用提交部署

Flink程序提交运行方式有两种:
    方式一:以命令的方式提交
    方式二:以UI的方式提交

将开发应用程序编译打包:`flink-day01-1.0.0.jar`,不包含其他依赖jar包,删除log4j配置文件。

运行使用Flink 开发流式应用程序,从Socket实时消费数据,进行词频统计,输出控制台。
注意:写入HDFS如果存在权限问题,进行如下设置:

//777权限只是在自己电脑用
hadoop fs -chmod -R 777 /

并在代码中添加:

System.setProperty("HADOOP_USER_NAME", "root")

       1 修改流式程序

修改流式程序,从应用程序传递参数:host和port,使用Flink中工具类:ParameterTool,解析参数,代码如下所示:

package xx.xxxx.flink.submit;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 使用Flink 计算引擎实现流式数据处理:从Socket接收数据,实时进行词频统计WordCount
*/
public class SubmitStreamWordCount {
public static void main(String[] args) throws Exception {
// TODO: 从应用程序接收传递参数
final ParameterTool params = ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() != 2){
System.err.println("Usage: SubmitStreamWordCount --host <host> --port <port>");
return;
}
final String hostname = params.get("host");
final int port = params.getInt("port");
// 1.准备环境-env:获取流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.准备数据-source:从Socket实时接收流式数据,指定IP地址和端口号
DataStreamSource<String> inputDataStream = env.socketTextStream(hostname, port);
// 3.处理数据-transformation:调用DataStream中转换函数处理分析数据
// 3.1 对每行数据按照符分割
SingleOutputStreamOperator<String> wordDataStream = inputDataStream.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split("\\\\s+");
for (String word : words) {
out.collect(word);
}
}
}
);
// 3.2 将每个单词转换为二元组,表示每个单词出现一次
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDataStream = wordDataStream.map(
new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
}
);
// 3.3 对元组数据,按照单词分组和组内聚合统计
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = tupleDataStream
.keyBy(0) // 单词分组,下标索引为0
.sum(1);// 次数累加,下标索引为1
// 4.输出结果-sink:将结果数据打印控制台
resultDataStream.print();
// 5.触发执行-execute:启动流式应用
env.execute(SubmitStreamWordCount.class.getSimpleName()) ;
}
}

       2 命令方式提交

此方式提交Flink应用可以运行至Standalone集群和YARN集群(Session会话模式和Job分离模式),以运行YARN的Job分离模式为例演示提交Flink应用程序。

第一步、上传作业jar包到linux服务器

cd /export/server/flink/
rz

第二步、提交运行

cd /export/server/flink/
bin/flink run --class cn.itcast.flink.submit.SubmitStreamWordCount 
-m yarn-cluster -yjm 1024 -ytm 
1024 flink-day01-1.0.0.jar --host node1.itcast.cn --port 9999

第三步、查看任务运行概述

     3 UI 方式提交

此种方式提交应用,仅仅在Flink Standalone集群和YARN Session会话模式下,此处以YARN Session为例演示。

第一步、启动HDFS集群和YARN集群

# 在第一台虚拟机上启动服务
hadoop-daemon.sh start namenode 
hadoop-daemons.sh start datanode
yarn-daemon.sh start resourcemanager
yarn-daemons.sh start resourcemanager

第二步、启动YARN Session

cd /export/server/flink/
bin/yarn-session.sh -n 2 -tm 1024 -jm 1024-s 1 -d
......................................
JobManager Web Interface: http://第三台虚拟机:33619

第三步、上传作业jar包及指定相关参数

选择打成jar包,然后填写参数值,截图如下:

参数内容:

Entry Class:cn.itcast.flink.submit.SubmitStreamWordCount
Parallelism:2
Program Arguments:--host 第一台虚拟机--port 9999

点击显示计划【Show Plan】:

点击提交按钮【Submit】,运行Flink应用

第四步、查看任务运行概述

第五步、查看任务运行结果

完毕!!!

以上是关于从0到1Flink的成长之路的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(十六)

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)