Spark/Glue:.count() 或在约 20MM 记录和 1 个工作人员的数据帧上生成字段列表时的性能问题

Posted

技术标签:

【中文标题】Spark/Glue:.count() 或在约 20MM 记录和 1 个工作人员的数据帧上生成字段列表时的性能问题【英文标题】:Spark/Glue: performance issue when .count() or when generating fields' list on dataframe of ~20MM records and 1 worker 【发布时间】:2022-01-23 23:36:11 【问题描述】:

我正在尝试使用 AWS Glue 运行一个简单的 ETL 流程。

过程很简单:使用 JDBC 连接器从数据库中读取 20 多个表,然后将它们汇入 S3。一切正常,唯一的问题是运行作业所需的时间(2 小时以上)。

主要瓶颈是由一些非常大的表(16 到 2000 万条记录)造成的,而且我必须提取行数和字段列表。 粘合作业使用 Python 3、Spark 3、2 个工作人员(其中 1 个驱动程序)。

我先看表:

df = sparkSession.read.format("jdbc").option("url", connection_url).option("dbtable", table).option("driver", DRIVER).load()

然后我将其转换为 GlueDynamicFrame(因为我更容易在其上运行操​​作):

df = DynamicFrame.fromDF(df, glueContext, "df")

然后我继续计算行数:

n_rows = df.count()

这开始了痛苦:对于某些表(最大的表),仅返回此值需要 10 到 20 分钟。我已经研究并(我认为)理解了 Spark 中惰性评估和计算的概念,但在我看来,这个操作无论如何都应该少一些,而且我肯定做错了什么。无论如何,然后我继续生成一个字段列表:

fields = [df.schema().fields[x].name for x in range(0, len(df.schema().fields))]

同样,运行 10 到 20 分钟。最终,我下沉了数据框:

glueContext.write_dynamic_frame.\
            from_options(frame = df,
                        connection_type = "s3",
                        connection_options = "path": path,
                                              "partitionKeys": [partition],
                        format = "parquet")

同样,这些大表需要很长时间。

值得一提的是,我从包含少量行的 db 表中提取。我提到这一点是因为我在阅读表格后立即阅读了重新分区的可能解决方案,但是重新分区 3 行的 DataFrame 是零意义的。

唯一系统地做它的方法是首先计算行数,然后基于 n_rows 重新分区,但这已经需要永远。另外,我已经读过分区数应该与工人数有一定的关系。我有 1 个工人,所以 1 个分区对我来说似乎是合乎逻辑的。

我的问题是:我做错了什么?我应该在阅读时增加工人数量并相应地重新分配吗?或者还有什么其他的解决方案? 非常感谢您的任何建议!

编辑:最终增加工作人员的数量、缓存以及使用 lower_bound/upper_bound 进行读取分区有很大帮助。但对我帮助最大的是避免 df.count() 成为瘟疫。没有那个操作,工作的持续时间会减少 80%...不知道为什么,因为我还是个初学者,但是这个非常简单的操作不可能真的那么贵...

【问题讨论】:

【参考方案1】:

我相信您没有使用由numPartitions 选项控制的并行 JDBC 读取机制

你必须找到一个最佳的numPartitions号码

    根据分配的 Executor cores,一个 Executor core 执行一个分区。 将在 Executor 中并行执行的数据分区应完全适合内存以避免溢出。
df = spark.read. \
format("jdbc"). \
option("url", "URL"). \
option("user", "<username>"). \
option("password", "<password>"). \
option("dbtable", "<table>"). \
option("partitionColumn", "partitionColumn"). \
option("lowerBound", "<lowest partition number>"). \
option("upperBound", "<largest partition number>"). \
option("numPartitions", "<number of partitions>"). \
load()

【讨论】:

【参考方案2】:

我们在迁移过程中遇到了同样的挑战,并基于以下优化方法进行了优化。

优化:01

就像你提到的n_rows = df.count() 是昂贵的操作,并尽量从你的代码中避免这个过程。

优化:02 [生成字段列表]

我们已尝试通过示例记录 1 从源中获取架构。

src_connect_string = 'url':"jdbc:teradata://conntionstring,TMODE=TERA", 'user' : "username", 'password' : "mypassword",'query':"select  * from tablename limit 1 ",'driver' :"com.teradata.jdbc.TeraDriver"
df_td_src=spark.read.format("jdbc").options(**src_connect_string).load()
src_td_columns=df_td_src.schema.names

优化:03

找到一个读取过程或写入过程花费较长时间的位置。基于此,我们可以使该进程以并发方式运行。例如,由于我们的写作过程需要更长的时间,我们以并发的方式进行写作。 ref 的示例代码。

jdbcurl = f"jdbc:teradata://server/database=db, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - query")

data_df = spark.read \
          .format('jdbc') \
          .options(url= jdbcurl, user= user,password= pw, query=query, driver= driver,numPartitions=100) \
          .option('customSchema', schema[1:-1]) \
          .option('ConnectionRetries', '3') \
          .option('ConnectionRetryInterval', '2000') \
          .option("fetchSize",100000) \
          .load()

# display(data_df)
from pyspark.sql.functions import *
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql.functions import col
date_range = ['2017-01-28']


def writeS3(curr_date):
    print(f"Starting S3 write for date - curr_date")
    data_df1 = data_df.withColumn("date1", f.from_unixtime(f.unix_timestamp(data_df.LD_TS), "yyyy-MM-dd"))
    display(data_df1)
    print(curr_date)
    save_df = data_df1.filter(f"date1='curr_date'").drop('date1')
    save_df.write.parquet(f"s3://location")

jobs = []
results_done = []

total_days = 30

with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
  print(f"raw_bucket/db/table/")
  for curr_date in date_range:
      print(f"Starting S3 write for date - curr_date")
      jobs.append(e.submit(writeS3, curr_date))
#       result_done = job.result()
#       print(f"Job Completed - result_done")
print("Task complete")

【讨论】:

方法 3 没有任何意义,因为 Spark 已经默认并行工作了。 绝对!!我同意 Spark 以分布式方式工作,就像并行一样。这种方法是在多线程概念中运行代码 但这没有任何意义,因为底层的 Spark 进程已经使用了线程。在多个线程中运行 PySpark 命令不会提高性能。【参考方案3】:

在处理 16-20 百万条记录时,我肯定会增加工人数量。您真的很想利用 Spark 的并行处理能力。

另外,.count() 是一个强制 Spark 执行计划的操作。如果您想继续使用该 DataFrame,您应该使用.cache() 来提高性能。

【讨论】:

以上是关于Spark/Glue:.count() 或在约 20MM 记录和 1 个工作人员的数据帧上生成字段列表时的性能问题的主要内容,如果未能解决你的问题,请参考以下文章

如何配置 Spark / Glue 以避免在 Glue 作业成功执行后创建空的 $_folder_$

AWS Glue - Spark 作业 - 如何增加内存限制或更有效地运行?

尽管 ping 通,Websocket 在约 2 小时后获得连接壁橱

使用 cursor.getCount() 获取计数或在 SQL 子句上使用 COUNT 执行 rawQuery?

为啥 C# 数组仍然限制在约 21 亿个元素中

CLLocationmanager 在约 30 秒后抛出