我的 spark 代码没有使用 Yarn aws EMR 中可用的所有执行器
Posted
技术标签:
【中文标题】我的 spark 代码没有使用 Yarn aws EMR 中可用的所有执行器【英文标题】:My spark code is not using all the executors available in Yarn aws EMR 【发布时间】:2018-01-09 06:49:18 【问题描述】:我编写了在本地运行的 Spark 代码。我创建了一个用户定义的函数,该函数需要应用于通过交叉连接从本地文件读取的两个表创建的数据框。
不知何故,我正在应用的用户定义的函数没有被分发。我已经在所有节点上安装了所需的 python 包。使用spark-submit
我已经指定了内核和内存的数量。这是我的代码:
spark = SparkSession.builder.appName("WordSimilarities").enableHiveSupport().getOrCreate()
spark.sparkContext.parallelize(range(1,1000)).map(imprts)
df = spark.read.csv('./FlightGlobal_distinct_operators.csv', header=False).withColumnRenamed('_c0', 'first').repartition(10)
df.cache()
df2 = spark.read.csv('./TRAC_distinct_operators.csv',header=False).withColumnRenamed('_c0', 'second').repartition(10)
df2.cache()
df3 = df.crossJoin(df2)
df3.write.saveAsTable("hello", format="parquet",mode="overwrite",location="/user/hive/hello/")
df3 = spark.sql("select * from hello").repartition(500)
print(df3.count())
df3.cache()
以及我正在应用的功能
schema = StructType([StructField("col1", FloatType()), StructField("col2", FloatType()), StructField("col3", FloatType()), StructField(
"col4", FloatType()), StructField("col5", FloatType()), StructField("col6", FloatType()), StructField("col7", FloatType())])
allmethodUDF = udf(all_methods_scores, schema)
finalDF = df3.withColumn("complex", allmethodUDF('first', 'second')).select(
'first', 'second', 'complex.col1', 'complex.col2', 'complex.col3', 'complex.col4', 'complex.col6', 'complex.col7')
finalDF.show()
这是我在每个节点上使用的包:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import sys
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf
from pyspark import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
import editdistance
import jellyfish
import fuzzy
import re
from fuzzywuzzy import fuzz
import itertools
def all_methods_scores(original1, original2):
在所有包上面的那个函数中我做了很多迭代
函数的代码简单地应用于以多列作为输入的每一行。所有 RDD 块都只在一个节点上。
【问题讨论】:
【参考方案1】:这个答案可能太简单了,但是您是否对数据进行了重新分区?如果您正在读取单个 .csv 文件(看起来您正在这样做),您最终将得到一个分区。
尝试对您的数据运行 repartition():
df.repartition(100)
df2.repartition(100)
阅读 CSV 文件后,看看这是否能解决您的问题。
代码顶部的parallelize() 在做什么?
spark.sparkContext.parallelize(range(1,1000)).map(imprts)
【讨论】:
imprts 是 python 中的函数,我确保通过以下 def imprts(x) 将这些包导入每个从属节点: import numpy as np import editdistance import jellyfish import blur import re from blurwuzzy导入 fuzz 导入 itertools 返回 x 这里 spark.sparkContext.parallelize(df) df 是数据帧而不是列表.. 我们可以像这样并行化数据帧吗?实际上这就是我尝试重新分区数据的原因。 你是对的。我应该说“重新分区(n)”。需要根据可用的内核选择参数“n”(我发现 4-6x 通常效果很好)。无论您有 RDD 还是数据帧,从单个文件中读取都会产生单个分区。【参考方案2】:要使用所有可用的执行器,您需要在 spark-submit 命令中指定 --num-executors value>。
【讨论】:
以上是关于我的 spark 代码没有使用 Yarn aws EMR 中可用的所有执行器的主要内容,如果未能解决你的问题,请参考以下文章
spark.yarn.jar和spark.yarn.archive的使用