Python中可变数据的重复数据删除/合并

Posted

技术标签:

【中文标题】Python中可变数据的重复数据删除/合并【英文标题】:Deduplication/merging of mutable data in Python 【发布时间】:2021-12-06 16:59:52 【问题描述】:

问题的高级视图

我有 X 个源,其中包含有关我们环境中资产(主机名、IP、MAC、操作系统等)的信息。源包含从 1500 到 150k 的条目(至少是我现在使用的条目)。我的脚本应该查询它们中的每一个,收集这些数据,通过合并来自不同来源的相同资产的信息来消除重复数据,并返回所有条目的统一列表。我当前的实现确实有效,但对于更大的数据集来说速度很慢。我很好奇是否有更好的方法来完成我想做的事情。

普遍问题 通过合并相似条目来删除重复数据,但需要注意的是,合并两个资产可能会改变生成的资产是否与合并前与前两个相似的第三个资产相似。 示例: ~ 相似性,+ 合并 (之前)A~B~C (之后) (A+B) ~ C 或 (A+B) !~ C

我试着寻找有同样问题的人,我只找到了What is an elegant way to remove duplicate mutable objects in a list in Python?,但它不包括对我来说至关重要的数据合并。

使用的类

为了便于阅读和理解而进行了简化,删除了不需要的部分 - 常规功能完好无损。

class Entry:

    def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = ):
        # SO: Sorting and sanitization removed for simplicity
        self.source = source
        self.mac = mac
        self.ip = ip
        self.hostname = hostname
        self.os = os
        self.details = details

    def __eq__(self, other):
        if isinstance(other, Entry):
            return (self.source == other.source and
                    self.os == other.os and
                    self.hostname == other.hostname and
                    self.mac == other.mac and
                    self.ip == other.ip)
        return NotImplemented

    def is_similar(self, other) -> bool:
        def same_entry(l1: list, l2: list) -> bool:
            return not set(l1).isdisjoint(l2)

        if isinstance(other, Entry):
            if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
                empty_hostnames = self.hostname == [] or other.hostname == []
                empty_macs = self.mac == [] or other.mac == []

                return (same_entry(self.hostname, other.hostname) or
                        (empty_hostnames and same_entry(self.mac, other.mac)) or
                        (empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))

        return False

    def merge(self, other: 'Entry'):
        self.source = _merge_lists(self.source, other.source)
        self.hostname = _merge_lists(self.hostname, other.hostname)
        self.mac = _merge_lists(self.mac, other.mac)
        self.ip = _merge_lists(self.ip, other.ip)
        self.os = self.os if self.os != OS.UNKNOWN else other.os
        self.details = _merge_dicts(self.details, other.details)

    def representation(self) -> str:
        # Might be useful if anyone wishes to run the code
        return f'<Entry from self.source: hostname=self.hostname, MAC=self.mac, IP=self.ip, OS=self.os.value, details=self.details>'

def _merge_lists(l1: list, l2: list):
    return list(set(l1) | set(l2))


def _merge_dicts(d1: dict, d2: dict):
    """
    Merge two dicts without overwriting any data.
    """
    # If either is empty, return the other one
    if not d1:
        return d2
    if not d2:
        return d1
    if d1 == d2:
        return d1

    result = d1
    for k, v in d2.items():
        if k in result:
            result[k + '_'] = v
        else:
            result[k] = v

    return result
class OS(Enum):
    '''
    Enum specifying the operating system of the asset.
    '''
    UNKNOWN = 'Unknown'
    WINDOWS = 'Windows'
    LINUX = 'Linux'
    MACOS = 'MacOS'

算法

每个算法都获取来自不同来源的条目列表,例如: entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]

主要去重功能

它是每个算法中使用的主要函数。它获取来自 2 个不同来源的条目列表,并将其组合到包含资产的列表中,并在需要时合并信息。


这可能是我最需要帮助的部分。这是我能想到的唯一方法。正因为如此,我专注于如何将这个函数的运行速度提高几倍,但是从缩短运行时间的角度来看,让这个函数更快是最好的。


def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
    """
    Deduplicates entries from provided lists by merging similar entries.
    Entries in the lists are supposed to be already deduplicated.
    """
    # If either is empty, return the other one
    if not en1:
        return en2
    if not en2:
        return en1

    result = []

    # Iterate over longer and check for similar in shorter
    if len(en2) > len(en1):
        en1, en2 = en2, en1

    for e in en1:
        # walrus operator in Python 3.8 or newer
        while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
            e.merge(similar)
            en2.remove(similar)
            del similar
        result.append(e)
    result.extend(en2)

    return result

此处不适用常规重复数据删除(例如使用集合)的原因是因为将一个条目与另一个新条目合并可能会变得相似,例如:

In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True

第一种方法 - 顺序

我的第一个想法是最简单的,就是简单的递归。

def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
    """Deduplication helper allowing for providing more than 2 sources."""
    if len(lists) == 1:
        return lists[0]

    return deduplicate(lists[0], dedup_multiple(lists[1:]))

第二种方法 - 使用池的多线程

