如何在 PySpark UDF 中使用边缘情况解决分配问题(如 Hungarian/linear_sum_assignment)

Posted

技术标签:

【中文标题】如何在 PySpark UDF 中使用边缘情况解决分配问题(如 Hungarian/linear_sum_assignment)【英文标题】:How to solve an assignment problem (like Hungarian/linear_sum_assignment) with an edge case in PySpark UDF 【发布时间】:2021-09-22 19:38:54 【问题描述】:

我有一个分配问题,我想向 SO 社区询问为我的 spark 数据框(使用 spark 3.1+)实现此任务的最佳方法。我将首先描述问题,然后再进行实施。

问题是:我有最多 N 个任务和最多 N 个个人(在这个问题的情况下,N=10)。每个人都有执行每项任务的成本,其中最低成本为 0 美元,最高成本为 10 美元。这是一个匈牙利算法问题,有一些注意事项。

    在某些情况下,任务少于 10 个和/或个人少于 10 个,可以不为某人分配任务(或不为任务分配个人)。李> [更复杂的边缘情况/我遇到问题的那个] - 列表中可能有一项任务具有标志 multiTask=True(不能超过 1 个multiTask,并且可能存在没有)。如果一个worker的multiTask的cost小于x,他会被自动分配到multiTask,并且multiTask被认为是在优化过程中被占用。 我将分享几个例子。在此示例中,要分配给多任务的 x 值为 1。 如果 10 名工人中有 1 名在多任务上的成本为 0.25,则将他分配给多任务,然后将其他 9 名工人分配给其他 9 项任务 如果 10 名工人中有 2 名工人在 multiTask 上的成本 如果所有 10 个工作人员在 multiTask 上的成本均 如果在 multiTask 上没有任何工作人员的成本

这是 spark 数据框的样子。 注意:为了简单起见,我展示了一个示例,其中 N=3(3 个任务,3 个个人)。

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])

df = spark.createDataFrame(rdd)

您会看到有一个日期/位置,因为我需要为每个日期/位置分组解决此分配问题。 我计划通过使用dense_rank()根据他们的ID为每个工作人员和任务分配一个“索引”,然后使用pandas UDF,根据索引填充N x N numpy数组,并调用linear_sum_assignment函数来解决这个问题.但是,由于我在 multiTask 中布置的第二个边缘案例,我不相信这个计划会奏效。

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")

# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)


def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
  df_dict = pandas_df.to_dict('records')
  # in case there are less than N rows/columns
  N = max(pandas_df.shape[0], pandas_df.shape[1])
  arr = np.zeros((N,N))
  for row in df_dict: 
    # worker_idx will be the row number, task idx will be the col number
    worker_idx = row.get('worker_idx')
    task_idx = row.get('task_idx')
    arr[worker_idx][task_idx] = row.get('cost')
  rids, cids = linear_sum_assignment(n)
  
  return_list = []
  # now want to return a dataframe that says which task_idx a worker has 
  for r, c in zip(rids, cids):
    for d in df_dict: 
      if d.get('worker_idx') == r:
        d['task_assignment'] = c
        return_list.append(d)
  return pd.DataFrame(return_list)
      
  
  
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)

df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))

如您所见,这个案例根本没有涵盖多任务。我想以最有效的方式解决这个问题,所以我不依赖于 pandas udf 或 scipy。

【问题讨论】:

【参考方案1】:

我对您使用的库一无所知,因此无法帮助您编写代码,但我认为您应该分两步完成:

    如果需要将工作人员分配给多任务,则将他们分配给它。如果有人被分配到此任务,请不要将其包含在您的成本矩阵中。 照常使用匈牙利算法(或其他替代算法)为工作人员分配任务。

基本的匈牙利算法仅适用于平方成本矩阵,看起来您已经通过用 0 填充成本矩阵来正确处理该问题,但是对矩形矩阵的算法进行了一些修改。您可能想看看您是否可以使用其中一种替代方法,因为它可能会更快。

【讨论】:

是的,这就是我计划采用的方法。但是,我遇到的麻烦是如何最好地有效地实现它,因为如果我事先为每个人分配一个索引,然后必须删除分配给多任务和多任务的人(可能),他们的行/列将为 0 我无法事先分配索引并在我查看是否有人分配给多任务后分配它们,但这感觉就像很多循环 匈牙利算法是 O(n^3) 或 O(n^4),具体取决于实现。为了减小 n 的大小,在该算法之外增加一两个循环是一个很好的折衷方案。

以上是关于如何在 PySpark UDF 中使用边缘情况解决分配问题(如 Hungarian/linear_sum_assignment)的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark的列中找到列表的平均值?

udf(用户定义函数)如何在 pyspark 中工作?

如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?

如何在pyspark withcolumn中使用udf和class

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark