在 Hadoop 上实现 Apriori 算法

Posted

技术标签:

【中文标题】在 Hadoop 上实现 Apriori 算法【英文标题】:Implementing Apriori Algorithm on Hadoop 【发布时间】:2018-08-09 18:09:40 【问题描述】:

我正在尝试使用 Hadoop 实现 Apriori 算法。我已经实现了 Apriori 算法的非分布式版本,但是我对 Hadoop 和 MapReduce 的不熟悉导致了一些问题。

我想实现算法的方式分为两个阶段:

1) 在第一阶段,map reduce 作业将对原始事务数据集进行操作。此阶段的输出是一个包含所有 1 项集及其对 1 的支持的文件。

2) 在第二阶段,我想读入前一阶段的输出,然后构造新的项集。重要的是,我想在映射器中确定是否仍然在数据集中找到任何新项集。我想如果我将原始数据集作为输入发送到映射器,它将对原始文件进行分区,以便每个映射器只扫描部分数据集。然而,候选列表需要从前一阶段的所有输出中构建。然后,这将在循环中迭代固定次数。

我的问题是弄清楚如何具体确保我可以访问每个映射器中的完整项集,以及能够访问原始数据集以计算每个阶段的新支持。

感谢任何建议、cmets、建议或答案。

编辑:根据反馈,我只想更具体地说明我在这里要问的内容。

【问题讨论】:

我知道你的问题是关于先验算法的。但是,我强烈建议更好地应用FP Growth Algorithm,因为先验算法在此过程中必须重复多次。这种算法不推荐用于长的高数据处理流水线。 【参考方案1】:

在开始之前,我建议你阅读Hadoop Map-Reduce Tutorial。

第 1 步: 将数据文件加载到 HDFS。假设您的数据是 txt 文件,每组是一行。

a b c
a c d e
a e f
a f z
...

第 2 步: 按照 Map-Reduce 教程构建您自己的 Apriori 类。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException 
  // Seprate the line into tokens by space
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) 
    // Add the token into a writable set
    ... put the element into a writable set ...
  
  context.write(word, one);

第 3 步: 运行 mapreduce jar 文件。输出将在 HDFS 中的一个文件中。 你会得到类似的东西:

a b 3 (number of occurrence)
a b c 5
a d 2
...

根据输出文件,你可以计算出关系。

On a related note, you might want to consider using a higher level abstraction than map-reduce like Cascading or Apache Spark.

【讨论】:

感谢您的 cmets。不幸的是,作业要求我使用 MapReduce。【参考方案2】:

我使用 Hadoop Streaming 在 Apache Spark 和 Hadoop MapReduce 中实现了 AES 算法。 我知道它与 Apriori 不一样,但您可以尝试使用我的方法。

使用 Hadoop Streming MapReduce 实现的 AES 简单示例。

Project structure for AES Hadoop Streaming

1n_reducer.py / 1n_combiner 是相同的代码,但没有约束。

import sys

CONSTRAINT = 1000

def do_reduce(word, _values):
    return word, sum(_values)


prev_key = None
values = []

for line in sys.stdin:
    key, value = line.split("\t")
    if key != prev_key and prev_key is not None:
        result_key, result_value = do_reduce(prev_key, values)
        if result_value > CONSTRAINT:
            print(result_key + "\t" + str(result_value))
        values = []
    prev_key = key
    values.append(int(value))

if prev_key is not None:
    result_key, result_value = do_reduce(prev_key, values)
    if result_value > CONSTRAINT:
        print(result_key + "\t" + str(result_value))

base_mapper.py:

import sys


def count_usage():
    for line in sys.stdin:
        elements = line.rstrip("\n").rsplit(",")
        for item in elements:
            print("item\tcount".format(item=item, count=1))


if __name__ == "__main__":
    count_usage()

2n_mapper.py 使用上一次迭代的结果。 在回答您的问题时,您可以读取先前迭代的输出以形成项集。

import itertools
import sys

sys.path.append('.')
N_DIM = 2


def get_2n_items():
    items = set()
    with open("part-00000") as inf:
        for line in inf:
            parts = line.split('\t')
            if len(parts) > 1:
                items.add(parts[0])

    return items


def count_usage_of_2n_items():
    all_items_set = get_2n_items()
    for line in sys.stdin:
        items = line.rstrip("\n").rsplit(",")  # 74743 43355 53554
        exist_in_items = set()
        for item in items:
            if item in all_items_set:
                exist_in_items.add(item)
        for combination in itertools.combinations(exist_in_items, N_DIM):
            combination = sorted(combination)
            print("el1,el2\tcount".format(el1=combination[0], el2=combination[1], count=1))


if __name__ == "__main__":
    count_usage_of_2n_items()

根据我的经验,如果唯一组合(项目集)的数量太大(100K+),Apriori 算法不适合 Hadoop。 如果您找到了使用 Hadoop MapReduce(流式处理或 Java MapReduce 实现)实现 Apriori 算法的优雅解决方案,请与社区分享。

PS。如果您需要更多代码 sn-ps 请索取。

【讨论】:

以上是关于在 Hadoop 上实现 Apriori 算法的主要内容,如果未能解决你的问题,请参考以下文章

如何在python中为Apriori算法组合列表的字符串元素?

[ML&DL] 频繁项集Apriori算法

使用 Hadoop 的机器学习框架 [关闭]

hadoop学习之----------IntelliJ IDEA上实现MapReduce中最简单的单词统计的程序(本地 和 hadoop 两种实现方式)

如何在加载数据时在 AsyncTask 上实现旋转 ProgressBar?

资源 | 在TensorFlow 1.0上实现快速图像生成算法Fast PixelCNN++