腾讯高性能图计算框架Plato代码阅读 图加载

Posted archimekai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了腾讯高性能图计算框架Plato代码阅读 图加载相关的知识,希望对你有一定的参考价值。

腾讯高性能图计算框架Plato代码阅读(二) 图加载

在pagerank计算逻辑中,图加载和图切分是通过如下调用完成的:

  // init graph
  plato::graph_info_t graph_info(FLAGS_is_directed);
  auto pdcsc = plato::create_dcsc_seqs_from_path<plato::empty_t>(
    &graph_info, FLAGS_input, plato::edge_format_t::CSV,
    plato::dummy_decoder<plato::empty_t>, FLAGS_alpha, FLAGS_part_by_in
  );

图加载和图切分中涉及到了复杂的代码逻辑,我们今天主要介绍图加载。

入口函数create_dcsc_seqs_from_path解读

函数create_dcsc_seqs_from_path定义如下,可以看到外层的create_dcsc_seqs_from_path是对内层create_dcsc_seq_from_path的简单封装,封装之处在于指定SEQ_PART(切分策略)为sequence_balanced_by_source_t:

// dcsc with sequence balanced partition by source
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>> create_dcsc_seqs_from_path (
    graph_info_t*                   pgraph_info,
    const std::string&              path,
    edge_format_t                   format,
    decoder_t<EDATA>                decoder,
    int                             alpha = -1,
    bool                            use_in_degree = false,
    vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr) 
  return create_dcsc_seq_from_path<EDATA, sequence_balanced_by_source_t, VID_T, CACHE>
    (pgraph_info, path, format, decoder, alpha, use_in_degree, vid_encoder);


/*
 * create dcsc graph structure with sequence balanced by source partition from file system
 *
 * \\tparam EDATA          edge data type
 * \\tparam SEQ_PART       sequence partition type
 * \\tparam VID_T          vertex id type, can be uint32_t or uint64_t
 * \\tparam CACHE          cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t
 *
 * \\param pgraph_info     user should fill 'is_directed_' field of  graph_info_t, this function
 *                        will fill other fields during load process.
 * \\param path            input file path, 'path' can be a file or a directory.
 *                        'path' can be located on hdfs or posix, distinguish by its prefix.
 *                        eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
 * \\param format          file format
 * \\param decoder         edge data decode, string => EDATA
 * \\param is_directed     the graph is directed or not
 * \\param alpha           vertex's weighted for partition, -1 means use default
 * \\param use_in_degree   use in-degree instead of out degree for partition
 *
 * \\return
 *      graph structure in dcsc form
 **/
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,
         template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (
    graph_info_t*                       pgraph_info,
    const std::string&                  path,
    edge_format_t                       format,
    decoder_t<EDATA>                    decoder,
    int                                 alpha = -1,
    bool                                use_in_degree = false,
    vencoder_t<EDATA, VID_T, CACHE>     vid_encoder = nullptr)

下面从模板参数、入参、返回值、主流程这几个方面对这个函数做解读。

模板参数

可以看到create_dcsc_seq_from_path是一个使用了template的函数,在pagerank场景下,模板参数的取值为:

EDATA=plato::empty_t,pagerank中在边上没有存储数据

VID_T=plato::vid_t,一般为int32或int64

CACHE=plato::edge_file_cache_t

edge_file_cache_t定义如下,其通过父类对edge_unit_t类型的数据进行管理,用于存储加载上来的图中的边:

// thread-safe edge cache implementation, fixed capacity
template <typename EDATA, typename VID_T = vid_t>
class edge_file_cache_t : public object_file_buffer_t<edge_unit_t<EDATA, VID_T>> 
public:
  using edata_t            = EDATA;
  using edge_unit_spec_t   = edge_unit_t<edata_t, VID_T>;

  explicit edge_file_cache_t(void) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>()  
  explicit edge_file_cache_t(size_t n) : object_file_buffer_t<edge_unit_t<EDATA, VID_T>>(n)  
;

