USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13相关的知识,希望对你有一定的参考价值。

USDP默认的环境变量

    ┌──────────────────────────────────────────────────────────────────────┐
    │                 • MobaXterm Personal Edition v21.4 •                 │
    │               (SSH client, X server and network tools)               │
    │                                                                      │
    │ ➤ SSH session to root@192.168.88.101                                 │
    │   • Direct SSH      :  ✔                                             │
    │   • SSH compression :  ✔                                             │
    │   • SSH-browser     :  ✔                                             │
    │   • X11-forwarding  :(remote display is forwarded through SSH)  │
    │                                                                      │
    │ ➤ For more info, ctrl+click on help or visit our website.            │
    └──────────────────────────────────────────────────────────────────────┘

Last login: Sat Mar 12 02:05:37 2022 from zhiyong5
[root@zhiyong2 ~]# java -version
java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
[root@zhiyong2 ~]# scala
-bash: scala: 未找到命令
[root@zhiyong2 ~]# which java
/usr/java/jdk1.8.0_202/bin/java
[root@zhiyong2 ~]# which hadoop
/srv/udp/2.0.0.0/yarn/bin/hadoop
[root@zhiyong2 ~]# cat /etc/profile
# /etc/profile

# System wide environment and startup programs, for login setup
# Functions and aliases go in /etc/bashrc

# It's NOT a good idea to change this file unless you know what you
# are doing. It's much better to create a custom.sh shell script in
# /etc/profile.d/ to make custom changes to your environment, as this
# will prevent the need for merging in future updates.

pathmunge () 
    case ":$PATH:" in
        *:"$1":*)
            ;;
        *)
            if [ "$2" = "after" ] ; then
                PATH=$PATH:$1
            else
                PATH=$1:$PATH
            fi
    esac



if [ -x /usr/bin/id ]; then
    if [ -z "$EUID" ]; then
        # ksh workaround
        EUID=`/usr/bin/id -u`
        UID=`/usr/bin/id -ru`
    fi
    USER="`/usr/bin/id -un`"
    LOGNAME=$USER
    MAIL="/var/spool/mail/$USER"
fi

# Path manipulation
if [ "$EUID" = "0" ]; then
    pathmunge /usr/sbin
    pathmunge /usr/local/sbin
else
    pathmunge /usr/local/sbin after
    pathmunge /usr/sbin after
fi

HOSTNAME=`/usr/bin/hostname 2>/dev/null`
HISTSIZE=1000
if [ "$HISTCONTROL" = "ignorespace" ] ; then
    export HISTCONTROL=ignoreboth
else
    export HISTCONTROL=ignoredups
fi

export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL

# By default, we want umask to get set. This sets it for login shell
# Current threshold for system reserved uid/gids is 200
# You could check uidgid reservation validity in
# /usr/share/doc/setup-*/uidgid file
if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then
    umask 002
else
    umask 022
fi

for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do
    if [ -r "$i" ]; then
        if [ "$-#*i" != "$-" ]; then
            . "$i"
        else
            . "$i" >/dev/null
        fi
    fi
done

unset i
unset -f pathmunge
export JAVA_HOME=/usr/java/jdk1.8.0_202
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
SERVICE_BIN=`/bin/bash /srv/.service_env`
export PATH=$SERVICE_BIN:$PATH
[root@zhiyong2 ~]# cd /srv/.service_env
-bash: cd: /srv/.service_env: 不是目录
[root@zhiyong2 ~]# cat /srv/.service_env
#!/bin/bash
# PARENT_PATH
export UDP_SERVICE_PATH=/srv/udp/2.0.0.0

