Spark Streaming源码解读之No Receivers详解
Posted snail_gesture
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming源码解读之No Receivers详解相关的知识,希望对你有一定的参考价值。
背景:
目前No Receivers在企业中使用的越来越多。No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。所以Spark Streaming就产生了自定义RDD –> KafkaRDD.
源码分析:
1. KafkaRDD:
**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,//因为传输的时候需要编码,所以需要Decoder
T <: Decoder[_]: ClassTag,
R: ClassTag] private[spark] (
sc: SparkContext,
kafkaParams: Map[String, String],
val offsetRanges: Array[OffsetRange],//offsetRanges指定数据范围
leaders: Map[TopicAndPartition, (String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges
2. HasOffsetRanges: RDD是a list of partitions.
/**
* Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
* offset ranges in RDDs generated by the direct Kafka DStream (see
* [[KafkaUtils.createDirectStream()]]).
*
//foreachRDD就可以获取当前batch Duractions中的产生的RDD的分区的数据。
* KafkaUtils.createDirectStream(...).foreachRDD rdd =>
* val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
* ...
*
*
*/
trait HasOffsetRanges
def offsetRanges: Array[OffsetRange]
3. Count: 此时的范围指的是多少条数据。OffsetRange指定了什么topic下的什么partiiton下的读取数据范围。
/**
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
final class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long) extends Serializable
import OffsetRange.OffsetRangeTuple
//offset消息的偏移量
/** Number of messages this OffsetRange refers to */
def count(): Long = untilOffset - fromOffset
- 在KafkaRDD中getPartition
override def getPartitions: Array[Partition] =
offsetRanges.zipWithIndex.map case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
.toArray
5. KafkaRDDPartition: 相当于Kafka数据来源的指针。或者相当于引用。清晰的说明数据在哪里。
/** @param topic kafka topic name
* @param partition kafka partition id
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
* @param port preferred kafka host's port
*/
private[kafka]
class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long,
val host: String, //就是读取数据来源的host,port同样是
val port: Int
) extends Partition
//KafkaRDD的一个partition只能属于一个topic
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
6. KafkaRDD中的compute计算每个数据分片
override def compute(thePart: Partition, context: TaskContext): Iterator[R] =
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
//如果fromOffset和untilOffset相等表面该消息是空
if (part.fromOffset == part.untilOffset)
log.info(s"Beginning offset $part.fromOffset is the same as ending offset " +
s"skipping $part.topic $part.partition")
Iterator.empty
else
new KafkaRDDIterator(part, context)
7. KafkaRDDIterator: 获取数据
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R]
context.addTaskCompletionListener context => closeIfNeeded()
log.info(s"Computing topic $part.topic, partition $part.partition " +
s"offsets $part.fromOffset -> $part.untilOffset")
val kc = new KafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null
// The idea is to use the provided preferred host, except on task retry atttempts,
// to minimize number of kafka metadata requests
private def connectLeader: SimpleConsumer =
if (context.attemptNumber > 0)
kc.connectLeader(part.topic, part.partition).fold(
errs => throw new SparkException(
s"Couldn't connect to leader for topic $part.topic $part.partition: " +
errs.mkString("\\n")),
consumer => consumer
)
else
kc.connect(part.host, part.port)
private def handleFetchErr(resp: FetchResponse)
if (resp.hasError)
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode)
log.error(s"Lost leader for topic $part.topic partition $part.partition, " +
s" sleeping for $kc.config.refreshLeaderBackoffMsms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
private def fetchBatch: Iterator[MessageAndOffset] =
val req = new FetchRequestBuilder()
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build()
val resp = consumer.fetch(req)
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
override def close(): Unit =
if (consumer != null)
consumer.close()
override def getNext(): R =
if (iter == null || !iter.hasNext)
iter = fetchBatch
if (!iter.hasNext)
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
else
val item = iter.next()
if (item.offset >= part.untilOffset)
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
else
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
8. KafkaCluster:封装了与Kafka集群交互
/**
* Convenience methods for interacting with a Kafka cluster.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
private[spark]
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable
举例:一般使用KafkaUtils的createDirectStream读取数据。
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
9. 在使用Kafka的Direct方式操作数据的时候,通过使用
/**
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
*
* Points to note:
* - No receivers: This stream does not use any receiver. It directly queries Kafka
* - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
* by the stream itself. For interoperability with Kafka monitoring tools that depend on
* Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
* You can access the offsets used in each batch from the generated RDDs (see
* [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
* - Failure Recovery: To recover from driver failures, you have to enable checkpointing
* in the [[StreamingContext]]. The information on consumed offset can be
* recovered from the checkpoint. See the programming guide for details (constraints, etc.).
* - End-to-end semantics: This stream ensures that every records is effectively received and
* transformed exactly once, but gives no guarantees on whether the transformed data are
* outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
* that the output operation is idempotent, or use transactions to output records atomically.
* See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
* to be set with Kafka broker(s) (NOT zookeeper servers), specified in
* host1:port1,host2:port2 form.
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] =
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
//kc 是Kafka的集群
//fromOffsets获取具体的偏移量
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
10. getFromOffsets:
private[kafka] def getFromOffsets(
kc: KafkaCluster,
kafkaParams: Map[String, String],
topics: Set[String]
): Map[TopicAndPartition, Long] =
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val result = for
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest"))
kc.getEarliestLeaderOffsets(topicPartitions)
else
kc.getLatestLeaderOffsets(topicPartitions)
).right
yield
leaderOffsets.map case (tp, lo) =>
//创建Kafka Direct DStream的时候,他会和Kafka集群进行交互,来获得他的partition和offset的信息。
//实质上是通过DirectKafkaInputDStream
(tp, lo.offset)
KafkaCluster.checkErrors(result)
11. 不同topic的partition对应着我们生成的KafkaRDD的partition.
/**
* A stream of @link org.apache.spark.streaming.kafka.KafkaRDD where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* Starting offsets are specified in advance,
* and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
* see @link org.apache.spark.streaming.kafka.KafkaCluster
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler function for translating each message into the desired type
*/
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging
12. DirectKafkaInputDStream源码如下:每次compute之后会产生KafkaRDD
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] =
// untilOffsets需要获取的数据区间。这样你就知道你要计算多少条数据了。
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
//创建RDD实例,所以这里DirectKafkaInputDStream和RDD是一一对应的。
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
val description = offsetRanges.filter offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
.map offsetRange =>
s"topic: $offsetRange.topic\\tpartition: $offsetRange.partition\\t" +
s"offsets: $offsetRange.fromOffset to $offsetRange.untilOffset"
.mkString("\\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
总体流程如下:
采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出。但是如果采用的Receiver的话就需要缓存。
2. 如果采用Receiver的方式的华,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。
以上是关于Spark Streaming源码解读之No Receivers详解的主要内容,如果未能解决你的问题,请参考以下文章
第15课:Spark Streaming源码解读之No Receivers彻底思考
Spark Streaming源码解读之No Receivers详解
Spark Streaming源码解读之No Receivers详解
(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考