将 Apache Spark Scala 重写为 PySpark

Posted

技术标签:

【中文标题】将 Apache Spark Scala 重写为 PySpark【英文标题】:Rewriting Apache Spark Scala into PySpark 【发布时间】:2020-11-22 22:29:46 【问题描述】:

社区,我对 Scala 不熟悉,对 PySpark 也不是很好。但是,我对 Scala 不太熟悉,因此希望有人可以让我知道是否有人可以帮助我将以下 Apache Spark Scala 重写为 PySpark。

如果你要问到目前为止我做了什么来帮助自己,我会诚实地说很少,因为我仍处于编码的早期阶段。

所以,如果您可以帮助将以下代码重新编码到 PySpark 中,或者让我走上正确的道路以便我自己重新编码,那将非常有帮助

import org.apache.spark.sql.DataFrame

def readParquet(basePath: String): DataFrame = 
  val parquetDf = spark
  .read
  .parquet(basePath)
  return parquetDf


def num(df: DataFrame): Int = 
  val numPartitions = df.rdd.getNumPartitions
  return numPartitions



def ram(size: Int): Int = 
  val ramMb = size
  return ramMb


def target(size: Int): Int = 
  val targetMb = size
  return targetMb



def dp(): Int = 
  val defaultParallelism  = spark.sparkContext.defaultParallelism
  return defaultParallelism


def files(dp: Int, multiplier: Int, ram: Int, target: Int): Int = 
  val maxPartitions = Math.max(dp * multiplier, Math.ceil(ram / target).toInt)
  return maxPartitions



def split(df: DataFrame, max: Int): DataFrame = 
  val repartitionDf = df.repartition(max)
  return repartitionDf


def writeParquet(df: DataFrame, targetPath: String) 
  return df.write.format("parquet").mode("overwrite").save(targetPath)


import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("spark-repartition-optimizer-app").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 2001) // example
val parquetDf = readParquet("/blogs/source/airlines.parquet/")
val numPartitions = num(parquetDf)
val ramMb = ram(6510) // approx. df cache size
val targetMb = target(128) // approx. partition size (between 50 and 200 mb)
val defaultParallelism = dp()
val maxPartitions = files(defaultParallelism, 2, ramMb, targetMb)
val repartitionDf = split(parquetDf, maxPartitions)
writeParquet(repartitionDf, "/blogs/optimized/airlines.parquet/")

【问题讨论】:

我将开始在 pyspark.sql 模块中搜索方法,并从 pyspark.rdd.RDD 文档中搜索 rdd 方法。例如,您可以从 pyspark.sql 模块文档中搜索术语“parquet”,并找出您的 readParquet 函数在做什么。链接:spark.apache.org/docs/latest/api/python/pyspark.sql.html 和 spark.apache.org/docs/1.1.1/api/python/… @eemilk,感谢您与我们联系。我猜你的意思是,除非你精通这两种语言,否则将 Scala 重新编码为 PySpark 并不容易,对吗? 我还没有使用 scala 编程,但是这些函数看起来很容易重写到 python 和 pyspark 中 -> 语法几乎与 spark 完全一样。它只是控制火花的API,因此搜索火花方法,例如repartition 来自 pyspark.sql 模块,您可以阅读它的作用并在 python 中正确使用它和 pyspark。但绝对你需要了解一些关于 python 的知识,例如函数,循环等。 @eemilk,再次感谢您与我们联系。我同意,这很容易。我刚刚遇到了一段 scala 代码,它被证明很难转换为 PySpark。我不断收到错误消息:未定义名称“数学”当我运行以下函数时def files(dp, multiplier, ram, target): maxPartitions = math.max(dp * multiplier, math.ceil(ram / target).toInt) return maxPartitions有什么想法吗? w3schools.com/python/python_math.asp 【参考方案1】:

我只需要自己将 Scala 代码重新编码为 PySpark。

【讨论】:

【参考方案2】:

已通过在 pyspark 中包含以下模块来解决此问题。

import module

【讨论】:

以上是关于将 Apache Spark Scala 重写为 PySpark的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

将 scala 中的伴随对象转换为 Python

将决策树训练分类器的模型输出保存为 Spark Scala 平台中的文本文件

如何将 Scala Spark Dataframe 转换为 LinkedHashMap[String, String]

如何在 Apache Spark 中将 Scala UDF 转换为 Java 版本?

在 Apache Spark (Scala) 上获取两个数据帧的差异