databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值
Posted
技术标签:
【中文标题】databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值【英文标题】:databricks udf broadcast dictionary value returns list of dictionaries; can't access values from dictionaries in that list 【发布时间】:2021-04-01 13:28:04 【问题描述】:下面的代码是基于udf dictionary broadcast 博客发布的。在下面的代码中,广播字典应该返回一个字典列表,即预期从广播字典返回 'key':['key':'value','key2':'value','key':'value ','key2':'值']
当我尝试从列表中的字典访问任何键时,不会返回任何值。我知道这一点是因为我收到错误“分配前引用的局部变量 'tmp'”。此变量应包含由列表中每个字典中定义的操作产生的计算值。谁能告诉我为什么或如何调试这个或为什么字典键没有返回值?我在返回广播密钥的代码中添加了打印语句,但它从未出现在输出或驱动程序日志中。
广播词典
calc dict 'name': 'Lvl1_Name_Score', 'operations': ['operator': 'multi', 'value': 1.5, 'operator': 'div', 'value': 1]
def apply_weights_a(calc_broadcasted,key):
def apply_weights(col_name):
calc_operations = calc_broadcasted.value.get(key)
results = 0
print(f'calc_operations calc_operations')
for op in calc_operations:
print(f'op op')
# for i,op in enumerate(calc['dependencies']
if op['operator'] == 'div': tmp = col_name / float(op['value'])
elif op['operator'] == 'multi': tmp = col_name * float(op['value'])
elif op['operator'] == 'sub':tmp = col_name - float(op['value'])
elif op['operator'] == 'add': tmp = col_name + float(op['value'])
# sdf = sdf['tmp'].apply(lambda x: 0 if x < 0 else x)
results = results + tmp
return results
return udf(apply_weights)
def multi_apply_weights(col_names,weights):
def inner(sdf):
for col_name in col_names:
#print(f"weights weights")
size = len(col_name) #get col_name without _c
calc = [mylist for mylist in weights if mylist['name'] == col_name[:size-2] ]
#print(f"calc calc")
calc_dict = calc[0]
#print(f"calc dict calc[0]")
b = spark.sparkContext.broadcast(calc_dict)
sdf = sdf.withColumn(
col_name,
apply_weights_a(b,'operations')(col_name)
)
return sdf
return inner
sdf = multi_apply_weights(weight_list,config['algorithmScores'])(sdf)
此 spark 代码是转换此 pandas 函数的结果,该函数在一组列上执行 1 到多个不同的计算...
def apply_weights(df, weights):
for calc in weights:
col = calc['name']
for op in calc['operations']:
if op['operator'] == 'div':
df[col + '_w'] = df[col]/float(op['value'])
elif op['operator'] == 'multi':
df[col+ '_w'] = df[col]*float(op['value'])
elif op['operator'] == 'sub':
df[col+ '_w'] = df[col]-float(op['value'])
elif op['operator'] == 'add':
df[col+ '_w'] = df[col]+float(op['value'])
df[col] = df[col].apply(lambda x: 0 if x < 0 else x)
return df
【问题讨论】:
如果你让你的代码更简约,你可能会有更好的运气?有很多代码要进入。见How to create a Minimal, Reproducible Example。 【参考方案1】:我发现我可以通过做一个小的改动来运行原始的 python 代码。这适用于我的 4 个功能中的 3 个。在我的最后一个函数上使用这种方法时,它导致我的集群崩溃并出现错误“火花驱动程序已意外停止并正在重新启动。您的笔记本将自动重新附加'。我已经向 microsoft 开了一张票,以获得额外的帮助。
def apply_weights(kdf, weights) -> ks.DataFrame[zip(kdf.dtypes, kdf.columns)]:
我还解决了广播字典的问题,方法是在广播之前添加一个键。我把广播改成
brdcst = dict("rules":newcolrules)
b = spark.sparkContext.broadcast(brdcst)
calcs = b.value.get('rules') #test broadcasted dictionary
print(f"calcs : calcs") #test broadcasted dictionary
【讨论】:
以上是关于databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值的主要内容,如果未能解决你的问题,请参考以下文章