# HOME
export FLINK_HOME=$UDP_SERVICE_PATH/flink
export FLUME_HOME=$UDP_SERVICE_PATH/flume
export HIVE_HOME=$UDP_SERVICE_PATH/hive
export IMPALA_HOME=$UDP_SERVICE_PATH/impala
export KYLIN_HOME=$UDP_SERVICE_PATH/kylin
export LIVY_HOME=$UDP_SERVICE_PATH/livy
export PHOENIX_HOME=$UDP_SERVICE_PATH/phoenix
export PRESTO_HOME=$UDP_SERVICE_PATH/presto
export TRINO_HOME=$UDP_SERVICE_PATH/trino
export SPARK_HOME=$UDP_SERVICE_PATH/spark
export SQOOP_HOME=$UDP_SERVICE_PATH/sqoop
export DATAX_HOME=$UDP_SERVICE_PATH/datax
export TEZ_HOME=$UDP_SERVICE_PATH/tez
export YARN_HOME=$UDP_SERVICE_PATH/yarn

export ELASTICSEARCH_HOME=$UDP_SERVICE_PATH/elasticsearch
export HBASE_HOME=$UDP_SERVICE_PATH/hbase
export HDFS_HOME=$UDP_SERVICE_PATH/hdfs
export KAFKA_HOME=$UDP_SERVICE_PATH/kafka
export KUDU_HOME=$UDP_SERVICE_PATH/kudu
export ZOOKEEPER_HOME=$UDP_SERVICE_PATH/zookeeper

export HUE_HOME=$UDP_SERVICE_PATH/hue
export KAFKAEAGLE_HOME=$UDP_SERVICE_PATH/kafkaeagle
export KIBANA_HOME=$UDP_SERVICE_PATH/kibana
export ZEPPELIN_HOME=$UDP_SERVICE_PATH/zeppelin
export ZKUI_HOME=$UDP_SERVICE_PATH/zkui

export AIRFLOW_HOME=$UDP_SERVICE_PATH/airflow
export OOZIE_HOME=$UDP_SERVICE_PATH/oozie
export UDS_HOME=$UDP_SERVICE_PATH/uds
export DOLPHINSCHEDULER_HOME=$UDP_SERVICE_PATH/dolphinscheduler

export RANGER_HOME=$UDP_SERVICE_PATH/ranger

export ATLAS_HOME=$UDP_SERVICE_PATH/atlas

export NEO4J_HOME=$UDP_SERVICE_PATH/neo4j

# BIN
export FLINK_BIN=$FLINK_HOME/bin
export FLUME_BIN=$FLUME_HOME/bin
export HIVE_BIN=$HIVE_HOME/bin
export IMPALA_BIN=$IMPALA_HOME/bin
export KYLIN_BIN=$KYLIN_HOME/bin
export LIVY_BIN=$LIVY_HOME/bin
export PHOENIX_BIN=$PHOENIX_HOME/bin
export PRESTO_BIN=$PRESTO_HOME/bin
export TRINO_BIN=$TRINO_HOME/bin
export SPARK_BIN=$SPARK_HOME/bin
export SQOOP_BIN=$SQOOP_HOME/bin
export DATAX_BIN=$DATAX_HOME/bin
export TEZ_BIN=$TEZ_HOME/bin
export YARN_BIN=$YARN_HOME/bin

export ELASTICSEARCH_BIN=$ELASTICSEARCH_HOME/bin
export HBASE_BIN=$HBASE_HOME/bin
export HDFS_BIN=$HDFS_HOME/bin
export KAFKA_BIN=$KAFKA_HOME/bin
export KUDU_BIN=$KUDU_HOME/bin
export ZOOKEEPER_BIN=$ZOOKEEPER_HOME/bin

export HUE_BIN=$HUE_HOME/bin
export KAFKAEAGLE_BIN=$KAFKAEAGLE_HOME/bin
export KIBANA_BIN=$KIBANA_HOME/bin
export ZEPPELIN_BIN=$ZEPPELIN_HOME/bin
export ZKUI_BIN=$ZKUI_HOME/bin

export AIRFLOW_BIN=$AIRFLOW_HOME/bin
export OOZIE_BIN=$OOZIE_HOME/bin
export UDS_BIN=$UDS_HOME/bin
export DOLPHINSCHEDULER_BIN=$DOLPHINSCHEDULER_HOME/bin
export RANGER_BIN=$RANGER_HOME/bin

