在 pyspark sql 的连接中重复使用相同的数据框视图
Posted
技术标签:
【中文标题】在 pyspark sql 的连接中重复使用相同的数据框视图【英文标题】:repeatedly using same dataframe view in join in pyspark sql 【发布时间】:2021-05-21 00:31:58 【问题描述】:我在 pyspark 中有以下代码。在代码中,我创建了 3 个子查询视图。然后我将 3 个视图连接在一起形成最终的 pyspark-sql 视图。我发现,如果我使用之前在代码中创建的 daily_sku_t 视图运行 pyspark-sql,它会引发一个非常长的错误,我在下面的第一行提供了该错误。相反,如果我先将 daily_sku_t 视图写入 parquet,然后再将其读回,则代码运行不会出错。问题是它使整个代码需要更长的时间来写出和读回。有没有办法解决这个问题而不必执行写出的操作?我尝试使用不同的名称创建另一个视图副本,但这也引发了错误。
示例:
daily_sku_t.write.mode("overwrite")\
.parquet('s3://stuff/copy_daily_sku_t/')
copy_daily_sku_t=sqlContext.read.parquet('s3://stuff/copy_daily_sku_t/')
copy_daily_sku_t.createOrReplaceTempView("daily_sku_t")
代码:
# product_repeat
product_repeat_product_cycle_days_query="""select
dateclosed, s.product_id, s.tz_brandname,
datediff(dateclosed,
lag(dateclosed, 1) over (partition by s.tz_brandname, customer_uuid, s.product_id
ORDER BY
dateclosed ASC, ticketid )) as product_cycle_days
from
(select distinct dateclosed, product_id, tz_brandname, customer_uuid, ticketid
from daily_sku_t
where (customer_uuid is not null) and (trim(customer_uuid) !='')
and (product_id is not null) and (trim(product_id) !='')
-- and (tz_brandname is not null) and (trim(tz_brandname) !='')
and (dateclosed is not null) and (trim(dateclosed) !='')
) s
"""
product_repeat_product_cycle_days=spark.sql(product_repeat_product_cycle_days_query)
product_repeat_product_cycle_days.createOrReplaceTempView("product_repeat_product_cycle_days")
product_cycle_days_sub_query="""select
dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
floor(avg(product_cycle_days)) as product_cycle_days
from
(
select
t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
a.product_cycle_days
from daily_sku_t t
left join product_repeat_product_cycle_days a
on trim(a.product_id)=trim(t.product_id)
and trim(a.dateclosed)=trim(t.dateclosed)
and trim(a.tz_brandname)=trim(t.tz_brandname)
where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
and (t.product_id is not null) and (trim(t.product_id) !='')
and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
)
group by
dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""
product_cycle_days_sub=spark.sql(product_cycle_days_sub_query)
product_cycle_days_sub.createOrReplaceTempView("product_cycle_days_sub")
product_repeat_gross_query="""select
dateclosed, storeid, tz_brandname, producttype, productsubtype, size, product_id,
sum(product_repeat_gross) + (rand() / 10000) as product_repeat_gross
from
(
select
t.dateclosed, t.product_id, t.storeid, t.producttype, t.productsubtype, t.size, t.tz_brandname,
case
when ticketid = first_value(ticketid) over (partition by t.product_id, t.customer_uuid
ORDER BY
t.dateclosed ASC,ticketid rows between unbounded preceding and unbounded following) then 0
else gros-s-receipts
end as product_repeat_gross
from daily_sku_t t
where (t.customer_uuid is not null) and (trim(t.customer_uuid) !='')
and (t.product_id is not null) and (trim(t.product_id) !='')
and (t.dateclosed is not null) and (trim(t.dateclosed) !='')
)
group by
dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id"""
product_repeat_gross=spark.sql(product_repeat_gross_query)
product_repeat_gross.createOrReplaceTempView("product_repeat_gross")
product_repeat_query="""select a.dateclosed,
a.storeid,
a.tz_brandname,
a.producttype,
a.productsubtype,
a.size,
a.product_id,
b.product_cycle_days,
c.product_repeat_gross
from (select distinct dateclosed, tz_brandname, producttype, productsubtype, size, storeid, product_id from daily_sku_t) a
left join product_repeat_gross c
on trim(a.dateclosed)=trim(c.dateclosed)
and trim(a.storeid)=trim(c.storeid)
and trim(a.tz_brandname)=trim(c.tz_brandname)
and trim(a.producttype)=trim(c.producttype)
and trim(a.productsubtype)=trim(c.productsubtype)
and trim(a.size)=trim(c.size)
and trim(a.product_id)=trim(c.product_id)
left join product_cycle_days_sub b
on trim(a.dateclosed)=trim(b.dateclosed)
and trim(a.storeid)=trim(b.storeid)
and trim(a.tz_brandname)=trim(b.tz_brandname)
and trim(a.producttype)=trim(b.producttype)
and trim(a.productsubtype)=trim(b.productsubtype)
and trim(a.size)=trim(b.size)
and trim(a.product_id)=trim(b.product_id)
"""
product_repeat=spark.sql(product_repeat_query)
product_repeat.createOrReplaceTempView("product_repeat")
错误顶部:
An error was encountered:
'Resolved attribute(s) _w5#7888 missing from discounts#7149,_w5# ....
【问题讨论】:
在执行JOIN
操作之前,您是否尝试过设置spark.conf.set("spark.sql.crossJoin.enabled", "true")
【参考方案1】:
在单个查询中,您可以通过使用“with”子句来实现。 如下所示。
spark.sql("""
with tableA as select ..., --tableA subquery goes here
tableB as select ..., --tableB subquery goes here
tableC as select --tableC subquery goes here
//refer the temp tables created above in the following query
select * from tableD join tableA join tableB....""")
【讨论】:
以上是关于在 pyspark sql 的连接中重复使用相同的数据框视图的主要内容,如果未能解决你的问题,请参考以下文章