其父类object_file_buffer_t的定义如下。这个类提供了线程安全的内存管理能力。

// fixed-size, object file buffer with thread-safe traversal
template <typename T, typename Enable = void>
class object_file_buffer_t 

edge_file_cache_t的模板化参数为EDATA=plato::empty_tVID_T=plato::vid_t,object_file_buffer_t的模板化参数为T=edge_unit_t<EDATA, VID_T>

edge_unit_t的定义如下。其中存储了一个边的起始节点,终止节点,边数据。

template <typename EDATA_T, typename VID_T = vid_t>
struct edge_unit_t 
  VID_T   src_;
  VID_T   dst_;
  EDATA_T edata_;

  template<typename Ar>
  void serialize(Ar &ar)  // boost-style serialization when EDATA_T is non-trivial
      ar & src_ & dst_ & edata_;
  
;// __attribute__((packed));

入参

大部分入参的作用从名称中可以领会到,我们重点介绍几个入参的类型。

edge_format_t 是一个简单的枚举,一般只能取CSV

enum class edge_format_t 
  UNKNOWN = 0,
  CSV     = 1
;

decoder_t 是一个函数类型,作用为解析边上的数据,定义如下。

/*
 * \\brief decoder_t, decode edge data from string
 *
 * \\param pOutput   output
 * \\param sInput    unresolve c-string, end with '\\0', you can modify it
 *
 * \\return true -- continue parse stage, false -- abort parse
 *
 **/
template <typename EdgeData>
using decoder_t = std::function<bool(EdgeData*, char*)>;

在pagerank的场景中,decoder_t decoder传入的值为plato::dummy_decoderplato::empty_t,原因是PageRank不需要使用边上的数据。

vencoder_t<EDATA, VID_T, CACHE> 是顶点ID编码器的类型,定义如下:

template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
using vencoder_t = typename std::remove_reference<vid_encoder_t<EDATA,VID_T,CACHE>*>::type;

template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
class vid_encoder_t 
  void encode(CACHE<EDATA, VID_T>& cache, encoder_callback_t callback);

可以看到,其实质上是vid_encoder_t<EDATA,VID_T,CACHE>类型,通过提供encode方法来对顶点ID进行重编码,从而让顶点ID重新分布到[0, 顶点个数)的范围内,方便后续的计算。后续的分区操作假设了顶点ID的范围是[0, 顶点个数)。

传入参数取值总结

总结一下,create_dcsc_seqs_from_path传入的参数取值为:

参数取值备注
graph_info_t* pgraph_infograph_info
const std::string& pathFLAGS_input
edge_format_t formatplato::edge_format_t::CSV
decoder_t<EDATA> decoderplato::dummy_decoder<plato::empty_t>
int alpha=-1FLAGS_alpha
bool use_in_degree=falseFLAGS_part_by_in
vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr使用默认值nullptr

返回值

该函数的返回值类型为 std::shared_ptr<dcsc_t<EDATA, sequence_balanced_by_source_t>>

dcsc_t的定义如下。其中,dcsc是doubly compressed sparse column的简称,这种格式的详细定义请见论文On the representation and multiplication of hypersparse matrices,后面有机会单独写一篇文章介绍各种稀疏矩阵存储格式。

/*
 * doubly compressed sparse column storage
 * vertexId must be compacted
 *
 * references:
 *  BULUC, A., AND GILBERT, J.R. On the representation and multiplication of
 *  hypersparse matrices.
 *
 * \\tparam EDATA      data type associate with edge
 * \\tparam PART_IMPL  partitioner's type
 **/
template <typename EDATA, typename PART_IMPL, typename ALLOC = std::allocator<adj_unit_t<EDATA>>>
class dcsc_t 

这里面同样涉及到模板参数,其中,EDATA=plato::empty_t表示边上未存储数据,PART_IMPL=sequence_balanced_by_source_t表示使用sequence_balanced_by_source_t来进行图的切分,ALLOC = std::allocator<adj_unit_t<EDATA>>表示内存分配器的类型。下面打开介绍sequence_balanced_by_source_t 和 adj_unit_t。

