Flink部署-standalone模式

Posted 笨小孩撸代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink部署-standalone模式相关的知识,希望对你有一定的参考价值。

安装环境信息


flink-1.6.2-bin-hadoop27-scala_2.11.tgz

hadoop-2.7.5

java 1.8

zookeeper 3.4.6

os:centos 6.4


1、下载

直接去flink的社区下载就可以了。http://flink.apache.org/downloads.html

2、解压

 tar -zxvf flink-1.6.2-bin-hadoop27-scala_2.11.tgz 

3、修改环境变量 ~.bash_profile

export FLINK_HOME=/opt/flink-1.6.2

export PATH=$FLINK_HOME/bin:$PATH

4、修改flink-conf.yaml配置文件,先配置一个简单版本,standalone的模式

jobmanager.rpc.address: cdh1

jobmanager.rpc.port: 6123

jobmanager.heap.size: 1024m

taskmanager.heap.size: 1024m

taskmanager.numberOfTaskSlots: 4

parallelism.default: 12

5、修改slaves和masters2个文件,用来配置taskManager和JobManager信息

[hadoop@cdh1 conf]$ cat slaves 

cdh2

cdh3

cdh4

cdh5

[hadoop@cdh1 conf]$ cat masters 

cdh1:8081

6、将flink安装所有信息已经环境信息同步到其他机器上面,这里有几台机器就要执行几次

scp .bash_profile hadoop@cdh3:~/.bash_profile

scp -r ./flink-1.6.2 hadoop@cdh3:/opt/

7、启动flink

[hadoop@cdh1 bin]$ ./start-cluster.sh

8、启动完成已经我们可以用jps。分别可以看到JobManager和TaskManager的2个进程

[hadoop@cdh1 bin]$ jps

3866 StandaloneSessionClusterEntrypoint

[hadoop@cdh2 ~]$ jps

3534 TaskManagerRunner


已经表示搭建完成了,现在我们开始验证一下集群


使用start-scala-shell.sh来验证

${FLINK_HOME}/bin/start-scala-shell.sh是flink提供的交互式clinet,可以用于代码片段的测试,方便开发工作,它有两种启动方式,一种是工作在本地,另一种是工作到集群。本例中因为机器连接非常方便,就直接使用集群进行测试,在开发中,如果集群连接不是非常方便,可以连接到本地,在本地开发测试通过后,再连接到集群进行部署工作。如果程序有依赖的jar包,则可以使用 -a <path/to/jar.jar> 或 --addclasspath <path/to/jar.jar>参数来添加依赖。


1.本地连接

${FLINK_HOME}/bin/start-scala-shell.sh local

2.集群连接

${FLINK_HOME}/bin/start-scala-shell.sh remote <hostname> <portnumber>

3.带有依赖包的格式

${FLINK_HOME}/bin/start-scala-shell.sh [local|remote<host><port>] --addclasspath<path/to/jar.jar>

4.查看帮助

${FLINK_HOME}/bin/start-scala-shell.sh --help

[hadoop@cdh2 bin]$ ./start-scala-shell.sh --help

Flink Scala Shell

Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...


Command: local [options]

Starts Flink scala shell with a local Flink cluster

  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

Command: remote [options] <host> <port>

Starts Flink scala shell connecting to a remote cluster

  <host>                   Remote host name as string

  <port>                   Remote port as integer


  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

Command: yarn [options]

Starts Flink scala shell connecting to a yarn cluster

  -n, --container arg      Number of YARN container to allocate (= Number of TaskManagers)

  -jm, --jobManagerMemory arg

                           Memory for JobManager container

  -nm, --name <value>      Set a custom name for the application on YARN

  -qu, --queue <arg>       Specifies YARN queue

  -s, --slots <arg>        Number of slots per TaskManager

  -tm, --taskManagerMemory <arg>

                           Memory per TaskManager container

  -a, --addclasspath <path/to/jar>

                           Specifies additional jars to be used in Flink

  --configDir <value>      The configuration directory.

  -h, --help               Prints this usage text


我们 使用集群模式去验证


[hadoop@cdh1 bin]$ ./start-scala-shell.sh remote 192.168.18.160 8081


运行如下案例代码

Scala> val text = benv.fromElements(

  "To be, or not to be,--that is the question:--",

  "Whether 'tis nobler in the mind to suffer",

  "The slings and arrows of outrageous fortune",

  "Or to take arms against a sea of troubles,")

Scala> val counts = text

    .flatMap { _.toLowerCase.split("\\W+") }

    .map { (_, 1) }.groupBy(0).sum(1)

Scala> counts.print()


运行结果


web url也可以看到详细的信息


遇到异常情况:

我们这边是因为安装了Scala导致通信失败,将Scala的环境信息去掉就可以了。具体问题还不是很清楚,待后续查明白。


java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)

        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)

        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)

        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

2018-11-19 01:49:52,298 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   

 - Job Socket Window WordCount (8b38f995aa8e61fd520b61e0888ecd46) switched from state RUNNING to FAILING.

java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)

        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)

        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)

        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)



以上是关于Flink部署-standalone模式的主要内容,如果未能解决你的问题,请参考以下文章

2.Flink安装部署Local本地模式-了解Standalone独立集群模式Standalone-HA高可用集群模式(原理|操作|测试)

Flink从入门到真香(Flink环境部署-集群standalone模式)

Flink部署-standalone模式

flink部署操作-flink standalone集群安装部署

flink standalone 部署模式且不能使用 hdfs 场景下的各种问题及其应对方案

大数据Flink进阶:Flink集群部署