flink02------1.自定义source

Posted jj1106

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink02------1.自定义source相关的知识,希望对你有一定的参考价值。

1.自定义sink

  在flink中,sink负责最终数据的输出。使用DataStream实例中的addSink方法,传入自定义的sink类

定义一个printSink(),使得其打印显示的是真正的task号(默认的情况是task的id+1)

MyPrintSink

技术图片
package cn._51doit.flink.day02;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class MyPrintSink<T> extends RichSinkFunction<T> {

    @Override
    public void invoke(T value, Context context) throws Exception {

        int index = getRuntimeContext().getIndexOfThisSubtask();

        System.out.println(index + " > " + value);
    }
}
View Code

MyPrintSinkDemo

技术图片
package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class MyPrintSinkDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = wordAndOne.keyBy(0).sum(1);

        res.addSink(new MyPrintSink<>());

        env.execute();
    }
}
View Code

 

2. StreamingSink

 用的比较多,可以将结果输出到本地或者hdfs中去,并且支持exactly once

技术图片
package cn._51doit.flink.day02;


import akka.remote.WireFormats;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.concurrent.TimeUnit;

public class StreamFileSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase);
        String path = "E:\flink";

        env.enableCheckpointing(10000);

        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                // 滚动生成文件的最长时间
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) 
                                // 间隔多长时间没写文件,则文件滚动
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
                                // 文件大小超过1m,则滚动
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        upper.addSink(sink);
        env.execute();

    }
}
View Code

 

 

以上是关于flink02------1.自定义source的主要内容,如果未能解决你的问题,请参考以下文章

7.FLINK Source基于集合基于文件基于Socket自定义Source--随机订单数量自定义Source自定义Source-MySQL

07-flink-1.10.1- 用户自定义 flink source api

07-flink-1.10.1- 用户自定义 flink source api

Flink自定义非并行的Source,即Source的并行度只为1

flink1.7自定义source实现

FLINK 基于1.15.2的Java开发-自定义Source端