这就是我目前使用的方法。到目前为止,它是最快的并且相当简单。

def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
    """Asynchronous deduplication helper allowing for providing more than 2 sources."""
    with mp.Pool() as pool:
        while len(lists) > 1:
            if len(lists) % 2 == 1:
                lists.append([])
            data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
            lists = pool.map_async(_internal_deduplication, data).get()
        return lists[0]

def _internal_deduplication(en):
    return deduplicate(*en)

但我很快意识到,如果一项任务比其他任务花费的时间长得多(例如,因为对最大的源进行重复数据删除),那么其他所有任务都会等待而不是工作。

第三种方法 - 使用队列和进程的多线程

当我试图加快第二种方法时,我遇到了How to use python multiprocessing pool in continuous loop 和Filling a queue and managing multiprocessing in python,我想出了以下解决方案。

def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
    tasks_number = min(os.cpu_count(), len(lists) // 2)
    args = lists[:tasks_number]

    with mp.Manager() as manager:
        queue = manager.Queue()

        for l in lists[tasks_number:]:
            queue.put(l)

        processes = []
        for arg in args:
            proc = mp.Process(target=test, args=(queue, arg, ))
            proc.start()
            processes.append(proc)

        for proc in processes:
            proc.join()

        return queue.get()


def test(queue: mp.Queue, arg: List[Entry]):
    while not queue.empty():
        try:
            arg2: List[Entry] = queue.get()
        except Empty:
            continue
        arg = deduplicate(arg, arg2)

    queue.put(arg)

我认为这将是最好的解决方案,因为如果可能的话,不会有不处理数据的时刻,但经过测试,它几乎总是比第二种方法稍慢。

运行时比较

Source A    1510
Source B    1509
Source C    5000
Source D    4460
Source E    5000
Source F    2084

Deduplicating.....
SYNC   - Execution time: 188.6127771000 - Count: 13540
ASYNC  - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A    1510
Source B    1509
Source C    11821
Source D    13871
Source E    5001
Source F    2333

Deduplicating.....
ASYNC  - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405

【问题讨论】:

您是否知道两个相似的条目在合并后会变得不相似?假设A、B、C有相同的mac地址,A没有主机名,B和C有不同的。那么 A 与两者相似。但是将 B 合并到 A 中,现在 A 和 C 不一样了。 是的,我知道这可能会发生;但是,我不确定如何正确解决该问题。如果我决定不合并此类案例,我首先需要发现它们(这将需要大量额外的计算来临时合并并检查相似条目的数量是否已更改)并留下更大的相似条目集。如果我想合并它们,我可以保持原样并希望它只是可忽略的子集,或者我需要在合并时实现某种类型的优先级,这也需要前面解释过的某种类型的发现。 【参考方案1】:

总结:我们定义了两个草图函数 f 和 g 从条目到集合 “草图”使得两个条目 e 和 e' 相似当且仅当 f(e) ∩ g(e′) ≠ ∅。然后我们可以有效地识别合并(参见 算法结束)。

我实际上要定义四个草图函数,fos, faddr、gos 和 gaddr,我们从中 构造

f(e) = (x, y) | x ∈ fos(e), y ∈ faddr(e) g(e) = (x, y) | x ∈ gos(e), y ∈ gaddr(e).

fos 和 gos 是四个中较简单的一个。 fos(e) 包括

(1, e.os),如果已知 e.os (2,),如果已知 e.os (3,),如果 e.os 未知。

gos(e) 包括

(1, e.os),如果已知 e.os (2,),如果 e.os 未知 (3,).

faddr 和 gaddr 更复杂,因为 是优先属性,它们可以有多个值。 尽管如此,同样的技巧也可以发挥作用。 f地址(e) 包括

(1, h) 对于 e.hostname 中的每个 h (2, m) 对于 e.mac 中的每个 m,如果 e.hostname 非空 (3, m) 对于 e.mac 中的每个 m,如果 e.hostname 为空 (4, i) 对于 e.ip 中的每个 i,如果 e.hostname 和 e.mac 是 非空 (5, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空且 e.mac 非空 (6, i) 对于 e.ip 中的每个 i,如果 e.hostname 非空且 e.mac 为空 (7, i) 对于 e.ip 中的每个 i,如果 e.hostname 和 e.mac 是 空。

gaddr(e) 包括

(1, h) 对于 e.hostname 中的每个 h (2, m) 对于 e.mac 中的每个 m,如果 e.hostname 为空 (3, m) 对于 e.mac 中的每个 m (4, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空且 e.mac 为空 (5, i) 对于 e.ip 中的每个 i,如果 e.mac 为空 (6, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空 (7, i) 对于 e.ip 中的每个 i

算法的其余部分如下。

初始化一个defaultdict(list) 将一个草图映射到一个条目列表 标识符。

对于每个条目,对于每个条目的 f 草图,添加条目的 defaultdict 中相应列表的标识符。

初始化一个set 的边。

对于每个条目,对于每个条目的 g 草图,查找 defaultdict 中的 g-sketch 并从条目的 列表中每个其他标识符的标识符。

现在我们有了一组边,我们遇到了@btilly 的问题 著名的。作为一名计算机科学家,我的第一直觉是找到连接 组件,但是当然,合并两个条目可能会导致一些事件 边缘消失。相反,您可以使用边缘作为候选 合并,并重复直到上面的算法没有返回边。

import collections
import itertools

Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))

UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"


def f_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os != UNKNOWN:
        yield (2,)
    if e.os == UNKNOWN:
        yield (3,)


def g_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os == UNKNOWN:
        yield (2,)
    yield (3,)


def f_addr(e):
    for h in e.hostname:
        yield (1, h)
    if e.hostname:
        for m in e.mac:
            yield (2, m)
    if not e.hostname:
        for m in e.mac:
            yield (3, m)
    if e.hostname and e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.hostname and e.mac:
        for i in e.ip:
            yield (5, i)
    if e.hostname and not e.mac:
        for i in e.ip:
            yield (6, i)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (7, i)


def g_addr(e):
    for h in e.hostname:
        yield (1, h)
    if not e.hostname:
        for m in e.mac:
            yield (2, m)
    for m in e.mac:
        yield (3, m)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.mac:
        for i in e.ip:
            yield (5, i)
    if not e.hostname:
        for i in e.ip:
            yield (6, i)
    for i in e.ip:
        yield (7, i)


def f(e):
    return set(itertools.product(f_os(e), f_addr(e)))


def g(e):
    return set(itertools.product(g_os(e), g_addr(e)))


def is_similar(e, e_prime):
    return not f(e).isdisjoint(g(e_prime))


# Begin testing code for is_similar


def original_is_similar(e, e_prime):
    if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
        return False
    if e.hostname and e_prime.hostname:
        return not set(e.hostname).isdisjoint(set(e_prime.hostname))
    if e.mac and e_prime.mac:
        return not set(e.mac).isdisjoint(set(e_prime.mac))
    return not set(e.ip).isdisjoint(set(e_prime.ip))


import random


def random_os():
    return random.choice([UNKNOWN, WINDOWS, LINUX])


def random_names(prefix):
    return [
        "".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
    ]


def random_entry():
    return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))


