Posted davidwang456
Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State。针对状态数据得持久化,Flink提供了Checkpoint机制处理;针对状态数据,Flink提供了不同的状态管理器来管理状态数据,如MemoryStateBackend。
上面Flink的文章中,有引用word count的例子,但是都没有包含状态管理。也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。
从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。
所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到memory/file system/rocksdb等。Flink通过定期地做checkpoint来实现容错和恢复。
/** * This is the core interface for <i>stateful transformation functions</i>, meaning functions * that maintain state across individual stream records. * While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the * greatest flexibility in managing both <i>keyed state</i> and <i>operator state</i>. * * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common lightweight * ways to setup stateful functions typically used instead of the full fledged * abstraction represented by this interface. * * <h1>Initialization</h1> * The @link CheckpointedFunction#initializeState(FunctionInitializationContext) is called when * the parallel instance of the transformation function is created during distributed execution. * The method gives access to the @link FunctionInitializationContext which in turn gives access * to the to the @link OperatorStateStore and @link KeyedStateStore. * * <p>The @code OperatorStateStore and @code KeyedStateStore give access to the data structures * in which state should be stored for Flink to transparently manage and checkpoint it, such as * @link org.apache.flink.api.common.state.ValueState or * @link org.apache.flink.api.common.state.ListState. * * <p><b>Note:</b> The @code KeyedStateStore can only be used when the transformation supports * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a @code keyBy(...)). * * <h1>Snapshot</h1> * The @link CheckpointedFunction#snapshotState(FunctionSnapshotContext) is called whenever a * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically * make sure that the checkpointed data structures (obtained in the initialization phase) are up * to date for a snapshot to be taken. The given snapshot context gives access to the metadata * of the checkpoint. * * <p>In addition, functions can use this method as a hook to flush/commit/synchronize with * external systems. * * <h1>Example</h1> * The code example below illustrates how to use this interface for a function that keeps counts * of events per key and per parallel partition (parallel instance of the transformation function * during distributed execution). * The example also changes of parallelism, which affect the count-per-parallel-partition by * adding up the counters of partitions that get merged on scale-down. Note that this is a * toy example, but should illustrate the basic skeleton for a stateful function. * * <p><pre>@code * public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction * * private ReducingState<Long> countPerKey; * private ListState<Long> countPerPartition; * * private long localCount; * * public void initializeState(FunctionInitializationContext context) throws Exception * // get the state data structure for the per-key state * countPerKey = context.getKeyedStateStore().getReducingState( * new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class)); * * // get the state data structure for the per-partition state * countPerPartition = context.getOperatorStateStore().getOperatorState( * new ListStateDescriptor<>("perPartitionCount", Long.class)); * * // initialize the "local count variable" based on the operator state * for (Long l : countPerPartition.get()) * localCount += l; * * * * public void snapshotState(FunctionSnapshotContext context) throws Exception * // the keyed state is always up to date anyways * // just bring the per-partition state in shape * countPerPartition.clear(); * countPerPartition.add(localCount); * * * public T map(T value) throws Exception * // update the states * countPerKey.add(1L); * localCount++; * * return value; * * * </pre> * * <hr> * * <h1><a name="shortcuts">Shortcuts</a></h1> * There are various ways that transformation functions can use state without implementing the * full-fledged @code CheckpointedFunction interface: * * <h4>Operator State</h4> * Checkpointing some state that is part of the function object itself is possible in a simpler way * by directly implementing the @link ListCheckpointed interface. * That mechanism is similar to the previously used @link Checkpointed interface. * * <h4>Keyed State</h4> * Access to keyed state is possible via the @link RuntimeContext‘s methods: * <pre>@code * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> * * private ValueState<Long> count; * * public void open(Configuration cfg) throws Exception * count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class)); * * * public T map(T value) throws Exception * Long current = count.get(); * count.update(current == null ? 1L : current + 1); * * return value; * * * </pre> * * @see ListCheckpointed * @see RuntimeContext */
2.1. 它的snapshotState调用过程如下:
/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more @link StreamOperators which form * the Task‘s operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * </pre> * * <p>The @code StreamTask has a lock object called @code lock. All calls to methods on a * @code StreamOperator must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> */
/** * A <b>State Backend</b> defines how the state of a streaming application is stored and * checkpointed. Different State Backends store their state in different fashions, and use * different data structures to hold the state of a running application. * * <p>For example, the @link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend * keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the * JobManager. The backend is lightweight and without additional dependencies, but not highly available * and supports only small state. * * <p>The @link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend * keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem * (typically a replicated highly-available filesystem, like <a href="https://hadoop.apache.org/">HDFS</a>, * <a href="https://ceph.com/">Ceph</a>, <a href="https://aws.amazon.com/documentation/s3/">S3</a>, * <a href="https://cloud.google.com/storage/">GCS</a>, etc). * * <p>The @code RocksDBStateBackend stores working state in <a href="http://rocksdb.org/">RocksDB</a>, * and checkpoints the state by default to a filesystem (similar to the @code FsStateBackend). * * <h2>Raw Bytes Storage and Backends</h2> * * The @code StateBackend creates services for <i>raw bytes storage</i> and for <i>keyed state</i> * and <i>operator state</i>. * * <p>The <i>raw bytes storage</i> (through the @link CheckpointStreamFactory) is the fundamental * service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager * to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state * backends to store checkpointed state. * * <p>The @link AbstractKeyedStateBackend and @link OperatorStateBackend created by this state * backend define how to hold the working state for keys and operators. They also define how to checkpoint * that state, frequently using the raw bytes storage (via the @code CheckpointStreamFactory). * However, it is also possible that for example a keyed state backend simply implements the bridge to * a key/value store, and that it does not need to store anything in the raw byte storage upon a * checkpoint. * * <h2>Serializability</h2> * * State Backends need to be @link java.io.Serializable serializable, because they distributed * across parallel processes (for distributed execution) together with the streaming application code. * * <p>Because of that, @code StateBackend implementations (typically subclasses * of @link AbstractStateBackend) are meant to be like <i>factories</i> that create the proper * states stores that provide access to the persistent storage and hold the keyed- and operator * state data structures. That way, the State Backend can be very lightweight (contain only * configurations) which makes it easier to be serializable. * * <h2>Thread Safety</h2> * * State backend implementations have to be thread-safe. Multiple threads may be creating * streams and keyed-/operator state backends concurrently. */
/** * Savepoints are manually-triggered snapshots from which a program can be * resumed on submission. * * <p>In order to allow changes to the savepoint format between Flink versions, * we allow different savepoint implementations (see subclasses of this * interface). * * <p>Savepoints are serialized via a @link SavepointSerializer. */
5.Querable State
Queryable State,顾名思义,就是可查询的状态,表示这个状态,在流计算的过程中就可以被查询,而不像其他流计算框架,需要存储到外部系统中才能被查询。目前可查询的state主要针对partitionable state,如keyed state等。
简单来说,当用户在job中定义了queryable state之后,就可以在外部,通过QueryableStateClient
,通过job id, state name, key来查询所对应的状态的实时的值。
5.1 QueryableStateClient
/** * Client for querying Flink‘s managed state. * * <p>You can mark state as queryable via @link StateDescriptor#setQueryable(String). * The state instance created from this descriptor will be published for queries when it‘s * created on the Task Managers and the location will be reported to the Job Manager. * * <p>The client connects to a @code Client Proxy running on a given Task Manager. The * proxy is the entry point of the client to the Flink cluster. It forwards the requests * of the client to the Job Manager and the required Task Manager, and forwards the final * response back the client. * * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved * locations are cached. When the server address of the requested KvState instance is determined, the * client sends out a request to the server. The returned final answer is then forwarded to the Client. */
/** * Returns a future holding the serialized request result. * * @param jobId JobID of the job the queryable state * belongs to * @param queryableStateName Name under which the state is queryable * @param keyHashCode Integer hash code of the key (result of * a call to @link Object#hashCode() * @param serializedKeyAndNamespace Serialized key and namespace to query * KvState instance with * @return Future holding the serialized result */ private CompletableFuture<KvStateResponse> getKvState( final JobID jobId, final String queryableStateName, final int keyHashCode, final byte[] serializedKeyAndNamespace) LOG.debug("Sending State Request to .", remoteAddress); try KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace); return client.sendRequest(remoteAddress, request); catch (Exception e) LOG.error("Unable to send KVStateRequest: ", e); return FutureUtils.getFailedFuture(e);
5.2 KvStateServer
/** * An interface for the Queryable State Server running on each Task Manager in the cluster. * This server is responsible for serving requests coming from the @link KvStateClientProxy * Queryable State Proxy and requesting <b>locally</b> stored state. */
6. 总结