Giraph源码分析—— 添加消息统计功能
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Giraph源码分析—— 添加消息统计功能相关的知识,希望对你有一定的参考价值。
作者|白松1、?添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。
源代码如下:
package org.apache.giraph.counters;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
/**
* Hadoop Counters in group "Giraph Messages" for counting every superstep
* message count.
*/
public class GiraphMessages extends HadoopCountersBase
/** Counter group name for the giraph Messages */
public static final String GROUP_NAME = "Giraph Messages";
/** Singleton instance for everyone to use */
private static GiraphMessages INSTANCE;
/** superstep time in msec */
private final Map superstepMessages;
private GiraphMessages(Context context)
super(context, GROUP_NAME);
superstepMessages = Maps.newHashMap();
/**
* Instantiate with Hadoop Context.
*
* @param context
* Hadoop Context to use.
*/
public static void init(Context context)
INSTANCE = new GiraphMessages(context);
/**
* Get singleton instance.
*
* @return singleton GiraphTimers instance.
*/
public static GiraphMessages getInstance()
return INSTANCE;
/**
* Get counter for superstep messages
*
* @param superstep
* @return
*/
public GiraphHadoopCounter getSuperstepMessages(long superstep)
GiraphHadoopCounter counter = superstepMessages.get(superstep);
if (counter == null)
String counterPrefix = "Superstep- " + superstep+" ";
counter = getCounter(counterPrefix);
superstepMessages.put(superstep, counter);
return counter;
@Override
public Iterator iterator()
return superstepMessages.values().iterator();
2、在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为<SuperStep-Number,TotalMessagesCount>,实际存储于上步GiraphMessages类中定义的Map<Long, GiraphHadoopCounter> superstepMessages 对象中。 在BspServiceMaster的构造方法中,最后面追加一行代码,对GiraphMessages进行初始化。
GiraphMessages.init(context);
在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:
……
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
LOG.info("D-globalStats: "+globalStats+"\n\n");
//添加下面语句。从第0个超步起开始记录。
if(getSuperstep() != INPUT_SUPERSTEP)
GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());
……
3、实验结果如下:
完!
以上是关于Giraph源码分析—— 添加消息统计功能的主要内容,如果未能解决你的问题,请参考以下文章