如何在单个 Spark 作业中摄取不同的 Spark 数据帧

Posted

技术标签:

【中文标题】如何在单个 Spark 作业中摄取不同的 Spark 数据帧【英文标题】:How to ingest different spark dataframes in a single spark job 【发布时间】:2020-05-04 20:33:08 【问题描述】:

我想在处理不同输入源但使用尽可能少的计算资源的 Spark 中编写 ETL 管道,并且在使用“传统”Spark ETL 方法时遇到问题。

我有许多需要保存到 DeltaLake 表中的流数据源。每个数据源只是 s3 中的一个文件夹,其中包含 avro 文件。每个数据源都有不同的模式。每个数据源都应该保存在它自己的 DeltaLake 表中。除了 avro -> delta 之外,几乎不需要转换,只需要从文件名派生的一些附加字段进行丰富。 新文件的添加速度适中,从一分钟一次到一天一次,具体取决于数据源。当新数据到达时,我有一个 kafka 通知,描述了什么样的数据和 s3 文件路径。

假设有两个数据源 - A 和 B。A 是 s3://bucket/A/* 文件,B - s3://bucket/B/*。每当添加新文件时,我都会收到一条带有有效负载 'datasource': 'A', filename: 's3://bucket/A/file1', ... other fields 的 kafka 消息。 A 文件应该去 delta 表 s3://delta/A/, B - s3://delta/B/

如何在单个 Spark 应用程序中以最小的延迟将它们全部摄取? 随着需求数据的不断涌现,听起来像是流媒体。但是在火花流中,需要预先定义流模式,并且我有不同的源,不同的模式是未知的。

为每个数据源启动一个专用的 spark 应用程序不是一种选择 - 有 100 多个数据源到达的文件非常小。拥有 100 多个 spark 应用程序是浪费金钱。都应该使用大小适中的单个集群来摄取。

我现在唯一的想法:在驱动程序进程中运行一个普通的 kafka 消费者,为每条记录读取一个数据帧,丰富额外的字段并坚持到它的增量表。更多并行性 - 使用多条消息并在未来运行它们,因此多个作业同时运行。 一些伪代码,在驱动进程中:

val consumer = KafkaConsumer(...)
consumer.foreachrecord =>
    val ds = record.datasource
    val file = record.filename
    val df = spark.read.format(avro).load(file)
        .withColumn('id', record.id)
    val dest = s"s3://delta/$record.datasourceName"
    df.write.format('delta').save(dest)
    consumer.commit(offset from record)

听起来不错(PoC 显示它有效),但我想知道是否还有其他选择?任何其他想法表示赞赏。 Spark 在 DataBricks 平台上运行。

【问题讨论】:

【参考方案1】:

Spark 不限制您为每个数据源摄取都使用一个 spark 应用程序,您可以将数据源分组到几个 spark 应用程序中,或者您可以为所有数据源使用一个 spark 应用程序,如果 spark 应用程序有,这是一种可行的方法有足够的资源来摄取和处理所有数据源。

你可以这样做:

object StreamingJobs extends SparkApp 

  // consume from Kafka Topic 1
  StreamProcess_1.runStream(spark)

  // consume from Kafka Topic 2
  StreamProcess_2.runStream(spark)

  //  consume from Kafka Topic n
  StreamProcess_N.runStream(spark)

  // wait until termination
  spark.streams.awaitAnyTermination()


也许还有另一个用于批处理的 spark 作业

object BatchedJobs extends SparkApp 

  // consume from data source 1
  BatchedProcess_1.run(spark)

  // consume from  data source 2
  BatchedProcess_2.run(spark)

  //  consume from  data source n
  BatchedProcess_N.run(spark) 


【讨论】:

我想,问题更多是关于“如何加载具有不同模式的多个表”,而不是“如何在 Databricks 中进行并行作业”。我也在寻找答案。

以上是关于如何在单个 Spark 作业中摄取不同的 Spark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

Spark:从具有不同内存/核心配置的单个JVM作业同时启动

从 google pubsub 到 spark 流的数据摄取速度很慢

如何在单个 Spark 作业中调用多个 writeStream 操作?

如何将 Spark SQL 批处理作业结果写入 Apache Druid?

10期末大作业

如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?