Flink1.14.3流批一体体验

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.14.3流批一体体验相关的知识,希望对你有一定的参考价值。

前言

Flink自从1.10就喊着要搞流批一体,据说1.14是个里程碑,特意体验下。

变化

DataSet消失

笔者隐约记得,Flink1.8老版本和Spark很像,同样分Stream流处理和DataSet批处理。新版本中:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkDatasetDemo1 
    public static void main(String[] args) throws Exception
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.fromElements("hehe", "haha", "哈哈", "哈哈");//老版本是返回DataSet
        String[] str1 = "hehe1", "haha1", "哈哈1", "哈哈1";
        DataSource<String> data1 = env.fromElements(str1);//老版本是返回DataSet

        AggregateOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction1())
                .groupBy(0).sum(1);
        result.print();

        System.out.println("**************************");

        SortPartitionOperator<Tuple2<String, Integer>> result1 = data1.flatMap(new FlatMapFunction2())
                .groupBy(0).sum(1).sortPartition(1, Order.DESCENDING);
        result1.print();
    

    private static class FlatMapFunction1 implements FlatMapFunction<String, Tuple2<String,Integer>> 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception 
            for (String cell : value.split("\\\\s+") ) 
                out.collect(Tuple2.of(cell,1));
            
        
    

    private static class FlatMapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>> 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception 
            String[] split = value.split("\\\\s+");
            for (int i = 0; i < split.length; i++) 
              out.collect(new Tuple2<>(split[i],1));
            
        
    


执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,1)
(haha,1)
(哈哈,2)
**************************
(哈哈1,2)
(hehe1,1)
(haha1,1)

Process finished with exit code 0

结果当然是不会有啥变化,但是记忆中的DataSet消失了,变成了DataSource,点进去可以看到:

package org.apache.flink.api.java.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.SplitDataProperties;
import org.apache.flink.configuration.Configuration;

/**
 * An operation that creates a new data set (data source). The operation acts as the data set on
 * which to apply further transformations. It encapsulates additional configuration parameters, to
 * customize the execution.
 *
 * @param <OUT> The type of the elements produced by this data source.
 */
@Public
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> 

    private final InputFormat<OUT, ?> inputFormat;

    private final String dataSourceLocationName;

    private Configuration parameters;

    private SplitDataProperties<OUT> splitDataProperties;

    // --------------------------------------------------------------------------------------------

    /**
     * Creates a new data source.
     *
     * @param context The environment in which the data source gets executed.
     * @param inputFormat The input format that the data source executes.
     * @param type The type of the elements produced by this input format.
     */
    public DataSource(
            ExecutionEnvironment context,
            InputFormat<OUT, ?> inputFormat,
            TypeInformation<OUT> type,
            String dataSourceLocationName) 
        super(context, type);

        this.dataSourceLocationName = dataSourceLocationName;

        if (inputFormat == null) 
            throw new IllegalArgumentException("The input format may not be null.");
        

        this.inputFormat = inputFormat;

        if (inputFormat instanceof NonParallelInput) 
            this.parallelism = 1;
        
    

    /**
     * Gets the input format that is executed by this data source.
     *
     * @return The input format that is executed by this data source.
     */
    @Internal
    public InputFormat<OUT, ?> getInputFormat() 
        return this.inputFormat;
    

    /**
     * Pass a configuration to the InputFormat.
     *
     * @param parameters Configuration parameters
     */
    public DataSource<OUT> withParameters(Configuration parameters) 
        this.parameters = parameters;
        return this;
    

    /** @return Configuration for the InputFormat. */
    public Configuration getParameters() 
        return this.parameters;
    

    /**
     * Returns the @link org.apache.flink.api.java.io.SplitDataProperties for the @link
     * org.apache.flink.core.io.InputSplits of this DataSource for configurations.
     *
     * <p>SplitDataProperties can help to generate more efficient execution plans.
     *
     * <p><b> IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
     * </b>
     *
     * @return The SplitDataProperties for the InputSplits of this DataSource.
     */
    @PublicEvolving
    public SplitDataProperties<OUT> getSplitDataProperties() 
        if (this.splitDataProperties == null) 
            this.splitDataProperties = new SplitDataProperties<OUT>(this);
        
        return this.splitDataProperties;
    

    // --------------------------------------------------------------------------------------------

    protected GenericDataSourceBase<OUT, ?> translateToDataFlow() 
        String name =
                this.name != null
                        ? this.name
                        : "at "
                                + dataSourceLocationName
                                + " ("
                                + inputFormat.getClass().getName()
                                + ")";
        if (name.length() > 150) 
            name = name.substring(0, 150);
        

        @SuppressWarnings("unchecked", "rawtypes")
        GenericDataSourceBase<OUT, ?> source =
                new GenericDataSourceBase(
                        this.inputFormat, new OperatorInformation<OUT>(getType()), name);
        source.setParallelism(parallelism);
        if (this.parameters != null) 
            source.getParameters().addAll(this.parameters);
        
        if (this.splitDataProperties != null) 
            source.setSplitDataProperties(this.splitDataProperties);
        
        return source;
    

