Hadoop社区检测问题之相似用户问题

Posted Dodo·D·Caster

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop社区检测问题之相似用户问题相关的知识,希望对你有一定的参考价值。

问题描述:

在twitter中,用户可以follow其他用户。此时,该用户为follower,而被关注的用户则是followee。该关系为单向关系,即 A follow B 的同时 B 不一定会 follow A。

我们需要找出与每个用户最相似的top K个用户,相似度用共同关注的用户数量来衡量。例如,A关注了C D F,而B关注了A C D,则AB的共同关注为C D, A与B之间的相似度为2。

数据集:

地址:https://www.dropbox.com/s/g1nthvj98iaikly/twitter_combined.txt.gz?dl=0

该数据集中的数据有两列,含义如下:

1 2 //表示用户2关注了用户1

1 3 //表示用户3关注了用户1

思路:

可以用hadoop的mapreduce来做,我们分为3个mapreduce。

第一个mapreduce:数据合并

用来把数据合并,如

1 2

1 3

变成

1 2 3

表示用户2和用户3关注了用户1。也就是说,key为followee,value为followers。

所以实际的思路很简单,map阶段output (1,2)(1,3)

reduce会收到(1,2,3),把2,3变成text传入value即可。

第二个mapreduce:计算相似度

用来计算不同用户之间的相似度。我们知道,对于同一个followee的不同followers,他们彼此都有共同的followee,所以相似度+1.

所以map阶段,我们对这些followers做笛卡尔积,将笛卡尔积的结果作为key,value赋1,例如

1 2 3 4

则变成(”2 3”, 1),(”2 4”, 1),(”3 2”, 1),(”3 4”, 1),(”4 2”, 1),(”4 3”,1)。注意要去掉”2 2”,“3 3”,“4 4”这种无效数据。

如果用户2和用户3同时关注了3个用户,则reduce会收到(”2 3”, 1,1,1)累加则可得到相似度3,作为reduce的outputvalue

第三个mapreduce:排序top K

用来计算每个用户的最相似的K个用户。上一个mapreduce之后,我们得到了每对用户的相似度,则这次只需要按相似度进行排序,并输出前K个即可。

map阶段,key为用户A,value为用户B+用户AB的相似度,则reduce会得到和用户A有共同关注的所有用户及其相似度。降序排序并将value设置为前K个相似用户的text即可。

代码:

