转换 Spark DF 太 Pandas DF 和其他方式 - 性能
Posted
技术标签:
【中文标题】转换 Spark DF 太 Pandas DF 和其他方式 - 性能【英文标题】:Converting Spark DF too Pandas DF and other way - Performance 【发布时间】:2020-05-19 01:53:16 【问题描述】:尝试将包含 8m 条记录的 Spark DF 转换为 Pandas DF
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sourcePandas = srcDF.select("*").toPandas()
大约需要 2 分钟
还有从 Pandas 到 Spark DF 的其他方式
finalDF = spark.createDataFrame(sourcePandas)
耗时太长且永远无法完成。
源熊猫
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 42 columns):
CONSIGNMENT_PK 10 non-null int32
CERTIFICATE_NO 10 non-null object
ACTOR_NAME 10 non-null object
GENERATOR_FK 10 non-null int32
TRANSPORTER_FK 10 non-null int32
RECEIVER_FK 10 non-null int32
REC_POST_CODE 0 non-null object
WASTEDESC 10 non-null object
WASTE_FK 10 non-null int32
GEN_LICNUM 0 non-null object
VOLUME 10 non-null int32
MEASURE 10 non-null object
WASTE_TYPE 10 non-null object
WASTE_ADD 0 non-null object
CONTAMINENT1_FK 0 non-null float64
CONTAMINENT2_FK 0 non-null float64
CONTAMINENT3_FK 0 non-null float64
CONTAMINENT4_FK 0 non-null float64
TREATMENT_FK 10 non-null int32
ANZSICODE_FK 10 non-null int32
VEH1_REGNO 10 non-null object
VEH1_LICNO 0 non-null object
VEH2_REGNO 0 non-null object
VEH2_LICNO 0 non-null object
GEN_SIGNEE 0 non-null object
GEN_DATE 10 non-null datetime64[ns]
TRANS_SIGNEE 0 non-null object
TRANS_DATE 10 non-null datetime64[ns]
REC_SIGNEE 0 non-null object
REC_DATE 10 non-null datetime64[ns]
DATECREATED 10 non-null datetime64[ns]
DISCREPANCY 0 non-null object
APPROVAL_NUMBER 0 non-null object
TR_TYPE 10 non-null object
REC_WASTE_FK 10 non-null int32
REC_WASTE_TYPE 10 non-null object
REC_VOLUME 10 non-null int32
REC_MEASURE 10 non-null object
DATE_RECEIVED 10 non-null datetime64[ns]
DATE_SCANNED 0 non-null datetime64[ns]
HAS_IMAGE 10 non-null object
LASTMODIFIED 10 non-null datetime64[ns]
dtypes: datetime64[ns](7), float64(4), int32(10), object(21)
memory usage: 3.0+ KB
srcDF
|-- CONSIGNMENT_PK: integer (nullable = true)
|-- CERTIFICATE_NO: string (nullable = true)
|-- ACTOR_NAME: string (nullable = true)
|-- GENERATOR_FK: integer (nullable = true)
|-- TRANSPORTER_FK: integer (nullable = true)
|-- RECEIVER_FK: integer (nullable = true)
|-- REC_POST_CODE: string (nullable = true)
|-- WASTEDESC: string (nullable = true)
|-- WASTE_FK: integer (nullable = true)
|-- GEN_LICNUM: string (nullable = true)
|-- VOLUME: integer (nullable = true)
|-- MEASURE: string (nullable = true)
|-- WASTE_TYPE: string (nullable = true)
|-- WASTE_ADD: string (nullable = true)
|-- CONTAMINENT1_FK: integer (nullable = true)
|-- CONTAMINENT2_FK: integer (nullable = true)
|-- CONTAMINENT3_FK: integer (nullable = true)
|-- CONTAMINENT4_FK: integer (nullable = true)
|-- TREATMENT_FK: integer (nullable = true)
|-- ANZSICODE_FK: integer (nullable = true)
|-- VEH1_REGNO: string (nullable = true)
|-- VEH1_LICNO: string (nullable = true)
|-- VEH2_REGNO: string (nullable = true)
|-- VEH2_LICNO: string (nullable = true)
|-- GEN_SIGNEE: string (nullable = true)
|-- GEN_DATE: timestamp (nullable = true)
|-- TRANS_SIGNEE: string (nullable = true)
|-- TRANS_DATE: timestamp (nullable = true)
|-- REC_SIGNEE: string (nullable = true)
|-- REC_DATE: timestamp (nullable = true)
|-- DATECREATED: timestamp (nullable = true)
|-- DISCREPANCY: string (nullable = true)
|-- APPROVAL_NUMBER: string (nullable = true)
|-- TR_TYPE: string (nullable = true)
|-- REC_WASTE_FK: integer (nullable = true)
|-- REC_WASTE_TYPE: string (nullable = true)
|-- REC_VOLUME: integer (nullable = true)
|-- REC_MEASURE: string (nullable = true)
|-- DATE_RECEIVED: timestamp (nullable = true)
|-- DATE_SCANNED: timestamp (nullable = true)
|-- HAS_IMAGE: string (nullable = true)
|-- LASTMODIFIED: timestamp (nullable = true)
集群大小
【问题讨论】:
srcDF
和finalDF
的形状是什么?什么是集群配置?
@Dave:用所需信息更新了问题。
【参考方案1】:
收集到 pandas 并重新并行到集群的内存高水位标记大约是 pandas DF 存储成本的 2 倍。十行数据帧为 3KB,因此 8M 约为 2.5G。加倍以获得高水位线将我们带到〜5G。默认的 spark driver 内存是 1G,这对于你想做的事情来说太低了,导致 JVM thrash GC:
将Application Propertyspark.driver.memory
推到8G,应该可以了。
重新评估为什么要将所有这些数据收集到驱动程序。你能用pandas UDF in a GROUPED_MAP吗?
【讨论】:
以上是关于转换 Spark DF 太 Pandas DF 和其他方式 - 性能的主要内容,如果未能解决你的问题,请参考以下文章