flink源码解读之StreamExecutionEnvironment
Posted 数据中台知行合一
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink源码解读之StreamExecutionEnvironment相关的知识,希望对你有一定的参考价值。
摘要:本文对1.12版本的flink源码StreamExecutionEnvironment进行了相应的注释,深入了解StreamExecutionEnvironment提供的流式作业运行环境相关的功能,该类源码有2000多行,解剖它能对学习和理解flink流式作业有提纲擎领的作业,整个源码注释耗费5个小时多。
先看看类图
开始源码分析。
/**
* The StreamExecutionEnvironment is the context in which a streaming program is executed. A
* {@link LocalStreamEnvironment} will cause execution in the current JVM, a
* {@link RemoteStreamEnvironment} will cause execution on a remote setup.
*
* <p>The environment provides methods to control the job execution (such as setting the parallelism
* or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
*
* @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
* @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
*/
public class StreamExecutionEnvironment {
/** The default name to use for a streaming job if no other name has been specified. */
/*缺省作业名称*/
public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
/** The time characteristic that is used if none other is set. */
/*默认为事件处理时间*/
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
/** The default buffer timeout (max delay of records in the network stack). */
/*默认网络缓冲区超时时间为100毫秒*/
private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
/**
* The environment of the context (local by default, cluster if invoked through command line).
*/
/*流式作业运行环境,默认为本地模式*/
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
/*保存流式计算工厂,用于线程间通信*/
private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();
/** The default parallelism used when creating a local environment. */
/*作业缺省并行度为操作系统CPU核数*/
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
// ------------------------------------------------------------------------
/** The execution configuration for this environment. */
/*作业运行环境的配置*/
private final ExecutionConfig config = new ExecutionConfig();
/** Settings that control the checkpointing behavior. */
/*作业分布式快照的配置*/
private final CheckpointConfig checkpointCfg = new CheckpointConfig();
/*作业转换的列表,多个转换间按照编码的先后顺序串起来*/
protected final List<Transformation<?>> transformations = new ArrayList<>();
/*缓冲区超时时间*/
private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
/*是否运行将各个算子链起来,默认可以,串起来的算子运行速度更快,链起来的算子在一个sub-task(同一个taskManager的slot上执行),
* 避免跨网络数据传输、上下文切换、序列化*/
protected boolean isChainingEnabled = true;
/** The state backend used for storing k/v state and state snapshots. */
/*存储k/v状态数据的状态后端*/
private StateBackend defaultStateBackend;
/** The time characteristic used by the data streams. */
/*时间维度包括事件发生时间、事件接入flink的时间、事件在flink处理的时间,默认为事件在flink的处理时间*/
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
/*分布式快照缓存文件,使用链表的数据结构存储*/
protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
/*选用何种执行器*/
private final PipelineExecutorServiceLoader executorServiceLoader;
/*作业配置文件*/
private final Configuration configuration;
/*类加载器*/
private final ClassLoader userClassloader;
/*作业监听者,使用链表数据结构保存*/
private final List<JobListener> jobListeners = new ArrayList<>();
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
/*无参构造函数*/
public StreamExecutionEnvironment() {
this(new Configuration());
// unfortunately, StreamExecutionEnvironment always (implicitly) had a public constructor.
// This constructor is not useful because the execution environment cannot be used for
// execution. We're keeping this to appease the binary compatibiliy checks.
}
/**
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link PipelineExecutor}.
*/
/*使用传入的Configuration配置进行环境构造,会使用默认类加载器加载*/
public StreamExecutionEnvironment(final Configuration configuration) {
this(configuration, null);
}
/**
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the user code {@link ClassLoader}.
*/
/*使用用户自定义Configuration和用户自定义类加载器构造,执行器使用默认的,默认为本地执行器工厂,可以配置的执行器包括:
CollectionExecutorFactory、EmbeddedExecutorFactory、IDReportingExecutorFactory、
KubernetesSessionClusterExecutorFactory、LocalExecutorFactory、RemoteExecutorFactory、
WebSubmissionExecutorFactory、YarnJobClusterExecutorFactory、YarnSessionClusterExecutorFactory*/
public StreamExecutionEnvironment(
final Configuration configuration,
final ClassLoader userClassloader) {
this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
}
/**
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the {@link PipelineExecutorServiceLoader} and
* user code {@link ClassLoader}.
*/
/*使用传入的执行器工厂,Configuration,用户自定义类加载器构建*/
public StreamExecutionEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userClassloader) {
/*执行器为空指针则抛出NullPointerException异常,构建失败*/
this.executorServiceLoader = checkNotNull(executorServiceLoader);
/*配置文件为空指针则抛出NullPointerException异常,构建失败*/
this.configuration = checkNotNull(configuration);
this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader;
// the configuration of a job or an operator can be specified at the following places:
// i) at the operator level using e.g. parallelism using the SingleOutputStreamOperator.setParallelism().
// ii) programmatically by using e.g. the env.setRestartStrategy() method
// iii) in the configuration passed here
//
// if specified in multiple places, the priority order is the above.
//
// Given this, it is safe to overwrite the execution config default values here because all other ways assume
// that the env is already instantiated so they will overwrite the value passed here.
/*算在的配置优先级高于作业的配置优先级,作业配置优先级高于此处传参的配置*/
this.configure(this.configuration, this.userClassloader);
}
protected Configuration getConfiguration() {
return this.configuration;
}
protected ClassLoader getUserClassloader() {
return userClassloader;
}
/**
* Gets the config object.
*/
/*获取作业配置*/
public ExecutionConfig getConfig() {
return config;
}
/**
* Get the list of cached files that were registered for distribution among the task managers.
*/
/*获取在TaskManager间缓存的文件*/
public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
return cacheFile;
}
/**
* Gets the config JobListeners.
*/
/*获取作业状态该表时要通知的监听者的列表,不建议在监听者的回调函数中做阻塞操作*/
public List<JobListener> getJobListeners() {
return jobListeners;
}
/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as map,
* batchReduce) to run with x parallel instances. This method overrides the
* default parallelism for this environment. The
* {@link LocalStreamEnvironment} uses by default a value equal to the
* number of hardware contexts (CPU cores / threads). When executing the
* program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup.
*
* @param parallelism The parallelism
*/
/*设置当前作业的并行度,此处的设置将覆盖默认并行度,默认的并行度是CPU核数*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
config.setParallelism(parallelism);
return this;
}
/**
* Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)
* is Short.MAX_VALUE.
*
* <p>The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
* @param maxParallelism Maximum degree of parallelism to be used for the program.,
* with 0 < maxParallelism <= 2^15 - 1
*/
/*设置作业的最大并行度,一个作业的最大并行度为Short.MAX_VALUE*/
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
/*如果参数校验未通过,则抛出IllegalArgumentException异常*/
Preconditions.checkArgument(maxParallelism > 0 &&
maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
"maxParallelism is out of bounds 0 < maxParallelism <= " +
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
/*覆盖Configuration设置的并行度*/
config.setMaxParallelism(maxParallelism);
return this;
}
/**
* Gets the parallelism with which operation are executed by default.
* Operations can individually override this value to use a specific
* parallelism.
*
* @return The parallelism used by operations, unless they override that
* value.
*/
public int getParallelism() {
return config.getParallelism();
}
/**
* Gets the maximum degree of parallelism defined for the program.
* 获取当前作业定义的最大并行度
* <p>The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
* 作业最大并行度设置了最大值,同时也定义了分区状态的键分组的数值
* @return Maximum degree of parallelism
*/
public int getMaxParallelism() {
return config.getMaxParallelism();
}
/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
* can result in three logical modes:
* 设置输出缓冲区的刷新频率(毫秒),默认的输出缓冲区的刷新频率提供低延迟和开发人员获得平滑的体验。设置这个参数会影响如下三点:
* <ul>
* <li>A positive integer triggers flushing periodically by that integer</li>
* 正整数,触发周期性的刷新
* <li>0 triggers flushing after every record thus minimizing latency</li>
* 设置为0,每一个记录都将触发刷新,提供最小化的延迟
* <li>-1 triggers flushing only when the output buffer is full thus maximizing
* 设置为-1,只有缓冲区超过最大容量时才触发刷新
* throughput</li>
* </ul>
*
* @param timeoutMillis
* The maximum time between two output flushes.
*/
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
/*参数只能为-1或者非负整数*/
if (timeoutMillis < -1) {
throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
}
this.bufferTimeout = timeoutMillis;
return this;
}
/**
* Gets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. For clarification on the extremal values see
* {@link #setBufferTimeout(long)}.
*获取输出缓冲区最大刷新频率
* @return The timeout of the buffer.
*/
public long getBufferTimeout() {
return this.bufferTimeout;
}
/**
* Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully
* avoiding serialization and de-serialization.
* 禁用流式操作链。算子链允许操作时无shuffle,以便任务能在一个线程实现本地计算,避免序列化和反序列化。
* 非必要时不要禁用操作链。
* @return StreamExecutionEnvironment with chaining disabled.
*/
public StreamExecutionEnvironment disableOperatorChaining() {
this.isChainingEnabled = false;
return this;
}
/**
* Returns whether operator chaining is enabled.
* 返回操作链是否被启用
* @return {@code true} if chaining is enabled, false otherwise.
*/
public boolean isChainingEnabled() {
return isChainingEnabled;
}
// ------------------------------------------------------------------------
// Checkpointing Settings
// ------------------------------------------------------------------------
/**
* Gets the checkpoint config, which defines values like checkpoint interval, delay between
* checkpoints, etc.
* 获取设置快照的配置文件,定义了快照的间隔,前后快照的延迟等
* @return The checkpoint config.
*/
public CheckpointConfig getCheckpointConfig() {
return checkpointCfg;
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint. This method selects
* {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
* 启用流式作业的快照机制。分布式的数据流的状态将会周期性的采集快照,如果检测到作业失败,
* 数据流将会从最新一次的快照处重行启动计算。该方法保障EXACTLY_ONCE的数据一致性。
* <p>The job draws checkpoints periodically, in the given interval. The state will be
* stored in the configured state backend.
* 作业将会以指定间隔,周期性的采集快照。这些状态数据将会保存在状态后端里,状态后端支持:
* 内存状态后端,默认限制大小为5MB,适用于本地调试和flink任务较少的场景。
* 文件系统状态后端,适用于高可用配置、大状态、长窗口、大key/value状态的任务。
* RocksDB数据库状态后端,适用于高可用配置、大状态、长窗口、大key/value状态的任务,常用于生产系统。
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
* 检查点迭代数据流当前不被支持,因此,对于迭代作业,如果开启了检查点,作业将不能正常启动。
* 为了覆盖这种机制,使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
* @param interval Time interval between state checkpoints in milliseconds.
* 状态检查点的时间间隔,单位为毫秒
*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
checkpointCfg.setCheckpointInterval(interval);
return this;
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
* 启用流式作业的快照机制。分布式的数据流的状态将会周期性的采集快照,如果检测到作业失败,
* 数据流将会从最新一次的快照处重行启动计算。
* <p>The job draws checkpoints periodically, in the given interval. The system uses the
* given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
* The state will be stored in the configured state backend.
* 快照机制模式可以设置为exactly once或at least once
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
* 检查点迭代数据流当前不被支持,因此,对于迭代作业,如果开启了检查点,作业将不能正常启动。
* 为了覆盖这种机制,使用enableCheckpointing(long, CheckpointingMode, boolean)方法。
* @param interval
* Time interval between state checkpoints in milliseconds.
* 状态检查点的时间间隔,单位为毫秒
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
* 设置检查点机制的模式,包括"exactly once" 和 "at least once"。
* 注意,exactly once会增加延迟,并且不保障多个系统间的数据也满足exactly once。
* at least once表示出错或恢复时,状态数据将会重复多次。
*/
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
checkpointCfg.setCheckpointingMode(mode);
checkpointCfg.setCheckpointInterval(interval);
return this;
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* <p>The job draws checkpoints periodically, in the given interval. The state will be
* stored in the configured state backend.
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. If the "force" parameter is set to true, the system will execute the
* job nonetheless.
* 当前迭代式作业尚不支持快照,如果force参数设置为true,系统虽然可以执行作业,但是仍然不支持快照。
* @param interval
* Time interval between state checkpoints in millis.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
* @param force
* If true checkpointing will be enabled for iterative jobs as well.
*
* @deprecated Use {@link #enableCheckpointing(long, CheckpointingMode)} instead.
* Forcing checkpoints will be removed in the future.
* 强制快照,后续版本会移除。
*/
"deprecation") (
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
checkpointCfg.setCheckpointingMode(mode);
checkpointCfg.setCheckpointInterval(interval);
checkpointCfg.setForceCheckpointing(force);
return this;
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint. This method selects
* {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
*
* <p>The job draws checkpoints periodically, in the default interval. The state will be
* stored in the configured state backend.
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
*
* @deprecated Use {@link #enableCheckpointing(long)} instead.
*/
public StreamExecutionEnvironment enableCheckpointing() {
/*设置默认检查点为500毫秒*/
checkpointCfg.setCheckpointInterval(500);
return this;
}
/**
* Returns the checkpointing interval or -1 if checkpointing is disabled.
*
* <p>Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}.
*
* @return The checkpointing interval or -1
*/
/*返回检查点间的间隔,返回-1表示禁用了检查点*/
public long getCheckpointInterval() {
return checkpointCfg.getCheckpointInterval();
}
/**
* Returns whether checkpointing is force-enabled.
*
* @deprecated Forcing checkpoints will be removed in future version.
*/
"deprecation") (
public boolean isForceCheckpointing() {
return checkpointCfg.isForceCheckpointing();
}
/**
* Returns the checkpointing mode (exactly-once vs. at-least-once).
*
* <p>Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}.
*
* @return The checkpoint mode
*/
/*返回检查点模式,exactly-once和at-least-once*/
public CheckpointingMode getCheckpointingMode() {
return checkpointCfg.getCheckpointingMode();
}
/**
* Sets the state backend that describes how to store and checkpoint operator state. It defines
* both which data structures hold state during execution (for example hash tables, RockDB,
* or other data stores) as well as where checkpointed data will be persisted.
* 设置状态检查点,描述如何保存检查点状态数据。同时定义了作业执行期间的保存状态的数据结构,如哈希表,RockDB数据库
* 或者其他数据存储系统以及检查点数据持久化到哪里。
* <p>State managed by the state backend includes both keyed state that is accessible on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream keyed streams}, as well as
* state maintained directly by the user code that implements
* {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction CheckpointedFunction}.
* 状态后端要管理的状态数据包括两种:
* 1、调用KeyedStream的键状态数据
* 2、用户直接通过编码实现CheckpointedFunction的维持状态数据
* <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
* maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
* but can checkpoint only small states (some counters).
* 将状态数据保存在堆内存里,如对象。这种方式减少了额外的依赖,不过只能保存少量的状态数据。
* <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
* stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
* file system (like HDFS, S3, MapR FS, Alluxio, etc) this will guarantee that state is not lost upon
* failures of individual nodes and that streaming program can be executed highly available and strongly
* consistent (assuming that Flink is run in high-availability mode).
* 相反,使用文件状态后端保存检查点状态数据,如分布式文件系统HDFS,亚马逊S3,MapReduce FS,Alluxio等,这些将保障状态数据
* 不丢失,保障作业的高可用和数据强一致性。
* @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
* 返回StreamExecutionEnvironment本身,可以使用函数链式调用。
*
* @see #getStateBackend()
*/
public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
/*检查参数是否为空指针,如果为空,则抛出异常NullPointerException*/
this.defaultStateBackend = Preconditions.checkNotNull(backend);
return this;
}
/**
* @deprecated Use {@link #setStateBackend(StateBackend)} instead.
*/
public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
this.defaultStateBackend = Preconditions.checkNotNull(backend);
return this;
}
/**
* Gets the state backend that defines how to store and checkpoint state.
*
* @see #setStateBackend(StateBackend)
*/
public StateBackend getStateBackend() {
return defaultStateBackend;
}
/**
* Sets the restart strategy configuration. The configuration specifies which restart strategy
* will be used for the execution graph in case of a restart.
*
* @param restartStrategyConfiguration Restart strategy configuration to be set
*/
/*通过配置文件设置作业重启策略,包括
1、FailureRateRestartStrategyConfiguration,失败超过一定频率就重启。
2、FallbackRestartStrategyConfiguration,用户在flink-conf.yaml配置的自定义重启策略。
3、FixedDelayRestartStrategyConfiguration,固定延迟重启策略。
4、NoRestartStrategyConfiguration,无重启策略
*/
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
config.setRestartStrategy(restartStrategyConfiguration);
}
/**
* Returns the specified restart strategy configuration.
*
* @return The restart strategy configuration to be used
*/
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
return config.getRestartStrategy();
}
/**
* Sets the number of times that failed tasks are re-executed. A value of
* zero effectively disables fault tolerance. A value of {@code -1}
* indicates that the system default value (as defined in the configuration)
* should be used.
* 设置任务失败次数超过给定阈值时重新计算作业。
* 设置为0表示不容忍失败。
* 设置为-1表示使用系统默认配置
*
* @param numberOfExecutionRetries
* The number of times the system will try to re-execute failed tasks.
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
* {@link RestartStrategies#fixedDelayRestart(int, Time)} contains the number of
* execution retries.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
}
/**
* Gets the number of times the system will try to re-execute failed tasks.
* A value of {@code -1} indicates that the system default value (as defined
* in the configuration) should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*
* @deprecated This method will be replaced by {@link #getRestartStrategy}.
*/
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
}
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
/**
* Adds a new Kryo default serializer to the Runtime.
* 添加Kryo为缺省序列化器到运行时
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes
* by java serialization.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializer
* The serializer to use.
*/
public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
config.addDefaultKryoSerializer(type, serializer);
}
/**
* Adds a new Kryo default serializer to the Runtime.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializerClass
* The class of the serializer to use.
*/
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
config.addDefaultKryoSerializer(type, serializerClass);
}
/**
* Registers the given type with a Kryo Serializer.
* 注册Kryo为序列化器
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes
* by java serialization.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializer
* The serializer to use.
*/
public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
config.registerTypeWithKryoSerializer(type, serializer);
}
/**
* Registers the given Serializer via its class as a serializer for the
* given type at the KryoSerializer.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializerClass
* The class of the serializer to use.
*/
"rawtypes") (
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
config.registerTypeWithKryoSerializer(type, serializerClass);
}
/**
* Registers the given type with the serialization stack. If the type is
* eventually serialized as a POJO, then the type is registered with the
* POJO serializer. If the type ends up being serialized with Kryo, then it
* will be registered at Kryo to make sure that only tags are written.
* 如果注册的type为POJO,则使用POJO序列化。
* 如果注册的type为Kryo,则注册使用Kryo序列化。
* @param type
* The class of the type to register.
*/
public void registerType(Class<?> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
}
/*使用简单工厂方法创建序列化*/
TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type);
if (typeInfo instanceof PojoTypeInfo) {
config.registerPojoType(type);
} else {
config.registerKryoType(type);
}
}
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
* 设置时间特征,支持事件处理时间,事件发生时间,事件接入flink系统时间
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
/*如果时间特征为处理时间,设置水印间隔为0秒*/
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
/*如果时间特征不是处理时间,设置水印间隔为200毫秒*/
getConfig().setAutoWatermarkInterval(200);
}
}
/**
* Gets the time characteristic.
*
* @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @return The time characteristic.
*/
public TimeCharacteristic getStreamTimeCharacteristic() {
return timeCharacteristic;
}
/**
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g.
* {@link StreamPipelineOptions#TIME_CHARACTERISTIC}. It will reconfigure
* {@link StreamExecutionEnvironment}, {@link ExecutionConfig} and {@link CheckpointConfig}.
*
* <p>It will change the value of a setting only if a corresponding option was set in the
* {@code configuration}. If a key is not present, the current value of a field will remain
* untouched.
*
* @param configuration a configuration to read the values from
* @param classLoader a class loader to use when loading classes
*/
public void configure(ReadableConfig configuration, ClassLoader classLoader) {
/*配置时间特征*/
configuration.getOptional(StreamPipelineOptions.TIME_CHARACTERISTIC)
.ifPresent(this::setStreamTimeCharacteristic);
/*配置状态后端*/
Optional.ofNullable(loadStateBackend(configuration, classLoader))
.ifPresent(this::setStateBackend);
/*配置是否启用算子链*/
configuration.getOptional(PipelineOptions.OPERATOR_CHAINING)
.ifPresent(c -> this.isChainingEnabled = c);
/*配置输出缓冲区超时时间*/
configuration.getOptional(ExecutionOptions.BUFFER_TIMEOUT)
.ifPresent(t -> this.setBufferTimeout(t.toMillis()));
/*配置作业监听者*/
configuration.getOptional(DeploymentOptions.JOB_LISTENERS)
.ifPresent(listeners -> registerCustomListeners(classLoader, listeners));
/*配置缓存文件*/
configuration.getOptional(PipelineOptions.CACHED_FILES)
.ifPresent(f -> {
this.cacheFile.clear();
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
/*配置类加载器*/
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
}
/*注册用户自定义监听器*/
private void registerCustomListeners(final ClassLoader classLoader, final List<String> listeners) {
for (String listener : listeners) {
try {
final JobListener jobListener = InstantiationUtil.instantiate(
listener, JobListener.class, classLoader);
jobListeners.add(jobListener);
} catch (FlinkException e) {
throw new WrappingRuntimeException("Could not load JobListener : " + listener, e);
}
}
}
/*加载状态后端*/
private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader classLoader) {
try {
return StateBackendLoader.loadStateBackendFromConfig(
configuration,
classLoader,
null);
} catch (DynamicCodeLoadingException | IOException e) {
throw new WrappingRuntimeException(e);
}
}
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
/**
* Creates a new data stream that contains a sequence of numbers. This is a parallel source,
* if you manually set the parallelism to {@code 1}
* (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)})
* the generated sequence of elements is in order.
*
* @param from
* The number to start at (inclusive)
* @param to
* The number to stop at (inclusive)
* @return A data stream, containing all number in the [from, to] interval
*/
/*创建一个数值序列*/
public DataStreamSource<Long> generateSequence(long from, long to) {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
/*source增加数值序列*/
return addSource(new StatefulSequenceSource(from, to), "Sequence Source");
}
/**
* Creates a new data stream that contains the given elements. The elements must all be of the
* same type, for example, all of the {@link String} or {@link Integer}.
*
* <p>The framework will try and determine the exact type from the elements. In case of generic
* elements, it may be necessary to manually supply the type information via
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data
* stream source with a degree of parallelism one.
*
* @param data
* The array of elements to create the data stream from.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
/*通过给定的值集合创建一个数据流
* 数据源的并行度为1
* */
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
/*通过集合的第一个值类型判断具体类型,集合的类型需要保持全部一致*/
typeInfo = TypeExtractor.getForObject(data[0]);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
/*将集合转换为链表*/
return fromCollection(Arrays.asList(data), typeInfo);
}
/**
* Creates a new data set that contains the given elements. The framework will determine the type according to the
* based type user supplied. The elements should be the same or be the subclass to the based type.
* The sequence of elements must not be empty.
* Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
* degree of parallelism one.
*
* @param type
* The based class type in the collection.
* @param data
* The array of elements to create the data stream from.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given array of elements
*/
/*制定数据类型,从给定集合中创建数据流,添加到Source*/
public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForClass(type);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(Arrays.asList(data), typeInfo);
}
/**
* Creates a data stream from the given non-empty collection. The type of the data stream is that of the
* elements in the collection.
*
* <p>The framework will try and determine the exact type from the collection elements. In case of generic
* elements, it may be necessary to manually supply the type information via
* {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with
* parallelism one.
*
* @param data
* The collection of elements to create the data stream from.
* @param <OUT>
* The generic type of the returned data stream.
* @return
* The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
Preconditions.checkNotNull(data, "Collection must not be null");
if (data.isEmpty()) {
throw new IllegalArgumentException("Collection must not be empty");
}
OUT first = data.iterator().next();
if (first == null) {
throw new IllegalArgumentException("Collection must not contain null elements");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForObject(first);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(data, typeInfo);
}
/**
* Creates a data stream from the given non-empty collection.
*
* <p>Note that this operation will result in a non-parallel data stream source,
* i.e., a data stream source with parallelism one.
*
* @param data
* The collection of elements to create the data stream from
* @param typeInfo
* The TypeInformation for the produced data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the given collection
*/
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "Collection must not be null");
// must not have null elements and mixed elements
FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
SourceFunction<OUT> function;
try {
function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
return addSource(function, "Collection Source", typeInfo).setParallelism(1);
}
/**
* Creates a data stream from the given iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens,
* the type of data returned by the iterator must be given explicitly in the form of the type
* class (this is due to the fact that the Java compiler erases the generic type information).
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e.,
* a data stream source with a parallelism of one.
*
* @param data
* The iterator of elements to create the data stream from
* @param type
* The class of the data produced by the iterator. Must not be a generic class.
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the elements in the iterator
* @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
*/
/*从给定迭代器创建Source*/
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
return fromCollection(data, TypeExtractor.getForClass(type));
}
/**
* Creates a data stream from the given iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens,
* the type of data returned by the iterator must be given explicitly in the form of the type
* information. This method is useful for cases where the type is generic.
* In that case, the type class (as given in
* {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
*
* <p>Note that this operation will result in a non-parallel data stream source, i.e.,
* a data stream source with parallelism one.
*
* @param data
* The iterator of elements to create the data stream from
* @param typeInfo
* The TypeInformation for the produced data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream representing the elements in the iterator
*/
/*从给定迭代器创建Source,名称为Collection Source*/
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be null");
SourceFunction<OUT> function = new FromIteratorFunction<>(data);
return addSource(function, "Collection Source", typeInfo);
}
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable,
* allowing the framework to create a parallel data stream source that returns the elements in
* the iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens, the type
* of data returned by the iterator must be given explicitly in the form of the type class
* (this is due to the fact that the Java compiler erases the generic type information).
*
* @param iterator
* The iterator that produces the elements of the data stream
* @param type
* The class of the data produced by the iterator. Must not be a generic class.
* @param <OUT>
* The type of the returned data stream
* @return A data stream representing the elements in the iterator
*/
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type) {
return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
}
/**
* Creates a new data stream that contains elements in the iterator. The iterator is splittable,
* allowing the framework to create a parallel data stream source that returns the elements in
* the iterator.
*
* <p>Because the iterator will remain unmodified until the actual execution happens, the type
* of data returned by the iterator must be given explicitly in the form of the type
* information. This method is useful for cases where the type is generic. In that case, the
* type class (as given in
* {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)} does not
* supply all type information.
*
* @param iterator
* The iterator that produces the elements of the data stream
* @param typeInfo
* The TypeInformation for the produced data stream.
* @param <OUT>
* The type of the returned data stream
* @return A data stream representing the elements in the iterator
*/
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo) {
return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
}
// private helper for passing different names
private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
typeInfo, String operatorName) {
return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
}
/**
* Reads the given file line-by-line and creates a data stream that contains a string with the
* contents of each such line. The file will be read with the UTF-8 character set.
*
* <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
* them to the downstream readers to read the actual data,
* and exits, without waiting for the readers to finish reading. This implies that no more
* checkpoint barriers are going to be forwarded after the source exits, thus having no
* checkpoints after that point.
*
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The data stream that represents the data read from the given file as text lines
*/
/*从文本文件中读数据到Source,文件按行分割,文件编码格式为UTF-8,支持从本地文件系统和hdfs读取*/
public DataStreamSource<String> readTextFile(String filePath) {
return readTextFile(filePath, "UTF-8");
}
/**
* Reads the given file line-by-line and creates a data stream that contains a string with the
* contents of each such line. The {@link java.nio.charset.Charset} with the given name will be
* used to read the files.
*
* <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
* forwards them to the downstream readers to read the actual data,
* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
* barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
*
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param charsetName
* The name of the character set used to read the file
* @return The data stream that represents the data read from the given file as text lines
*/
/*从文本文件中读数据到Source,文件按行分割,文件编码格式通过charsetName指定,支持从本地文件系统和hdfs读取*/
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
/*文件不能为空文件*/
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
/**
* Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
* 读取用户自定义数据,实现FileInputFormat
* <p>Since all data streams need specific information about their types, this method needs to determine the
* type of the data produced by the input format. It will attempt to determine the data type by reflection,
* unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
* In the latter case, this method will invoke the
* {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
* type produced by the input format.
* 通过反射机制判断文件的数据类型。
* 如果输入格式实现了ResultTypeQueryable接口,将调用getProducedType获取文件数据类型。
* <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
* forwards them to the downstream readers to read the actual data,
* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
* barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
* Source退出后,没有检查点。
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param inputFormat
* The input format used to create the data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data read from the given file
*/
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath) {
return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
}
/**
* Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
* on the provided {@link FileProcessingMode}.
*
* <p>See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
*
* @param inputFormat
* The input format used to create the data stream
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
* 监视类型,包括监控文件路径并对新数据做出反应、只处理一次并退出,
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
* 扫描文件路径的间隔
* @param filter
* The files to be excluded from the processing
* 待排除的文件
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data read from the given file
*
* @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
* {@link StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)}
*
*/
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
FilePathFilter filter) {
inputFormat.setFilesFilter(filter);
TypeInformation<OUT> typeInformation;
try {
typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
} catch (Exception e) {
throw new InvalidProgramException("The type returned by the input format could not be " +
"automatically determined. Please specify the TypeInformation of the produced type " +
"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
}
return readFile(inputFormat, filePath, watchType, interval, typeInformation);
}
/**
* Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
* on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path
* for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and
* exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, the user
* can specify a custom {@link FilePathFilter}. As a default implementation you can use
* {@link FilePathFilter#createDefaultFilter()}.
*
* <p>Since all data streams need specific information about their types, this method needs to determine the
* type of the data produced by the input format. It will attempt to determine the data type by reflection,
* unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
* In the latter case, this method will invoke the
* {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
* type produced by the input format.
*
* <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
* the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
* to be processed, forwards them to the downstream readers to read the actual data,
* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
* are going to be forwarded after the source exits, thus having no checkpoints after that point.
*
* @param inputFormat
* The input format used to create the data stream
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data read from the given file
*/
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval) {
TypeInformation<OUT> typeInformation;
try {
typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
} catch (Exception e) {
throw new InvalidProgramException("The type returned by the input format could not be " +
"automatically determined. Please specify the TypeInformation of the produced type " +
"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
}
return readFile(inputFormat, filePath, watchType, interval, typeInformation);
}
/**
* Creates a data stream that contains the contents of file created while system watches the given path. The file
* will be read with the system's default character set.
*
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
* @param intervalMillis
* The interval of file watching in milliseconds
* @param watchType
* The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
* only
* new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
* appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
* contents
* of files.
* 1、ONLY_NEW_FILES,只读新文件
* 2、REPROCESS_WITH_APPENDED,读取包括新增的所有的文件
* 3、PROCESS_ONLY_APPENDED,读取新增的文件内容
* @return The DataStream containing the given directory.
*
* @deprecated Use {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} instead.
*/
"deprecation") (
public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType) {
DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
filePath, intervalMillis, watchType), "Read File Stream source");
return source.flatMap(new FileReadFunction());
}
/**
* Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
* Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
* the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the
* path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed,
* the user can specify a custom {@link FilePathFilter}. As a default implementation you can use
* {@link FilePathFilter#createDefaultFilter()}.
*
* <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
* the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
* to be processed, forwards them to the downstream readers to read the actual data,
* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
* are going to be forwarded after the source exits, thus having no checkpoints after that point.
*
* @param inputFormat
* The input format used to create the data stream
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
* @param typeInformation
* Information on the type of the elements in the output stream
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data read from the given file
*/
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
TypeInformation<OUT> typeInformation) {
Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
inputFormat.setFilePath(filePath);
return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. On the termination of the socket server connection retries can be
* initiated.
* 从链接中创建一个数据流,接收到的字符串按照系统默认的字符编码进行解码。当链接中断时可以发起重试。
*
* <p>Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
* the socket was gracefully terminated.
*
* @param hostname
* The host name which a server socket binds
* 待接收数据的服务器地址
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* 待监听的端口,如果设置为0,则随机分配一个端口
* @param delimiter
* A character which splits received strings into records
* 用于分割接收到的数据的分隔符,一般是回车、换行
* @param maxRetry
* The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
* Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
* while
* a negative value ensures retrying forever.
* 当接收数据的链接终短时,最大的重试秒数,每秒重试一次。配置为0的话则读操作随着链接中断而停止。如果配置为一个负数,则不停的重试
* @return A data stream containing the strings received from the socket
* 返回从链接中接收到的字符串流
*
* @deprecated Use {@link #socketTextStream(String, int, String, long)} instead.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
return socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. On the termination of the socket server connection retries can be
* initiated.
* 从链接中创建一个数据流,接收到的字符串按照系统默认的字符编码进行解码。当链接中断时可以发起重试。
*
* <p>Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
* the socket was gracefully terminated.
* 链接被非正常终止时,需要注意这个事件不会被自动报告,只有在链接优雅的终止时才会发起重试。
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @param delimiter
* A string which splits received strings into records
* @param maxRetry
* The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
* Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
* while
* a negative value ensures retrying forever.
* @return A data stream containing the strings received from the socket
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. The reader is terminated immediately when the socket is down.
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @param delimiter
* A character which splits received strings into records
* @return A data stream containing the strings received from the socket
*
* @deprecated Use {@link #socketTextStream(String, int, String)} instead.
*/
/*链接中断不重试*/
"deprecation") (
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set. The reader is terminated immediately when the socket is down.
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @param delimiter
* A string which splits received strings into records
* @return A data stream containing the strings received from the socket
*/
/*当链接中断时立马停止读*/
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
/**
* Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
* decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when
* the socket is down.
*
* @param hostname
* The host name which a server socket binds
* @param port
* The port number which a server socket binds. A port number of 0 means that the port number is automatically
* allocated.
* @return A data stream containing the strings received from the socket
*/
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, "\n");
}
/**
* Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
*
* <p>Since all data streams need specific information about their types, this method needs to determine the
* type of the data produced by the input format. It will attempt to determine the data type by reflection,
* unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
* In the latter case, this method will invoke the
* {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
* type produced by the input format.
*
* <p><b>NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
* (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
* them to the downstream readers to read the actual data, and exits,
* without waiting for the readers to finish reading. This implies that no more checkpoint
* barriers are going to be forwarded after the source exits, thus having no checkpoints.
*
* @param inputFormat
* The input format used to create the data stream
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data created by the input format
*/
/** 创建一个输入流,格式为实现了InputFormat的格式。数据的类型使用反射机制去判断,
* 如果实现了ResultTypeQueryable接口,则使用getProducedType获取数据类型
*/
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
}
/**
* Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
*
* <p>The data stream is typed to the given TypeInformation. This method is intended for input formats
* where the return type cannot be determined by reflection analysis, and that do not implement the
* {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
*
* <p><b>NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
* (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
* them to the downstream readers to read the actual data, and exits,
* without waiting for the readers to finish reading. This implies that no more checkpoint
* barriers are going to be forwarded after the source exits, thus having no checkpoints.
* 数据读完就退出,没有检查点
*
* @param inputFormat
* The input format used to create the data stream
* @param typeInfo
* The information about the type of the output type
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data created by the input format
*/
/*使用用户指定的数据类型创建输入流*/
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
DataStreamSource<OUT> source;
/*如果创建的数据流格式是文件类型*/
if (inputFormat instanceof FileInputFormat) {
"unchecked") (
FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
/*文件处理策略为读取完就退出,读取间隔设置为-1,被调用函数将会忽略这个参数配置*/
source = createFileInput(format, typeInfo, "Custom File source",
FileProcessingMode.PROCESS_ONCE, -1);
} else {
source = createInput(inputFormat, typeInfo, "Custom Source");
}
return source;
}
private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName) {
/** 输入分片
* 类继承关系:InputFormatSourceFunction》RichParallelSourceFunction》AbstractRichFunction》RichFunction》Function
*
* */
InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo);
return addSource(function, sourceName, typeInfo);
}
private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
long interval) {
/*上层调用也做了inputFormat的非空判断*/
Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
/*判断参数是否是只处理一次就退出,并且监控间隔大于等于1毫秒*/
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
"The path monitoring interval cannot be less than " +
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
/*输入分片构建的函数类*/
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
/*读取输入分片的具体实现类*/
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory =
new ContinuousFileReaderOperatorFactory<>(inputFormat);
/*读数据的这个操作的并行度是1,读出来后使用的并行度就是作业配置的并行度*/
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
.transform("Split Reader: " + sourceName, typeInfo, factory);
return new DataStreamSource<>(source);
}
/**
* Adds a Data Source to the streaming topology.
*
* <p>By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
* implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
* org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source
* will have the parallelism of the environment. To change this afterwards call {@link
* org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
*
* @param function
* the user defined function
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
/**
* 添加数据源到StreamGraph
* 默认的数据源的并行度为1,
* 如果要设置其他并行度,需要实现ParallelSourceFunction或继承RichParallelSourceFunction,这样就可以复用作业运行环境的并行度了
* */
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
return addSource(function, "Custom Source");
}
/**
* Adds a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
* @param function
* the user defined function
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
/*使用自定义类型添加输入源*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
return addSource(function, sourceName, null);
}
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
* @param function
* the user defined function
* @param <OUT>
* type of the returned stream
* @param typeInfo
* the user defined type information for the stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) {
return addSource(function, "Custom Source", typeInfo);
}
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
* @param function
* the user defined function
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @param typeInfo
* the user defined type information for the stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}
/**
* Add a data {@link Source} to the environment to get a {@link DataStream}.
*
* @param source
* the user defined source
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
/*实验性功能,添加连续的数据源*/
public <OUT> DataStreamSource<OUT> continuousSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
/**
* 水印的处理策略主要就是对记录的时间戳的处理策略,一般使用事件事件-延迟时间
* Watermark的核心本质可以理解成一个延迟触发机制。
* 在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做 窗口计算操作(如汇总、分组等),
* 如果数据没有全部到达,则继续等待该窗口中的数据全 部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,
* 它能够衡量数据处 理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及 延迟到达时,
* 也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
* 那么 Flink 是怎么计算 Watermak 的值呢?
* Watermark =进入Flink 的最大的事件时间(mxtEventTime)-指定的延迟时间(t)
* 那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
* 如果有窗口的停止时间等于或者小于 maxEventTime - t(当时的warkmark),那么这个窗口被触发执行。
* */
return continuousSource(source, timestampsAndWatermarks, sourceName, null);
}
/**
* Add a data {@link Source} to the environment to get a {@link DataStream}.
*
* @param source
* the user defined source
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @param typeInfo
* the user defined type information for the stream
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> continuousSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo) {
final TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource<>(
this,
checkNotNull(source, "source"),
checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
checkNotNull(resolvedTypeInfo),
checkNotNull(sourceName));
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with a generated
* default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
/*触发程序执行,作业默认名称为Flink Streaming Job
* 程序将执行所有sink算子的操作,例如打印,将他们转发到消息队列
* */
public JobExecutionResult execute() throws Exception {
return execute(DEFAULT_JOB_NAME);
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with the provided name
*
* @param jobName
* Desired name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
/*指定作业名称执行*/
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
return execute(getStreamGraph(jobName));
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* @param streamGraph the stream graph representing the transformations
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
/*使用用户指定的转换拓扑图
* 返回作业的执行结果,包括程序持续时间和累加器
* */
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
/*异步执行转换拓扑
* 通过JobClient可以获取作业的ID,作业的执行状态,触发检查点,停止并保存检查点,获取累加器结果,获取作业执行结果
* */
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
/*DeploymentOptions支持TARGET,ATTACHED,SHUTDOWN_IF_ATTACHED,JOB_LISTENERS
* 客户端模式提交作业,在控制台可以看到作业运行日志
* */
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
} else {
/*异步提交作业,客户端提交后就退出了,无法看到作业运行日志*/
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
/*串行化调用作业监听者,作业状态改变时分别通知关注不同事件的监听者*/
jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
/*通知所有监听者,作业执行出错*/
jobListeners.forEach(jobListener -> {
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
});
ExceptionUtils.rethrowException(t);
// never reached, only make javac happy
return null;
}
}
/**
* Register a {@link JobListener} in this environment. The {@link JobListener} will be
* notified on specific job status changed.
*/
/*注册作业监听者*/
public void registerJobListener(JobListener jobListener) {
checkNotNull(jobListener, "JobListener cannot be null");
jobListeners.add(jobListener);
}
/**
* Clear all registered {@link JobListener}s.
*/
/*清空所有监听者*/
public void clearJobListeners() {
this.jobListeners.clear();
}
/**
* Triggers the program asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with a generated
* default name.
*
* @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
/*异步提交作业,调用者通过JobClient获取作业执行情况*/
public final JobClient executeAsync() throws Exception {
return executeAsync(DEFAULT_JOB_NAME);
}
/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with the provided name
*
* @param jobName desired name of the job
* @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
public JobClient executeAsync(String jobName) throws Exception {
return executeAsync(getStreamGraph(checkNotNull(jobName)));
}
/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* @param streamGraph the stream graph representing the transformations
* @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
/*流水线执行工厂*/
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
try {
/*获取作业执行结果*/
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
* clears previously registered {@link Transformation transformations}.
*
* @return The streamgraph representing the transformations
*/
public StreamGraph getStreamGraph() {
return getStreamGraph(DEFAULT_JOB_NAME);
}
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
* clears previously registered {@link Transformation transformations}.
*
* @param jobName Desired name of the job
* @return The streamgraph representing the transformations
*/
/*获取流作业的拓扑图,将会清除之前注册的转换*/
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraph(jobName, true);
}
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the streaming job
* with the option to clear previously registered {@link Transformation transformations}. Clearing the
* transformations allows, for example, to not re-execute the same operations when calling
* {@link #execute()} multiple times.
*
* @param jobName Desired name of the job
* @param clearTransformations Whether or not to clear previously registered transformations
* @return The streamgraph representing the transformations
*/
/*获取指定作业的转换拓扑图,clearTransformations设置为true则将会清楚调用该函数前的所有转换*/
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
/*获取转换作业拓扑图的工厂*/
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
* @return The execution plan of the program, as a JSON String.
*/
/*获取作业执行计划,返回的格式为JSON格式*/
public String getExecutionPlan() {
return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}
/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
*/
public <F> F clean(F f) {
if (getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, getConfig().getClosureCleanerLevel(), true);
}
ClosureCleaner.ensureSerializable(f);
return f;
}
/**
* Adds an operator to the list of operators that should be executed when calling
* {@link #execute}.
*
* <p>When calling {@link #execute()} only the operators that where previously added to the list
* are executed.
*
* <p>This is not meant to be used by users. The API methods that create operators must call
* this method.
*/
/*添加转换*/
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
// --------------------------------------------------------------------------------------------
// Factory methods for ExecutionEnvironments
// --------------------------------------------------------------------------------------------
/**
* Creates an execution environment that represents the context in which the
* program is currently executed. If the program is invoked standalone, this
* method returns a local execution environment, as returned by
* {@link #createLocalEnvironment()}.
*
* @return The execution environment of the context in which the program is
* executed.
*/
/*创建执行环境,如果程序是在standalone模式下调用,则创建本地执行环境*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
* environment was created in. The default parallelism of the local
* environment is the number of hardware contexts (CPU cores / threads),
* unless it was specified differently by {@link #setParallelism(int)}.
*
* @return A local execution environment.
*/
/*在同一个JVM里创建多线程模式的本地运行环境,如果不特别设置并行度,则作业并行度默认为1*/
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
* environment was created in. It will use the parallelism specified in the
* parameter.
*
* @param parallelism
* The parallelism for the local environment.
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
return createLocalEnvironment(parallelism, new Configuration());
}
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
* environment was created in. It will use the parallelism specified in the
* parameter.
*
* @param parallelism
* The parallelism for the local environment.
* @param configuration
* Pass a custom configuration into the cluster
* @return A local execution environment with the specified parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;
currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}
/**
* Creates a {@link LocalStreamEnvironment} for local program execution that also starts the
* web monitoring UI.
*
* <p>The local execution environment will run the program in a multi-threaded fashion in
* the same JVM as the environment was created in. It will use the parallelism specified in the
* parameter.
*
* <p>If the configuration key 'rest.port' was set in the configuration, that particular
* port will be used for the web UI. Otherwise, the default port (8081) will be used.
*/
/*创建本地执行环境,并且提供WebUI,如果配置文件配置了rest.port则使用配置的端口,否则使用默认8081端口*/
public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 later
conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
return createLocalEnvironment(defaultLocalParallelism, conf);
}
/**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use no parallelism, unless the parallelism is set
* explicitly via {@link #setParallelism}.
*
* @param host
* The host name or address of the master (JobManager), where the
* program should be executed.
* 指定JobManager的服务器地址
* @param port
* The port of the master (JobManager), where the program should
* be executed.
* @param jarFiles
* The JAR files with code that needs to be shipped to the
* cluster. If the program uses user-defined functions,
* user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* 指定多个jar文件,这些文件需要能从集群访问到,一般可以把这些jar文件放在NFS文件系统进行存储
* @return A remote environment that executes the program on a cluster.
*/
/*创建远程运行环境,客户端会提交部分程序到环境去执行,需要注意所有的文件路径都可以在集群模式下能够访问到。
执行环境没有并行度,除非显示调用setParallelism
JobManager服务器地址,端口,多个jar文件
*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, String... jarFiles) {
return new RemoteStreamEnvironment(host, port, jarFiles);
}
/**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use the specified parallelism.
*
* @param host
* The host name or address of the master (JobManager), where the
* program should be executed.
* @param port
* The port of the master (JobManager), where the program should
* be executed.
* @param parallelism
* The parallelism to use during the execution.
* @param jarFiles
* The JAR files with code that needs to be shipped to the
* cluster. If the program uses user-defined functions,
* user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
/*创建远程运行环境,设置作业并行度,JobManager服务器地址,端口,多个jar文件*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, int parallelism, String... jarFiles) {
RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
env.setParallelism(parallelism);
return env;
}
/**
* Creates a {@link RemoteStreamEnvironment}. The remote environment sends
* (parts of) the program to a cluster for execution. Note that all file
* paths used in the program must be accessible from the cluster. The
* execution will use the specified parallelism.
*
* @param host
* The host name or address of the master (JobManager), where the
* program should be executed.
* @param port
* The port of the master (JobManager), where the program should
* be executed.
* @param clientConfig
* The configuration used by the client that connects to the remote cluster.
* @param jarFiles
* The JAR files with code that needs to be shipped to the
* cluster. If the program uses user-defined functions,
* user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
/*创建远程运行环境,使用用户自定义配置文件,JobManager服务器地址,端口,多个jar文件*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, Configuration clientConfig, String... jarFiles) {
return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
}
/**
* Gets the default parallelism that will be used for the local execution environment created by
* {@link #createLocalEnvironment()}.
*
* @return The default local parallelism
*/
/*在本地运行环境中,取默认作业并行度*/
public static int getDefaultLocalParallelism() {
return defaultLocalParallelism;
}
/**
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
*
* @param parallelism The parallelism to use as the default local parallelism.
*/
/*本地运行环境中,设置作业并行度*/
public static void setDefaultLocalParallelism(int parallelism) {
defaultLocalParallelism = parallelism;
}
// --------------------------------------------------------------------------------------------
// Methods to control the context and local environments for execution from packaged programs
// --------------------------------------------------------------------------------------------
/*初始化运行环境上下文*/
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}
/*重置运行环境上下文*/
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
threadLocalContextEnvironmentFactory.remove();
}
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (which will be distributed via BlobServer), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* 使用指定的名称注册文件到分布式缓存,这些文件需要可以在分布式运行环境下访问到本地文件路径。
* 文件可以存放在本地文件系统(通过BlobServer实现分布式),或者存放在分布式文件系统。
* 需要的时候,程序会动态拷贝这些文件到临时缓存,所以需要注意如果文件太大,跨网络拷贝会增加延迟。
*
* <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name) {
registerCachedFile(filePath, name, false);
}
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (which will be distributed via BlobServer), or files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
*
* <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
* @param executable flag indicating whether the file should be executable
* 文件是否是可执行文件,如是否包含main函数入口
*/
public void registerCachedFile(String filePath, String name, boolean executable) {
this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
}
/*通过反射或者ResultTypeQueryable的接口确定数据类型*/
// Private helpers.
"unchecked") (
private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
Object source,
String sourceName,
Class<?> baseSourceClass,
TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = typeInfo;
if (source instanceof ResultTypeQueryable) {
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
}
if (resolvedTypeInfo == null) {
try {
resolvedTypeInfo = TypeExtractor.createTypeInfo(
baseSourceClass,
source.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
return (T) resolvedTypeInfo;
}
}
以上是关于flink源码解读之StreamExecutionEnvironment的主要内容,如果未能解决你的问题,请参考以下文章