databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值

Posted

技术标签:

【中文标题】databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值【英文标题】:databricks udf broadcast dictionary value returns list of dictionaries; can't access values from dictionaries in that list 【发布时间】:2021-04-01 13:28:04 【问题描述】:

下面的代码是基于udf dictionary broadcast 博客发布的。在下面的代码中,广播字典应该返回一个字典列表,即预期从广播字典返回 'key':['key':'value','key2':'value','key':'value ','key2':'值']

当我尝试从列表中的字典访问任何键时,不会返回任何值。我知道这一点是因为我收到错误“分配前引用的局部变量 'tmp'”。此变量应包含由列表中每个字典中定义的操作产生的计算值。谁能告诉我为什么或如何调试这个或为什么字典键没有返回值?我在返回广播密钥的代码中添加了打印语句,但它从未出现在输出或驱动程序日志中。

广播词典

calc dict 'name': 'Lvl1_Name_Score', 'operations': ['operator': 'multi', 'value': 1.5, 'operator': 'div', 'value': 1]



def apply_weights_a(calc_broadcasted,key): 
    def apply_weights(col_name):
        calc_operations = calc_broadcasted.value.get(key)
        results = 0
        print(f'calc_operations calc_operations')
        for op in calc_operations:
            print(f'op op')
            # for i,op in enumerate(calc['dependencies']
            if op['operator'] == 'div': tmp = col_name / float(op['value'])
            elif op['operator'] == 'multi': tmp = col_name * float(op['value'])
            elif op['operator'] == 'sub':tmp = col_name - float(op['value'])
            elif op['operator'] == 'add': tmp = col_name + float(op['value'])
        # sdf = sdf['tmp'].apply(lambda x: 0 if x < 0 else x)
        results = results + tmp
        return results
    return udf(apply_weights)


def multi_apply_weights(col_names,weights): 
    def inner(sdf):
        for col_name in col_names:
            #print(f"weights weights")
            size = len(col_name)  #get col_name without _c
            calc = [mylist for mylist in weights if mylist['name'] == col_name[:size-2] ]
            #print(f"calc calc")
            calc_dict = calc[0]
            #print(f"calc dict calc[0]")
            b = spark.sparkContext.broadcast(calc_dict)
            sdf = sdf.withColumn(
                col_name,
                apply_weights_a(b,'operations')(col_name)
            )
        return sdf
    return inner

sdf = multi_apply_weights(weight_list,config['algorithmScores'])(sdf)
                                    

此 spark 代码是转换此 pandas 函数的结果,该函数在一组列上执行 1 到多个不同的计算...

def apply_weights(df, weights):
    for calc in weights:
        col = calc['name']
        for op in calc['operations']:
            if op['operator'] == 'div':
                df[col + '_w'] = df[col]/float(op['value'])
            elif op['operator'] == 'multi':
                df[col+ '_w'] = df[col]*float(op['value'])
            elif op['operator'] == 'sub':
                df[col+ '_w'] = df[col]-float(op['value'])
            elif op['operator'] == 'add':
                df[col+ '_w'] = df[col]+float(op['value'])
        df[col] = df[col].apply(lambda x: 0 if x < 0 else x)

    return df

【问题讨论】:

如果你让你的代码更简约,你可能会有更好的运气?有很多代码要进入。见How to create a Minimal, Reproducible Example。 【参考方案1】:

我发现我可以通过做一个小的改动来运行原始的 python 代码。这适用于我的 4 个功能中的 3 个。在我的最后一个函数上使用这种方法时,它导致我的集群崩溃并出现错误“火花驱动程序已意外停止并正在重新启动。您的笔记本将自动重新附加'。我已经向 microsoft 开了一张票,以获得额外的帮助。

def apply_weights(kdf, weights) -> ks.DataFrame[zip(kdf.dtypes, kdf.columns)]:

我还解决了广播字典的问题,方法是在广播之前添加一个键。我把广播改​​成

brdcst = dict("rules":newcolrules)
b = spark.sparkContext.broadcast(brdcst) 

calcs = b.value.get('rules') #test broadcasted dictionary
print(f"calcs : calcs") #test broadcasted dictionary

【讨论】:

以上是关于databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Python 字典作为列值执行 SQL 插入命令

Spark 2.4 上带有字典的 UDF

Pyspark UDF 无法使用大字典

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

为啥字典查找会导致“不可散列类型”错误,但使用直接值不会?

将字典保存为 pyspark 数据框并加载它 - Python、Databricks