从流中替换采样

Posted

技术标签:

【中文标题】从流中替换采样【英文标题】:Sampling with replacement from streams 【发布时间】:2021-04-28 16:37:48 【问题描述】:

我正在寻找有效的算法来采样数据并从非常大的文件(或流)中替换。具体来说:

有一个足够大的文件,我无法将其加载到内存中,但可以遍历行并将选定的行写入流; 我需要对 k 行进行替换并从中取样,总行数 n 是预先知道的(k 可能大于n); 采样必须是统一的(这意味着获得任何可能的 k 大小的样本的概率相同)。

我知道无需替换的情况下的解决方案 - 跟踪 k_in_i(采样和迭代的行数,直到行 i em>) 并以概率 (k - k_i) / (n - n_i) 取第 i 行;但是对带有替换的采样应用相同的逻辑是行不通的——我得到了非常复杂的概率公式来计算第 i 行的概率(或者我做错了)。

有没有有效的方法来做到这一点?

更新

好的,看来我找到了一个半体面的方法来做到这一点。

binom(n, k)为二项式系数。用 n 个元素替换的 k 大小的样本数是 binom(n + k - 1, n - 1)。因此

选择第一个元素 0 次的概率是 p0 = binom(n + k - 2, n - 2) / binom(n + k - 1, n - 1) = (n - 1) / (n + k - 1); 选择第一个元素 1 次的概率是 p1 = binom(n + k - 3, n - 2) / binom(n + k - 1, n - 1) = p0 * k / (n + k - 2); 选择第一个元素 2 次的概率是 p2 = binom(n + k - 4, n - 2) / binom(n + k - 1, n - 1) = p1 * (k - 1) / (n + k - 3); 等

通过这个很好的递归公式,我们可以遍历行,滚动多少次以获取第 i 行,同时跟踪已经获取了多少行。我用下面的python函数模拟了这个过程:

import random


def sample_wr(xs, k0):
    res = []
    n, k = len(xs), k0
    ix = 0
    
    while len(res) < k0:
        if n == 1 and k > 0:
            res.extend([xs[-1]] * k)
        else:
            m = 0
            u = random.random()
            p = (n - 1.0) / (n + k - 1.0)
            sump = p

            while sump < u:
                m += 1
                p *= (k - m + 1.0) / (n + k - m - 1.0)
                sump += p
                
            if m:
                res.extend([xs[ix]] * m)
                k -= m
            
            n -= 1
            ix += 1
        
    return tuple(res)

在列表 [1, 2, 3] 上进行 10M 次模拟后,所有 4 元素样本似乎都是一致的。

感谢大家的见解。

【问题讨论】:

对于替换抽样,您只需迭代 k 次,每次,以概率 1/n 对每一行进行抽样。抽样概率永远不会随着替换而改变。如果您不想在文件中循环多次,请首先生成您将要采样的索引并对其进行排序,这样您最多只需要迭代一次。 既然你好像提前知道了n,就从0到n-1中选择k个数,排序,然后挑出迭代时具有这些索引的行。 多个循环不起作用 - 我说的是数十亿行和可比较的 k。提前选择索引是一个有趣的想法,它可能会奏效;尽管我必须将它们保存在内存中,这具有挑战性。或者我也可以将它们保存在一个文件中并按顺序读取,这可能会很慢。我会测试一下,谢谢。 【参考方案1】:

在没有替换的情况下,我们仅检查是否选择了概率为1/(remaining items) 的当前项目。我们只需要检查我们将选择当前项目的次数(从 0 到 k)来替换它。这是通过使用二项式随机变量来实现的,该变量模拟给定成功概率(在我们的例子中,仍然是1/(remaining items))和试验次数(这里是 k)的成功次数。这是一些python代码:

from numpy import random


def select_k(l, k):
    ans = []
    for i in range(len(l) - 1):
        cnt = random.binomial(k, 1. / (len(l) - i), 1)[0]
        k -= cnt
        ans += [l[i]] * cnt
    return ans + [l[-1]] * k


print(select_k([1, 2, 3], 4))

【讨论】:

感谢您的回答;但是,它似乎没有提供统一的采样。我运行你的函数 10M 次,从 (1, 2, 3) 绘制长度为 4 的样本 - 分布不均匀(脚本:pastebin.com/fvzfNghv,结果:pastebin.com/wjasnJLU)。 这是因为我们正在对列表进行排序!请注意,接收到的概率,例如 [1, 1, 1, 2],等于接收到 [1, 1, 2, 1] 的概率。但是,我的代码有效地对输出进行了排序。你可以简单地打乱输出,你会发现样本确实是均匀分布的。 但是,如果您希望它均匀分布在所有多集上,这将更加困难 - 我不确定您将如何处理。 是的,也许我不够清楚,我需要在所有多集上均匀分布。似乎这是一种方法,我已经用详细信息更新了我的问题。现在考虑是否可以在 Map-Reduce 集群上并行完成。 @fanvacoolt 如果您有 t 个线程,您可以将流拆分为 t 个,并随机选择从每个 t 个块中抽取多少 k 个元素。请注意,实际上这是递归的! - 你有同样的问题,除了现在有 t 个东西,你仍然会从它们中选择 k 个。

以上是关于从流中替换采样的主要内容,如果未能解决你的问题,请参考以下文章

使用 offlineAudioContext 重新采样捕获的音频流

从流中读取失败 - MySqlException

连续从流中读取?

从流中播放波形文件

如何使用 Java 8 lambda 从流中获取一系列项目?

WPF中的MediaElement从流中播放视频?