增加了子集匹配维度的区间树?

Posted

技术标签:

【中文标题】增加了子集匹配维度的区间树?【英文标题】:Interval tree with added dimension of subset matching? 【发布时间】:2017-06-16 12:56:13 【问题描述】:

这是一个关于一个有点复杂的问题的算法问题。基础是这样的:

一个基于可用槽预留槽的调度系统。插槽有一定的标准,我们称之为标签。如果可用槽的标记集是保留槽的超集,则保留通过这些标记与可用槽匹配。

作为一个具体的例子,以这个场景为例:

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+

在 11:00 到 12:30 之间可以预订标签 AB,从 12:00 到 13:30 CD 可以预订,并且有一个从大约 12:00 到 12:30 重叠。

11:00  12:00  13:00
+--------+
| A, B   |
+--------+
       +--------+
       | C, D   |
       +--------+
  xxxxxx
  x A  x
  xxxxxx

这里已经预订了A,因此在11:15-ish 和12:00-ish 之间无法预订AB

简而言之就是这个想法。可用插槽没有具体限制:

一个可用的插槽可以包含任意数量的标签 任意数量的时隙可以随时重叠 槽的长度是任意的 保留可以包含任意数量的标签

系统中唯一需要遵守的规则是:

添加预留时,至少有一个剩余的可用插槽必须与预留中的所有标签匹配

澄清一下:如果同时有两个可用的插槽,例如标签A,那么此时可以为A 进行两个预留,但不能再预留了。

我正在使用 interval tree 的修改实现;快速概览:

所有可用槽都添加到区间树中(保留重复/重叠) 所有保留槽都被迭代并且: 从树中查询与预订时间匹配的所有可用时隙 与预订标签匹配的第一个被切片,并从树中移除切片

当这个过程完成后,剩下的就是剩余的可用槽片,我可以查询是否可以为特定时间进行新的预订并添加它。

数据结构看起来像这样:


  type: 'available', 
  begin: 1497857244, 
  end: 1497858244, 
  tags: [ foo: 'bar' ,  baz: 42 ]


  type: 'reserved', 
  begin: 1497857345, 
  end: 1497857210, 
  tags: [ foo: 'bar' ]

标签本身就是键值对象,它们的列表就是一个“标签集”。如果有帮助,可以将它们序列化;到目前为止,我使用的是 Python set 类型,这使得比较很容易。时隙开始/结束时间是树中的 UNIX 时间戳。我并不是特别喜欢这些特定的数据结构,如果有用,我可以重构它们。


我面临的问题是这不是没有错误的;每隔一段时间,预订就会潜入与其他预订发生冲突的系统中,我还不知道这到底是怎么发生的。当标签以复杂的方式重叠时,它也不是很聪明,需要计算最佳分布,以便所有预留可以尽可能地适合可用的插槽;事实上,目前在重叠场景中如何将预订与可用时隙进行匹配是不确定的。

我想知道的是:区间树主要用于此目的,但我当前的系统将标签集匹配作为附加维度添加到此系统是笨重且固定的; 是否有可以优雅地处理此问题的数据结构或算法?

必须支持的操作:

    向系统查询与特定标签集匹配的可用插槽(考虑可能会降低可用性但它们本身不属于所述标签集的保留;例如,在上面的示例中查询B 的可用性)。李> 确保不能将没有匹配的可用插槽的预留添加到系统中。

【问题讨论】:

您没有很清楚地解释输入。只要标签正确匹配,是否可以将给定预留分配给任何插槽?或者预订是否附有时间间隔? 我已经阐明了使用的数据结构。是的,预订只需与可用空档的标签和时间相匹配即可。 不是在树中拥有所有可用的插槽然后从中删除它们,而是从一棵空树开始然后尝试用保留填充它是否有意义,这样就不会有冲突? @Florian 我对任何方法都完全开放。根据我发布的数据结构(甚至是可延展的),我只有一堆值,并且需要能够 1)获得可按标签集过滤的仍然可用的插槽,以及 2)检查是否可以添加新的预订。 @Florian 以 Apple Store Genius Bar 预订之类的方式为例:您只是在预订一个与 某人 谈论您的 iPhone 的时间,比如说,哪个特别你得到的插槽或人。届时只有有知识的人可以与您讨论您的 iPhone,但会有多个人,您将在那时和那里弄清楚您究竟得到了哪一个。跨度> 【参考方案1】:

您的问题可以使用constraint programming 解决。在 python 中,这可以使用python-constraint 库来实现。