继续往下找:

package org.apache.flink.api.java.operators;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

/**
 * Base class of all operators in the Java API.
 *
 * @param <OUT> The type of the data set produced by this operator.
 * @param <O> The type of the operator, so that we can return it.
 */
@Public
public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> 

接着往下找:

package org.apache.flink.api.java;

import org.apache.flink.annotation.Public;
import 省略中间的。。。。。。。。。。。。
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * A DataSet represents a collection of elements of the same type.
 *
 * <p>A DataSet can be transformed into another DataSet by applying a transformation as for example
 *
 * <ul>
 *   <li>@link DataSet#map(org.apache.flink.api.common.functions.MapFunction),
 *   <li>@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction),
 *   <li>@link DataSet#join(DataSet), or
 *   <li>@link DataSet#coGroup(DataSet).
 * </ul>
 *
 * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
 */
@Public
public abstract class DataSet<T> 

新版本已经废弃了直接操作DataSet,使用船新的DataSource来做批处理!!!


可以看到现在使用的DataSet的实现类Operator的实现类DataSource。

DataStream有了实现类

每秒mock一条数据的数据源:

package com.zhiyong.flinkStudy;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

/**
 * @program: study
 * @description: Flink的WordCount数据源,每秒产生1条数据
 * @author: zhiyong
 * @create: 2022-03-17 00:06
 **/
public class WordCountSource1ps implements SourceFunction<String> 


    private boolean needRun = true;

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception 
        while (needRun)
            ArrayList<String> result = new ArrayList<>();
            for (int i = 0; i < 20; i++) 
                result.add("zhiyong"+i);
            
            sourceContext.collect(result.get(new Random().nextInt(20)));
            Thread.sleep(1000);
        
    

    @Override
    public void cancel() 
        needRun = false;
    

DataStream程序:

package com.zhiyong.flinkStudy;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

import java.util.Collection;

/**
 * @program: study
 * @description: Flink的DataStreamDemo
 * @author: zhiyong
 * @create: 2022-03-17 00:06
 **/
public class FlinkDataStreamDemo1 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);//防止报网络资源不充分的错

        SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = env.addSource(new WordCountSource1ps())
                .flatMap(new FlatMapFunction1())
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() 
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception 
                        return value.f0;
                    
                )
                .sum(1);

        DataStream<Tuple2<String, Integer>> result2 = env.addSource(new WordCountSource1ps())
                .flatMap(new FlatMapFunction1())
                .keyBy(0)
                // 已经过时的方法
                .sum(1);

//        SingleOutputStreamOperator<Tuple2<String, Integer>> result3 = env.addSource(new WordCountSource1ps())
//                .flatMap(new FlatMapFunction1())
//                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() 
//                    @Override
//                    public Object getKey(Tuple2<String, Integer> value) throws Exception 
//                        return value.f0;
//                    
//                )
//                .window(new WindowAssigner<Tuple2<String, Integer>, Window>() 
//                    @Override
//                    public Collection<Window> assignWindows(Tuple2<String, Integer> element, long timestamp, WindowAssignerContext context) 
//                        return null;
//                    
//
//                    @Override
//                    public Trigger<Tuple2<String, Integer>, Window> getDefaultTrigger(StreamExecutionEnvironment env) 
//                        return null;
//                    
//
//                    @Override
//                    public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) 
//                        return null;
//                    
//
//                    @Override
//                    public boolean isEventTime() 
//                        return false;
//                    
//                )
USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13

产品动态解读Dataphin流批一体的实时研发

产品动态解读Dataphin流批一体的实时研发

USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13

Flink 执行引擎:流批一体的融合之路

流批一体不只有Flink,还有实时数据模型