图切分策略 sequence_balanced_by_source_t

plato中提供了多种图切分策略,sequence_balanced_by_source_t就是其中一种,核心定义如下

/*
 * Partitioner try to keep each partitions' computation work balanced
 * vertexId must be compacted
 *
 * references:
 *  Julian Shun, Guy E Blelloch. Ligra: A Lightweight Graph Processing
 *  Framework for Shared Memory
 *
 *  Xiaowei Zhu, Wenguang Chen, etc. Gemini: A Computation-Centric Distributed
 *  Graph Processing System
 **/

// edge belong to source node's partition
class sequence_balanced_by_source_t 

sequence_balanced_by_source_t初始化时,会按照负载均衡的原则预先计算好每个partition负责的顶点编号范围。

plato为了实现负载均衡, 使不同分区上的工作量大致相等,减少struggler导致的任务执行时间长的问题,按照指标α · |Vi| + |Ei|来估计分区i的工作量。这个公式中,α是一个可调节参数,用来决定更重视分区中顶点的数量|Vi|,还是更重视分区中边的数量|Ei|。α的默认值为8*(p-1),其中p代表集群中所运行的进程的数量(一般同集群中的机器数量相同)。

vector poffset用来存储分区的结果。poffset中有p+1个元素,对于分区i,其存储的顶点范围为poffset[i - 1] 到 poffset[i]。为了性能,poffset中存储的顶点编号,除最后一个外,都会对齐到PAGESIZE。请注意poffset中存储的顶点编号是递增的。

初始化的关键逻辑如下:

  /*
   * constructor
   *
   * \\param degrees   each vertex's degrees
   * \\param vertices  vertex number of the graph
   * \\param vertices  edge number of the graph
   * \\param alpha     vertex's weight of computation, default: -1, means
   *                  alpha = 8 * (partitions - 1)
   **/
  template <typename DT>
  sequence_balanced_by_source_t(const DT* degrees, vid_t vertices, eid_t edges, int alpha = -1) 
    if (-1 == alpha) 
      auto& cluster_info = cluster_info_t::get_instance();
      // 计算alpha的默认值
      alpha = 8 * (cluster_info.partitions_ - 1);
    
    __init_offset(&offset_, degrees, vertices, edges, alpha);
  

template <typename DT>
void __init_offset(std::vector<vid_t>* poffset, const DT* degrees, vid_t vertices, eid_t edges, int alpha) 
  // poffset中存储了每个partition中要管理的节点范围
  auto& cluster_info = cluster_info_t::get_instance();

  uint64_t remained_amount = edges + vertices * (uint64_t)alpha;
  uint64_t expected_amount = 0;

  poffset->clear();
  poffset->resize(cluster_info.partitions_ + 1, 0);
  for (int p_i = 0; p_i < cluster_info.partitions_; ++p_i) 
    // 总是尝试在剩下的分区中平分剩下的节点,从而得到expected_amount
    expected_amount = remained_amount / (cluster_info.partitions_ - p_i);

    uint64_t amount = 0;
    for (vid_t v_i = poffset->at(p_i); v_i < vertices; ++v_i) 
      amount += (alpha + degrees[v_i]);
      if (amount >= expected_amount) 
        // 为了性能,将顶点编号对齐到PAGESIZE
        poffset->at(p_i + 1) = v_i / PAGESIZE * PAGESIZE;
        break;
      
    
    if ((cluster_info.partitions_ - 1) == p_i)  poffset->at(cluster_info.partitions_) = vertices; 

    remained_amount -= amount;
  

初始化计算完毕后,可以直接通过get_partition_id接口查询顶点所对应的分区编号。

  // get vertex's partition
  inline int get_partition_id(vid_t v_i) 
    for (size_t p_i = 0; p_i < (offset_.size() - 1); ++p_i) 
      if (v_i >= offset_[p_i] && v_i < offset_[p_i + 1]) 
        return p_i;
      
    
  

