Flink从入门到真香(Flink环境部署-单机)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink从入门到真香(Flink环境部署-单机)相关的知识,希望对你有一定的参考价值。
一、准备工作
安装java
yum install java-11-openjdk -y
[root@localhost opt]# java -version
openjdk version "11.0.8" 2020-07-14 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.8+10-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode, sharing)
安装devel(为了安装jps命令)
yum install java-1.8.0-openjdk-devel.x86_64 -y
二、local模式
1、上传安装包然后解压到指定目录,并且修改所属用户和所属组
cd /opt/
wget https://apache.claz.org/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz
tar zxvf flink-1.11.2-bin-scala_2.11.tgz
mv flink-1.11.2 flink
chown -R root:root flink
2、准备一个测试数据集(只是测试用,内容随意)
在/root/words.txt中写入
a b c d
a
f
e d e w 2
3、到flink目录下执行shell,启动交互式窗口测试
/opt/flink/bin/start-scala-shell.sh local
Batch - Use the ‘benv‘ and ‘btenv‘ variable # 批处理环境入口
* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")
*
* val batchTable = btenv.fromDataSet(dataSet)
* btenv.registerTable("tableName", batchTable)
* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
HINT: You can use print() on a DataSet to print the contents or collect()
a sql query result back to the shell.
Streaming - Use the ‘senv‘ and ‘stenv‘ variable # 流处理环境入口
* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
*
* val streamTable = stenv.fromDataStream(dataStream, ‘num)
* val resultTable = streamTable.select(‘num).where(‘num % 2 === 1 )
* resultTable.toAppendStream[Row].print()
* senv.execute("My streaming program")
HINT: You can only print a DataStream to the shell in local mode.
执行效果:
scala> benv.readTextFile("/root/words.txt").flatMap(.split(" ")).map((,1)).groupBy(0).sum(1).print()
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider (file:/opt/flink/lib/flink-dist_2.11-1.11.2.jar) to method java.lang.ref.Reference.waitForReferenceProcessing()
WARNING: Please consider reporting this to the maintainers of org.apache.flink.util.JavaGcCleanerWrapper$PendingCleanersRunnerProvider
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
(2,1)
(a,2)
(b,1)
(c,1)
(d,2)
(e,2)
(f,1)
(w,1)
三、单机集群模式
1、启动服务
[root@localhost opt]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.
可以看到进程已经启动
[root@localhost ~]# jps
8016 Jps
26851 FlinkShell
7652 StandaloneSessionClusterEntrypoint #job manager
7959 TaskManagerRunner
这时候可以打开http://10.0.83.71:8081 访问flink Dashboard了
2、提交一个测试任务
输入源还是用words.txt作为样例,指定输出
/opt/flink/bin/flink run /opt/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out2
如果失败需要删除之前的运行信息(一般出现在重复安装flink或者多个模式切换)
rm -rf /tmp/.yarn-properties-root
3、停止集群
[root@localhost bin]# /opt/flink/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 11494) on host localhost.localdomain.
Stopping standalonesession daemon (pid: 11187) on host localhost.localdomain.
以上是关于Flink从入门到真香(Flink环境部署-单机)的主要内容,如果未能解决你的问题,请参考以下文章
Flink从入门到真香(1-分别使用流模式和批模式运行第一个demo)