转换 Spark DF 太 Pandas DF 和其他方式 - 性能

Posted

技术标签:

【中文标题】转换 Spark DF 太 Pandas DF 和其他方式 - 性能【英文标题】:Converting Spark DF too Pandas DF and other way - Performance 【发布时间】:2020-05-19 01:53:16 【问题描述】:

尝试将包含 8m 条记录的 Spark DF 转换为 Pandas DF

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sourcePandas = srcDF.select("*").toPandas()

大约需要 2 分钟

还有从 Pandas 到 Spark DF 的其他方式

finalDF = spark.createDataFrame(sourcePandas)

耗时太长且永远无法完成。

源熊猫

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 42 columns):
CONSIGNMENT_PK     10 non-null int32
CERTIFICATE_NO     10 non-null object
ACTOR_NAME         10 non-null object
GENERATOR_FK       10 non-null int32
TRANSPORTER_FK     10 non-null int32
RECEIVER_FK        10 non-null int32
REC_POST_CODE      0 non-null object
WASTEDESC          10 non-null object
WASTE_FK           10 non-null int32
GEN_LICNUM         0 non-null object
VOLUME             10 non-null int32
MEASURE            10 non-null object
WASTE_TYPE         10 non-null object
WASTE_ADD          0 non-null object
CONTAMINENT1_FK    0 non-null float64
CONTAMINENT2_FK    0 non-null float64
CONTAMINENT3_FK    0 non-null float64
CONTAMINENT4_FK    0 non-null float64
TREATMENT_FK       10 non-null int32
ANZSICODE_FK       10 non-null int32
VEH1_REGNO         10 non-null object
VEH1_LICNO         0 non-null object
VEH2_REGNO         0 non-null object
VEH2_LICNO         0 non-null object
GEN_SIGNEE         0 non-null object
GEN_DATE           10 non-null datetime64[ns]
TRANS_SIGNEE       0 non-null object
TRANS_DATE         10 non-null datetime64[ns]
REC_SIGNEE         0 non-null object
REC_DATE           10 non-null datetime64[ns]
DATECREATED        10 non-null datetime64[ns]
DISCREPANCY        0 non-null object
APPROVAL_NUMBER    0 non-null object
TR_TYPE            10 non-null object
REC_WASTE_FK       10 non-null int32
REC_WASTE_TYPE     10 non-null object
REC_VOLUME         10 non-null int32
REC_MEASURE        10 non-null object
DATE_RECEIVED      10 non-null datetime64[ns]
DATE_SCANNED       0 non-null datetime64[ns]
HAS_IMAGE          10 non-null object
LASTMODIFIED       10 non-null datetime64[ns]
dtypes: datetime64[ns](7), float64(4), int32(10), object(21)
memory usage: 3.0+ KB

srcDF

|-- CONSIGNMENT_PK: integer (nullable = true)
 |-- CERTIFICATE_NO: string (nullable = true)
 |-- ACTOR_NAME: string (nullable = true)
 |-- GENERATOR_FK: integer (nullable = true)
 |-- TRANSPORTER_FK: integer (nullable = true)
 |-- RECEIVER_FK: integer (nullable = true)
 |-- REC_POST_CODE: string (nullable = true)
 |-- WASTEDESC: string (nullable = true)
 |-- WASTE_FK: integer (nullable = true)
 |-- GEN_LICNUM: string (nullable = true)
 |-- VOLUME: integer (nullable = true)
 |-- MEASURE: string (nullable = true)
 |-- WASTE_TYPE: string (nullable = true)
 |-- WASTE_ADD: string (nullable = true)
 |-- CONTAMINENT1_FK: integer (nullable = true)
 |-- CONTAMINENT2_FK: integer (nullable = true)
 |-- CONTAMINENT3_FK: integer (nullable = true)
 |-- CONTAMINENT4_FK: integer (nullable = true)
 |-- TREATMENT_FK: integer (nullable = true)
 |-- ANZSICODE_FK: integer (nullable = true)
 |-- VEH1_REGNO: string (nullable = true)
 |-- VEH1_LICNO: string (nullable = true)
 |-- VEH2_REGNO: string (nullable = true)
 |-- VEH2_LICNO: string (nullable = true)
 |-- GEN_SIGNEE: string (nullable = true)
 |-- GEN_DATE: timestamp (nullable = true)
 |-- TRANS_SIGNEE: string (nullable = true)
 |-- TRANS_DATE: timestamp (nullable = true)
 |-- REC_SIGNEE: string (nullable = true)
 |-- REC_DATE: timestamp (nullable = true)
 |-- DATECREATED: timestamp (nullable = true)
 |-- DISCREPANCY: string (nullable = true)
 |-- APPROVAL_NUMBER: string (nullable = true)
 |-- TR_TYPE: string (nullable = true)
 |-- REC_WASTE_FK: integer (nullable = true)
 |-- REC_WASTE_TYPE: string (nullable = true)
 |-- REC_VOLUME: integer (nullable = true)
 |-- REC_MEASURE: string (nullable = true)
 |-- DATE_RECEIVED: timestamp (nullable = true)
 |-- DATE_SCANNED: timestamp (nullable = true)
 |-- HAS_IMAGE: string (nullable = true)
 |-- LASTMODIFIED: timestamp (nullable = true)

集群大小

【问题讨论】:

srcDFfinalDF的形状是什么?什么是集群配置? @Dave:用所需信息更新了问题。 【参考方案1】:

收集到 pandas 并重新并行到集群的内存高水位标记大约是 pandas DF 存储成本的 2 倍。十行数据帧为 3KB,因此 8M 约为 2.5G。加倍以获得高水位线将我们带到〜5G。默认的 spark driver 内存是 1G,这对于你想做的事情来说太低了,导致 JVM thrash GC:

    将Application Propertyspark.driver.memory推到8G,应该可以了。

    重新评估为什么要将所有这些数据收集到驱动程序。你能用pandas UDF in a GROUPED_MAP吗?

【讨论】:

以上是关于转换 Spark DF 太 Pandas DF 和其他方式 - 性能的主要内容,如果未能解决你的问题,请参考以下文章

在 spark.SQL DataFrame 和 pandas DataFrame 之间转换 [重复]

从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame

07 从RDD创建DataFrame