解释 Spark 结构化流执行器和 Kafka 分区之间的映射

Posted

技术标签:

【中文标题】解释 Spark 结构化流执行器和 Kafka 分区之间的映射【英文标题】:Explain mapping between Spark Structured Streaming executors and Kafka partitions 【发布时间】:2017-10-09 06:11:32 【问题描述】:

我已经在 Kafka 主题上部署了一个包含 4 个工作人员的结构化流,其中包含 4 个分区。

我假设将为 4 个分区部署 4 个工作人员,工作人员分区之间存在一对一的映射。

但是,事实并非如此。所有分区都由同一个 Executor 提供服务。我通过检查执行程序的线程 ID 和日志来确认这一点。

是否有任何文档显示 Kafka 分区和 Spark 结构化流之间的相关性。另外,有没有我们可以调整的旋钮。

【问题讨论】:

【参考方案1】:

相关性为“1:n(executor:partitions)”:一个Kafka partition只能被一个executor消费,一个executor可以消费多个Kafka partition。

这与 Spark Streaming 一致。


对于结构化流,默认模型是“微批处理模型”,“连续处理模型”仍处于“实验”状态。

对于“微批处理模型”,在“KafkaSource.scala”中有

 *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
 *     data from Kafka topic + partition is consistently read by the same executors across
 *     batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
 *     docs on [[KafkaSourceRDD]] for more details.

在“KafkaSourceRDD”中

/**
 * An RDD that reads data from Kafka based on offset ranges across multiple partitions.
 * Additionally, it allows preferred locations to be set for each topic + partition, so that
 * the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
 * and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
 *
 * ...
 */
private[kafka010] class KafkaSourceRDD(

我们知道默认位置策略是LocationStrategies.PreferConsistent


对于“连续处理模型”,在“KafkaContinuousReader.scala”中

  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = 
    ...
    startOffsets.toSeq.map 
      case (topicPartition, start) =>
        KafkaContinuousDataReaderFactory(
          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
          .asInstanceOf[DataReaderFactory[UnsafeRow]]
    .asJava
  

/**
 * A data reader factory for continuous Kafka processing. This will be serialized and transformed
 * into a full reader on executors.
 *
 * @param topicPartition The (topic, partition) pair this task is responsible for.
 * ...
 */
case class KafkaContinuousDataReaderFactory(
    topicPartition: TopicPartition,
    startOffset: Long,
    kafkaParams: ju.Map[String, Object],
    pollTimeoutMs: Long,
    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] 
  override def createDataReader(): KafkaContinuousDataReader = 
    new KafkaContinuousDataReader(
      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
  

我们可以知道每个(topic, partition) 将被包含在一个工厂中,然后将在一个执行器中。

【讨论】:

虽然在文档中执行器将从 kafka 主题分区中消耗,但当我尝试读取流时,我发现驱动程序正在从所有分区而不是执行器中消耗【参考方案2】:

如果您使用 DirectStream API,则相关性为 1:1(sparkcore:partition)。来自spark streaming guide,

Kafka 0.10 的 Spark Streaming 集成在设计上类似于 0.8 Direct Stream 方法。它提供简单的并行性,1:1 Kafka 分区和 Spark 分区之间的对应关系,以及 访问偏移量和元数据

【讨论】:

谢谢维涅什。但是,我希望挖掘结构化流式处理方法。结构化流也一样吗? 结构化流和旧的火花流的本质区别在于,在火花流中你得到一个 DStream,你在结构化流中得到一个流数据帧。 1:1 并行性保持不变。

以上是关于解释 Spark 结构化流执行器和 Kafka 分区之间的映射的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?

Spark和Kafka环境安装

使用 python 构建 Spark 结构化流

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

通过点击流分析确定热门主题,Apache Spark + Kafka 组合了解一下!

Kafka 主题分区到 Spark 流