在 pyspark sql 的连接中重复使用相同的数据框视图

Posted

技术标签:

【中文标题】在 pyspark sql 的连接中重复使用相同的数据框视图【英文标题】:repeatedly using same dataframe view in join in pyspark sql 【发布时间】:2021-05-21 00:31:58 【问题描述】:

我在 pyspark 中有以下代码。在代码中,我创建了 3 个子查询视图。然后我将 3 个视图连接在一起形成最终的 pyspark-sql 视图。我发现,如果我使用之前在代码中创建的 daily_sku_t 视图运行 pyspark-sql,它会引发一个非常长的错误,我在下面的第一行提供了该错误。相反,如果我先将 daily_sku_t 视图写入 parquet,然后再将其读回,则代码运行不会出错。问题是它使整个代码需要更长的时间来写出和读回。有没有办法解决这个问题而不必执行写出的操作?我尝试使用不同的名称创建另一个视图副本,但这也引发了错误。

示例:

daily_sku_t.write.mode("overwrite")\
.parquet('s3://stuff/copy_daily_sku_t/')

copy_daily_sku_t=sqlContext.read.parquet('s3://stuff/copy_daily_sku_t/')

copy_daily_sku_t.createOrReplaceTempView("daily_sku_t")

代码:

# product_repeat

product_repeat_product_cycle_days_query="""select
            dateclosed, s.product_id, s.tz_brandname,
             datediff(dateclosed,
            lag(dateclosed, 1) over (partition by s.tz_brandname, customer_uuid, s.product_id
        ORDER BY
            dateclosed ASC, ticketid )) as product_cycle_days
        from
            (select distinct dateclosed, product_id, tz_brandname, customer_uuid, ticketid 
            from daily_sku_t
            where (customer_uuid is not null) and (trim(customer_uuid) !='')
            and (product_id is not null) and (trim(product_id) !='')
            -- and (tz_brandname is not null) and (trim(tz_brandname) !='')
            and (dateclosed is not null) and (trim(dateclosed) !='')
            ) s 
    
        """

product_repeat_product_cycle_days=spark.sql(product_repeat_product_cycle_days_query)

product_repeat_product_cycle_days.createOrReplaceTempView("product_repeat_product_cycle_days")

product_cycle_days_sub_query="""select
        dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id, 
        floor(avg(product_cycle_days)) as product_cycle_days
    from
        (
        select
            t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
            a.product_cycle_days
        from daily_sku_t t
        left join product_repeat_product_cycle_days a
        on trim(a.product_id)=trim(t.product_id)
        and trim(a.dateclosed)=trim(t.dateclosed)
        and trim(a.tz_brandname)=trim(t.tz_brandname)
        where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
            and (t.product_id is not null) and (trim(t.product_id) !='')
            and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
        
        )
    group by
        dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""

product_cycle_days_sub=spark.sql(product_cycle_days_sub_query)

product_cycle_days_sub.createOrReplaceTempView("product_cycle_days_sub")


product_repeat_gross_query="""select
        dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
        sum(product_repeat_gross) + (rand() / 10000) as product_repeat_gross
    from
        (
        select
            t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
            case
                when ticketid = first_value(ticketid) over (partition by t.product_id, t.customer_uuid
            ORDER BY
                t.dateclosed ASC,ticketid rows between unbounded preceding and unbounded following) then 0
                else gros-s-receipts
            end as product_repeat_gross
        from daily_sku_t t
        where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
            and (t.product_id is not null) and (trim(t.product_id) !='')
            and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
        
        )
    group by
        dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""

product_repeat_gross=spark.sql(product_repeat_gross_query)

product_repeat_gross.createOrReplaceTempView("product_repeat_gross")



product_repeat_query="""select a.dateclosed, 
a.storeid, 
a.tz_brandname, 
a.producttype, 
a.productsubtype, 
a.size, 
a.product_id,
b.product_cycle_days,
c.product_repeat_gross
from (select distinct dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id from daily_sku_t) a
left join product_repeat_gross c
on trim(a.dateclosed)=trim(c.dateclosed)
and trim(a.storeid)=trim(c.storeid) 
and trim(a.tz_brandname)=trim(c.tz_brandname) 
and trim(a.producttype)=trim(c.producttype) 
and trim(a.productsubtype)=trim(c.productsubtype) 
and trim(a.size)=trim(c.size) 
and trim(a.product_id)=trim(c.product_id)
left join product_cycle_days_sub b
on trim(a.dateclosed)=trim(b.dateclosed) 
and trim(a.storeid)=trim(b.storeid) 
and trim(a.tz_brandname)=trim(b.tz_brandname) 
and trim(a.producttype)=trim(b.producttype) 
and trim(a.productsubtype)=trim(b.productsubtype) 
and trim(a.size)=trim(b.size) 
and trim(a.product_id)=trim(b.product_id)


"""

product_repeat=spark.sql(product_repeat_query)

product_repeat.createOrReplaceTempView("product_repeat")

错误顶部:

An error was encountered:
'Resolved attribute(s) _w5#7888 missing from discounts#7149,_w5# ....

【问题讨论】:

在执行JOIN操作之前,您是否尝试过设置spark.conf.set("spark.sql.crossJoin.enabled", "true") 【参考方案1】:

在单个查询中,您可以通过使用“with”子句来实现。 如下所示。

spark.sql("""
with tableA as select ..., --tableA subquery goes here
tableB as select ..., --tableB subquery goes here
tableC as select --tableC subquery goes here
//refer the temp tables created above in the following query
select * from tableD join tableA join tableB....""")

【讨论】:

以上是关于在 pyspark sql 的连接中重复使用相同的数据框视图的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中连接具有相同名称的列的值

pyspark中的内部反连接[重复]

我们如何在 pyspark 的不同模块中使用相同的连接数据框用法

数据框在多列上连接,pyspark中的列有一些条件[重复]

向数据框添加索引。 Pyspark 2.4.4 [重复]

在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()