首先,我们需要一种方法来检查两个插槽是否彼此一致。这是一个函数,如果两个插槽共享一个标签并且它们的帧重叠,则返回 true。在python中,这可以使用以下函数来实现

def checkNoOverlap(slot1, slot2):
    shareTags = False
    for tag in slot1['tags']:
        if tag in slot2['tags']:
            shareTags = True
            break    
    if not shareTags: return True
    return not (slot2['begin'] <= slot1['begin'] <= slot2['end'] or 
                slot2['begin'] <= slot1['end'] <= slot2['end'])

我不确定您是否希望标签完全相同(如 foo: bar 等于 foo: bar)或只有键(如 foo: bar 等于 foo: qux),但是你可以在上面的函数中改变它。

一致性检查

我们可以将 python-constraint 模块用于您要求的两种功能。

第二个功能是最简单的。为了实现这一点,我们可以使用函数isConsistent(set),它将提供的数据结构中的插槽列表作为输入。然后该函数会将所有插槽提供给 python-constraint 并检查插槽列表是否一致(没有 2 个不应该重叠的插槽,重叠)并返回一致性。

def isConsistent(set):
        #initialize python-constraint context
        problem = Problem()
        #add all slots the context as variables with a singleton domain
        for i in range(len(set)):
            problem.addVariable(i, [set[i]])        
        #add a constraint for each possible pair of slots
        for i in range(len(set)):
            for j in range(len(set)):
                #we don't want slots to be checked against themselves
                if i == j:
                    continue
                #this constraint uses the checkNoOverlap function
                problem.addConstraint(lambda a,b: checkNoOverlap(a, b), (i, j))
        # getSolutions returns all the possible combinations of domain elements
        # because all domains are singleton, this either returns a list with length 1 (consistent) or 0 (inconsistent)
        return not len(problem.getSolutions()) == 0

只要用户想添加一个预留槽,就可以调用这个函数。可以将输入槽添加到已经存在的槽列表中,并且可以检查一致性。如果一致,则保留新的插槽。否则,新的插槽重叠,应该被拒绝。

寻找可用的插槽

这个问题有点棘手。我们可以使用与上述相同的功能,并进行一些重大更改。我们现在不想将新插槽与现有插槽一起添加,而是将所有可能的插槽添加到已经存在的插槽中。然后我们可以检查所有可能的槽与保留槽的一致性,并询问约束系统是否一致。

因为如果我们不做任何限制,可能的槽数是无限的,所以我们首先需要为程序声明一些参数:

MIN = 149780000 #available time slots can never start earlier then this time
MAX = 149790000 #available time slots can never start later then this time
GRANULARITY = 1*60 #possible time slots are always at least one minut different from each other

我们现在可以继续主函数了。它看起来很像一致性检查,但我们现在添加了一个变量来发现所有可用的插槽,而不是来自用户的新插槽。

def availableSlots(tags, set):
    #same as above
    problem = Problem()
    for i in range(len(set)):
        problem.addVariable(i, [set[i]])
    #add an extra variable for the available slot is added, with a domain of all possible slots
    problem.addVariable(len(set), generatePossibleSlots(MIN, MAX, GRANULARITY, tags))
    for i in range(len(set) +1):
        for j in range(len(set) +1):
            if i == j:
                continue
            problem.addConstraint(lambda a, b: checkNoOverlap(a, b), (i, j))
    #extract the available time slots from the solution for clean output
    return filterAvailableSlots(problem.getSolutions())

我使用一些辅助函数来保持代码更简洁。它们都包含在此处。

def filterAvailableSlots(possibleCombinations):
    result = []
    for slots in possibleCombinations:
        for key, slot in slots.items():
            if slot['type'] == 'available':
                result.append(slot)

    return result

def generatePossibleSlots(min, max, granularity, tags):
    possibilities = []
    for i in range(min, max - 1, granularity):
        for j in range(i + 1, max, granularity):
            possibleSlot = 
                              'type': 'available',
                              'begin': i,
                              'end': j,
                              'tags': tags
            
            possibilities.append(possibleSlot)
    return tuple(possibilities)

您现在可以将函数 getAvailableSlots(tags, set) 与您想要的可用插槽和一组已保留插槽的标签一起使用。请注意,此函数实际上返回所有一致的可能槽,因此无需努力寻找最大长度的槽或进行其他优化。

希望这会有所帮助! (我让它按照你在我的 pycharm 中描述的那样工作)

【讨论】:

