Flink 读写 Ceph S3入门学习总结

Posted 董可伦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 读写 Ceph S3入门学习总结相关的知识,希望对你有一定的参考价值。

前言

首先安装好Ceph,可以参考我前面的文章Ceph分布式集群安装配置

版本

Flink: 1.10.1
hadoop: hdp版本 3.1.1.3.1.0.0-78

jar包

flink-s3-fs-hadoop-1.10.1.jar,从maven仓库下载即可,下载地址:https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop,找到对应的版本下载即可
然后在$FLINK_HOME/plugins目录下创建文件夹s3-fs-hadoop,将下载的flink-s3-fs-hadoop-1.10.1.jar拷贝到该目录下

Flink Shell 读写

我这个版本的Flink是有scala-shell终端的,别的版本可能没有,路径 bin/start-scala-shell.sh

配置flink-conf.yaml

添加配置(Shell我们采用模式yarn-per-job):

execution.target: yarn-per-job
s3.access.key: access_key
s3.secret.key: secret_key
s3.endpoint: ip:7480
s3.connection.ssl.enabled: false

s3cmd创建测试文件

和上篇文章一样,先创建测试文件和测试Bucket

创建用于测试读的文件

创建Bucket

s3cmd mb s3://txt
Bucket 's3://txt/' created

本地生成测试txt

vi test.txt
1
2
3
4

将test.txt上传到s3://txt

s3cmd put test.txt s3://txt
upload: 'test.txt' -> 's3://txt/test.txt'  [1 of 1]
 8 of 8   100% in    0s    45.82 B/s  done

创建用于测试写的Bucket

s3cmd mb s3://test-s3-write
Bucket 's3://test-s3-write/' created

启动 Flink-Scala-Shell

bin/start-scala-shell.sh yarn

测试代码

// 测试读
benv.readTextFile("s3://txt/test.txt").print()
1
2
3
4
// 测试写
val data = benv.fromElements("abc", "def")
data.writeAsText("s3://test-s3-write/test_flink")
benv.execute("Save2S3")
benv.readTextFile("s3://test-s3-write/test_flink").print()
abc
def

s3cmd验证

 s3cmd ls s3://test-s3-write/
                          DIR  s3://test-s3-write/test_df/
2022-09-30 03:13            8  s3://test-s3-write/test_flink

程序Jar包提交验证

本来想在IDEA本地远程读写S3验证,但是我没有找到本地扩展Flink plugins的方法,无奈先放弃,采用flink run 提交jar包的形式验证

pom依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

代码

package com.dkl.s3.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;

public class Flink_S3_Demo 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.readTextFile("s3://txt/test.txt").print();
        env.execute("Read S3");

        env.fromElements("abc", "def").writeAsText("s3://test-s3-write/test_flink_program", OVERWRITE);
        env.execute("Write S3");

        env.readTextFile("s3://test-s3-write/test_flink_program").print();
        env.execute("Read S3");
    

完整代码

完整代码已上传到GitHub,有需要的同学可自行下载:https://github.com/dongkelun/S3_Demo/tree/master/Flink_S3_Demo

提交

这里采用提交到standalone的模式,因为yarn-per-job的输出日志不太好查找,而虽然yarn-session也能看,但是由于我们开发环境用了Knox代理,而查看yarn-session日志时服务有异常,所以采用standalone模式,方便查看输出日志以便验证

这里需要首先启动standalone集群,并且去掉配置文件里的execution.target

bin/start-cluster.sh

然后提交jar包运行

bin/flink run -c com.dkl.s3.flink.Flink_S3_Demo ~/Flink_S3_Demo-1.0.jar

Job has been submitted with JobID d7cee212111d76ffe3d4ed5905a484ec
Program execution finished
Job with JobID d7cee212111d76ffe3d4ed5905a484ec has finished.
Job Runtime: 2637 ms

Job has been submitted with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2
Program execution finished
Job with JobID 98d2cbf5b1292b0b7edfeebb4a1d64e2 has finished.
Job Runtime: 1859 ms

Job has been submitted with JobID cfeecfa9226a3ae1d97728e80fa0dbff
Program execution finished
Job with JobID cfeecfa9226a3ae1d97728e80fa0dbff has finished.
Job Runtime: 277 ms

结果验证

Web UI地址(默认):ip:8081

s3cmd ls s3://test-s3-write/
                          DIR  s3://test-s3-write/test_df/
2022-09-30 03:13            8  s3://test-s3-write/test_flink
2022-10-01 02:00            8  s3://test-s3-write/test_flink_program

yarn-per-job 提交方式

flink-conf.yaml 里添加配置

execution.target: yarn-per-job

yarn-session

## 首先启动yarn-session
bin/yarn-session.sh  -nm flink-s3 -d

## 通过yid参数指定yarn-session的application_id即可
bin/flink run -yid application_1664327097090_0051 -c com.dkl.s3.flink.Flink_S3_Demo ~/Flink_S3_Demo-1.0.jar

异常解决

因为是第一次写Flink代码,会遇到一些初级问题,这里记录一下

异常1

Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: indata-10-110-105-164.indata.com/10.110.105.164:45401
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
  ... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: indata-10-110-105-164.indata.com/10.110.105.164:45401
Caused by: java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

这里根据异常信息还是比较难定位的,原因是因为文件已经存在,我们需要指定模式为overwrite,信息太不明显了~

import org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
data.writeAsText("s3://test-s3-write/test_flink", OVERWRITE)

异常2

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        ... 44 more
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (92348e650f9b2cc3c194cc287fc404e4)

这个异常也是网上找了很多资料都没有解决,最后发现是因为我的环境上起了不止一个flink standalone,应该是因为有多个版本的flink

jps
892872 HRegionServer
4030074 FlinkYarnSessionCli
1685009 -- process information unavailable
2313 TaskManagerRunner
2211114 JournalNode
18444 HistoryServer
15949
2225644 NodeManager
20751 PrestoServer
1758666 YarnJobClusterEntrypoint
1617 StandaloneSessionClusterEntrypoint
2210614
4030501 YarnSessionClusterEntrypoint
19041 KyuubiServer
30050 ZeppelinServer
11688 AlluxioWorker
1757680 YarnJobClusterEntrypoint
5293 StandaloneSessionClusterEntrypoint
12147 AlluxioJobWorker
14068 gateway.jar
1735660 TaskManagerRunner
1758764 YarnJobClusterEntrypoint
1759021 Jps
1207 QuorumPeerMain
5816 TaskManagerRunner
7993 Kafka
16570 QueryServer
26111 HistoryServer

首先把所有的Flink相关的进程kill,然后再启动standalone集群就可以了

其他问题

其他问题就是上面提到过的输出日志查看,yarn模式捣鼓了半天也没有找到输出日志(但是通过写成功的文件知道程序是运行成功的),最后发现是因为knox代理异常导致的,所以采用standalone解决,关于每个模式的提交方法我记录在上面了。

参考

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/filesystems/s3/

以上是关于Flink 读写 Ceph S3入门学习总结的主要内容,如果未能解决你的问题,请参考以下文章

ceph学习笔记之六 数据读写过程

ceph储存的S3接口实现(支持断点续传)

Ceph 配置S3和swift接口访问集群

Ceph object Gateway 之 S3 API

Ceph分布式存储之三-S3接口编程

ceph学习笔记之三Object