export ATLAS_BIN=$ATLAS_HOME/bin
export NEO4J_BIN=$NEO4J_HOME/bin
SERVICE_BIN=$FLINK_BIN:$FLUME_BIN:$HIVE_BIN:$IMPALA_BIN:$KYLIN_BIN:$LIVY_BIN:$PHOENIX_BIN:$PRESTO_BIN:$SPARK_BIN:$SQOOP_BIN:$DATAX_BIN:$TEZ_BIN:$YARN_BIN:$ELASTICSEARCH_BIN:$HBASE_BIN:$HDFS_BIN:$KAFKA_BIN:$KUDU_BIN:$ZOOKEEPER_BIN:$HUE_BIN:$KAFKAEAGLE_BIN:$KIBANA_BIN:$ZEPPELIN_BIN:$ZKUI_BIN:$AIRFLOW_BIN:$OOZIE_BIN:$UDS_BIN:$DOLPHINSCHEDULER_BIN:$RANGER_BIN:$ATLAS_BIN:$TRINO_BIN:$NEO4J_BIN

echo $SERVICE_BIN

可以看出,USDP已经默认添加了很多环境变量,对新手和非专业运维人员相当友好!

使用自带的Flink1.13测试Word Count

[root@zhiyong2 ~]# cd
[root@zhiyong2 ~]# ll
总用量 20
-rw-------. 1 root root  1639 3月   1 05:40 anaconda-ks.cfg
drwxr-xr-x. 2 root root    60 3月   1 23:11 logs
-rw-r--r--  1 root root 14444 3月  11 22:36 test1.txt
[root@zhiyong2 ~]# touch wordtest1.txt
[root@zhiyong2 ~]# ll
总用量 20
-rw-------. 1 root root  1639 3月   1 05:40 anaconda-ks.cfg
drwxr-xr-x. 2 root root    60 3月   1 23:11 logs
-rw-r--r--  1 root root 14444 3月  11 22:36 test1.txt
-rw-r--r--  1 root root     0 3月  14 22:18 wordtest1.txt
[root@zhiyong2 ~]# vim wordtest1.txt
[root@zhiyong2 ~]# cat wordtest1.txt
hello
word
world
hehe
haha
haha
haha
hehe
digital
monster
[root@zhiyong2 ~]# which flink
/srv/udp/2.0.0.0/flink/bin/flink
[root@zhiyong2 batch]# cd /srv/udp/2.0.0.0/flink/examples/batch
[root@zhiyong2 batch]# ll
总用量 144
-rwxr-xr-x. 1 hadoop hadoop 14542 10月  9 17:26 ConnectedComponents.jar
-rwxr-xr-x. 1 hadoop hadoop 15872 10月  9 17:26 DistCp.jar
-rwxr-xr-x. 1 hadoop hadoop 17240 10月  9 17:26 EnumTriangles.jar
-rwxr-xr-x. 1 hadoop hadoop 19668 10月  9 17:26 KMeans.jar
-rwxr-xr-x. 1 hadoop hadoop 16005 10月  9 17:26 PageRank.jar
-rwxr-xr-x. 1 hadoop hadoop 12535 10月  9 17:26 TransitiveClosure.jar
-rwxr-xr-x. 1 hadoop hadoop 26331 10月  9 17:26 WebLogAnalysis.jar
-rwxr-xr-x. 1 hadoop hadoop 10432 10月  9 17:26 WordCount.jar
[root@zhiyong2 ~]# flink run /srv/udp/2.0.0.0/flink/examples/batch/WordCount.jar --input /root/wordtest1.txt

直接运行肯定会报错:

[root@zhiyong2 ~]# flink run /srv/udp/2.0.0.0/flink/examples/batch/WordCount.jar --input /root/wordtest1.txt
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/yarn/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Printing result to stdout. Use --output to specify output path.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
        at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
        at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
        at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 11 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
        ... 22 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
        ... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: 拒绝连接: localhost/127.0.0.1:8081
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: 拒绝连接: localhost/127.0.0.1:8081
Caused by: java.net.ConnectException: 拒绝连接
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NiosocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)

