腾讯高性能图计算框架Plato代码阅读 图加载
Posted archimekai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了腾讯高性能图计算框架Plato代码阅读 图加载相关的知识,希望对你有一定的参考价值。
腾讯高性能图计算框架Plato代码阅读(二) 图加载
在pagerank计算逻辑中,图加载和图切分是通过如下调用完成的:
// init graph
plato::graph_info_t graph_info(FLAGS_is_directed);
auto pdcsc = plato::create_dcsc_seqs_from_path<plato::empty_t>(
&graph_info, FLAGS_input, plato::edge_format_t::CSV,
plato::dummy_decoder<plato::empty_t>, FLAGS_alpha, FLAGS_part_by_in
);
图加载和图切分中涉及到了复杂的代码逻辑,我们今天主要介绍图加载。
入口函数create_dcsc_seqs_from_path解读
函数create_dcsc_seqs_from_path
的定义如下,可以看到外层的create_dcsc_seqs_from_path是对内层create_dcsc_seq_from_path的简单封装,封装之处在于指定SEQ_PART(切分策略)为sequence_balanced_by_source_t:
// dcsc with sequence balanced partition by source
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>> create_dcsc_seqs_from_path (
graph_info_t* pgraph_info,
const std::string& path,
edge_format_t format,
decoder_t<EDATA> decoder,
int alpha = -1,
bool use_in_degree = false,
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr)
return create_dcsc_seq_from_path<EDATA, sequence_balanced_by_source_t, VID_T, CACHE>
(pgraph_info, path, format, decoder, alpha, use_in_degree, vid_encoder);
/*
* create dcsc graph structure with sequence balanced by source partition from file system
*
* \\tparam EDATA edge data type
* \\tparam SEQ_PART sequence partition type
* \\tparam VID_T vertex id type, can be uint32_t or uint64_t
* \\tparam CACHE cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t
*
* \\param pgraph_info user should fill 'is_directed_' field of graph_info_t, this function
* will fill other fields during load process.
* \\param path input file path, 'path' can be a file or a directory.
* 'path' can be located on hdfs or posix, distinguish by its prefix.
* eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
* \\param format file format
* \\param decoder edge data decode, string => EDATA
* \\param is_directed the graph is directed or not
* \\param alpha vertex's weighted for partition, -1 means use default
* \\param use_in_degree use in-degree instead of out degree for partition
*
* \\return
* graph structure in dcsc form
**/
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,
template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (
graph_info_t* pgraph_info,
const std::string& path,
edge_format_t format,
decoder_t<EDATA> decoder,
int alpha = -1,
bool use_in_degree = false,
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr)
下面从模板参数、入参、返回值、主流程这几个方面对这个函数做解读。
模板参数
可以看到create_dcsc_seq_from_path是一个使用了template的函数,在pagerank场景下,模板参数的取值为:
EDATA=plato::empty_t
,pagerank中在边上没有存储数据
VID_T=plato::vid_t
,一般为int32或int64
CACHE=plato::edge_file_cache_t
edge_file_cache_t定义如下,其通过父类对edge_unit_t类型的数据进行管理,用于存储加载上来的图中的边:
// thread-safe edge cache implementation, fixed capacity
template <typename EDATA, typename VID_T = vid_t>
class edge_file_cache_t : public object_file_buffer_t<edge_unit_t<EDATA, VID_T>>
public:
using edata_t = EDATA;
using edge_unit_spec_t = edge_unit_t<edata_t, VID_T>;
explicit edge_file_cache_t(void) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>()
explicit edge_file_cache_t(size_t n) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>(n)
;
其父类object_file_buffer_t
的定义如下。这个类提供了线程安全的内存管理能力。
// fixed-size, object file buffer with thread-safe traversal
template <typename T, typename Enable = void>
class object_file_buffer_t
edge_file_cache_t的模板化参数为EDATA=plato::empty_t
,VID_T=plato::vid_t
,object_file_buffer_t的模板化参数为T=edge_unit_t<EDATA, VID_T>
。
edge_unit_t的定义如下。其中存储了一个边的起始节点,终止节点,边数据。
template <typename EDATA_T, typename VID_T = vid_t>
struct edge_unit_t
VID_T src_;
VID_T dst_;
EDATA_T edata_;
template<typename Ar>
void serialize(Ar &ar) // boost-style serialization when EDATA_T is non-trivial
ar & src_ & dst_ & edata_;
;// __attribute__((packed));
入参
大部分入参的作用从名称中可以领会到,我们重点介绍几个入参的类型。
edge_format_t 是一个简单的枚举,一般只能取CSV
enum class edge_format_t
UNKNOWN = 0,
CSV = 1
;
decoder_t 是一个函数类型,作用为解析边上的数据,定义如下。
/*
* \\brief decoder_t, decode edge data from string
*
* \\param pOutput output
* \\param sInput unresolve c-string, end with '\\0', you can modify it
*
* \\return true -- continue parse stage, false -- abort parse
*
**/
template <typename EdgeData>
using decoder_t = std::function<bool(EdgeData*, char*)>;
在pagerank的场景中,decoder_t decoder传入的值为plato::dummy_decoderplato::empty_t,原因是PageRank不需要使用边上的数据。
vencoder_t<EDATA, VID_T, CACHE> 是顶点ID编码器的类型,定义如下:
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
using vencoder_t = typename std::remove_reference<vid_encoder_t<EDATA,VID_T,CACHE>*>::type;
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
class vid_encoder_t
void encode(CACHE<EDATA, VID_T>& cache, encoder_callback_t callback);
可以看到,其实质上是vid_encoder_t<EDATA,VID_T,CACHE>类型,通过提供encode方法来对顶点ID进行重编码,从而让顶点ID重新分布到[0, 顶点个数)的范围内,方便后续的计算。后续的分区操作假设了顶点ID的范围是[0, 顶点个数)。
传入参数取值总结
总结一下,create_dcsc_seqs_from_path传入的参数取值为:
参数 | 取值 | 备注 |
---|---|---|
graph_info_t* pgraph_info | graph_info | |
const std::string& path | FLAGS_input | |
edge_format_t format | plato::edge_format_t::CSV | |
decoder_t<EDATA> decoder | plato::dummy_decoder<plato::empty_t> | |
int alpha=-1 | FLAGS_alpha | |
bool use_in_degree=false | FLAGS_part_by_in | |
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr | 使用默认值nullptr |
返回值
该函数的返回值类型为 std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>>
dcsc_t的定义如下。其中,dcsc是doubly compressed sparse column的简称,这种格式的详细定义请见论文On the representation and multiplication of hypersparse matrices,后面有机会单独写一篇文章介绍各种稀疏矩阵存储格式。
/*
* doubly compressed sparse column storage
* vertexId must be compacted
*
* references:
* BULUC, A., AND GILBERT, J.R. On the representation and multiplication of
* hypersparse matrices.
*
* \\tparam EDATA data type associate with edge
* \\tparam PART_IMPL partitioner's type
**/
template <typename EDATA, typename PART_IMPL, typename ALLOC = std::allocator<adj_unit_t<EDATA>>>
class dcsc_t
这里面同样涉及到模板参数,其中,EDATA=plato::empty_t
表示边上未存储数据,PART_IMPL=sequence_balanced_by_source_t
表示使用sequence_balanced_by_source_t来进行图的切分,ALLOC = std::allocator<adj_unit_t<EDATA>>
表示内存分配器的类型。下面打开介绍sequence_balanced_by_source_t 和 adj_unit_t。
图切分策略 sequence_balanced_by_source_t
plato中提供了多种图切分策略,sequence_balanced_by_source_t就是其中一种,核心定义如下
/*
* Partitioner try to keep each partitions' computation work balanced
* vertexId must be compacted
*
* references:
* Julian Shun, Guy E Blelloch. Ligra: A Lightweight Graph Processing
* Framework for Shared Memory
*
* Xiaowei Zhu, Wenguang Chen, etc. Gemini: A Computation-Centric Distributed
* Graph Processing System
**/
// edge belong to source node's partition
class sequence_balanced_by_source_t
sequence_balanced_by_source_t初始化时,会按照负载均衡的原则预先计算好每个partition负责的顶点编号范围。
plato为了实现负载均衡, 使不同分区上的工作量大致相等,减少struggler导致的任务执行时间长的问题,按照指标α · |Vi| + |Ei|
来估计分区i的工作量。这个公式中,α是一个可调节参数,用来决定更重视分区中顶点的数量|Vi|,还是更重视分区中边的数量|Ei|。α的默认值为8*(p-1)
,其中p代表集群中所运行的进程的数量(一般同集群中的机器数量相同)。
vector poffset用来存储分区的结果。poffset中有p+1个元素,对于分区i,其存储的顶点范围为poffset[i - 1] 到 poffset[i]。为了性能,poffset中存储的顶点编号,除最后一个外,都会对齐到PAGESIZE。请注意poffset中存储的顶点编号是递增的。
初始化的关键逻辑如下:
/*
* constructor
*
* \\param degrees each vertex's degrees
* \\param vertices vertex number of the graph
* \\param vertices edge number of the graph
* \\param alpha vertex's weight of computation, default: -1, means
* alpha = 8 * (partitions - 1)
**/
template <typename DT>
sequence_balanced_by_source_t(const DT* degrees, vid_t vertices, eid_t edges, int alpha = -1)
if (-1 == alpha)
auto& cluster_info = cluster_info_t::get_instance();
// 计算alpha的默认值
alpha = 8 * (cluster_info.partitions_ - 1);
__init_offset(&offset_, degrees, vertices, edges, alpha);
template <typename DT>
void __init_offset(std::vector<vid_t>* poffset, const DT* degrees, vid_t vertices, eid_t edges, int alpha)
// poffset中存储了每个partition中要管理的节点范围
auto& cluster_info = cluster_info_t::get_instance();
uint64_t remained_amount = edges + vertices * (uint64_t)alpha;
uint64_t expected_amount = 0;
poffset->clear();
poffset->resize(cluster_info.partitions_ + 1, 0);
for (int p_i = 0; p_i < cluster_info.partitions_; ++p_i)
// 总是尝试在剩下的分区中平分剩下的节点,从而得到expected_amount
expected_amount = remained_amount / (cluster_info.partitions_ - p_i);
uint64_t amount = 0;
for (vid_t v_i = poffset->at(p_i); v_i < vertices; ++v_i)
amount += (alpha + degrees[v_i]);
if (amount >= expected_amount)
// 为了性能,将顶点编号对齐到PAGESIZE
poffset->at(p_i + 1) = v_i / PAGESIZE * PAGESIZE;
break;
if ((cluster_info.partitions_ - 1) == p_i) poffset->at(cluster_info.partitions_) = vertices;
remained_amount -= amount;
初始化计算完毕后,可以直接通过get_partition_id接口查询顶点所对应的分区编号。
// get vertex's partition
inline int get_partition_id(vid_t v_i)
for (size_t p_i = 0; p_i < (offset_.size() - 1); ++p_i)
if (v_i >= offset_[p_i] && v_i < offset_[p_i + 1])
return p_i;
查询边的分区时,直接按照边的src的顶点所处的分区确定边的分区:
// get edge's partition
inline int get_partition_id(vid_t src, vid_t /*dst*/)
return get_partition_id(src);
邻接单元 adj_unit_t
adj_unit_t的定义比较简单。每个adj_unit_t存储当前顶点的一个邻居,以及从当前顶点到该邻居的边上的数据。
template <typename EDATA_T>
struct adj_unit_t
vid_t neighbour_;
EDATA_T edata_;
template<typename Ar>
void serialize(Ar &ar) // boost-style serialization when EDATA_T is non-trivial
ar & neighbour_ & edata_;
;// __attribute__((packed));
create_dcsc_seq_from_path的主流程
在create_dcsc_seq_from_path函数中,主要做了如下几件事情,请结合代码中的注释理解,请注意我只贴出了关键代码,完整代码请点击上面的链接到github上查看。
- 从文件中加载边。
- 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据。
- 初始化分区器,计算每个分区中保存的顶点ID范围。
- 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,
template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (
graph_info_t* pgraph_info,
const std::string& path,
edge_format_t format,
decoder_t<EDATA> decoder,
int alpha = -1,
bool use_in_degree = false,
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr)
// 1. 从文件中加载边。plato的输入数据格式为edge list,因此这里使用了加载边的说法。
auto cache = load_edges_cache<EDATA, VID_T, CACHE>(pgraph_info, path, format, decoder, nullptr, vid_encoder);
if (use_in_degree)
// 2. 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据
degrees = generate_dense_in_degrees<vid_t>(*pgraph_info, *cache);
else
degrees = generate_dense_out_degrees<vid_t>(*pgraph_info, *cache);
// 3. 初始化分区器,计算每个分区中保存的顶点ID范围。注意pagerank场景下模板参数SEQ_PART的取值为 class sequence_balanced_by_source_t,这个类实例化时会计算好每个分区中保存的顶点ID范围。reset为std::shared_ptr提供的方法,可以先不关注。
part_dcsc.reset(new SEQ_PART(degrees.data(), pgraph_info->vertices_,
__edges, alpha));
part_dcsc->check_consistency();
// 4. 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。
pdcsc->load_from_cache(*pgraph_info, *cache)
return pdcsc;
这篇文章主要打开介绍从文件中加载边的操作,剩下的操作在后续文章中介绍。
1. 从文件中加载边 load_edges_cache
load_edges_cache函数的关键定义如下:
/*
* parallel load edges from file system to cache
*
* \\tparam EDATA data bind on edge
* \\tparam VID_T vertex id type, can be uint32_t or uint64_t
* \\tparam CACHE cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t
*
* \\param pginfo graph info
* \\param path input file path, 'path' can be a file or a directory.
* 'path' can be located on hdfs or posix, distinguish by its prefix.
* eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
* \\param format file format
* \\param decoder edge data decode, string => EDATA
* \\param callback function executed when parsing data
* \\param vid_encoder encoder for data
*
* \\return loaded cache or nullptr
**/
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<CACHE<EDATA, vid_t>> load_edges_cache(
graph_info_t* pginfo,
const std::string& path,
edge_format_t format,
decoder_t<EDATA> decoder,
data_callback_t<EDATA, vid_t> callback = nullptr, // pagerank场景下取值 nullptr
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr)
// 1.1 从文件系统上读取edge list格式的数据
read_from_files<EDATA, vid_t>(path, format, decoder, real_callback);
// 1.2 通过MPI_Allreduce统计各个机器上加载到的边数量的和
MPI_Allreduce(MPI_IN_PLACE, &edges, 1, get_mpi_data_type<eid_t>(), MPI_SUM, MPI_COMM_WORLD);
// 1.3 由于v_bitmap大小太大,直接调用MPI_Allreduce会出错,所以通过plato自己实现的allreduce方法分段对v_bitmap进行allreduce
allreduce(MPI_IN_PLACE, v_bitmap.data_, word_offset(v_bitmap.size_) + 1, get_mpi_data_type<uint64_t>(),
MPI_BOR, MPI_COMM_WORLD);
// 1.4 将计算得到的信息保存到入参pginfo中,实质上就是保存到了graph_info中
if (pginfo)
pginfo->edges_ = edges;
pginfo->vertices_ = v_bitmap.count();
pginfo->max_v_i_ = v_bitmap.msb();
1.1 从文件系统上读取edge list格式的数据 read_from_files
read_from_files函数的关键定义如下:
/*
* parallel parse edges from file system to cache
*
* \\tparam EDATA data bind on edge
* \\tparam VID_T vertex id type, can be uint32_t or uint64_t
*
* \\param path input file path, 'path' can be a file or a directory.
* 'path' can be located on hdfs or posix, distinguish by its prefix.
* eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
* \\param format file format
* \\param decoder edge data decode, string => EDATA
* \\param callback function executed when parsing data
*
**/
template <typename EDATA, typename VID_T = vid_t>
void read_from_files(
const std::string& path,
edge_format_t format,
decoder_t<EDATA> decoder,
data_callback_t<EDATA, VID_T> callback)
parser = csv_parser<boost::iostreams::filtering_istream, EDATA, VID_T>;
// 1.1.1 获取本机的文件列表
std::vector<std::string> files = get_files(path);
// 1.1.2 通过OpenMP,以cluster_info.threads_个线程并行读取files列表指定的文件,每次通过with_file方法读取一个文件
#pragma omp parallel num_threads(cluster_info.threads_)
while (true)
std::string filename;
std::lock_guard<std::mutex> lock(files_lock);
if (files.empty()) break;
filename = std::move(files.back());
files.pop_back();
with_file(filename, [&] (boost::iostreams::filtering_istream& is)
parser(is, callback, decoder);
);
1.1.1 获取本机负责的文件列表 get_file 和 get_files_from_hdfs
get_file函数是一个很简单的封装,根据url的开头来决定调用get_files_from_hdfs还是get_files_from_posix,关键定义如下。 本例中我们假设pagerank的计算从HDFS中读取数据。
inline std::vector<std::string> get_files(const std::string& path)
if (boost::starts_with(path, "hdfs://"))
return get_files_from_hdfs(path);
else
return get_files_from_posix(path);
get_files_from_hdfs函数的关键定义如下:
腾讯高性能图计算框架Plato代码阅读 图加载
腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化
腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化
腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化