基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?
Posted
技术标签:
【中文标题】基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?【英文标题】:What is the best PySpark practice to dynamically retrieve/filter the Spark frame based on extracting widget values on top of notebook in DataBrick?基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是什么? 【发布时间】:2021-11-24 20:23:42 【问题描述】:假设我有一个名为 df
的 Spark 数据框:
<------Time-resolution-------->
+------------+----------+---------+---------+---------+
| Name | date | 00-24 | 00-12 | 12-24 |
+------------+----------+---------+---------+---------+
| X1 |2020-10-20| 137 | 68 | 69 |
| X2 |2020-10-22| 132 | 66 | 66 |
| X3 |2020-10-24| 132 | 64 | 68 |
| X4 |2020-10-25| 587 | 292 | 295 |
| X5 |2020-10-29| 134 | 67 | 67 |
+------------+----------+---------+---------+---------+
我想在 DataBricks 介质中使用 PySpark 在我的笔记本顶部创建 4 个小部件,以 dbutils.widgets.dropdown()
的形式从可用数据中获取如下:
00-24
|00-12
|12-24
之一)
Name_Of_Interest(前 3 个名称基于感兴趣的时间分辨率列的降序排列)
我根据这个answer和那个answer尝试了以下内容:
我可以为下面的第一个和第二个项目做到这一点:
dbutils.widgets.removeAll()
# compute the list of all dates from maximum date available till today
max_date = df.select(F.max('date')).first()['max(date)']
min_date = df.select(F.min('date')).first()['min(date)']
print(min_date)
print(max_date)
dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = ['date'])
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = ['date'])
#dbutils.widgets.text(name = "DATE_FROM", defaultValue = min_date")
#dbutils.widgets.text(name = "DATE_TO", defaultValue = max_date)
对于第 3 项,我有一个愚蠢的想法:
channel = ['00-24', '00-12', '12-24']
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])
对于最后一项,我想列出感兴趣的名称,但我无法映射 String 并像 Scala 版本一样传递它
#Get interested Time resolution from widget
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
# compute the list 5 top names in interested time resolution
max_Top_Name = df.select(F.max(dropdownColumn)).first()[dropdownColumn]
NUM_OF_NAMES_FOR_DROPDOWN = 5
#Scala version works
#val Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).map(i=>i.getAs[String]("Name"))
#dbutils.widgets.dropdown("Name", "X1", Name_list.toSeq , "Username Of Interes")
#PySpark version doesn't work
Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).rdd.flatMap(lambda x: x).collect()
dbutils.widgets.dropdown(name = "Name", defaultValue = max_Top_Name , choices = [str(x) for x in Name_list] + ["None"])
最后,我想过滤该特定名称的记录并随着时间的推移选择时间分辨率,并根据answer 更新框架,如下所示:
selected_widgets = ['DATE_FROM', 'DATE_TO', 'Time_Resolution_Of_Interest', 'Name_Of_Interest']
myList = getArgument(selected_widgets).split(",")
display(df.filter(df.isin(myList)))
我希望通过小部件值来达到下表名称:X1
和时间分辨率:00-24
在一定时间内date
从2020-10-20
到2020-11-20
:
+------------+----------+---------+
| Name | date | 00-24 |
+------------+----------+---------+
| X1 |2020-10-20| 137 |
| X1 |2020-10-21| 111 |
| X1 |2020-10-22| 99 |
| X1 |2020-10-23| 123 |
| X1 |2020-10-24| 101 |
| ... | ... | ... |
+------------+----------+---------+
【问题讨论】:
【参考方案1】:您可以做的是首先像您一样构建小部件,然后从小部件中获取单个值并过滤它们以获得最终结果。请参阅下面的示例代码,这可能与您的要求不匹配 1-1,但应该指导您达到您想要的。
创建日期小部件:
from pyspark.sql.functions import min, max
dbutils.widgets.removeAll()
# compute the list of all dates from maximum date available till today
date = [date[0] for date in data.select("date").collect()]
max_min_date = data.select(max('date'),min('date')).first()
min_date = max_min_date['min(date)']
max_date = max_min_date['max(date)']
print(date)
print(min_date)
print(max_date)
dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = date)
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = date)
使用模式创建时间分辨率小部件,这将允许您构建时间列的动态列表:
channel = [f.name for f in data.schema.fields if f.name not in ['name', 'date']]
print(channel)
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])
创建名称小部件:
from pyspark.sql.functions import col
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
NUM_OF_NAMES_FOR_DROPDOWN = 5
#sort by selected time column desc and take 5 rows
name_limit = [name[0] for name in
data.select("Name").orderBy(col(dropdownColumn), ascending=False).take(NUM_OF_NAMES_FOR_DROPDOWN)]
dbutils.widgets.dropdown(name = "Name", defaultValue = 'X1' , choices = [str(x) for x in name_limit] + ["None"])
最后,根据小部件值过滤数据:
date_from_val = dbutils.widgets.get("DATE_FROM")
date_to_val = dbutils.widgets.get("DATE_TO")
time_val = dbutils.widgets.get("Time_Resolution_Of_Interest")
name_val = dbutils.widgets.get("Name")
result = data.select("name", time_val).where(f"name = 'name_val' and date between 'date_from_val' and 'date_to_val'")
display(result)
【讨论】:
请注意这个post,因为它与流数据上训练模型的日常保存\加载过程中的数据工程有关。以上是关于基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合