如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点

Posted

技术标签:

【中文标题】如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点【英文标题】:How to load data at Spark Dataframe in each Worker to prevent loading huge data to Master node 【发布时间】:2021-10-16 16:23:03 【问题描述】:

我可以使用以下代码从主节点中的 Oracle 数据库中读取数据:

 val spark = SparkSession
            .builder
            .master("local[4]")
            .config("spark.executor.memory", "8g")
            .config("spark.executor.cores", 4)
            .config("spark.task.cpus",1)
            .appName("Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate()

 val jdbcDF = spark.read
              .format("jdbc")
              .option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcldb")
              .option("dbtable", "table")
              .option("user", "orcl")
              .option("password", "********")
              .load()

然后我可以在 Worker 之间重新分配 Dataframe:

  val test = jdbcDF.repartition(8,col("ID_Col"))
  test.explain

我的问题是我的数据量很大,无法放入主 RAM。因此,我希望每个节点分别读取自己的数据。我想知道是否有任何方法可以从每个 Worker 的数据库中读取数据并将它们加载到 Spark Dataframe。实际上,我想分别使用Scala或Python将数据加载到每个Worker Node中的Spark Dataframe。

请指导我如何做到这一点?

非常感谢任何帮助。

【问题讨论】:

这能回答你的问题吗? Why spark is slower when compared to sqoop , when it comes to jdbc? 您是希望每个工作人员都读入数据以加快读取速度,还是希望每个工作人员都有一份数据副本?如果是后者,请查看broadcast Afaik 它不通过主人。 亲爱的@JarrodBaker 谢谢你的回答,我确实希望每个工人都读取数据以加快读取速度;此外,我有大量数据,它们不能都在一个节点中(我的意思是 Master 这里)。只要,我知道,Master读取数据并使用broadcast在节点之间共享数据。这不是我想要的。 @thebluephantom,你能解释一下吗?谢谢。 【参考方案1】:

使用local,您可以像 YARN 一样使用 not have a Resource Mgr。你有no Workers,但你可以并行运行东西,只要local[n]在具有N个内核的同一台机器上适当设置。

如果您遵循 Alex Ott 的建议并阅读,您将不会加载到 Master。

您可以在使用spark.read.jdbc 读取数据时使用参数lowerBound, upperBound, numPartitions 来提高加载速度,在Workers 上使用Cores 而不是Executors。这就是本地的含义以及 Spark 的工作原理。

如果您需要进行其他分区,则需要进行后续重新分区。

如果你有足够的内存和磁盘,你会比较慢,但它会处理。

【讨论】:

亲爱的@thebluephantom,感谢您的回答。我根据 Alex Ott 的建议阅读了链接。如果我可以在每个具有不同值的 Worker 节点中使用 lowerBoundupperBound,请指导我?我的意思是每个 Worker 都有自己的 lowerBoundupperBound 见medium.com/@radek.strnad/… 非常感谢您提供的有用链接。 一切进展如何? 列类型为字符串。我必须在该字符串列上分发数据库。我应该将 String 列转换为 Integer,以保持数据含义不变。

以上是关于如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点的主要内容,如果未能解决你的问题,请参考以下文章

spark优化

spark中每个worker节点运行多少个executor进程?

任务中如何确定spark分区数task数目core个数worker节点个数excutor数量

如何在Spark集群的work节点上启动多个Executor?

如何为 apache spark worker 更改每个节点的内存

Spark 异常:worker 中的 Python 版本 3.4 与驱动程序 3.5 中的版本不同