Pyspark:将 sql 查询转换为 pyspark?

Posted

技术标签:

【中文标题】Pyspark:将 sql 查询转换为 pyspark?【英文标题】:Pyspark: Convert sql query to pyspark? 【发布时间】:2022-01-18 12:02:24 【问题描述】:

我有两组带有一些案例陈述的查询。我需要在 pyspark 中转换相同的逻辑。

**First query**
case
when appointment_date is null
then 0
--Ticket resolved without having to send a truck out
when nvl(resolution,'') in (
'CSTXCL - OK BY PHONE'
,'OK AT TIME CALLED'
,'CONFIRMED OK BY PHONE'
,'RESOLVED THROUGH FOLLOW UP'
,'OK BY PHONE CALL'
)
or nvl(resolution,'') ilike '%NO VAN ROLL%'
then 0
when wo_status in ('PENDING','CANCELLED')
then 0
when stype = 'install'
and (
btrim(job_task,'\"') ilike '%Disco%'
or btrim(job_task,'\"') ilike '%Reconnect%'
or btrim(job_task,'\"') ilike '%Wireless Uninstall%'
or btrim(job_task,'\"') ilike '%Remove%'
or btrim(job_task,'\"') ilike '%Retrieve%'
)
and btrim(job_task,'\"') not ilike '%[!n]Install%'
and btrim(job_task,'\"') not ilike '%[!se]Connect%'
and btrim(job_task,'\"') not ilike 'Install%'
and btrim(job_task,'\"') not ilike '%(COPPER TO FTTH)%'
then 0
else 1
end as truck_roll
**Second query**

case when wo_status = 'COMPLETED'  and nvl(resolution,'') not in ('CANCELLING ORDER','CANCEL ORDER','CLOSE SRO')
then 1 else 0 end as completed, --these resolutions indicate cancelled tickets, so even if they come in as closed, call it cancelled
case when wo_status = 'CREATED' then 1 else 0 end as created,
case when wo_status = 'PENDING'  and nvl(resolution,'') not in ('CANCELLING ORDER','CANCEL ORDER','CLOSE SRO') then 1 else 0 end as pending,
case when wo_status = 'CANCELLED'  or ( wo_status in ('COMPLETED','PENDING' ) and nvl(resolution,'') in ('CANCELLING ORDER','CANCEL ORDER','CLOSE SRO') ) then 1 else 0 end as cancelled.

如果有人知道如何在 pyspark 中实现相同的逻辑,那将不胜感激。

第一组查询我试过了。

df.withColumn('truck_roll', when(df.appointment_date.isNull(), 0).when(df.nvl(resolution,'').isin('CSTXCL - OK BY PHONE','OK AT TIME CALLED','CONFIRMED OK BY PHONE','RESOLVED THROUGH FOLLOW UP','OK BY PHONE CALL')
      |(df.nvl(resolution,'')like(''%NO VAN ROLL%''),0))

在这之后我被困住了。我需要有 btrim 的案例。我怎样才能做到这一点。

【问题讨论】:

欢迎来到 *** :) 提出问题时,建议创建一个最小的、可重现的示例 (***.com/help/minimal-reproducible-example) 以简化回答您问题的人。请参阅下面的分析器,我已经简化了您的代码,以包含您所面临的问题的最小示例,而无需不必要的代码。 我已经给出了如何在 pyspark 中实现case-statement 的答案(请参阅下面的答案),我知道这是您的问题。如果您有更详细的问题,我建议您创建单独的问题,例如一个关于如何实现case-statements 的问题,一个关于如何实现and/or 等的问题。 (假设您没有找到任何类似的问题)。如果您仍然有问题,请提供您尝试过的 pyspark 代码(同样,可重现性极低)以及为什么它不起作用,我可以看看 :)。 @Cleared 感谢您的回复。我想要这个场景的一个例子(不在):当 wo_status = 'COMPLETED' 和 nvl(resolution,'') 不在 ('CANCELLING ORDER','CANCEL ORDER','CLOSE SRO') 然后 1 else 0 end已完成 我该如何处理这个..你能帮忙 请查看@Cleared上方更新的问题 【参考方案1】:

您可以使用when-函数(参见pyspark.sql.functions.when)

所以,为了简化您的查询:

case
    when job_task is null then 0
    when job_task = 'ABC' then 1
    else 2
end as truck_roll

将在pyspark中翻译成以下内容

df.withColumn(
    'truck_roll',
    when(df.job_task.isNull(), 0)
    .when(df.job_task == 'ABC', 1)
   .otherwise(2)
)

即您可以像上面一样嵌套when 以拥有多个“案例”,而不是else 您使用otherwise

【讨论】:

我对您的答案进行了格式化,以便阅读(并更正了缺失的“)”)。

以上是关于Pyspark:将 sql 查询转换为 pyspark?的主要内容,如果未能解决你的问题,请参考以下文章

将 SQL 连接查询转换为 pyspark 语法

如何将 sql 查询转换为 Pandas Dataframe 和 PySpark Dataframe

发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

Pyspark:将 pyspark.sql.row 转换为 Dataframe

pyspark 中的 df.show() 问题

将 SQL 代码转换为 PySpark 的问题;我在哪里用 groupby 和 count 创建一个新的 DF