[4] Flink大数据流式处理利剑: Flink集群安装和运行
Posted 朱清云的技术博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[4] Flink大数据流式处理利剑: Flink集群安装和运行相关的知识,希望对你有一定的参考价值。
在上一个章节《[3] Flink大数据流式处理利剑: Flink的部署架构》 笔者提到了Flink的基本部署架构,这一个章节笔者就代理大家来安装一个最新的Flink集群。兵马未动粮草先行,所以首先要去Flink的官方网站上去下载Flink的安装包。Flink当前的最新版本是 1.14.3
可以看到1.14.3 有两个版本,一个是运行在Scala2.11上面,一个是运行在Scala2.12上面;笔者就安装一个Scala2.12版本吧!下载地址:https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz。笔者在Linux CentOs 机器上运行下面的命令:
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz --no-check-certificate
tar -vxf flink-1.14.3-bin-scala_2.12.tgzls
cd flink-1.14.3
执行tree命令后,其目录文件结构如下:
.
├── bin
│ ├── bash-java-utils.jar
│ ├── config.sh
│ ├── find-flink-home.sh
│ ├── flink
│ ├── flink-console.sh
│ ├── flink-daemon.sh
│ ├── historyserver.sh
│ ├── jobmanager.sh
│ ├── kubernetes-jobmanager.sh
│ ├── kubernetes-session.sh
│ ├── kubernetes-taskmanager.sh
│ ├── pyflink-shell.sh
│ ├── sql-client.sh
│ ├── standalone-job.sh
│ ├── start-cluster.sh
│ ├── start-zookeeper-quorum.sh
│ ├── stop-cluster.sh
│ ├── stop-zookeeper-quorum.sh
│ ├── taskmanager.sh
│ ├── yarn-session.sh
│ └── zookeeper.sh
├── conf
│ ├── flink-conf.yaml
│ ├── log4j-cli.properties
│ ├── log4j-console.properties
│ ├── log4j.properties
│ ├── log4j-session.properties
│ ├── logback-console.xml
│ ├── logback-session.xml
│ ├── logback.xml
│ ├── masters
│ ├── workers
│ └── zoo.cfg
├── examples
│ ├── batch
│ │ ├── ConnectedComponents.jar
│ │ ├── DistCp.jar
│ │ ├── EnumTriangles.jar
│ │ ├── KMeans.jar
│ │ ├── PageRank.jar
│ │ ├── TransitiveClosure.jar
│ │ ├── WebLogAnalysis.jar
│ │ └── WordCount.jar
│ ├── gelly
│ │ └── flink-gelly-examples_2.12-1.14.3.jar
│ ├── python
│ │ ├── datastream
│ │ │ ├── event_time_timer.py
│ │ │ ├── __init__.py
│ │ │ ├── process_json_data.py
│ │ │ ├── state_access.py
│ │ │ └── word_count.py
│ │ └── table
│ │ ├── __init__.py
│ │ ├── mixing_use_of_datastream_and_table.py
│ │ ├── multi_sink.py
│ │ ├── pandas
│ │ │ ├── conversion_from_dataframe.py
│ │ │ ├── __init__.py
│ │ │ └── pandas_udaf.py
│ │ ├── process_json_data.py
│ │ ├── process_json_data_with_udf.py
│ │ ├── windowing
│ │ │ ├── __init__.py
│ │ │ ├── over_window.py
│ │ │ ├── session_window.py
│ │ │ ├── sliding_window.py
│ │ │ └── tumble_window.py
│ │ └── word_count.py
│ ├── streaming
│ │ ├── Iteration.jar
│ │ ├── SessionWindowing.jar
│ │ ├── SocketWindowWordCount.jar
│ │ ├── StateMachineExample.jar
│ │ ├── TopSpeedWindowing.jar
│ │ ├── Twitter.jar
│ │ ├── WindowJoin.jar
│ │ └── WordCount.jar
│ └── table
│ ├── AdvancedFunctionsExample.jar
│ ├── ChangelogSocketExample.jar
│ ├── GettingStartedExample.jar
│ ├── StreamSQLExample.jar
│ ├── StreamWindowSQLExample.jar
│ ├── UpdatingTopCityExample.jar
│ └── WordCountSQLExample.jar
├── lib
│ ├── flink-csv-1.14.3.jar
│ ├── flink-dist_2.12-1.14.3.jar
│ ├── flink-json-1.14.3.jar
│ ├── flink-shaded-zookeeper-3.4.14.jar
│ ├── flink-table_2.12-1.14.3.jar
│ ├── log4j-1.2-api-2.17.1.jar
│ ├── log4j-api-2.17.1.jar
│ ├── log4j-core-2.17.1.jar
│ └── log4j-slf4j-impl-2.17.1.jar
├── LICENSE
├── licenses
│ ├── LICENSE.antlr-java-grammar-files
│ ├── LICENSE.antlr-runtime
│ ├── LICENSE-aopalliance
│ ├── LICENSE.asm
│ ├── LICENSE.automaton
│ ├── LICENSE.base64
│ ├── LICENSE.bouncycastle
│ ├── LICENSE.google-auth-library-credentials
│ ├── LICENSE.grizzled-slf4j
│ ├── LICENSE-hdrhistogram
│ ├── LICENSE.icu4j
│ ├── LICENSE.influx
│ ├── LICENSE.janino
│ ├── LICENSE.javax.activation
│ ├── LICENSE.jaxb
│ ├── LICENSE.jdom
│ ├── LICENSE.jline
│ ├── LICENSE.jsr166y
│ ├── LICENSE.jzlib
│ ├── LICENSE.kryo
│ ├── LICENSE.minlog
│ ├── LICENSE.protobuf
│ ├── LICENSE.py4j
│ ├── LICENSE.pyrolite
│ ├── LICENSE-re2j
│ ├── LICENSE.reflections
│ ├── LICENSE.scala
│ ├── LICENSE.scopt
│ ├── LICENSE.slf4j-api
│ ├── LICENSE-stax2api
│ └── LICENSE.webbit
├── log
├── NOTICE
├── opt
│ ├── flink-azure-fs-hadoop-1.14.3.jar
│ ├── flink-cep_2.12-1.14.3.jar
│ ├── flink-cep-scala_2.12-1.14.3.jar
│ ├── flink-gelly_2.12-1.14.3.jar
│ ├── flink-gelly-scala_2.12-1.14.3.jar
│ ├── flink-oss-fs-hadoop-1.14.3.jar
│ ├── flink-python_2.12-1.14.3.jar
│ ├── flink-queryable-state-runtime-1.14.3.jar
│ ├── flink-s3-fs-hadoop-1.14.3.jar
│ ├── flink-s3-fs-presto-1.14.3.jar
│ ├── flink-shaded-netty-tcnative-dynamic-2.0.39.Final-14.0.jar
│ ├── flink-shaded-zookeeper-3.5.9.jar
│ ├── flink-sql-client_2.12-1.14.3.jar
│ ├── flink-state-processor-api_2.12-1.14.3.jar
│ └── python
│ ├── cloudpickle-1.2.2-src.zip
│ ├── py4j-0.10.8.1-src.zip
│ └── pyflink.zip
├── plugins
│ ├── external-resource-gpu
│ │ ├── flink-external-resource-gpu-1.14.3.jar
│ │ ├── gpu-discovery-common.sh
│ │ └── nvidia-gpu-discovery.sh
│ ├── metrics-datadog
│ │ └── flink-metrics-datadog-1.14.3.jar
│ ├── metrics-graphite
│ │ └── flink-metrics-graphite-1.14.3.jar
│ ├── metrics-influx
│ │ └── flink-metrics-influxdb-1.14.3.jar
│ ├── metrics-jmx
│ │ └── flink-metrics-jmx-1.14.3.jar
│ ├── metrics-prometheus
│ │ └── flink-metrics-prometheus-1.14.3.jar
│ ├── metrics-slf4j
│ │ └── flink-metrics-slf4j-1.14.3.jar
│ ├── metrics-statsd
│ │ └── flink-metrics-statsd-1.14.3.jar
│ └── README.txt
└── README.txt
26 directories, 146 files
其目录结构如下:
[root@localhost flink-1.14.3]# tree . -d
.
├── bin
├── conf
├── examples
│ ├── batch
│ ├── gelly
│ ├── python
│ │ ├── datastream
│ │ └── table
│ │ ├── pandas
│ │ └── windowing
│ ├── streaming
│ └── table
├── lib
├── licenses
├── log
├── opt
│ └── python
└── plugins
├── external-resource-gpu
├── metrics-datadog
├── metrics-graphite
├── metrics-influx
├── metrics-jmx
├── metrics-prometheus
├── metrics-slf4j
└── metrics-statsd
26 directories
然后执行,启动集群的命令:
[flink@localhost bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.
同一台机器上启动集群
默认情况,其会在本地启动1一个Job manager节点,三个task manager节点。验证的方式是用浏览器打开 http://127.0.0.1:8081/#/overview[如果没有安装在本机,请把127.0.0.1替换成相应的IP地址]
由此可以看到:
#1)只有 1个Job Manager(IP地址为127.0.0.1)
#2) 3个Task Manager(在同一台机器上,使用不同的端口)
我们看到其有3个Task Manager节点和1个Job Manager节点在同一台服务器上(127.0.0.1,本机)
如果我们不想在同一台机器上安装这三台Flink的应用服务,那么我们应该怎么办?
不同台机器上启动集群
如果我们不想在同一台机器上安装这三台Flink的应用服务,假设我们有三台虚拟机,其IP地址如下:
- 192.168.1.100
- 192.168.1.101
- 192.168.1.101;
我们现在以192.168.1.100为Job manager(master)节点;192.168.1.101,192.168.1.102为Task Manager节点(Slave)节点;则其架构如下:
具体的配置步骤如下:
步骤1:修改<flink_home>\\conf\\flink-conf.yaml 文件
打开flink-conf.yaml文件,修改 jobmanager.rpc.address的地址为192.168.1.100,也就是192.168.1.100为master节点
步骤2:修改<flink_home>\\conf\\master文件,把其地址修改为192.168.1.100:8081
步骤3:修改<flink_home>\\conf\\slave文件,在其里面添加2行TaskManager(Slave)的IP地址:
192.168.1.101
192.168.1.102
步骤4: 把上面的flink软件使用scp同步到192.168.1.101和192.168.1.102目录下
scp flink-1.14.3-bin-scala_2.12.tgz flink@192.168.1.101:/opt/flink
scp flink-1.14.3-bin-scala_2.12.tgz flink@192.168.1.102:/opt/flink
步骤5. 在各自的三个节点上的bin目录下面执行:start-cluster.sh
start-cluster.sh
注意在执行的过程中需要输入其他节点的密码。
步骤6:打开主节点的地址,可以看到安装集群安装成功。
如何保证Job Manager节点的高可用
上面的配置,能够保证Task Manager节点(Slave)节点高可用,但是Job Manager(Master)节点只有一个,如果Job Manager(Master)节点挂了,则整个集群都挂了;那么如何保证Job Manager(Master)节点的高可用呢?Flink给我们提供了两种不同的方式:
- 一种是ZooKeeper的方式 : Job Manager(Master)节点可以不用部署在K8S上
- 一种是直接借助K8S的特性:Job Manager(Master)节点必须部署在K8S上
具体方式,请参考Flink HA
通过命令行在集群上运行一个例子
上面已经把Flink的集群搭建好了,下面来看如何运行一个例子。比如在Flink的安装包路径的examples文件夹下,有很多例子,咱们以SocketWindowWordCount.jar为例子,试跑一个。
SocketWindowWordCount.jar的代码如下:
package org.apache.flink.streaming.examples.socket;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount
public static void main(String[] args) throws Exception
String hostname;
int port;
try
ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
catch (Exception e)
System.err.println("No port specified. Please run 'SocketWindowWordCount --hostname <hostname> --port <port>', where hostname (localhost by default) and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text into the command line");
return;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStreamSource = env.socketTextStream(hostname, port, "\\n");
SingleOutputStreamOperator singleOutputStreamOperator = dataStreamSource.flatMap(new FlatMapFunction<String, WordWithCount>()
public void flatMap(String value, Collector<SocketWindowWordCount.WordWithCount> out)
for (String word : value.split("\\\\s"))
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));
).keyBy(value -> value.word).window((WindowAssigner)TumblingProcessingTimeWindows.of(Time.seconds(5L))).reduce(new ReduceFunction<WordWithCount>()
public SocketWindowWordCount.WordWithCount reduce(SocketWindowWordCount.WordWithCount a, SocketWindowWordCount.WordWithCount b)
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);
);
singleOutputStreamOperator.print().setParallelism(1);
env.execute("Socket Window WordCount");
public static class WordWithCount
public String word;
public long count;
public WordWithCount()
public WordWithCount(String word, long count)
this.word = word;
this.count = count;
public String toString()
return this.word + " : " + this.count;
- 步骤1
由上面的代码可知,其需要指定一个端口(我们假定是9999)去读取需要处理的数据流。
我们通过nc命令模拟一个Socker数据流:
yum install nc
nc -lk 9999
- 步骤2:
所以我们在192.168.1.100(master节点)的bin目录下,找到flink的客户端,然后执行下面的命令
./flink run -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount ../examples/streaming/SocketWindowWordCount.jar --hostname 192.100.30.131 --port 9999
-
步骤3:登录到Web UI控制台
登录Web UI控制台后,我们将会看到已经部署的任务
-
步骤4:在nc控制台输入字符串,如下图:
注意,其字符串以空格为区分。 -
步骤5:回到Flink UI控制台,查看结果
在Flink的UI控制台,点击Task Manger,找到 Free Slot值为0的节点,然后在stdout tab下可以看到其输出,如下: -
步骤6 取消job
上面的测试完成后,我们可以取消当前job,可以点击Running Jobs --> Cancel Job
这样在完成的Job列表里面就会有Cancel状态的Job
通过Flink Web管理界面在集群上运行一个例子
通过浏览器打开Flink的Web管理界面,在左边点击“Submit New Job”按钮,则可以看到一个可以直接上传Flink应用程序的按钮;输入响应的参数,就可以点击提交了,其效果和通过命令行的方式一模一样,笔者就不在赘述。
值得一提的是,其有一个并行度的参数(Parallelism),如果输入2个话,其将会有两个并发:
- FlatMap的处理
- Window窗口处理
总结
上面笔者总结了如何在一个机器上安装集群,以及在多个机器上安装集群;并提到了通过zookeeper可以让Flink集群的Master节点也保持高可用;最后演示了如何部署一个SocketWindowWordCount.jar的例子,并在Flink的Web UI上面观察其job的执行情况和输出,下一个章节笔者将会带领大家今日其具体的Flink的用法。
以上是关于[4] Flink大数据流式处理利剑: Flink集群安装和运行的主要内容,如果未能解决你的问题,请参考以下文章
[2] Flink大数据流式处理利剑: 用Flink进行统计的一个简单例子