查询边的分区时,直接按照边的src的顶点所处的分区确定边的分区:

  // get edge's partition
  inline int get_partition_id(vid_t src, vid_t /*dst*/) 
    return get_partition_id(src);
  

邻接单元 adj_unit_t

adj_unit_t的定义比较简单。每个adj_unit_t存储当前顶点的一个邻居,以及从当前顶点到该邻居的边上的数据。

template <typename EDATA_T>
struct adj_unit_t 
  vid_t    neighbour_;
  EDATA_T  edata_;

  template<typename Ar>
  void serialize(Ar &ar)  // boost-style serialization when EDATA_T is non-trivial
      ar & neighbour_ & edata_;
  
;// __attribute__((packed));

create_dcsc_seq_from_path的主流程

create_dcsc_seq_from_path函数中,主要做了如下几件事情,请结合代码中的注释理解,请注意我只贴出了关键代码,完整代码请点击上面的链接到github上查看。

  1. 从文件中加载边。
  2. 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据。
  3. 初始化分区器,计算每个分区中保存的顶点ID范围。
  4. 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。
template <typename EDATA, typename SEQ_PART, typename VID_T = vid_t,
         template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<dcsc_t<EDATA, SEQ_PART>> create_dcsc_seq_from_path (
    graph_info_t*                       pgraph_info,
    const std::string&                  path,
    edge_format_t                       format,
    decoder_t<EDATA>                    decoder,
    int                                 alpha = -1,
    bool                                use_in_degree = false,
    vencoder_t<EDATA, VID_T, CACHE>     vid_encoder = nullptr) 

  // 1. 从文件中加载边。plato的输入数据格式为edge list,因此这里使用了加载边的说法。
  auto cache = load_edges_cache<EDATA, VID_T, CACHE>(pgraph_info, path, format, decoder, nullptr, vid_encoder);

  
    if (use_in_degree) 
      // 2. 根据用户指定的FLAGS_part_by_in参数,决定是使用顶点入度作为partition的依据,还是使用顶点出度作为partition的依据
      degrees = generate_dense_in_degrees<vid_t>(*pgraph_info, *cache);
     else 
      degrees = generate_dense_out_degrees<vid_t>(*pgraph_info, *cache);
    

    // 3. 初始化分区器,计算每个分区中保存的顶点ID范围。注意pagerank场景下模板参数SEQ_PART的取值为 class sequence_balanced_by_source_t,这个类实例化时会计算好每个分区中保存的顶点ID范围。reset为std::shared_ptr提供的方法,可以先不关注。
    part_dcsc.reset(new SEQ_PART(degrees.data(), pgraph_info->vertices_,
      __edges, alpha));
    part_dcsc->check_consistency();
  

  // 4. 重新读取文件,按照分区器的指示将顶点和边存储到正确的分区中。
  pdcsc->load_from_cache(*pgraph_info, *cache)

  return pdcsc;


这篇文章主要打开介绍从文件中加载边的操作,剩下的操作在后续文章中介绍。

1. 从文件中加载边 load_edges_cache

load_edges_cache函数的关键定义如下:

/*
 * parallel load edges from file system to cache
 *
 * \\tparam EDATA        data bind on edge
 * \\tparam VID_T        vertex id type, can be uint32_t or uint64_t
 * \\tparam CACHE        cache type, can be edge_block_cache_t or edge_file_cache_t or edge_cache_t
 *
 * \\param pginfo        graph info
 * \\param path          input file path, 'path' can be a file or a directory.
 *                      'path' can be located on hdfs or posix, distinguish by its prefix.
 *                      eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
 * \\param format        file format
 * \\param decoder       edge data decode, string => EDATA
 * \\param callback      function executed when parsing data
 * \\param vid_encoder   encoder for data 
 *
 * \\return loaded cache or nullptr
 **/