问题:在比较方法中,您似乎没有区分可用插槽和保留插槽……我是否应该将所有插槽(可用和保留)输入Problem 而不区分?那么checkNoOverlap 应该在那里做出一些让步,因为可用的插槽也可能“重叠”,不是吗? 是的,您应该提供固定的保留槽,以及可以在问题范围内变化的可用槽。然后,Problem 类将返回保留槽的所有正确组合以及可用槽变量的域的可能性。 请记住,为每个保留槽添加一个变量很重要,其中一个大小为 1 的域仅包含该槽,并且只有一个变量用于可用槽,一个域包含所有可能的可用插槽生成 那是为了获取所有可用的插槽?我目前正在尝试一致性检查的第一步。 ——“重叠”和“包含”不应该有区别吗?预订必须由可用的插槽“包含”,仅仅重叠是不行的。 类似的东西,是的。感谢您澄清您现有的样本并没有完全涵盖它。我会继续尝试这个。显然,您会很高兴收到您身边的更新样本。 :o)【参考方案2】:

这是一个解决方案,我将包括下面的所有代码。

1。创建一个插槽表和一个预留表

2。创建一个保留 x 槽的矩阵

根据保留槽组合是否可能由真值或假值填充

3。找出允许最多保留槽组合的最佳映射

注意:我当前的解决方案不适用于非常大的数组,因为它涉及遍历列表的所有可能排列,其中大小 = 插槽数。我已经发布了another question,看看是否有人能找到更好的方法。不过这个方案是准确的,可以优化

Python 代码源码

第 1 部分

from IPython.display import display
import pandas as pd
import datetime

available_data = [
    ['SlotA', datetime.time(11, 0, 0), datetime.time(12, 30, 0), set(list('ABD'))],
    ['SlotB',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('C'))],
    ['SlotC',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('ABCD'))],
    ['SlotD',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('AD'))],
]

reservation_data = [
    ['ReservationA', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('AD'))],
    ['ReservationB', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('A'))],
    ['ReservationC', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationD', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
    ['ReservationE', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('D'))]
]

reservations = pd.DataFrame(data=reservation_data, columns=['reservations', 'begin', 'end', 'tags']).set_index('reservations')
slots = pd.DataFrame(data=available_data, columns=['slots', 'begin', 'end', 'tags']).set_index('slots')

display(slots)
display(reservations)

第 2 部分

def is_possible_combination(r):
    return (r['begin'] >= slots['begin']) & (r['end'] <= slots['end']) & (r['tags'] <= slots['tags'])

solution_matrix = reservations.apply(is_possible_combination, axis=1).astype(int)
display(solution_matrix)

第三部分

import numpy as np
from itertools import permutations

# add dummy columns to make the matrix square if it is not
sqr_matrix = solution_matrix
if sqr_matrix.shape[0] > sqr_matrix.shape[1]:
    # uhoh, there are more reservations than slots... this can't be good
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc[:,'FakeSlot' + str(i)] = [1] * sqr_matrix.shape[0]
elif sqr_matrix.shape[0] < sqr_matrix.shape[1]:
    # there are more slots than customers, why doesn't anyone like us?
    for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
        sqr_matrix.loc['FakeCustomer' + str(i)] = [1] * sqr_matrix.shape[1]

# we only want the values now
A = solution_matrix.values.astype(int)

# make an identity matrix (the perfect map)
imatrix = np.diag([1]*A.shape[0])

# randomly swap columns on the identity matrix until they match. 
n = A.shape[0]

# this will hold the map that works the best
best_map_so_far = np.zeros([1,1])

for column_order in permutations(range(n)):
    # this is an identity matrix with the columns swapped according to the permutation
    imatrix = np.zeros(A.shape)
    for row, column in enumerate(column_order):
        imatrix[row,column] = 1

    # is this map better than the previous best?
    if sum(sum(imatrix * A)) > sum(sum(best_map_so_far)):
        best_map_so_far = imatrix

    # could it be? a perfect map??
    if sum(sum(imatrix * A)) == n:
        break

if sum(sum(imatrix * A)) != n:
    print('a perfect map was not found')

output = pd.DataFrame(A*imatrix, columns=solution_matrix.columns, index=solution_matrix.index, dtype=int)
display(output)

【讨论】:

使用Arne's constraint programming… 可能会优化测试所有组合?感谢您的回答,我会深入研究它,看看它会导致什么。 是的,这行得通。解释起来很棘手,但是您可以根据可能组合的矩阵来限制组合(因为无论如何您只想注意其中包含 1 的元素)。但这实际上可能更难以使用约束库进行编码,我稍后会尝试做一些事情 是的,我也很难用 Arne 的回答表达所有适用的约束,所以我欢迎这种替代方法。我一有时间就会玩弄它。【参考方案3】:

