Spark Streaming源码解读之No Receivers详解

Posted snail_gesture

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming源码解读之No Receivers详解相关的知识,希望对你有一定的参考价值。

背景:
目前No Receivers在企业中使用的越来越多。No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。所以Spark Streaming就产生了自定义RDD –> KafkaRDD.

源码分析:
1. KafkaRDD:

**
 * A batch-oriented interface for consuming from Kafka.
 * Starting and ending offsets are specified in advance,
 * so that you can control exactly-once semantics.
 * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
 * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
 * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
 * @param messageHandler function for translating each message into the desired type
 */
private[kafka]
class KafkaRDD[
  K: ClassTag,
  V: ClassTag,
  U <: Decoder[_]: ClassTag,//因为传输的时候需要编码,所以需要Decoder
  T <: Decoder[_]: ClassTag,
  R: ClassTag] private[spark] (
    sc: SparkContext,
    kafkaParams: Map[String, String],
    val offsetRanges: Array[OffsetRange],//offsetRanges指定数据范围
    leaders: Map[TopicAndPartition, (String, Int)],
    messageHandler: MessageAndMetadata[K, V] => R
  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges 
2.  HasOffsetRanges: RDD是a list of partitions.
/**
 * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
 * offset ranges in RDDs generated by the direct Kafka DStream (see
 * [[KafkaUtils.createDirectStream()]]).
 * 
//foreachRDD就可以获取当前batch Duractions中的产生的RDD的分区的数据。
 *   KafkaUtils.createDirectStream(...).foreachRDD  rdd =>
 *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 *      ...
 *   
 * 
 */

trait HasOffsetRanges 
  def offsetRanges: Array[OffsetRange]

3.  Count: 此时的范围指的是多少条数据。OffsetRange指定了什么topic下的什么partiiton下的读取数据范围。
/**
 * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
 * can be created with `OffsetRange.create()`.
 * @param topic Kafka topic name
 * @param partition Kafka partition id
 * @param fromOffset Inclusive starting offset
 * @param untilOffset Exclusive ending offset
 */

final class OffsetRange private(
    val topic: String,
    val partition: Int,
    val fromOffset: Long,
    val untilOffset: Long) extends Serializable 
  import OffsetRange.OffsetRangeTuple
//offset消息的偏移量
/** Number of messages this OffsetRange refers to */
def count(): Long = untilOffset - fromOffset
  1. 在KafkaRDD中getPartition
override def getPartitions: Array[Partition] = 
  offsetRanges.zipWithIndex.map  case (o, i) =>
      val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
      new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
  .toArray

5.  KafkaRDDPartition:  相当于Kafka数据来源的指针。或者相当于引用。清晰的说明数据在哪里。
/** @param topic kafka topic name
  * @param partition kafka partition id
  * @param fromOffset inclusive starting offset
  * @param untilOffset exclusive ending offset
  * @param host preferred kafka host, i.e. the leader at the time the rdd was created
  * @param port preferred kafka host's port
  */
private[kafka]
class KafkaRDDPartition(
  val index: Int,
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long,
  val host: String, //就是读取数据来源的host,port同样是
  val port: Int
) extends Partition 
//KafkaRDD的一个partition只能属于一个topic
  /** Number of messages this partition refers to */
  def count(): Long = untilOffset - fromOffset

6.  KafkaRDD中的compute计算每个数据分片
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = 
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
//如果fromOffset和untilOffset相等表面该消息是空
  if (part.fromOffset == part.untilOffset) 
    log.info(s"Beginning offset $part.fromOffset is the same as ending offset " +
      s"skipping $part.topic $part.partition")
    Iterator.empty
   else 
    new KafkaRDDIterator(part, context)
  

7.  KafkaRDDIterator: 获取数据
private class KafkaRDDIterator(
    part: KafkaRDDPartition,
    context: TaskContext) extends NextIterator[R] 

  context.addTaskCompletionListener context => closeIfNeeded() 

  log.info(s"Computing topic $part.topic, partition $part.partition " +
    s"offsets $part.fromOffset -> $part.untilOffset")

  val kc = new KafkaCluster(kafkaParams)
  val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(kc.config.props)
    .asInstanceOf[Decoder[K]]
  val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(kc.config.props)
    .asInstanceOf[Decoder[V]]
  val consumer = connectLeader
  var requestOffset = part.fromOffset
  var iter: Iterator[MessageAndOffset] = null

  // The idea is to use the provided preferred host, except on task retry atttempts,
  // to minimize number of kafka metadata requests
  private def connectLeader: SimpleConsumer = 
    if (context.attemptNumber > 0) 
      kc.connectLeader(part.topic, part.partition).fold(
        errs => throw new SparkException(
          s"Couldn't connect to leader for topic $part.topic $part.partition: " +
            errs.mkString("\\n")),
        consumer => consumer
      )
     else 
      kc.connect(part.host, part.port)
    
  

  private def handleFetchErr(resp: FetchResponse) 
    if (resp.hasError) 
      val err = resp.errorCode(part.topic, part.partition)
      if (err == ErrorMapping.LeaderNotAvailableCode ||
        err == ErrorMapping.NotLeaderForPartitionCode) 
        log.error(s"Lost leader for topic $part.topic partition $part.partition, " +
          s" sleeping for $kc.config.refreshLeaderBackoffMsms")
        Thread.sleep(kc.config.refreshLeaderBackoffMs)
      
      // Let normal rdd retry sort out reconnect attempts
      throw ErrorMapping.exceptionFor(err)
    
  

  private def fetchBatch: Iterator[MessageAndOffset] = 
    val req = new FetchRequestBuilder()
      .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
      .build()
    val resp = consumer.fetch(req)
    handleFetchErr(resp)
    // kafka may return a batch that starts before the requested offset
    resp.messageSet(part.topic, part.partition)
      .iterator
      .dropWhile(_.offset < requestOffset)
  

  override def close(): Unit = 
    if (consumer != null) 
      consumer.close()
    
  

  override def getNext(): R = 
    if (iter == null || !iter.hasNext) 
      iter = fetchBatch
    
    if (!iter.hasNext) 
      assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
      finished = true
      null.asInstanceOf[R]
     else 
      val item = iter.next()
      if (item.offset >= part.untilOffset) 
        assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
        finished = true
        null.asInstanceOf[R]
       else 
        requestOffset = item.nextOffset
        messageHandler(new MessageAndMetadata(
          part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
      
    
  

8.  KafkaCluster:封装了与Kafka集群交互
/**
 * Convenience methods for interacting with a Kafka cluster.
 * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
 * configuration parameters</a>.
 *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
 *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
 */
private[spark]
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable 

举例:一般使用KafkaUtils的createDirectStream读取数据。

KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
9.  在使用Kafka的Direct方式操作数据的时候,通过使用
/**
 * Create an input stream that directly pulls messages from Kafka Brokers
 * without using any receiver. This stream can guarantee that each message
 * from Kafka is included in transformations exactly once (see points below).
 *
 * Points to note:
 *  - No receivers: This stream does not use any receiver. It directly queries Kafka
 *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
 *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
 *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
 *    You can access the offsets used in each batch from the generated RDDs (see
 *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
 *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
 *    in the [[StreamingContext]]. The information on consumed offset can be
 *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
 *  - End-to-end semantics: This stream ensures that every records is effectively received and
 *    transformed exactly once, but gives no guarantees on whether the transformed data are
 *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
 *    that the output operation is idempotent, or use transactions to output records atomically.
 *    See the programming guide for more details.
 *
 * @param ssc StreamingContext object
 * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
 *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
 *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
 *   host1:port1,host2:port2 form.
 *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
 *   to determine where the stream starts (defaults to "largest")
 * @param topics Names of the topics to consume
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
def createDirectStream[
  K: ClassTag,
  V: ClassTag,
  KD <: Decoder[K]: ClassTag,
  VD <: Decoder[V]: ClassTag] (
    ssc: StreamingContext,
    kafkaParams: Map[String, String],
    topics: Set[String]
): InputDStream[(K, V)] = 
  val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

  val kc = new KafkaCluster(kafkaParams)
//kc 是Kafka的集群
//fromOffsets获取具体的偏移量
  val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
  new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
    ssc, kafkaParams, fromOffsets, messageHandler)

    10. getFromOffsets:
private[kafka] def getFromOffsets(
    kc: KafkaCluster,
    kafkaParams: Map[String, String],
    topics: Set[String]
  ): Map[TopicAndPartition, Long] = 
  val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
  val result = for 
    topicPartitions <- kc.getPartitions(topics).right
    leaderOffsets <- (if (reset == Some("smallest")) 
      kc.getEarliestLeaderOffsets(topicPartitions)
     else 
      kc.getLatestLeaderOffsets(topicPartitions)
    ).right
   yield 
    leaderOffsets.map  case (tp, lo) =>
//创建Kafka Direct DStream的时候,他会和Kafka集群进行交互,来获得他的partition和offset的信息。
//实质上是通过DirectKafkaInputDStream
        (tp, lo.offset)
    
  
  KafkaCluster.checkErrors(result)
    11. 不同topic的partition对应着我们生成的KafkaRDD的partition.
/**
 *  A stream of @link org.apache.spark.streaming.kafka.KafkaRDD where
 * each given Kafka topic/partition corresponds to an RDD partition.
 * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
 *  of messages
 * per second that each '''partition''' will accept.
 * Starting offsets are specified in advance,
 * and this DStream is not responsible for committing offsets,
 * so that you can control exactly-once semantics.
 * For an easy interface to Kafka-managed offsets,
 *  see @link org.apache.spark.streaming.kafka.KafkaCluster
 * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
 * configuration parameters</a>.
 *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
 *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
 * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
 *  starting point of the stream
 * @param messageHandler function for translating each message into the desired type
 */
private[streaming]
class DirectKafkaInputDStream[
  K: ClassTag,
  V: ClassTag,
  U <: Decoder[K]: ClassTag,
  T <: Decoder[V]: ClassTag,
  R: ClassTag](
    ssc_ : StreamingContext,
    val kafkaParams: Map[String, String],
    val fromOffsets: Map[TopicAndPartition, Long],
    messageHandler: MessageAndMetadata[K, V] => R
  ) extends InputDStream[R](ssc_) with Logging 
12. DirectKafkaInputDStream源码如下:每次compute之后会产生KafkaRDD
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = 
// untilOffsets需要获取的数据区间。这样你就知道你要计算多少条数据了。
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
//创建RDD实例,所以这里DirectKafkaInputDStream和RDD是一一对应的。
  val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number and metadata of this batch interval to InputInfoTracker.
  val offsetRanges = currentOffsets.map  case (tp, fo) =>
    val uo = untilOffsets(tp)
    OffsetRange(tp.topic, tp.partition, fo, uo.offset)
  
  val description = offsetRanges.filter  offsetRange =>
    // Don't display empty ranges.
    offsetRange.fromOffset != offsetRange.untilOffset
  .map  offsetRange =>
    s"topic: $offsetRange.topic\\tpartition: $offsetRange.partition\\t" +
      s"offsets: $offsetRange.fromOffset to $offsetRange.untilOffset"
  .mkString("\\n")
  // Copy offsetRanges to immutable.List to prevent from being modified by the user
  val metadata = Map(
    "offsets" -> offsetRanges.toList,
    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
  val inputInfo = StreamInputInfo(id, rdd.count, metadata)
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
  Some(rdd)

总体流程如下:

采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出。但是如果采用的Receiver的话就需要缓存。
2. 如果采用Receiver的方式的华,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。

以上是关于Spark Streaming源码解读之No Receivers详解的主要内容,如果未能解决你的问题,请参考以下文章

第15课:Spark Streaming源码解读之No Receivers彻底思考

Spark Streaming源码解读之No Receivers详解

Spark Streaming源码解读之No Receivers详解

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

Spark Streaming源码解读之Driver容错安全性

Spark Streaming源码解读之Driver容错安全性