template <typename EDATA, typename VID_T = vid_t, template<typename, typename> class CACHE = edge_block_cache_t>
std::shared_ptr<CACHE<EDATA, vid_t>> load_edges_cache(
    graph_info_t*                   pginfo,
    const std::string&              path,
    edge_format_t                   format,
    decoder_t<EDATA>                decoder,
    data_callback_t<EDATA, vid_t>   callback = nullptr,  // pagerank场景下取值 nullptr
    vencoder_t<EDATA, VID_T, CACHE> vid_encoder = nullptr) 

  // 1.1 从文件系统上读取edge list格式的数据
  read_from_files<EDATA, vid_t>(path, format, decoder, real_callback); 

  // 1.2 通过MPI_Allreduce统计各个机器上加载到的边数量的和
  MPI_Allreduce(MPI_IN_PLACE, &edges, 1, get_mpi_data_type<eid_t>(), MPI_SUM, MPI_COMM_WORLD);

  // 1.3 由于v_bitmap大小太大,直接调用MPI_Allreduce会出错,所以通过plato自己实现的allreduce方法分段对v_bitmap进行allreduce
  allreduce(MPI_IN_PLACE, v_bitmap.data_, word_offset(v_bitmap.size_) + 1, get_mpi_data_type<uint64_t>(),
      MPI_BOR, MPI_COMM_WORLD);

  // 1.4 将计算得到的信息保存到入参pginfo中,实质上就是保存到了graph_info中
  if (pginfo) 
    pginfo->edges_    = edges;
    pginfo->vertices_ = v_bitmap.count();
    pginfo->max_v_i_  = v_bitmap.msb();
  

1.1 从文件系统上读取edge list格式的数据 read_from_files

read_from_files函数的关键定义如下:

/*
 * parallel parse edges from file system to cache
 *
 * \\tparam EDATA        data bind on edge
 * \\tparam VID_T        vertex id type, can be uint32_t or uint64_t
 *
 * \\param path          input file path, 'path' can be a file or a directory.
 *                      'path' can be located on hdfs or posix, distinguish by its prefix.
 *                      eg: 'hdfs://' means hdfs, '/' means posix, 'wfs://' means wfs
 * \\param format        file format
 * \\param decoder       edge data decode, string => EDATA
 * \\param callback      function executed when parsing data
 *
 **/
template <typename EDATA, typename VID_T = vid_t>
void read_from_files(
  const std::string&            path,
  edge_format_t                 format,
  decoder_t<EDATA>              decoder,
  data_callback_t<EDATA, VID_T> callback) 

  parser = csv_parser<boost::iostreams::filtering_istream, EDATA, VID_T>;

  // 1.1.1 获取本机的文件列表
  std::vector<std::string> files = get_files(path);

  // 1.1.2 通过OpenMP,以cluster_info.threads_个线程并行读取files列表指定的文件,每次通过with_file方法读取一个文件
  #pragma omp parallel num_threads(cluster_info.threads_)
  
    while (true) 
      std::string filename;
      
        std::lock_guard<std::mutex> lock(files_lock);
        if (files.empty()) break;
        filename = std::move(files.back());
        files.pop_back();
      

      with_file(filename, [&] (boost::iostreams::filtering_istream& is) 
        parser(is, callback, decoder);
      );
    
  

1.1.1 获取本机负责的文件列表 get_file 和 get_files_from_hdfs

get_file函数是一个很简单的封装,根据url的开头来决定调用get_files_from_hdfs还是get_files_from_posix,关键定义如下。 本例中我们假设pagerank的计算从HDFS中读取数据。

inline std::vector<std::string> get_files(const std::string& path) 
  if (boost::starts_with(path, "hdfs://")) 
    return get_files_from_hdfs(path);
   else 
    return get_files_from_posix(path);
  

get_files_from_hdfs函数的关键定义如下:

腾讯高性能图计算框架Plato代码阅读 图加载

腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化

腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化

腾讯高性能图计算框架Plato代码阅读 进程启动及环境初始化

腾讯开源进入爆发期,Plato助推十亿级节点图计算进入分钟级时代

Nebula Graph + Plato调研总结