Arne 和 tinker 建议的方法都有帮助,但最终还不够。我想出了一种混合方法,可以很好地解决这个问题。

主要的问题是它是一个三维的问题,很难一次性解决所有的维度。这不仅仅是匹配时间重叠或标签重叠,而是匹配 时间片 与标签重叠。根据时间甚至标签将插槽与其他插槽匹配很简单,但是将已经匹配的可用性插槽与另一个时间的另一个预订匹配则非常复杂。这意味着,在这种情况下,一个可用性可以涵盖不同时间的两个预订:

+---------+
| A, B    |
+---------+
xxxxx xxxxx
x A x x A x
xxxxx xxxxx

试图将其融入基于约束的编程中需要极其复杂的约束关系,这很难管理。我的解决方案是简化问题……

删除一维

它不是一次解决所有维度,而是极大地简化了问题,在很大程度上消除了时间维度。我通过使用现有的区间树并根据需要对其进行切片来做到这一点:

def __init__(self, slots):
    self.tree = IntervalTree(slots)

def timeslot_is_available(self, start: datetime, end: datetime, attributes: set):
    candidate = Slot(start.timestamp(), end.timestamp(), dict(type=SlotType.RESERVED, attributes=attributes))
    slots = list(self.tree[start.timestamp():end.timestamp()])
    return self.model_is_consistent(slots + [candidate])

为了查询一个特定的槽是否可用,我只取在那个特定时间相关的槽(self.tree[..:..]),这将计算的复杂性降低到一个本地化的子集:

  |      |             +-+ = availability
+-|------|-+           xxx = reservation
  |  +---|------+
xx|x  xxx|x
  |  xxxx|
  |      |

然后我确认那个窄片内的一致性:

@staticmethod
def model_is_consistent(slots):
    def can_handle(r):
        return lambda a: r.attributes <= a.attributes and a.contains_interval(r)

    av = [s for s in slots if s.type == SlotType.AVAILABLE]
    rs = [s for s in slots if s.type == SlotType.RESERVED]

    p = Problem()
    p.addConstraint(AllDifferentConstraint())
    p.addVariables(range(len(rs)), av)

    for i, r in enumerate(rs):
        p.addConstraint(can_handle(r), (i,))

    return p.getSolution() is not None

(我在这里省略了一些优化和其他代码。)

这部分是 Arne 和 tinker 建议的混合方法。它使用 tinker 建议的矩阵算法,使用基于约束的编程来查找匹配槽。基本上:如果有任何解决方案可以将所有预留分配到不同的可用时隙,则此时间片处于一致状态。由于我传入了所需的预留槽,如果模型包括该槽在内仍然是一致的,这意味着预留该槽是安全的。

如果在这个狭窄的窗口内有两个短期预订可分配给相同的可用性,这仍然存在问题,但这种情况的可能性很低,结果仅仅是可用性查询的假阴性;误报会更成问题。

寻找可用的插槽

找到所有可用的插槽是一个更复杂的问题,因此再次进行一些简化是必要的。首先,只能查询模型以获取特定标签集的可用性(没有“给我所有全局可用槽”),其次只能以特定粒度(所需槽长度)查询。这非常适合我的特定用例,在这种情况下,我只需要向用户提供他们可以保留的插槽列表,例如 9:15-9:30、9:30-9:45 等。时间>。通过重用上面的代码,这使得算法非常简单:

def free_slots(self, start: datetime, end: datetime, attributes: set, granularity: timedelta):
    slots = []
    while start < end:
        slot_end = start + granularity
        if self.timeslot_is_available(start, slot_end, attributes):
            slots.append((start, slot_end))
        start += granularity

    return slots

换句话说,它只是在给定的时间间隔内遍历所有可能的插槽,并逐字检查该插槽是否可用。这是一种蛮力解决方案,但效果很好。

【讨论】:

以上是关于增加了子集匹配维度的区间树?的主要内容,如果未能解决你的问题,请参考以下文章

作为事实表子集的维度有啥用?

Greenplum 实时数据仓库实践——维度表技术

Greenplum 实时数据仓库实践——维度表技术

Greenplum 实时数据仓库实践——维度表技术

python 如何减少一个维度

CountVectorizer MultinomialNB ValueError:维度不匹配