基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?

Posted

技术标签:

【中文标题】基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?【英文标题】:What is the best PySpark practice to dynamically retrieve/filter the Spark frame based on extracting widget values on top of notebook in DataBrick?基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是什么? 【发布时间】:2021-11-24 20:23:42 【问题描述】:

假设我有一个名为 df 的 Spark 数据框:

                        <------Time-resolution-------->
+------------+----------+---------+---------+---------+
|    Name    |   date   |  00-24  |  00-12  |  12-24  |
+------------+----------+---------+---------+---------+
|     X1     |2020-10-20|   137   |   68    |   69    |
|     X2     |2020-10-22|   132   |   66    |   66    |
|     X3     |2020-10-24|   132   |   64    |   68    |
|     X4     |2020-10-25|   587   |   292   |   295   |
|     X5     |2020-10-29|   134   |   67    |   67    |
+------------+----------+---------+---------+---------+

我想在 DataBricks 介质中使用 PySpark 在我的笔记本顶部创建 4 个小部件,以 dbutils.widgets.dropdown() 的形式从可用数据中获取如下:

DATE_FROM DATE_TO Time_Resolution_Of_Interest(00-24|00-12|12-24 之一) Name_Of_Interest(前 3 个名称​​基于感兴趣的时间分辨率列的降序排列

我根据这个answer和那个answer尝试了以下内容:

我可以为下面的第一个和第二个项目做到这一点:

dbutils.widgets.removeAll()

# compute the list of all dates from maximum date available till today
max_date = df.select(F.max('date')).first()['max(date)']
min_date = df.select(F.min('date')).first()['min(date)']
print(min_date)
print(max_date)

dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = ['date'])
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = ['date'])
#dbutils.widgets.text(name = "DATE_FROM", defaultValue = min_date")
#dbutils.widgets.text(name = "DATE_TO",   defaultValue = max_date)

对于第 3 项,我有一个愚蠢的想法:

channel = ['00-24', '00-12', '12-24']
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])

对于最后一项,我想列出感兴趣的名称,但我无法映射 String 并像 Scala 版本一样传递它

#Get interested Time resolution from widget
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
# compute the list 5 top names in interested time resolution 
max_Top_Name = df.select(F.max(dropdownColumn)).first()[dropdownColumn]

NUM_OF_NAMES_FOR_DROPDOWN = 5

#Scala version works
#val Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).map(i=>i.getAs[String]("Name"))
#dbutils.widgets.dropdown("Name", "X1", Name_list.toSeq , "Username Of Interes")

#PySpark version doesn't work
Name_list = df.select("Name").take(NUM_OF_NAMES_FOR_DROPDOWN).rdd.flatMap(lambda x: x).collect()
dbutils.widgets.dropdown(name = "Name", defaultValue = max_Top_Name , choices = [str(x) for x in Name_list] + ["None"])

最后,我想过滤该特定名称的记录并随着时间的推移选择时间分辨率,并根据answer 更新框架,如下所示:

selected_widgets = ['DATE_FROM', 'DATE_TO', 'Time_Resolution_Of_Interest', 'Name_Of_Interest']
myList = getArgument(selected_widgets).split(",")
display(df.filter(df.isin(myList)))

我希望通过小部件值来达到下表名称:X1 和时间分辨率:00-24 在一定时间内date2020-10-202020-11-20

+------------+----------+---------+
|    Name    |   date   |  00-24  | 
+------------+----------+---------+
|     X1     |2020-10-20|   137   |  
|     X1     |2020-10-21|   111   | 
|     X1     |2020-10-22|   99    | 
|     X1     |2020-10-23|   123   | 
|     X1     |2020-10-24|   101   |  
|    ...     |    ...   |   ...   |  
+------------+----------+---------+

【问题讨论】:

【参考方案1】:

您可以做的是首先像您一样构建小部件,然后从小部件中获取单个值并过滤它们以获得最终结果。请参阅下面的示例代码,这可能与您的要求不匹配 1-1,但应该指导您达到您想要的。

创建日期小部件:

from pyspark.sql.functions import min, max

dbutils.widgets.removeAll()

# compute the list of all dates from maximum date available till today
date = [date[0] for date in data.select("date").collect()]
max_min_date = data.select(max('date'),min('date')).first()
min_date = max_min_date['min(date)']
max_date = max_min_date['max(date)']
print(date)
print(min_date)
print(max_date)

dbutils.widgets.dropdown(name = "DATE_FROM", defaultValue = min_date , choices = date)
dbutils.widgets.dropdown(name = "DATE_TO", defaultValue = max_date, choices = date)

使用模式创建时间分辨率小部件,这将允许您构建时间列的动态列表:

channel = [f.name for f in data.schema.fields if f.name not in ['name', 'date']]
print(channel)
dbutils.widgets.dropdown(name = "Time_Resolution_Of_Interest", defaultValue = "00-24" , choices = [str(x) for x in channel] + ["None"])

创建名称小部件:

from pyspark.sql.functions import col
dropdownColumn = dbutils.widgets.get("Time_Resolution_Of_Interest")
NUM_OF_NAMES_FOR_DROPDOWN = 5

#sort by selected time column desc and take 5 rows
name_limit = [name[0] for name in 
data.select("Name").orderBy(col(dropdownColumn), ascending=False).take(NUM_OF_NAMES_FOR_DROPDOWN)]
dbutils.widgets.dropdown(name = "Name", defaultValue = 'X1' , choices = [str(x) for x in name_limit] + ["None"])

最后,根据小部件值过滤数据:

date_from_val = dbutils.widgets.get("DATE_FROM")
date_to_val = dbutils.widgets.get("DATE_TO")
time_val = dbutils.widgets.get("Time_Resolution_Of_Interest")
name_val = dbutils.widgets.get("Name")

result = data.select("name", time_val).where(f"name = 'name_val' and date between 'date_from_val' and  'date_to_val'")

display(result)

【讨论】:

请注意这个post,因为它与流数据上训练模型的日常保存\加载过程中的数据工程有关。

以上是关于基于在 DataBrick 中的笔记本顶部提取小部件值来动态检索/过滤 Spark 框架的最佳 PySpark 实践是啥?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合

学习笔记:在Opencv下基于ORB的图像特征提取

从 Databrick 文件系统读取文件

从 databrick 在 adls gen 1 中写入 tsv 文件时行分隔符更改

取消绑定 wxPython 中的默认按钮行为

论文笔记OPTIPROMPT:用prompt提取预训练模型中的客观事实