Azure Databricks Scala:如何替换相应层次结构之后的行

Posted

技术标签:

【中文标题】Azure Databricks Scala:如何替换相应层次结构之后的行【英文标题】:Azure Databricks Scala : How to replace rows following a respective hirarchy 【发布时间】:2020-07-05 18:22:55 【问题描述】:

记住以下数据集:

我想获得

如您所见,基本上这个想法是遵循 ACTUAL_ID 列指示的路径,直到它为空(如果它还没有)

我尝试使用 udf 来传递完整的初始 Dataframe,并且递归会找到我想要的内容,但似乎无法将 Dataframes 传递给 UDF。我也考虑过替换一行的值,但似乎这是不可能的。

我最近的尝试:

def calculateLatestImdate(df: DataFrame, lookupId: String) : String = 
  var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
  if (foundId == "" || foundId == null)
  
    lookupId
  
  else
  
    calculateLatestImdate(df, foundId);
  


val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => 
  calculateLatestImdate(df,s)
)

val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")

val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))

【问题讨论】:

【参考方案1】:

这对我来说有点像一个图形问题,所以我使用 Scala 和图形框架想出了一个答案。它利用了connectedComponents 算法和图框的outDegrees 方法。我假设根据您的样本数据,每棵树的末端都是唯一的,但需要检查这个假设。我有兴趣了解更多数据的性能如何,但请告诉我您对解决方案的看法。

完整的脚本:

// NB graphframes had to be installed separately with the right Scala version 
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


// Create the test data

// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
  ( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
    ( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")

// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
  (2, 3, "is linked to"),
  (3, 6, "is linked to"),
  (4, 5, "is linked to")
)).toDF("src", "dst", "relationship")


// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)


// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")

val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)




// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
  .select("component", "data")
  .where("outDegree is null")

endOfTree.show()


components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
  .select($"c.id", $"c.component", $"t.data")
  .orderBy("id")
  .show()

我的结果:

如果您的数据已经在数据框中,只需使用selectwhere 过滤器就可以轻松地从原始数据框生成边缘数据框,例如

// Create the GraphFrame from the dataframe
val v2 = df

val e2 = df
  .select("id", "actual_id")
  .withColumn("rel", lit("is linked to"))
  .where("actual_id > 0")
  .toDF("src", "dst", "rel")

val g2 = GraphFrame(v2, e2)
print(g2)

g2.vertices.show()
g2.edges.show()

【讨论】:

您展示的内容似乎有效,我唯一不喜欢的是我需要将连接放在单独的数据框中(您的变量 e2),这对于一般情况是未知的但我想它可以通过一个简单的 select 来计算,其中 actual_id 为空。 我已经用一个例子更新了我的答案,但你是对的 - 在创建 GraphFrame 之前生成边缘数据框很容易,只需 selectwhere。我有兴趣用一些有意义的音量来尝试这两种方法——你在看什么样的音量?如果需要,我可能会创建一些虚拟数据。 这将用于 80MB(大约几十万行)的 csv。我还没有尝试过你的方法,但会尝试检查它。 嗨,你是怎么处理这个音量的?我很想知道它是如何执行的,以及图形方法是否最终成为解决这个特定问题的好方法?【参考方案2】:

相信我已经找到了问题的答案。

def calculateLatestId(df: DataFrame) : DataFrame = 
  var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)

  val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")

  joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
  
  if(differentIds.count > 0)
  
    calculateLatestId(joinedDf)
  
  else
  
    joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
    joinedDf
  

我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。

【讨论】:

很高兴知道您的问题已解决。您可以接受它作为答案(单击答案旁边的复选标记以将其从灰色切换为已填充。)。这对其他社区成员可能是有益的。谢谢。

以上是关于Azure Databricks Scala:如何替换相应层次结构之后的行的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Azure 数据工厂的 Databricks 上运行 .Net spark 作业?

使用 6.4 版扩展支持(包括 Apache Spark 2.4.5、Scala 2.11)在 azure databricks 上启动集群时出现问题

Azure Databricks 上的最大消息大小

Azure Databricks:如何在 Databricks 群集中添加 Spark 配置

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

如何强制 Azure 数据工厂数据流使用 Databricks