事实上,必须先开启集群才能跑任务:

[root@zhiyong2 ~]# start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zhiyong2.
Starting taskexecutor daemon on host zhiyong2.
[root@zhiyong2 ~]# flink run /srv/udp/2.0.0.0/flink/examples/batch/WordCount.jar --input /root/wordtest1.txt
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/yarn/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 332d103c5c3d037924473730e0a4155c
Program execution finished
Job with JobID 332d103c5c3d037924473730e0a4155c has finished.
Job Runtime: 1794 ms
Accumulator Results:
- dbb2606e28c38627e089d3f0e6864a3d (java.util.ArrayList) [7 elements]


(digital,1)
(haha,3)
(hehe,2)
(hello,1)
(monster,1)
(word,1)
(world,1)

打开网站:

http://zhiyong2:8081/#/overview

可以看到任务成功提交:

可以查看进程:

[root@zhiyong2 ~]# ps aux | grep flink

不用了就可以关闭Flink:

[root@zhiyong2 ~]# stop-cluster.sh
Stopping taskexecutor daemon (pid: 284215) on host zhiyong2.
Stopping standalonesession daemon (pid: 283695) on host zhiyong2.
[root@zhiyong2 ~]# ps aux | grep flink
root     311122  0.0  0.0 112824   980 pts/0    R+   22:44   0:00 grep --color=auto flink

USDP自带的Flink是1.13,有点老,配置也有问题。安装个1.14新版,还能体验流批一体的欢乐。

简单部署Flink1.14.3

准备安装路径:

[root@zhiyong2 ~]# mkdir -p /export/software/
[root@zhiyong2 ~]# mkdir -p /export/server/

上传压缩包:

[root@zhiyong2 ~]# ls /export/software/
flink-1.14.3-bin-scala_2.12.tgz

解压缩:

[root@zhiyong2 ~]# tar -zxvf /export/software/flink-1.14.3-bin-scala_2.12.tgz -C /export/server/

测试Word Count:

[root@zhiyong2 ~]# /export/server/flink-1.14.3/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zhiyong2.
Starting taskexecutor daemon on host zhiyong2.
[root@zhiyong2 ~]# /export/server/flink-1.14.3/bin/flink run /srv/udp/2.0.0.0/flink/examples/batch/WordCount.jar --input /root/wordtest1.txt
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 98d71fc1eb07630148e3709db89e3fd0
Program execution finished
Job with JobID 98d71fc1eb07630148e3709db89e3fd0 has finished.
Job Runtime: 2085 ms
Accumulator Results:
- 147191476201ac1195539c611aeb794c (java.util.ArrayList) [7 elements]


(digital,1)
(haha,3)
(hehe,2)
(hello,1)
(monster,1)
(word,1)
(world,1)

打开网站:

http://zhiyong2:8081/#/overview

可以看到:

这是货真价实的Flink1.14!!!看到这头大松鼠,说明下载的包没问题。

为了能方便地使用USDP已经配置好的环境变量和同步配置等功能,干脆将USDP的Flink1.13做替换,反正1.14是里程碑,1.13老版本大概率用不上了。。。替换之前先停止当前Flink进程:

[root@zhiyong2 ~]# /export/server/flink-1.14.3/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 317225) on host zhiyong2.
Stopping standalonesession daemon (pid: 316935) on host zhiyong2.

替换自带的Flink1.13

备份老版本flink

先备份原来的文件,出错后还可以复原:

