Flink系列文档-(YY06)-Flink编程API-Sink

Posted 大摇不摆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink系列文档-(YY06)-Flink编程API-Sink相关的知识,希望对你有一定的参考价值。

sink算子是将计算结果最终输出的算子

不同的sink算子可以数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。

1 打印输出print

打印是最简单的一个Sink,通常是用来做实验和测试时使用。

/**
 * sink:  将数据展示在控制台上
 */
users.print("输出的数据是:  ") ;
see.execute("sinks") ;

2 文件sink 

以下writeAs...方法均已被标记为deprecated

 writeAsText 以文本格式输出

该方法是将数据以文本格式实时的写入到指定的目录中,目录中的文件名称是该Sink所在subtask的Index + 1。可以额外指定一个参数writeMode,默认是WriteMode.NO_OVERWRITE。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。下面的方法都已经过期了!

  users.writeAsText("/data/yy01/", FileSystem.WriteMode.NO_OVERWRITE);
  users.writeAsText("/data/yy01/", FileSystem.WriteMode.OVERWRITE);

  writeAsCsv 以csv格式输出 

该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。

该Sink并不是将数据实时的写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。

 users.writeAsCsv("", FileSystem.WriteMode.NO_OVERWRITE);
 users.writeAsCsv("", FileSystem.WriteMode.NO_OVERWRITE);

  writeUsingOutputFormat 以指定的格式输出

该方法是将数据以指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类。

    users.writeUsingOutputFormat(new TextOutputFormat<>(new Path("/data/yy01"))) ;
    users.writeUsingOutputFormat(new CsvOutputFormat<>(new Path("/data/yy01"))) ;

  writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。输出之前,指定的网络端口服务必须已经启动。

DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
lines.writeToSocket("localhost", 9999, new SimpleStringSchema());

3 扩展Sink

3.1 StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

  streamFileSink中输出的文件,其生命周期会经历3中状态:

  •  - in-progress Files  当前文件正在写入中
  •  - Pending Files    当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  •  - Finished Files    在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 ! 

数据文件格式是行式存储格式

package com.blok;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.functions.sink.WriteSinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
/**
 * @Date: 22.11.9
 * @Author: Hang.Nian.YY
 * @WX: 17710299606
 * @Tips: 学大数据 ,到多易教育
 * @Description:
 */
public class _15Base_API_Sinks 
    public static void main(String[] args) throws Exception 
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 8888);
        StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        see.setParallelism(1);
        
        // 开启checkpoint机制
        see.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) ;
        see.getCheckpointConfig().setCheckpointStorage("file:///e://flink/data_chk");
        /**
         * 1  加载原始数据
         */
        // 输入的数据格式   id,name
        DataStreamSource<String> ds = see.socketTextStream("doitedu01", 8899);


        /**
         * 将加收到的数据流  以行格式输出到文件中
         */
        FileSink<String> fileSink = FileSink
                .forRowFormat(new Path("data/yy01"), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                // 每间隔指定的时间  生成一个新的文件
                .withRolloverInterval(10000)  // 10s
                // 文件达到指定的大小后  滚动到另一个文件
                .withMaxPartSize(1024*5)   //5KB
                // 数据不活动时间  指定时间内没有数据 则生成一个新的文件
                .withInactivityInterval(1000).build()
                )
                // 分桶策略 , 划分子文件夹的策略
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                .withBucketCheckInterval(5)
                // 文件的前后缀
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("hang-").withPartSuffix(".yy").build())
                .build();
        // ds.addSink()  SinkFunction实现类对象,用addSink() 来添加
        ds.sinkTo(fileSink);  // Sink 的实现类对象,用 sinkTo()来添加
        see.execute("sinks");
    

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:

以上是关于Flink系列文档-(YY06)-Flink编程API-Sink的主要内容,如果未能解决你的问题,请参考以下文章

第01讲:Flink 的应用场景和架构模型

第12讲:Flink 常用的 Source 和 Connector

第02讲:Flink 入门程序 WordCount 和 SQL 实现

Flink系列论文导读(上)

Flink 1.12 Release 文档解读

Flink 1.12 Release 文档解读