Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
Posted snail_gesture
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密相关的知识,希望对你有一定的参考价值。
背景:
整个Spark Streaming是按照Batch Duractions划分Job的。但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD,所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成核心的步骤。
源码分析:
1. 无论是updateStateByKey还是mapWithState方法在DStream中均没有,但是是通过隐身转换函数实现其功能。
object DStream
// `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
// `import StreamingContext._` to enable it. Now we move it here to make the compiler find
// it automatically. However, we still keep the old function in StreamingContext for backward
// compatibility and forward to the following function directly.
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
PairDStreamFunctions[K, V] =
new PairDStreamFunctions[K, V](stream)
updateStateByKey:
1. 在PairDStreamFunctions中updateStateByKey具体实现如下:
在已有的历史基础上,updateFunc对历史数据进行更新。该函数的返回值是DStream类型的。
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope
// defaultPartitioner
updateStateByKey(updateFunc, defaultPartitioner())
2. defaultPartitioner:
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) =
new HashPartitioner(numPartitions)
3. partitioner就是控制RDD的每个patition
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = ssc.withScope
val cleanedUpdateF = sparkContext.clean(updateFunc)
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) =>
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
updateStateByKey(newUpdateFunc, partitioner, true)
4. rememberPartitioner默认为true
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
): DStream[(K, S)] = ssc.withScope
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
5. 在StateDStream中,StorageLevel是直接存储到磁盘,因为此时的数据非常大
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
initialRDD : Option[RDD[(K, S)]]
) extends DStream[(K, S)](parent.ssc)
super.persist(StorageLevel.MEMORY_ONLY_SER)
- 在computeUsingPreiviousRDD源码如下:
private [this] def computeUsingPreviousRDD (
parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) =
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) =>
val i = iterator.map(t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
)
updateFuncLocal(i)
//cogroup每次计算的时候都会遍历prevSrateRDD中的所有parititioner的信息
//
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
所以,如果数据很多的时候不建议使用updateStateByKey。
updateStateByKey函数实现如下:
mapWithState:
1. 返回MapWithStateDStream函数,维护和更新历史状态都是基于Key。使用一个function对key-value形式的数据进行状态维护。
/**
* :: Experimental ::
* Return a [[MapWithStateDStream]] by applying a function to every key-value element of
* `this` stream, while maintaining some state data for each unique key. The mapping function
* and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
* transformation can be specified using [[StateSpec]] class. The state data is accessible in
* as a parameter of type [[State]] in the mapping function.
*
* Example of using `mapWithState`:
*
* // A mapping function that maintains an integer state and return a String
//此时的state就可以看成一张表,这张表记录了状态维护中所有的历史状态。
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] =
* // Use state.exists(), state.get(), state.update() and state.remove()
* // to manage state, and return the necessary string
*
*
* val spec = StateSpec.function(mappingFunction).numPartitions(10)
*
* val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
*
*
* @param spec Specification of this transformation
* @tparam StateType Class type of the state data
* @tparam MappedType Class type of the mapped data
*/
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] =
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
// StateSpecImpl类封装了StateSpec操作。
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
2. MapWithStateDStream源码如下:
/**
* :: Experimental ::
* DStream representing the stream of data generated by `mapWithState` operation on a
* [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]].
* Additionally, it also gives access to the stream of state snapshots, that is, the state data of
* all keys after a batch has updated them.
*
* @tparam KeyType Class of the key
* @tparam ValueType Class of the value
* @tparam StateType Class of the state data
* @tparam MappedType Class of the mapped data
*/
@Experimental
sealed abstract class MapWithStateDStream[KeyType, ValueType, StateType, MappedType: ClassTag](
ssc: StreamingContext) extends DStream[MappedType](ssc)
/** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */
def stateSnapshots(): DStream[(KeyType, StateType)]
/** Internal implementation of the [[MapWithStateDStream]] */
private[streaming] class MapWithStateDStreamImpl[
KeyType: ClassTag, ValueType: ClassTag, StateType: ClassTag, MappedType: ClassTag](
dataStream: DStream[(KeyType, ValueType)],
spec: StateSpecImpl[KeyType, ValueType, StateType, MappedType])
extends MapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream.context)
private val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
override def slideDuration: Duration = internalStream.slideDuration
override def dependencies: List[DStream[_]] = List(internalStream)
//计算的时候是通过InternalMapWithStateDStream来实现的。
override def compute(validTime: Time): Option[RDD[MappedType]] =
internalStream.getOrCompute(validTime).map _.flatMap[MappedType] _.mappedData
3. 更新历史数据。
/**
* A DStream that allows per-key state to be maintains, and arbitrary records to be generated
* based on updates to the state. This is the main DStream that implements the `mapWithState`
* operation on DStreams.
*
* @param parent (key, value) stream that is the source
* @param spec Specifications of the mapWithState operation
* @tparam K Key type
* @tparam V Value type
* @tparam S Type of the state maintained
* @tparam E Type of the mapped data
*/
private[streaming]
class InternalMapWithStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, E])
extends DStream[MapWithStateRDDRecord[K, S, E]](parent.context)
//不断的更新内存数据结构。
persist(StorageLevel.MEMORY_ONLY)
4. MapWithStateDStream.Compute
/** Method that generates a RDD for the given time */
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] =
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner))
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap _.stateMap.getAll() , partitioner, validTime)
else
rdd
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
//基于时间窗口创建RDD
// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse
context.sparkContext.emptyRDD[(K, V)]
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map interval =>
(validTime - interval).milliseconds
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
5. MapWithStateRDD: 是一个RDD,他本身包含了对mapWithState操作的数据,以及对数据怎么操作,MapWithStateRDDRecord代表了每个RDD的partition。
/**
* RDD storing the keyed states of `mapWithState` operation and corresponding mapped data.
* Each partition of this RDD has a single record of type [[MapWithStateRDDRecord]]. This contains a
* [[StateMap]] (containing the keyed-states) and the sequence of records returned by the mapping
* function of `mapWithState`.
* @param prevStateRDD The previous MapWithStateRDD on whose StateMap data `this` RDD
* will be created
* @param partitionedDataRDD The partitioned data RDD which is used update the previous StateMaps
* in the `prevStateRDD` to create `this` RDD
* @param mappingFunction The function that will be used to update state and return new data
* @param batchTime The time of the batch to which this RDD belongs to. Use to update
* @param timeoutThresholdTime The time to indicate which keys are timeout
*/
private[streaming] class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]],
private var partitionedDataRDD: RDD[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long]
) extends RDD[MapWithStateRDDRecord[K, S, E]](
partitionedDataRDD.sparkContext,
List(
new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
new OneToOneDependency(partitionedDataRDD))
)
@volatile private var doFullScan = false
require(prevStateRDD.partitioner.nonEmpty)
require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)
override val partitioner = prevStateRDD.partitioner
override def checkpoint(): Unit =
super.checkpoint()
doFullScan = true
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] =
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
6. updateRecordWithData: RDD本身不可变的,但是可以处理变化的数据。
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] =
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
val newStateMap = prevRecord.map _.stateMap.copy() . getOrElse new EmptyStateMap[K, S]()
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()
// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
dataIterator.foreach case (key, value) =>
wrappedState.wrap(newStateMap.get(key))
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved)
newStateMap.remove(key)
else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
//遍历当前所有batchTime的所有数据,然后使用自定义的函数对当前的batch数据进行计算,更新newStateMap数据结构。
// newStateMap是保存历史数据
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
mappedData ++= returned
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined)
newStateMap.getByTime(timeoutThresholdTime.get).foreach case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
// MapWithStateRDDRecord所代表的partition,从RDD的角度来说,没有变。但是内部变了。只是内部数据发送变化了。
MapWithStateRDDRecord(newStateMap, mappedData)
MapWithState实现如下:
总结:
以上是关于Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密的主要内容,如果未能解决你的问题,请参考以下文章
第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考