[root@zhiyong2 ~]# cd /export/
[root@zhiyong2 export]# chmod 777 /export/software
[root@zhiyong2 export]# chmod 777 /export/server/
[root@zhiyong2 export]# ll
总用量 0
drwxrwxrwx 3 root root 26 3月  14 22:32 server
drwxrwxrwx 2 root root 45 3月  14 22:30 software
[root@zhiyong2 export]# cd
[root@zhiyong2 software]# cd
[root@zhiyong2 ~]# su - hadoop
上一次登录:一 3月 14 22:59:03 CST 2022
[hadoop@zhiyong2 ~]$ cp -r /srv/udp/2.0.0.0/flink /export/software/
[hadoop@zhiyong2 ~]$ cd /export/software/
[hadoop@zhiyong2 software]$ ll
总用量 331744
drwxr-xr-x 11 hadoop hadoop       157 3月  14 23:00 flink
-rw-r--r--  1 root   root   339705501 3月  14 22:30 flink-1.14.3-bin-scala_2.12.tgz
[hadoop@zhiyong2 software]$ exit
登出

替换zhiyong2的Flink

[root@zhiyong2 ~]# rm -rf /srv/udp/2.0.0.0/flink
[root@zhiyong2 ~]# chmod 777 -R /srv/udp/2.0.0.0/
[root@zhiyong2 ~]# cd /export/server/
[root@zhiyong2 server]# ll
总用量 0
drwxr-xr-x 10 501 games 156 1月  11 07:45 flink-1.14.3
[root@zhiyong2 server]# su - hadoop
上一次登录:一 3月 14 23:13:04 CST 2022
[hadoop@zhiyong2 ~]$ cd /export/server/
[hadoop@zhiyong2 server]$ ll
总用量 0
drwxr-xr-x 10 501 games 156 1月  11 07:45 flink-1.14.3
[hadoop@zhiyong2 server]$ cp -r ./flink-1.14.3/ ./flink
[hadoop@zhiyong2 server]$ ll
总用量 0
drwxr-xr-x 10 hadoop hadoop 156 3月  14 23:14 flink
drwxr-xr-x 10    501 games  156 1月  11 07:45 flink-1.14.3
[hadoop@zhiyong2 server]$ cp -r ./flink /srv/udp/2.0.0.0/flink
[hadoop@zhiyong2 flink]$ cd /srv/udp/2.0.0.0
[hadoop@zhiyong2 2.0.0.0]$ ll
总用量 24
drwxrwxrwx. 11 hadoop  hadoop   169 3月   1 23:12 dolphinscheduler
drwxrwxrwx. 11 elastic elastic  261 3月   1 23:09 elasticsearch
drwxr-xr-x  10 hadoop  hadoop   156 3月  14 23:15 flink
drwxrwxrwx.  7 hadoop  hadoop   186 3月   1 23:10 flume
drwxrwxrwx   8 hadoop  hadoop   201 3月   3 00:17 hbase
drwxrwxrwx. 12 hadoop  hadoop   206 3月   1 23:06 hdfs
drwxrwxrwx. 13 hadoop  hadoop   229 3月   1 23:08 hive
drwxrwxrwx. 15 hue     hue     4096 3月   1 23:11 hue
drwxrwxrwx.  8 hadoop  hadoop   120 3月   1 23:09 kafka
drwxrwxrwx.  3 root    root      67 3月   1 23:05 node_exporter
drwxrwxrwx   3 root    root      17 3月  11 22:19 old
drwxrwxrwx.  6 hadoop  hadoop  4096 3月   1 23:07 phoenix
drwxrwxrwx. 11 hadoop  hadoop  4096 3月   1 23:11 ranger
drwxrwxrwx. 12 hadoop  hadoop   170 3月   1 23:08 spark
drwxrwxrwx. 10 hadoop  hadoop  4096 3月   1 23:08 sqoop
drwxrwxrwx.  6 hadoop  hadoop  4096 3月   1 23:07 tez
drwxrwxrwx. 12 hadoop  hadoop   206 3月   1 23:07 yarn
drwxrwxrwx. 12 hadoop  hadoop  4096 3月   1 23:05 zookeeper
[hadoop@zhiyong2 2.0.0.0]$ exit
登出
[root@zhiyong2 server]# chmod 777 -R /srv/udp/2.0.0.0/
[root@zhiyong2 server]# cd /srv/udp/2.0.0.0/
[root@zhiyong2 2.0.0.0]# ll
总用量 24
drwxrwxrwx. 11 hadoop  hadoop   169 3月   1 23:12 dolphinscheduler
drwxrwxrwx. 11 elastic elastic  261 3月   1 23:09 elasticsearch
drwxrwxrwx  10 hadoop  hadoop   156 3月  14 23:15 flink
drwxrwxrwx.  7 hadoop  hadoop   186 3月   1 23:10 flume
drwxrwxrwx   8 hadoop  hadoop   201 3月   3 00:17 hbase
drwxrwxrwx. 12 hadoop  hadoop   206 3月   1 23:06 hdfs
drwxrwxrwx. 13 hadoop  hadoop   229 3月   1 23:08 hive
drwxrwxrwx. 15 hue     hue     4096 3月   1 23:11 hue
drwxrwxrwx.  8 hadoop  hadoop   120 3月   1 23:09 kafka
drwxrwxrwx.  3 root    root      67 3月   1 23:05 node_exporter
drwxrwxrwx   3 root    root      17 3月  11 22:19 old
drwxrwxrwx.  6 hadoop  hadoop  4096 3月   1 23:07 phoenix
drwxrwxrwx. 11 hadoop  hadoop  4096 3月   1 23:11 ranger
drwxrwxrwx. 12 hadoop  hadoop   170 3月   1 23:08 spark
drwxrwxrwx. 10 hadoop  hadoop  4096 3月   1 23:08 sqoop
drwxrwxrwx.  6 hadoop  hadoop  4096 3月   1 23:07 tez
drwxrwxrwx. 12 hadoop  hadoop   206 3月   1 23:07 yarn
drwxrwxrwx. 12 hadoop  hadoop  4096 3月   1 23:05 zookeeper

