将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException
Posted
技术标签:
【中文标题】将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException【英文标题】:SparkException while porting pyspark code to scala for Spark 2.4.3 【发布时间】:2019-11-20 18:36:58 【问题描述】:我已经在 python 中编写了工作代码,现在根据一个新请求,我正在 scala 中重写相同的代码。但是,我遇到了一些错误
Python 代码
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
#Step-1: Split fits_assembly_name column#
Initial dataframe df looks as following
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
| Itemno|fits_model_number| fits_assembly_id| fits_assembly_name
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
| 0450056 44011 BODY, DECALS - Z19VFK99LK/LE (702498)
| 0450056 135502 DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------
df0 = df.withColumn('new_col', split(regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0')) \
.selectExpr(
'Itemno'
, 'fits_model_number'
, 'fits_assembly_id'
, 'fits_assembly_name'
, "coalesce(new_col[0], fits_assembly_name) as assembly_name"
, "coalesce(new_col[1], '') as models"
)
display(df0)
OUTPUT:
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
| Itemno|fits_model_number| fits_assembly_id| fits_assembly_name | assembly_name | models|
+-----------+-----------------+--------------------+--------------------------------------------------------------------------------------+--------------------------------------+--------------------+
0450056 44011 BODY, DECALS - Z19VFK99LK/LE (702498) BODY, DECALS (702498) Z19VFK99LK/LE
0450056 135502 DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08) DRIVE TRAIN, TRANSMISSION V08AB26/GB26/LB26
(6 SPEED) ALL OPTIONS (49VICTRANS08)
#Step-2: convert string into array of strings
df1 = df0.withColumn('temp0', split('fits_model_number', r'(?:(?![/_])\pPunct|\s)+')) \
.withColumn('temp0', expr("filter(temp0, x -> x <> '')"))
我不确定如何将 regexp_replace('fits_assembly_name', r'^(.*?)\s+-\s+(\S+)(.*)$', '$1$3\0$2'),'\0'))
转换为 scala 代码。我尝试了regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0"""))
,但在运行display(df0)
时遇到了复杂的时间错误。
能否请您帮助我在此处的步骤 1 和步骤 2 中需要更新什么,以便 scala 代码为此逻辑运行。
P.S.:到目前为止,这就是我在 scala 中开始的方式
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import spark.implicits._
// Step-1: Split fits_assembly_name column to remove model#
val df0 = df.withColumn("new_col", split(regexp_replace($"fits_assembly_name", """^(.*?)\s+-\s+(\S+)(.*)$""", """$1$3\0$2"""),"""\0""")).selectExpr(
"Itemno"
, "fits_model_number"
, "fits_assembly_id"
, "fits_assembly_name"
, "coalesce(new_col[0], fits_assembly_name) as assembly_name"
, "coalesce(new_col[1], '') as models"
)
但我在运行 display(df0)
时遇到错误。
【问题讨论】:
您需要转义正则表达式模式中的反斜杠:"^(.*?)\\s+-\\s+(\\S+)(.*)$"
。在 Python 中,我们使用原始字符串 r'..',因此不需要转义反斜杠。
@jxc:我在拆分参数时犯了错误。在 scala doc 中,他们提到我可以做either "\\d" or """\d"""
。我把它修好了。感谢您的及时回复!
@jxc:我已经发布了https://***.com/questions/58982095/update-query-to-rearrange-data-frame-by-season-and-resolve-to-char-issue-in-scal
。请让我知道你的想法。问候!
【参考方案1】:
要通过从regexp_replace
拆分新字符串来从new_col
创建2 个新列,您可以更新模式以使用4 个捕获组并在替换中使用$1$4$2$3
然后有一些单引号可以改成双引号。
^(.*?)(\s+-\s+)(\S+)(.*)$
Regex demo
对于新字符串,在模式 \s+-\s+
上拆分,这将匹配 2 个空白字符之间的破折号。
代码可能看起来像
val df0 = df
.withColumn(
"new_col", split(
regexp_replace(
$"fits_assembly_name",
"""^(.*?)(\s+-\s+)(\S+)(.*)$""",
"$1$4$2$3"
),
"""\s+-\s+"""
)
)
.selectExpr(
"Itemno"
, "fits_model_number"
, "fits_assembly_id"
, "fits_assembly_name"
, "coalesce(new_col[0], fits_assembly_name) as assembly_name"
, "coalesce(new_col[1], '') as models"
)
df0.show(false)
输出
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|Itemno |fits_model_number|fits_assembly_id|fits_assembly_name |assembly_name |models |
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
|0450056|a,b |44011 |BODY, DECALS - Z19VFK99LK/LE (702498) |BODY, DECALS (702498) |Z19VFK99LK/LE |
|0450056|c.d |135502 |DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|DRIVE TRAIN, TRANSMISSION (6 SPEED) ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26|
+-------+-----------------+----------------+----------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------+
对于第二部分,fits_model_number
没有数据,但您使用的模式 (?:(?![/_])\pPunct|\s)+
将匹配除 /
或 _
之外的 1+ 个标点符号或匹配 1+ 个空白字符。
您可以将代码更新为:
val df1 = df0
.withColumn("temp0", split(
col("fits_model_number"),
"""(?:(?![/_])\pPunct|\s)+""")
)
.withColumn(
"temp0",
expr("filter(temp0, x -> x <> '')")
)
df1.show()
输出(带有一些测试数据)
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
| Itemno|fits_model_number|fits_assembly_id| fits_assembly_name| assembly_name| models| temp0|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
|0450056| a,b,test| 44011|BODY, DECALS - Z1...|BODY, DECALS (702...| Z19VFK99LK/LE|[a, b, test]|
|0450056| c.d| 135502|DRIVE TRAIN, TRAN...|DRIVE TRAIN, TRAN...|V08AB26/GB26/LB26| [c, d]|
+-------+-----------------+----------------+--------------------+--------------------+-----------------+------------+
【讨论】:
成功了。感谢您的详细回复和友好的帮助。我在拆分时犯了一个错误。以上是关于将 pyspark 代码移植到 Spark 2.4.3 的 scala 时出现 SparkException的主要内容,如果未能解决你的问题,请参考以下文章
带有 PySpark 2.4 的 Pandas UDF [重复]
在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常