如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点
Posted
技术标签:
【中文标题】如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点【英文标题】:How to load data at Spark Dataframe in each Worker to prevent loading huge data to Master node 【发布时间】:2021-10-16 16:23:03 【问题描述】:我可以使用以下代码从主节点中的 Oracle 数据库中读取数据:
val spark = SparkSession
.builder
.master("local[4]")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus",1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcldb")
.option("dbtable", "table")
.option("user", "orcl")
.option("password", "********")
.load()
然后我可以在 Worker 之间重新分配 Dataframe:
val test = jdbcDF.repartition(8,col("ID_Col"))
test.explain
我的问题是我的数据量很大,无法放入主 RAM。因此,我希望每个节点分别读取自己的数据。我想知道是否有任何方法可以从每个 Worker 的数据库中读取数据并将它们加载到 Spark Dataframe。实际上,我想分别使用Scala或Python将数据加载到每个Worker Node中的Spark Dataframe。
请指导我如何做到这一点?
非常感谢任何帮助。
【问题讨论】:
这能回答你的问题吗? Why spark is slower when compared to sqoop , when it comes to jdbc? 您是希望每个工作人员都读入数据以加快读取速度,还是希望每个工作人员都有一份数据副本?如果是后者,请查看broadcast
。
Afaik 它不通过主人。
亲爱的@JarrodBaker 谢谢你的回答,我确实希望每个工人都读取数据以加快读取速度;此外,我有大量数据,它们不能都在一个节点中(我的意思是 Master 这里)。只要,我知道,Master读取数据并使用broadcast
在节点之间共享数据。这不是我想要的。
@thebluephantom,你能解释一下吗?谢谢。
【参考方案1】:
使用local
,您可以像 YARN 一样使用 not have a Resource Mgr
。你有no Workers
,但你可以并行运行东西,只要local[n]
在具有N个内核的同一台机器上适当设置。
如果您遵循 Alex Ott 的建议并阅读,您将不会加载到 Master。
您可以在使用spark.read.jdbc
读取数据时使用参数lowerBound, upperBound, numPartitions
来提高加载速度,在Workers 上使用Cores
而不是Executors
。这就是本地的含义以及 Spark 的工作原理。
如果您需要进行其他分区,则需要进行后续重新分区。
如果你有足够的内存和磁盘,你会比较慢,但它会处理。
【讨论】:
亲爱的@thebluephantom,感谢您的回答。我根据 Alex Ott 的建议阅读了链接。如果我可以在每个具有不同值的 Worker 节点中使用lowerBound
和 upperBound
,请指导我?我的意思是每个 Worker 都有自己的 lowerBound
和 upperBound
。
见medium.com/@radek.strnad/…
非常感谢您提供的有用链接。
一切进展如何?
列类型为字符串。我必须在该字符串列上分发数据库。我应该将 String 列转换为 Integer,以保持数据含义不变。以上是关于如何在每个 Worker 中的 Spark Dataframe 中加载数据,以防止将大量数据加载到 Master 节点的主要内容,如果未能解决你的问题,请参考以下文章
spark中每个worker节点运行多少个executor进程?
任务中如何确定spark分区数task数目core个数worker节点个数excutor数量
如何在Spark集群的work节点上启动多个Executor?