原创大叔案例分享id打通

Posted barneywill

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了原创大叔案例分享id打通相关的知识,希望对你有一定的参考价值。

经常有一些需要做id打通的场景,比如用户id打通等,

问题抽象是每条数据都可以解析出一个或多个kv pair:(id_type,id),然后需要将某一个kv pair匹配的多条数据进行merge;

比如:

data1: Array((‘type1‘, ‘id1‘), (‘type2‘, ‘id2‘))

data2: Array((‘type1‘, ‘id1‘), (‘type3‘, ‘id3‘))

data3: Array((‘type2‘, ‘id2‘), (‘type4‘, ‘id4‘))

其中data1和data2通过(‘type1‘, ‘id1‘)打通,data1和data3通过(‘type2‘, ‘id2‘)打通,最终data1、data2、data3打通成一条数据

data_union: Array((‘type1‘, ‘id1‘), (‘type2‘, ‘id2‘), , (‘type3‘, ‘id3‘), , (‘type4‘, ‘id4‘))

先定义基础类和方法

  class Data 
    def getId : String = ""
  

  def merge(dataArr : Array[(Map[Byte, String], Data)]) : (Map[Byte, String], Data) = dataArr.head
  def generateUUID : String = ""

其中

1)Data表示数据抽象,每条数据都有一个id;

2)Map[Byte, String]表示数据中的kv pair,即 Map[id_type, id]

3)merge将多条数据打通成一条数据;

先看最简单的递归实现

  def unionDataRDD1(rdd : RDD[(Map[Byte, String], Data)]) : RDD[(Map[Byte, String], Data)] = 
    var result = rdd.keyBy(_._2.getId).groupByKey.map(item => merge(item._2.toArray)).cache
    //Array[id_type]
    val idTypes = result.flatMap(item => item._1.keys).distinct.collect
    idTypes.foreach(item => result = result.filter(_._1.contains(item)).keyBy(_._1.get(item).get).groupByKey.map(item => merge(item._2.toArray)).union(result.filter(!_._1.contains(item))))
    result
  

性能不太好,再看优化后的非递归实现

  def unionDataRDD2(rdd : RDD[(Map[Byte, String], Data)]) : RDD[(Map[Byte, String], Data)] = 
    val result = rdd.keyBy(_._2.getId).groupByKey.map(item => merge(item._2.toArray)).cache

    //((id_type, id), group)
    val idGroupRDD = result.flatMap(item => val uuid = generateUUID; item._1.toArray.map(entry => (entry, uuid))).cache
    //Array(Array(group))
    val unionMap = idGroupRDD.groupByKey.map(_._2.toArray.distinct).filter(_.length > 1).collect
      //Map(group -> union_group)
      .foldLeft(Map[String, String]())((resultUnion, arr) => 
      val existingGroupMap = arr.collect(case group : String if resultUnion.contains(group) => (group, resultUnion.get(group).get)).toMap
      if (existingGroupMap == null || existingGroupMap.isEmpty) resultUnion ++ arr.collect(case group : String => (group -> arr.head)).toMap
      else if (existingGroupMap.size == 1) resultUnion ++ arr.collect(case group : String => (group -> existingGroupMap.head._2)).toMap
      else 
        val newUnionMap = existingGroupMap.map(_._2).collect(case group : String => (group -> existingGroupMap.head._2)).toMap
        resultUnion.collect(case entry : (String, String) => if (newUnionMap.contains(entry._2)) (entry._1, newUnionMap.get(entry._2).get) else entry) ++ arr.collect(case group : String => (group -> newUnionMap.head._2)).toMap
      
    )

over了

 

以上是关于原创大叔案例分享id打通的主要内容,如果未能解决你的问题,请参考以下文章

原创大叔经验分享(96)docker启动MySQL报错

原创大叔问题定位分享(38)impala报错内存不足

原创大叔经验分享(52)ClouderaManager修改配置报错

原创大叔问题定位分享(34)Spring的RestTemplate请求json数据后内容被修改

原创大叔经验分享(99)Atlas简介安装

原创大叔经验分享(35)lzo格式支持