def test_is_similar():
    print("Testing is_similar()")
    for rep in range(100000):
        e = random_entry()
        e_prime = random_entry()
        got = is_similar(e, e_prime)
        expected = original_is_similar(e, e_prime)
        if got != expected:
            print(e)
            print(e_prime)
            print("got", got)
            print("expected", expected)
            break


if __name__ == "__main__":
    test_is_similar()


# End testing code


def find_edges(entries):
    entries = list(entries)
    posting_lists = collections.defaultdict(list)
    for i, e in enumerate(entries):
        for sketch in f(e):
            posting_lists[sketch].append(i)
    edges = set()
    for i, e in enumerate(entries):
        for sketch in g(e):
            for j in posting_lists[sketch]:
                if i < j:
                    edges.add((i, j))
    return edges


# Begin testing code for find_edges


def test_find_edges():
    print("Testing find_edges()")
    entries = [random_entry() for i in range(1000)]
    got = find_edges(entries)
    expected = 
        (i, j)
        for (i, e) in enumerate(entries)
        for (j, e_prime) in enumerate(entries)
        if i < j and is_similar(e, e_prime)
    
    print(len(expected))
    assert got == expected


if __name__ == "__main__":
    test_find_edges()
    find_edges([random_entry() for i in range(10000)])

# End testing code for find_edges

【讨论】:

看了几遍,还是没看懂几个地方。为什么 fos 有 2 个已知 os 的案例,而 gos 有空案例? defaultdict(list) 到底应该包含什么?你能举个例子吗?我无法从描述中理解这个想法。 @seqre 今天晚些时候我会尝试发布一些代码。 @seqre 已发布代码。操作系统草图背后的想法是,我们需要在相似关系中涵盖三种情况:1)左侧条目已知 OS,右侧条目已知 OS,相同 OS 2)左侧条目已知 OS,右侧条目未知 OS 3)左侧条目的操作系统未知,右侧条目的操作系统已知或未知。 感谢您的代码。我测试了代码,它就像一个魅力。在花了几个小时分析和运行代码之后,我掌握了大部分算法(太聪明了!),但我仍然无法理解一些事情。不应该用e.hostname and not e.mac 替换not e.hostname and e.mac - 基本上改变这两个if 的顺序?另外,我们为什么要在find_edges() 中检查i &lt; j @seqre 1. 不,当我们知道两个主机名时,这会导致我们有时比较 IP。 2. 我想通过不检查对称边缘将慢速版本加速两倍。

以上是关于Python中可变数据的重复数据删除/合并的主要内容,如果未能解决你的问题,请参考以下文章

Python:合并文件并删除重复项

怎么将两个EXCEL表格合并后去除重复数据

Python数据类型 ——— 字典

DataTable中怎样将重复的数据合并

怎么将两个EXCEL表格合并后去除重复数据

在 Python 中合并数据框时出现重复的行