第一个mapreduce

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App 
    // step 1 : map class
    public static class mrMapper extends Mapper<LongWritable, Text, Text, Text> 
        private Text mapOutputKey = new Text();
        // private final static Text mapOutputValue = new Text();
        private Text mapOutputValue = new Text();
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException 
            // line value
            String lineValue = value.toString();

            // split
            // String[] strs = lineValue.split(" ");
            StringTokenizer stringTokenizer = new StringTokenizer(lineValue);   //按空格分割句子成单词

            // iterator
            while(stringTokenizer.hasMoreTokens()) 
                // get word value
                String followee = stringTokenizer.nextToken();
                String follower = stringTokenizer.nextToken();
                // set value
                mapOutputKey.set(followee);
                mapOutputValue.set(follower);
                // output
                context.write(mapOutputKey, mapOutputValue);
            
            
        
    

    // step 2 : reduce class
    public static class mrReducer extends Reducer<Text, Text, Text, Text> 
        private Text outputValue = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException 
            // sum tmp
            String sum = "";
            // iterator
            for(Text value: values) 
                // total
                sum = sum + value.toString() + " ";
            
            // set value
            outputValue.set(sum);

            // output
            context.write(key, outputValue);
        

    

    // step 3 : Driver
    /**
     * @param args
     * @throws IOException
     */
    public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
        // 1 : get configuration
        Configuration configuration = new Configuration();

        // 2 : create job
        org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance(configuration, this.getClass().getSimpleName());
        
        // run jar
        job.setJarByClass(this.getClass());

        // 3 : set job
        // input -> map -> reduce -> output
        // 3.1 input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        // 3.2 : map
        job.setMapperClass(mrMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 3.3 : reduce
        job.setReducerClass(mrReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 3.4 : output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4 : submit job
        boolean isSucces = job.waitForCompletion(true);
        return isSucces?0:1;

    

    // step 4 : run program
    public static void main(String[] args) throws Exception 
        int status = new App().run(args);
        System.exit(status);
    

第二个mapreduce

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App2 
    // step 1 : map class
    public static class mrMapper extends Mapper<LongWritable, Text, Text, IntWritable> 
        private Text mapOutputKey = new Text();
        private final static IntWritable mapOutputValue = new IntWritable(1);

        // private Text mapOutputValue = new Text();
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException 
            // line value
            String lineValue = value.toString();

            // split
            // String[] strs = lineValue.split(" ");
            StringTokenizer stringTokenizer = new StringTokenizer(lineValue); // 按空格分割句子成单词
            stringTokenizer.nextToken(); // skip followee

            Set<String> set = new HashSet<String>();
            while(stringTokenizer.hasMoreTokens()) 
                set.add(stringTokenizer.nextToken());
            

            // 笛卡尔积 different followers add similarity 1
            if (set.size() > 1) 
                for (Iterator<String> i = set.iterator(); i.hasNext();) 
                    String follower1 = i.next();
                    for (Iterator<String> j = set.iterator(); j.hasNext();) 
                        String follower2 = j.next();
                        if (!follower1.equals(follower2)) 
                            mapOutputKey.set(follower1+" "+follower2);
                            context.write(mapOutputKey, mapOutputValue);
                        
                    
                
            

        
    

    // step 2 : reduce class
    public static class mrReducer extends Reducer<Text, IntWritable, Text, IntWritable> 
        private IntWritable outputValue = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException 
            // sum tmp
            int sum = 0;
            // iterator
            for (IntWritable value : values) 
                // total
                sum += value.get();
            
            // set value
            outputValue.set(sum);

            // output
            context.write(key, outputValue);
        

    

    // step 3 : Driver
    /**
     * @param args
     * @throws IOException
     */
    public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
        // 1 : get configuration
        Configuration configuration = new Configuration();

        // 2 : create job
        org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance(configuration,
                this.getClass().getSimpleName());

        // run jar
        job.setJarByClass(this.getClass());

        // 3 : set job
        // input -> map -> reduce -> output
        // 3.1 input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        // 3.2 : map
        job.setMapperClass(mrMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 3.3 : reduce
        job.setReducerClass(mrReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 3.4 : output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4 : submit job
        boolean isSucces = job.waitForCompletion(true);
        return isSucces ? 0 : 1;

    

    // step 4 : run program
    public static void main(String[] args) throws Exception 
        int status = new App2().run(args);
        System.exit(status);
    

第三个mapreduce

import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App3 
    // step 1 : map class
    public static class mrMapper extends Mapper<LongWritable, Text, Text, Text> 
        private Text mapOutputKey = new Text();
        private Text mapOutputValue = new Text();

        // private Text mapOutputValue = new Text();
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException 
            // line value
            String lineValue = value.toString();

            // split
            // String[] strs = lineValue.split(" ");
            StringTokenizer stringTokenizer = new StringTokenizer(lineValue); // 按空格分割句子成单词

            //
            while (stringTokenizer.hasMoreTokens()) 
                String user = stringTokenizer.nextToken();
                String similarUser = stringTokenizer.nextToken();
                String similarity = stringTokenizer.nextToken();

                mapOutputKey.set(user);
                mapOutputValue.setNLP之句子相似度之入门篇

自然语言处理之比较两个句子的相似度 余弦相似度

Java之词义相似度计算(语义识别词语情感趋势词林相似度拼音相似度概念相似度字面相似度)

Java之词义相似度计算(语义识别词语情感趋势词林相似度拼音相似度概念相似度字面相似度)

中文句子相似度之計算與應用

如何使用聚类对具有相似意图的句子进行分组?