将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException

Posted

技术标签:

【中文标题】将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException【英文标题】:SparkException while porting pyspark code to scala for Spark 2.4.3 【发布时间】:2019-11-20 18:36:58 【问题描述】:

我已经在 python 中编写了工作代码,现在根据一个新请求,我正在 scala 中重写相同的代码。但是,我遇到了一些错误

Python 代码

from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

#Step-1: Split fits_assembly_name column#

Initial dataframe df looks as following

+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
|     Itemno|fits_model_number|    fits_assembly_id|  fits_assembly_name                                                                  
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
| 0450056                                   44011         BODY, DECALS - Z19VFK99LK/LE (702498)                                                
| 0450056                                   135502        DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)    
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------                                                                                                                                              


df0 = df.withColumn('new_col', split(regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0')) \
    .selectExpr(
        'Itemno'
      , 'fits_model_number'    
      , 'fits_assembly_id'
      , 'fits_assembly_name'
      , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
      , "coalesce(new_col[1], '') as models"
)

display(df0)

OUTPUT:
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
|     Itemno|fits_model_number|    fits_assembly_id|  fits_assembly_name                                                                  |       assembly_name                  |              models|
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
0450056                                 44011         BODY, DECALS - Z19VFK99LK/LE (702498)                                                 BODY, DECALS (702498)                   Z19VFK99LK/LE
0450056                                 135502        DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)    DRIVE TRAIN, TRANSMISSION               V08AB26/GB26/LB26
                                                                                                                                            (6 SPEED) ALL OPTIONS (49VICTRANS08) 

#Step-2: convert string into array of strings
df1 = df0.withColumn('temp0', split('fits_model_number', r'(?:(?![/_])\pPunct|\s)+')) \
        .withColumn('temp0', expr("filter(temp0, x -> x <> '')"))

我不确定如何将 regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0')) 转换为 scala 代码。我尝试了regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0""")),但在运行display(df0) 时遇到了复杂的时间错误。

能否请您帮助我在此处的步骤 1 和步骤 2 中需要更新什么,以便 scala 代码为此逻辑运行。

P.S.:到目前为止,这就是我在 scala 中开始的方式

import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import spark.implicits._

// Step-1: Split fits_assembly_name column to remove model#

val df0 = df.withColumn("new_col", split(regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0""")).selectExpr(
        "Itemno"
      , "fits_model_number"    
      , "fits_assembly_id"
      , "fits_assembly_name"
      , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
      , "coalesce(new_col[1], '') as models"
)

但我在运行 display(df0) 时遇到错误。

【问题讨论】:

您需要转义正则表达式模式中的反斜杠:"^(.*?)\\s+-\\s+(\\S+)(.*)$"。在 Python 中,我们使用原始字符串 r'..',因此不需要转义反斜杠。 @jxc:我在拆分参数时犯了错误。在 scala doc 中,他们提到我可以做either "\\d" or """\d"""。我把它修好了。感谢您的及时回复! @jxc:我已经发布了https://***.com/questions/58982095/update-query-to-rearrange-data-frame-by-season-and-resolve-to-char-issue-in-scal。请让我知道你的想法。问候! 【参考方案1】:

要通过从regexp_replace 拆分新字符串来从new_col 创建2 个新列,您可以更新模式以使用4 个捕获组并在替换中使用$1$4$2$3

然后有一些单引号可以改成双引号。

^(.*?)(\s+-\s+)(\S+)(.*)$

Regex demo

对于新字符串,在模式 \s+-\s+ 上拆分,这将匹配 2 个空白字符之间的破折号。

代码可能看起来像

val df0 = df
  .withColumn(
    "new_col", split(
      regexp_replace(
        $"fits_assembly_name",
        """^(.*?)(\s+-\s+)(\S+)(.*)$""",
        "$1$4$2$3"
      ),
      """\s+-\s+"""
    )
  )
  .selectExpr(
    "Itemno"
    , "fits_model_number"
    , "fits_assembly_id"
    , "fits_assembly_name"
    , "coalesce(new_col[0], fits_assembly_name) as assembly_name"
    , "coalesce(new_col[1], '') as models"
  )
df0.show(false)

输出

+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|Itemno |fits_model_number|fits_assembly_id|fits_assembly_name                                                                |assembly_name                                                 |models           |
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|0450056|a,b              |44011           |BODY, DECALS - Z19VFK99LK/LE (702498)                                             |BODY, DECALS (702498)                                         |Z19VFK99LK/LE    |
|0450056|c.d              |135502          |DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|DRIVE TRAIN, TRANSMISSION (6 SPEED) ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26|
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+

对于第二部分,fits_model_number 没有数据,但您使用的模式 (?:(?![/_])\pPunct|\s)+ 将匹配除 /_ 之外的 1+ 个标点符号或匹配 1+ 个空白字符。

您可以将代码更新为:

val df1 = df0
  .withColumn("temp0", split(
    col("fits_model_number"),
    """(?:(?![/_])\pPunct|\s)+""")
  )
  .withColumn(
    "temp0",
    expr("filter(temp0, x -> x <> '')")
  )
df1.show()

输出(带有一些测试数据)

+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
| Itemno|fits_model_number|fits_assembly_id|  fits_assembly_name|       assembly_name|           models|       temp0|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
|0450056|         a,b,test|           44011|BODY, DECALS - Z1...|BODY, DECALS (702...|    Z19VFK99LK/LE|[a, b, test]|
|0450056|              c.d|          135502|DRIVE TRAIN, TRAN...|DRIVE TRAIN, TRAN...|V08AB26/GB26/LB26|      [c, d]|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+

【讨论】:

成功了。感谢您的详细回复和友好的帮助。我在拆分时犯了一个错误。

以上是关于将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException的主要内容,如果未能解决你的问题,请参考以下文章

Spark(pyspark)中的决策树模型如何可视化?

带有 PySpark 2.4 的 Pandas UDF [重复]

在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常

(Pyspark - 在一段时间内按用户分组

Spark 2.4 上带有字典的 UDF

pyspark 2.4 无法从 sql 命令创建表需要 Hive 支持才能创建 Hive TABLE