使用 MapReduce 结交指定度数内的朋友
Posted
技术标签:
【中文标题】使用 MapReduce 结交指定度数内的朋友【英文标题】:Getting friends within a specified degree with MapReduce 【发布时间】:2013-07-25 20:43:33 【问题描述】:您知道如何使用 MapReduce 范例来实现此算法吗?
def getFriends(self, degree):
friendList = []
self._getFriends(degree, friendList)
return friendList
def _getFriends(self, degree, friendList):
friendList.append(self)
if degree:
for friend in self.friends:
friend._getFriends(degree-1, friendList)
假设我们有以下双向友谊:
(1,2), (1,3), (1,4), (4,5), (4,6), (5,7), (5,8)
例如,如何获取用户 1 的 1、2 和 3 度连接?答案必须是 1 -> 2, 3, 4, 5, 7, 8
谢谢
【问题讨论】:
【参考方案1】:也许你可以使用支持类似sql查询的hive!
【讨论】:
感谢您的回复。我只需要在 MapReduce 上这样做:-(【参考方案2】:据我了解,您想在社交图谱中收集某个人的第 n 个圈子中的所有朋友。大多数图算法都是递归的,递归不太适合 MapReduce 解决任务的方式。
我可以建议你使用Apache Giraph 来解决这个问题(实际上它在后台使用了 MapReduce)。它主要是异步的,您编写描述单个节点行为的作业,例如:
1. Send a message from root node to all friends to get their friendlist.
2.1. Each friend sends a message with friendlist to root node.
2.2. Each friend sends a message to all it's sub-friends to get their friendlist.
3.1. Each sub-friend sends a message with friendlist to root node.
3.2. Each sub-friend sends a message to all it's sub-sub-friends to get their friendlist.
...
N. Root node collects all these messages and merges them in a single list.
您也可以使用级联的 map-reduce 作业来收集圆圈,但这不是解决任务的有效方法:
-
将root用户好友导出到文件
circle-001
使用circle-001
作为作业的输入,将每个用户朋友从circle-001
导出到circle-002
做同样的事情,但使用 circle-002
作为输入
...
重复N次
如果你有很多用户来计算他们的圈子,第一种方法更适合。第二种方法在启动多个 MR 作业时开销很大,但它要简单得多,并且对于少量输入用户来说是可以的。
【讨论】:
感谢您的回复。我只需要在 MapReduce 上这样做:-( 您能否更详细地解释一下如何级联 map-reduce 作业来收集圈子? 更新了原始答案以描述第二种方法。请注意,Giraph 实际上是一个 MapReduce 工作。它只是一个抽象层,用于处理 Hadoop 上的大型图。 非常感谢您的回复。检查我新更新的初始问题,以更清楚地了解我需要实现的目标【参考方案3】:我是这个领域的新手,但这是我的想法。
您可以按照以下伪代码使用传统的 BFS 算法。
在每次迭代中,您都会启动一个 Hadoop 作业,该作业会发现当前工作集的所有尚未访问的子节点。
BFS (list curNodes, list visited, int depth)
if (depth <= 0)
return visited;
//run Hadoop job on the current working set curNodes restricted by visited
//the job will populate some result list with the list of child nodes of the current working set
//then,
visited.addAll(result);
curNodes.empty();
curNodes.addAll(result);
BFS(curNodes, visited, depth-1);
这个job的mapper和reducer如下所示。
在这个例子中,我只是使用静态成员来保存工作集、访问集和结果集。
它应该使用临时文件来实现。可能有一些方法可以优化从一次迭代到下一次迭代累积的临时数据的持久性。
我用于该工作的输入文件包含每行一个倒塌的倒塌列表,例如 1,2 2,3 5,4 ... ...
public static class VertexMapper extends
Mapper<Object, Text, IntWritable, IntWritable>
private static Set<IntWritable> curVertex = null;
private static IntWritable curLevel = null;
private static Set<IntWritable> visited = null;
private IntWritable key = new IntWritable();
private IntWritable value = new IntWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
StringTokenizer itr = new StringTokenizer(value.toString(), ",");
if (itr.countTokens() == 2)
String keyStr = itr.nextToken();
String valueStr = itr.nextToken();
try
this.key.set(Integer.parseInt(keyStr));
this.value.set(Integer.parseInt(valueStr));
if (VertexMapper.curVertex.contains(this.key)
&& !VertexMapper.visited.contains(this.value)
&& !key.equals(value))
context.write(VertexMapper.curLevel, this.value);
catch (NumberFormatException e)
System.err.println("Found key,value <" + keyStr + "," + valueStr
+ "> which cannot be parsed as int");
else
System.err.println("Found malformed line: " + value.toString());
public static class UniqueReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
private static Set<IntWritable> result = new HashSet<IntWritable>();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException
for (IntWritable val : values)
UniqueReducer.result.add(new IntWritable(val.get()));
// context.write(key, key);
运行工作将是这样的
UniqueReducer.result.clear();
VertexMapper.curLevel = new IntWritable(1);
VertexMapper.curVertex = new HashSet<IntWritable>(1);
VertexMapper.curVertex.add(new IntWritable(1));
VertexMapper.visited = new HashSet<IntWritable>(1);
VertexMapper.visited.add(new IntWritable(1));
Configuration conf = getConf();
Job job = new Job(conf, "BFS");
job.setJarByClass(BFSExample.class);
job.setMapperClass(VertexMapper.class);
job.setCombinerClass(UniqueReducer.class);
job.setReducerClass(UniqueReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(NullOutputFormat.class);
boolean result = job.waitForCompletion(true);
BFSExample bfs = new BFSExample();
ToolRunner.run(new Configuration(), bfs, args);
【讨论】:
非常感谢!这很有帮助!以上是关于使用 MapReduce 结交指定度数内的朋友的主要内容,如果未能解决你的问题,请参考以下文章