Flink1.14.3流批一体体验
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.14.3流批一体体验相关的知识,希望对你有一定的参考价值。
前言
Flink自从1.10就喊着要搞流批一体,据说1.14是个里程碑,特意体验下。
变化
DataSet消失
笔者隐约记得,Flink1.8老版本和Spark很像,同样分Stream流处理和DataSet批处理。新版本中:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkDatasetDemo1
public static void main(String[] args) throws Exception
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("hehe", "haha", "哈哈", "哈哈");//老版本是返回DataSet
String[] str1 = "hehe1", "haha1", "哈哈1", "哈哈1";
DataSource<String> data1 = env.fromElements(str1);//老版本是返回DataSet
AggregateOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction1())
.groupBy(0).sum(1);
result.print();
System.out.println("**************************");
SortPartitionOperator<Tuple2<String, Integer>> result1 = data1.flatMap(new FlatMapFunction2())
.groupBy(0).sum(1).sortPartition(1, Order.DESCENDING);
result1.print();
private static class FlatMapFunction1 implements FlatMapFunction<String, Tuple2<String,Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception
for (String cell : value.split("\\\\s+") )
out.collect(Tuple2.of(cell,1));
private static class FlatMapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception
String[] split = value.split("\\\\s+");
for (int i = 0; i < split.length; i++)
out.collect(new Tuple2<>(split[i],1));
执行后:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,1)
(haha,1)
(哈哈,2)
**************************
(哈哈1,2)
(hehe1,1)
(haha1,1)
Process finished with exit code 0
结果当然是不会有啥变化,但是记忆中的DataSet消失了,变成了DataSource,点进去可以看到:
package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.SplitDataProperties;
import org.apache.flink.configuration.Configuration;
/**
* An operation that creates a new data set (data source). The operation acts as the data set on
* which to apply further transformations. It encapsulates additional configuration parameters, to
* customize the execution.
*
* @param <OUT> The type of the elements produced by this data source.
*/
@Public
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>>
private final InputFormat<OUT, ?> inputFormat;
private final String dataSourceLocationName;
private Configuration parameters;
private SplitDataProperties<OUT> splitDataProperties;
// --------------------------------------------------------------------------------------------
/**
* Creates a new data source.
*
* @param context The environment in which the data source gets executed.
* @param inputFormat The input format that the data source executes.
* @param type The type of the elements produced by this input format.
*/
public DataSource(
ExecutionEnvironment context,
InputFormat<OUT, ?> inputFormat,
TypeInformation<OUT> type,
String dataSourceLocationName)
super(context, type);
this.dataSourceLocationName = dataSourceLocationName;
if (inputFormat == null)
throw new IllegalArgumentException("The input format may not be null.");
this.inputFormat = inputFormat;
if (inputFormat instanceof NonParallelInput)
this.parallelism = 1;
/**
* Gets the input format that is executed by this data source.
*
* @return The input format that is executed by this data source.
*/
@Internal
public InputFormat<OUT, ?> getInputFormat()
return this.inputFormat;
/**
* Pass a configuration to the InputFormat.
*
* @param parameters Configuration parameters
*/
public DataSource<OUT> withParameters(Configuration parameters)
this.parameters = parameters;
return this;
/** @return Configuration for the InputFormat. */
public Configuration getParameters()
return this.parameters;
/**
* Returns the @link org.apache.flink.api.java.io.SplitDataProperties for the @link
* org.apache.flink.core.io.InputSplits of this DataSource for configurations.
*
* <p>SplitDataProperties can help to generate more efficient execution plans.
*
* <p><b> IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
* </b>
*
* @return The SplitDataProperties for the InputSplits of this DataSource.
*/
@PublicEvolving
public SplitDataProperties<OUT> getSplitDataProperties()
if (this.splitDataProperties == null)
this.splitDataProperties = new SplitDataProperties<OUT>(this);
return this.splitDataProperties;
// --------------------------------------------------------------------------------------------
protected GenericDataSourceBase<OUT, ?> translateToDataFlow()
String name =
this.name != null
? this.name
: "at "
+ dataSourceLocationName
+ " ("
+ inputFormat.getClass().getName()
+ ")";
if (name.length() > 150)
name = name.substring(0, 150);
@SuppressWarnings("unchecked", "rawtypes")
GenericDataSourceBase<OUT, ?> source =
new GenericDataSourceBase(
this.inputFormat, new OperatorInformation<OUT>(getType()), name);
source.setParallelism(parallelism);
if (this.parameters != null)
source.getParameters().addAll(this.parameters);
if (this.splitDataProperties != null)
source.setSplitDataProperties(this.splitDataProperties);
return source;
继续往下找:
package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* Base class of all operators in the Java API.
*
* @param <OUT> The type of the data set produced by this operator.
* @param <O> The type of the operator, so that we can return it.
*/
@Public
public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT>
接着往下找:
package org.apache.flink.api.java;
import org.apache.flink.annotation.Public;
import 省略中间的。。。。。。。。。。。。
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A DataSet represents a collection of elements of the same type.
*
* <p>A DataSet can be transformed into another DataSet by applying a transformation as for example
*
* <ul>
* <li>@link DataSet#map(org.apache.flink.api.common.functions.MapFunction),
* <li>@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction),
* <li>@link DataSet#join(DataSet), or
* <li>@link DataSet#coGroup(DataSet).
* </ul>
*
* @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
*/
@Public
public abstract class DataSet<T>
新版本已经废弃了直接操作DataSet,使用船新的DataSource来做批处理!!!
可以看到现在使用的DataSet的实现类Operator的实现类DataSource。
DataStream有了实现类
每秒mock一条数据的数据源:
package com.zhiyong.flinkStudy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
import java.util.Random;
/**
* @program: study
* @description: Flink的WordCount数据源,每秒产生1条数据
* @author: zhiyong
* @create: 2022-03-17 00:06
**/
public class WordCountSource1ps implements SourceFunction<String>
private boolean needRun = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception
while (needRun)
ArrayList<String> result = new ArrayList<>();
for (int i = 0; i < 20; i++)
result.add("zhiyong"+i);
sourceContext.collect(result.get(new Random().nextInt(20)));
Thread.sleep(1000);
@Override
public void cancel()
needRun = false;
DataStream程序:
package com.zhiyong.flinkStudy;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import java.util.Collection;
/**
* @program: study
* @description: Flink的DataStreamDemo
* @author: zhiyong
* @create: 2022-03-17 00:06
**/
public class FlinkDataStreamDemo1
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//防止报网络资源不充分的错
SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>()
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception
return value.f0;
)
.sum(1);
DataStream<Tuple2<String, Integer>> result2 = env.addSource(new WordCountSource1ps())
.flatMap(new FlatMapFunction1())
.keyBy(0)
// 已经过时的方法
.sum(1);
// SingleOutputStreamOperator<Tuple2<String, Integer>> result3 = env.addSource(new WordCountSource1ps())
// .flatMap(new FlatMapFunction1())
// .keyBy(new KeySelector<Tuple2<String, Integer>, Object>()
// @Override
// public Object getKey(Tuple2<String, Integer> value) throws Exception
// return value.f0;
//
// )
// .window(new WindowAssigner<Tuple2<String, Integer>, Window>()
// @Override
// public Collection<Window> assignWindows(Tuple2<String, Integer> element, long timestamp, WindowAssignerContext context)
// return null;
//
//
// @Override
// public Trigger<Tuple2<String, Integer>, Window> getDefaultTrigger(StreamExecutionEnvironment env)
// return null;
//
//
// @Override
// public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig)
// return null;
//
//
// @Override
// public boolean isEventTime()
// return false;
//
// )
USDP使用笔记使用Flink1.14.3替换自带的老版Flink1.13