使用 Clojure 使用 Mahout 进行聚类 (fkmeans)

Posted

技术标签:

【中文标题】使用 Clojure 使用 Mahout 进行聚类 (fkmeans)【英文标题】:Clustering (fkmeans) with Mahout using Clojure 【发布时间】:2011-11-03 10:33:09 【问题描述】:

我正在尝试编写一个简短的脚本来通过 clojure 对我的数据进行聚类(尽管调用 Mahout 类)。我有这种格式的输入数据(这是来自php script 的输出)

format: (tag) (image) (frequency)
tag_sit image_a 0
tag_sit image_b 1
tag_lorem image_a 1
tag_lorem image_b 0
tag_dolor image_a 0
tag_dolor image_b 1
tag_ipsum image_a 1
tag_ipsum image_b 1
tag_amit image_a 1
tag_amit image_b 0
... (more)

然后我使用此脚本 (clojure) 将它们写入 SequenceFile

#!./bin/clj
(ns sensei.sequence.core)

(require 'clojure.string)
(require 'clojure.java.io)

(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.FileSystem)
(import org.apache.hadoop.fs.Path)
(import org.apache.hadoop.io.SequenceFile)
(import org.apache.hadoop.io.Text)

(import org.apache.mahout.math.VectorWritable)
(import org.apache.mahout.math.SequentialAccessSparseVector)

(with-open [reader (clojure.java.io/reader *in*)]
  (let [hadoop_configuration ((fn []
                                (let [conf (new Configuration)]
                                  (. conf set "fs.default.name" "hdfs://localhost:9000/")
                                  conf)))
        hadoop_fs (FileSystem/get hadoop_configuration)]
    (reduce
      (fn [writer [index value]]
        (. writer append index value)
        writer)
      (SequenceFile/createWriter
        hadoop_fs
        hadoop_configuration
        (new Path "test/sensei")
        Text
        VectorWritable)
      (map
        (fn [[tag row_vector]]
          (let [input_index (new Text tag)
                input_vector (new VectorWritable)]
            (. input_vector set row_vector)
            [input_index input_vector]))
        (map
          (fn [[tag photo_list]]
            (let [photo_map (apply hash-map photo_list)
                  input_vector (new SequentialAccessSparseVector (count (vals photo_map)))]
              (loop [frequency_list (vals photo_map)]
                (if (zero? (count frequency_list))
                  [tag input_vector]
                  (when-not (zero? (count frequency_list))
                    (. input_vector set
                       (mod (count frequency_list) (count (vals photo_map)))
                       (Integer/parseInt (first frequency_list)))
                    (recur (rest frequency_list)))))))
          (reduce
            (fn [result next_line]
              (let [[tag photo frequency] (clojure.string/split next_line #" ")]
                (update-in result [tag]
                  #(if (nil? %)
                     [photo frequency]
                     (conj % photo frequency)))))
            
            (line-seq reader)))))))

基本上它将输入转换为序列文件,以这种格式

键(文本):$tag_uri value (VectorWritable):带有数字索引和相应频率<0:1 1:0 2:0 3:1 4:0 ...>的向量(基数=文档数)

然后我继续用这个脚本做实际的集群(通过引用这个blog post)

#!./bin/clj

(ns sensei.clustering.fkmeans)

(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)

(let [hadoop_configuration ((fn []
                                (let [conf (new Configuration)]
                                  (. conf set "fs.default.name" "hdfs://127.0.0.1:9000/")
                                  conf)))
      input_path (new Path "test/sensei")
      output_path (new Path "test/clusters")
      clusters_in_path (new Path "test/clusters/cluster-0")]
  (FuzzyKMeansDriver/run
    hadoop_configuration
    input_path
    (RandomSeedGenerator/buildRandom
      hadoop_configuration
      input_path
      clusters_in_path
      (int 2)
      (new EuclideanDistanceMeasure))
    output_path
    (new EuclideanDistanceMeasure)
    (double 0.5)
    (int 10)
    (float 5.0)
    true
    false
    (double 0.0)
    false)) '' runSequential

但是我得到这样的输出

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001
java.lang.IllegalStateException: No clusters found. Check your -c path.
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
11/08/25 15:20:18 INFO mapred.JobClient:  map 0% reduce 0%
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0
Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
        at clojure.lang.Util.runtimeException(Util.java:153)
        at clojure.lang.Compiler.eval(Compiler.java:6417)
        at clojure.lang.Compiler.load(Compiler.java:6843)
        at clojure.lang.Compiler.loadFile(Compiler.java:6804)
        at clojure.main$load_script.invoke(main.clj:282)
        at clojure.main$script_opt.invoke(main.clj:342)
        at clojure.main$main.doInvoke(main.clj:426)
        at clojure.lang.RestFn.invoke(RestFn.java:436)
        at clojure.lang.Var.invoke(Var.java:409)
        at clojure.lang.AFn.applyToHelper(AFn.java:167)
        at clojure.lang.Var.applyTo(Var.java:518)
        at clojure.main.main(main.java:37)
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
        at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
        at clojure.lang.Compiler.eval(Compiler.java:6406)
        ... 10 more

当 runSequential 设置为 true 时

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor
Exception in thread "main" java.lang.IllegalStateException: Clusters is empty!
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
        at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
        at clojure.lang.Compiler.eval(Compiler.java:6465)
        at clojure.lang.Compiler.load(Compiler.java:6902)
        at clojure.lang.Compiler.loadFile(Compiler.java:6863)
        at clojure.main$load_script.invoke(main.clj:282)
        at clojure.main$script_opt.invoke(main.clj:342)
        at clojure.main$main.doInvoke(main.clj:426)
        at clojure.lang.RestFn.invoke(RestFn.java:436)
        at clojure.lang.Var.invoke(Var.java:409)
        at clojure.lang.AFn.applyToHelper(AFn.java:167)
        at clojure.lang.Var.applyTo(Var.java:518)
        at clojure.main.main(main.java:37)

我还把 fkmeans 脚本改写成这种形式

#!./bin/clj

(ns sensei.clustering.fkmeans)

(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)

(let [hadoop_configuration ((fn []
                                (let [conf (new Configuration)]
                                  (. conf set "fs.default.name" "hdfs://localhost:9000/")
                                  conf)))
      driver (new FuzzyKMeansDriver)]
  (. driver setConf hadoop_configuration)
  (. driver
     run
     (into-array String ["--input" "test/sensei"
                         "--output" "test/clusters"
                         "--clusters" "test/clusters/clusters-0"
                         "--clustering"
                         "--overwrite"
                         "--emitMostLikely" "false"
                         "--numClusters" "3"
                         "--maxIter" "10"
                         "--m" "5"])))

但仍然出现与第一个初始版本相同的错误:/

命令行工具运行良好

$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5

但是,当我尝试 clusterdumper 时它不会返回分数,即使 --clustering 选项存在于上一个命令中并且 --pointsDir 在这里定义

$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt

使用的 Mahout 版本:0.6-snapshot,clojure 1.3.0-snapshot

如果我错过了什么,请告诉我

【问题讨论】:

您能检查一下,是否真的生成了初始集群? 是的,有没有解决过?如果是这样,您可以发布答案。 OOOOOOOps,呃,我实际上更改了我的脚本以使用 graphlab,更容易设置并使用我 CPU 的所有 4 个内核:) 【参考方案1】:

我的猜测是,fuzzy-c-means 的 Mahout 实现需要初始集群开始,你可能没有提供?

这听起来有点像你在运行单节点?请注意,对于单节点系统,您应该避免所有 Mahout/Hadoop 开销,而只需使用常规集群算法。 Hadoop/Mahout 的成本相当高,只有当您无法再在单个系统上处理数据时才会有回报。除非您在 大量 数量的系统上这样做,否则它不是“地图缩减”。

【讨论】:

以上是关于使用 Clojure 使用 Mahout 进行聚类 (fkmeans)的主要内容,如果未能解决你的问题,请参考以下文章

K 表示使用 Mahout 进行聚类

Mahout实践指南 pdf

Mahout 二进制数据聚类

如何使用存储为 CSV 的矢量数据在 mahout 中执行 k-means 聚类?

mahout kmeans 聚类:显示错误

Mahout 聚类:使用 seqdumper 检索命名向量的名称时出错