Flink系列文档-(YY06)-Flink编程API-Sink
Posted 大摇不摆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink系列文档-(YY06)-Flink编程API-Sink相关的知识,希望对你有一定的参考价值。
sink算子是将计算结果最终输出的算子
不同的sink算子可以将数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
1 打印输出print
打印是最简单的一个Sink,通常是用来做实验和测试时使用。
/** * sink: 将数据展示在控制台上 */ users.print("输出的数据是: ") ; see.execute("sinks") ;
2 文件sink
以下writeAs...方法均已被标记为deprecated
writeAsText 以文本格式输出
该方法是将数据以文本格式实时的写入到指定的目录中,目录中的文件名称是该Sink所在subtask的Index + 1。可以额外指定一个参数writeMode,默认是WriteMode.NO_OVERWRITE。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。下面的方法都已经过期了!
users.writeAsText("/data/yy01/", FileSystem.WriteMode.NO_OVERWRITE);
users.writeAsText("/data/yy01/", FileSystem.WriteMode.OVERWRITE);
writeAsCsv 以csv格式输出
该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。
该Sink并不是将数据实时的写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。
users.writeAsCsv("", FileSystem.WriteMode.NO_OVERWRITE);
users.writeAsCsv("", FileSystem.WriteMode.NO_OVERWRITE);
writeUsingOutputFormat 以指定的格式输出
该方法是将数据以指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类。
users.writeUsingOutputFormat(new TextOutputFormat<>(new Path("/data/yy01"))) ;
users.writeUsingOutputFormat(new CsvOutputFormat<>(new Path("/data/yy01"))) ;
writeToSocket 输出到网络端口
该方法是将数据输出到指定的Socket网络地址端口。输出之前,指定的网络端口服务必须已经启动。
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
lines.writeToSocket("localhost", 9999, new SimpleStringSchema());
3 扩展Sink
3.1 StreamFileSink
该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink中输出的文件,其生命周期会经历3中状态:
- - in-progress Files 当前文件正在写入中
- - Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
- - Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !
数据文件格式是行式存储格式
package com.blok;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.functions.sink.WriteSinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
/**
* @Date: 22.11.9
* @Author: Hang.Nian.YY
* @WX: 17710299606
* @Tips: 学大数据 ,到多易教育
* @Description:
*/
public class _15Base_API_Sinks
public static void main(String[] args) throws Exception
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8888);
StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
see.setParallelism(1);
// 开启checkpoint机制
see.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) ;
see.getCheckpointConfig().setCheckpointStorage("file:///e://flink/data_chk");
/**
* 1 加载原始数据
*/
// 输入的数据格式 id,name
DataStreamSource<String> ds = see.socketTextStream("doitedu01", 8899);
/**
* 将加收到的数据流 以行格式输出到文件中
*/
FileSink<String> fileSink = FileSink
.forRowFormat(new Path("data/yy01"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
// 每间隔指定的时间 生成一个新的文件
.withRolloverInterval(10000) // 10s
// 文件达到指定的大小后 滚动到另一个文件
.withMaxPartSize(1024*5) //5KB
// 数据不活动时间 指定时间内没有数据 则生成一个新的文件
.withInactivityInterval(1000).build()
)
// 分桶策略 , 划分子文件夹的策略
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withBucketCheckInterval(5)
// 文件的前后缀
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("hang-").withPartSuffix(".yy").build())
.build();
// ds.addSink() SinkFunction实现类对象,用addSink() 来添加
ds.sinkTo(fileSink); // Sink 的实现类对象,用 sinkTo()来添加
see.execute("sinks");
其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。
所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
以上是关于Flink系列文档-(YY06)-Flink编程API-Sink的主要内容,如果未能解决你的问题,请参考以下文章
第12讲:Flink 常用的 Source 和 Connector