Flink 读写 Ceph S3入门学习总结
Posted 董可伦
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 读写 Ceph S3入门学习总结相关的知识,希望对你有一定的参考价值。
前言
首先安装好Ceph,可以参考我前面的文章Ceph分布式集群安装配置
版本
Flink: 1.10.1
hadoop: hdp版本 3.1.1.3.1.0.0-78
jar包
flink-s3-fs-hadoop-1.10.1.jar,从maven仓库下载即可,下载地址:https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop,找到对应的版本下载即可
然后在$FLINK_HOME/plugins
目录下创建文件夹s3-fs-hadoop
,将下载的flink-s3-fs-hadoop-1.10.1.jar拷贝到该目录下
Flink Shell 读写
我这个版本的Flink是有scala-shell终端的,别的版本可能没有,路径 bin/start-scala-shell.sh
配置flink-conf.yaml
添加配置(Shell我们采用模式yarn-per-job):
execution.target: yarn-per-job
s3.access.key: access_key
s3.secret.key: secret_key
s3.endpoint: ip:7480
s3.connection.ssl.enabled: false
s3cmd创建测试文件
和上篇文章一样,先创建测试文件和测试Bucket
创建用于测试读的文件
创建Bucket
s3cmd mb s3://txt
Bucket 's3://txt/' created
本地生成测试txt
vi test.txt
1
2
3
4
将test.txt上传到s3://txt
s3cmd put test.txt s3://txt
upload: 'test.txt' -> 's3://txt/test.txt' [1 of 1]
8 of 8 100% in 0s 45.82 B/s done
创建用于测试写的Bucket
s3cmd mb s3://test-s3-write
Bucket 's3://test-s3-write/' created
启动 Flink-Scala-Shell
bin/start-scala-shell.sh yarn
测试代码
// 测试读
benv.readTextFile("s3://txt/test.txt").print()
1
2
3
4
// 测试写
val data = benv.fromElements("abc", "def")
data.writeAsText("s3://test-s3-write/test_flink")
benv.execute("Save2S3")
benv.readTextFile("s3://test-s3-write/test_flink").print()
abc
def
s3cmd验证
s3cmd ls s3://test-s3-write/
DIR s3://test-s3-write/test_df/
2022-09-30 03:13 8 s3://test-s3-write/test_flink
程序Jar包提交验证
本来想在IDEA本地远程读写S3验证,但是我没有找到本地扩展Flink plugins的方法,无奈先放弃,采用flink run 提交jar包的形式验证
pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
代码
package com.dkl.s3.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
public class Flink_S3_Demo
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("s3://txt/test.txt").print();
env.execute("Read S3");
env.fromElements("abc", "def").writeAsText("s3://test-s3-write/test_flink_program", OVERWRITE);
env.execute("Write S3");
env.readTextFile("s3://test-s3-write/test_flink_program").print();
env.execute("Read S3");
完整代码
完整代码已上传到GitHub,有需要的同学可自行下载:https://github.com/dongkelun/S3_Demo/tree/master/Flink_S3_Demo
提交
这里采用提交到standalone的模式,因为yarn-per-job的输出日志不太好查找,而虽然yarn-session也能看,但是由于我们开发环境用了Knox代理,而查看yarn-session日志时服务有异常,所以采用standalone模式,方便查看输出日志以便验证
这里需要首先启动standalone集群,并且去掉配置文件里的execution.target
bin/start-cluster.sh
然后提交jar包运行
bin/flink run -c com.dkl.s3.flink.Flink_S3_Demo ~/Flink_S3_Demo-1.0.jar
Job has been submitted with JobID d7cee212111d76ffe3d4ed5905a484ec
Program execution finished
Job with JobID d7cee212111d76ffe3d4ed5905a484ec has finished.
Job Runtime: 2637 ms
Job has been submitted with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2
Program execution finished
Job with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2 has finished.
Job Runtime: 1859 ms
Job has been submitted with JobID cfeecfa9226a3ae1d97728e80fa0dbff
Program execution finished
Job with JobID cfeecfa9226a3ae1d97728e80fa0dbff has finished.
Job Runtime: 277 ms
结果验证
Web UI地址(默认):ip:8081
s3cmd ls s3://test-s3-write/
DIR s3://test-s3-write/test_df/
2022-09-30 03:13 8 s3://test-s3-write/test_flink
2022-10-01 02:00 8 s3://test-s3-write/test_flink_program
yarn-per-job 提交方式
flink-conf.yaml 里添加配置
execution.target: yarn-per-job
yarn-session
## 首先启动yarn-session
bin/yarn-session.sh -nm flink-s3 -d
## 通过yid参数指定yarn-session的application_id即可
bin/flink run -yid application_1664327097090_0051 -c com.dkl.s3.flink.Flink_S3_Demo ~/Flink_S3_Demo-1.0.jar
异常解决
因为是第一次写Flink代码,会遇到一些初级问题,这里记录一下
异常1
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: indata-10-110-105-164.indata.com/10.110.105.164:45401
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: indata-10-110-105-164.indata.com/10.110.105.164:45401
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
这里根据异常信息还是比较难定位的,原因是因为文件已经存在,我们需要指定模式为overwrite,信息太不明显了~
import org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
data.writeAsText("s3://test-s3-write/test_flink", OVERWRITE)
异常2
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
... 44 more
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)
这个异常也是网上找了很多资料都没有解决,最后发现是因为我的环境上起了不止一个flink standalone,应该是因为有多个版本的flink
jps
892872 HRegionServer
4030074 FlinkYarnSessionCli
1685009 -- process information unavailable
2313 TaskManagerRunner
2211114 JournalNode
18444 HistoryServer
15949
2225644 NodeManager
20751 PrestoServer
1758666 YarnJobClusterEntrypoint
1617 StandaloneSessionClusterEntrypoint
2210614
4030501 YarnSessionClusterEntrypoint
19041 KyuubiServer
30050 ZeppelinServer
11688 AlluxioWorker
1757680 YarnJobClusterEntrypoint
5293 StandaloneSessionClusterEntrypoint
12147 AlluxioJobWorker
14068 gateway.jar
1735660 TaskManagerRunner
1758764 YarnJobClusterEntrypoint
1759021 Jps
1207 QuorumPeerMain
5816 TaskManagerRunner
7993 Kafka
16570 QueryServer
26111 HistoryServer
首先把所有的Flink相关的进程kill,然后再启动standalone集群就可以了
其他问题
其他问题就是上面提到过的输出日志查看,yarn模式捣鼓了半天也没有找到输出日志(但是通过写成功的文件知道程序是运行成功的),最后发现是因为knox代理异常导致的,所以采用standalone解决,关于每个模式的提交方法我记录在上面了。
参考
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/filesystems/s3/
以上是关于Flink 读写 Ceph S3入门学习总结的主要内容,如果未能解决你的问题,请参考以下文章