测试Word Count:

[root@zhiyong2 ~]# start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zhiyong2.
Starting taskexecutor daemon on host zhiyong2.
[root@zhiyong2 ~]# flink run /srv/udp/2.0.0.0/flink/examples/batch/WordCount.jar --input /root/wordtest1.txt
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID b62bdeda4d704b20948eb1587d8035bd
Program execution finished
Job with JobID b62bdeda4d704b20948eb1587d8035bd has finished.
Job Runtime: 3427 ms
Accumulator Results:
- 0ad1f1ef00a3af540a29da2754412f2b (java.util.ArrayList) [7 elements]


(digital,1)
(haha,3)
(hehe,2)
(hello,1)
(monster,1)
(word,1)
(world,1)
[root@zhiyong2 ~]# stop-cluster.sh
Stopping taskexecutor daemon (pid: 464306) on host zhiyong2.
Stopping standalonesession daemon (pid: 463969) on host zhiyong2.

关闭集群前同样可以打开网站:

http://zhiyong2:8081/#/overview

看到:

zhiyong2的flink替换完成。

同步3台机器的Flink版本

集群的组件版本一定要保持一致!!!否则会发生各种奇怪的错误。由于zhiyong-2集群是备用集群,大概率不会运行计算组件,故本次只替换zhiyong-1集群。

    ┌──────────────────────────────────────────────────────────────────────┐
    │                 • MobaXterm Personal Edition v21.4 •                 │
    │               (SSH client, X server and network tools)               │
    │                                                                      │
    │ ➤ SSH session to root@192.168.88.102                                 │
    │   • Direct SSH      :  ✔                                             │
    │   • SSH compression :  ✔                                             │
    │   • SSH-browser     :  ✔                                             │
    │   • X11-forwarding  :(remote display is forwarded through SSH)  │
    │                                                                      │
    │ ➤ For more info, ctrl+click on help or visit our website.    

以上是关于USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13的主要内容,如果未能解决你的问题,请参考以下文章

USDP使用笔记Flink配置及简单测试

USDP使用笔记Flink配置及简单测试

USDP使用笔记Flink配置及简单测试

Flink1.14.3流批一体体验

Flink1.14.3流批一体体验

USDP使用笔记打通双集群HDFS实现跨nameservice访问