Hadoop社区检测问题之相似用户问题
Posted Dodo·D·Caster
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop社区检测问题之相似用户问题相关的知识,希望对你有一定的参考价值。
问题描述:
在twitter中,用户可以follow其他用户。此时,该用户为follower,而被关注的用户则是followee。该关系为单向关系,即 A follow B 的同时 B 不一定会 follow A。
我们需要找出与每个用户最相似的top K个用户,相似度用共同关注的用户数量来衡量。例如,A关注了C D F,而B关注了A C D,则AB的共同关注为C D, A与B之间的相似度为2。
数据集:
地址:https://www.dropbox.com/s/g1nthvj98iaikly/twitter_combined.txt.gz?dl=0
该数据集中的数据有两列,含义如下:
1 2 //表示用户2关注了用户1
1 3 //表示用户3关注了用户1
思路:
可以用hadoop的mapreduce来做,我们分为3个mapreduce。
第一个mapreduce:数据合并
用来把数据合并,如
1 2
1 3
变成
1 2 3
表示用户2和用户3关注了用户1。也就是说,key为followee,value为followers。
所以实际的思路很简单,map阶段output (1,2)(1,3)
reduce会收到(1,2,3),把2,3变成text传入value即可。
第二个mapreduce:计算相似度
用来计算不同用户之间的相似度。我们知道,对于同一个followee的不同followers,他们彼此都有共同的followee,所以相似度+1.
所以map阶段,我们对这些followers做笛卡尔积,将笛卡尔积的结果作为key,value赋1,例如
1 2 3 4
则变成(”2 3”, 1),(”2 4”, 1),(”3 2”, 1),(”3 4”, 1),(”4 2”, 1),(”4 3”,1)。注意要去掉”2 2”,“3 3”,“4 4”这种无效数据。
如果用户2和用户3同时关注了3个用户,则reduce会收到(”2 3”, 1,1,1)累加则可得到相似度3,作为reduce的outputvalue
第三个mapreduce:排序top K
用来计算每个用户的最相似的K个用户。上一个mapreduce之后,我们得到了每对用户的相似度,则这次只需要按相似度进行排序,并输出前K个即可。
map阶段,key为用户A,value为用户B+用户AB的相似度,则reduce会得到和用户A有共同关注的所有用户及其相似度。降序排序并将value设置为前K个相似用户的text即可。
代码:
第一个mapreduce
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class App
// step 1 : map class
public static class mrMapper extends Mapper<LongWritable, Text, Text, Text>
private Text mapOutputKey = new Text();
// private final static Text mapOutputValue = new Text();
private Text mapOutputValue = new Text();
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException
// line value
String lineValue = value.toString();
// split
// String[] strs = lineValue.split(" ");
StringTokenizer stringTokenizer = new StringTokenizer(lineValue); //按空格分割句子成单词
// iterator
while(stringTokenizer.hasMoreTokens())
// get word value
String followee = stringTokenizer.nextToken();
String follower = stringTokenizer.nextToken();
// set value
mapOutputKey.set(followee);
mapOutputValue.set(follower);
// output
context.write(mapOutputKey, mapOutputValue);
// step 2 : reduce class
public static class mrReducer extends Reducer<Text, Text, Text, Text>
private Text outputValue = new Text();
@Override
public void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException
// sum tmp
String sum = "";
// iterator
for(Text value: values)
// total
sum = sum + value.toString() + " ";
// set value
outputValue.set(sum);
// output
context.write(key, outputValue);
// step 3 : Driver
/**
* @param args
* @throws IOException
*/
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException
// 1 : get configuration
Configuration configuration = new Configuration();
// 2 : create job
org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance(configuration, this.getClass().getSimpleName());
// run jar
job.setJarByClass(this.getClass());
// 3 : set job
// input -> map -> reduce -> output
// 3.1 input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 : map
job.setMapperClass(mrMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 3.3 : reduce
job.setReducerClass(mrReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 3.4 : output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 4 : submit job
boolean isSucces = job.waitForCompletion(true);
return isSucces?0:1;
// step 4 : run program
public static void main(String[] args) throws Exception
int status = new App().run(args);
System.exit(status);
第二个mapreduce
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class App2
// step 1 : map class
public static class mrMapper extends Mapper<LongWritable, Text, Text, IntWritable>
private Text mapOutputKey = new Text();
private final static IntWritable mapOutputValue = new IntWritable(1);
// private Text mapOutputValue = new Text();
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException
// line value
String lineValue = value.toString();
// split
// String[] strs = lineValue.split(" ");
StringTokenizer stringTokenizer = new StringTokenizer(lineValue); // 按空格分割句子成单词
stringTokenizer.nextToken(); // skip followee
Set<String> set = new HashSet<String>();
while(stringTokenizer.hasMoreTokens())
set.add(stringTokenizer.nextToken());
// 笛卡尔积 different followers add similarity 1
if (set.size() > 1)
for (Iterator<String> i = set.iterator(); i.hasNext();)
String follower1 = i.next();
for (Iterator<String> j = set.iterator(); j.hasNext();)
String follower2 = j.next();
if (!follower1.equals(follower2))
mapOutputKey.set(follower1+" "+follower2);
context.write(mapOutputKey, mapOutputValue);
// step 2 : reduce class
public static class mrReducer extends Reducer<Text, IntWritable, Text, IntWritable>
private IntWritable outputValue = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException
// sum tmp
int sum = 0;
// iterator
for (IntWritable value : values)
// total
sum += value.get();
// set value
outputValue.set(sum);
// output
context.write(key, outputValue);
// step 3 : Driver
/**
* @param args
* @throws IOException
*/
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException
// 1 : get configuration
Configuration configuration = new Configuration();
// 2 : create job
org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance(configuration,
this.getClass().getSimpleName());
// run jar
job.setJarByClass(this.getClass());
// 3 : set job
// input -> map -> reduce -> output
// 3.1 input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 : map
job.setMapperClass(mrMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 3.3 : reduce
job.setReducerClass(mrReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 3.4 : output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 4 : submit job
boolean isSucces = job.waitForCompletion(true);
return isSucces ? 0 : 1;
// step 4 : run program
public static void main(String[] args) throws Exception
int status = new App2().run(args);
System.exit(status);
第三个mapreduce
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class App3
// step 1 : map class
public static class mrMapper extends Mapper<LongWritable, Text, Text, Text>
private Text mapOutputKey = new Text();
private Text mapOutputValue = new Text();
// private Text mapOutputValue = new Text();
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException
// line value
String lineValue = value.toString();
// split
// String[] strs = lineValue.split(" ");
StringTokenizer stringTokenizer = new StringTokenizer(lineValue); // 按空格分割句子成单词
//
while (stringTokenizer.hasMoreTokens())
String user = stringTokenizer.nextToken();
String similarUser = stringTokenizer.nextToken();
String similarity = stringTokenizer.nextToken();
mapOutputKey.set(user);
mapOutputValue.setNLP之句子相似度之入门篇
Java之词义相似度计算(语义识别词语情感趋势词林相似度拼音相似度概念相似度字面相似度)