Azure Databricks Scala:如何替换相应层次结构之后的行
Posted
技术标签:
【中文标题】Azure Databricks Scala:如何替换相应层次结构之后的行【英文标题】:Azure Databricks Scala : How to replace rows following a respective hirarchy 【发布时间】:2020-07-05 18:22:55 【问题描述】:记住以下数据集:
我想获得
如您所见,基本上这个想法是遵循 ACTUAL_ID 列指示的路径,直到它为空(如果它还没有)
我尝试使用 udf 来传递完整的初始 Dataframe,并且递归会找到我想要的内容,但似乎无法将 Dataframes 传递给 UDF。我也考虑过替换一行的值,但似乎这是不可能的。
我最近的尝试:
def calculateLatestImdate(df: DataFrame, lookupId: String) : String =
var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
if (foundId == "" || foundId == null)
lookupId
else
calculateLatestImdate(df, foundId);
val calculateLatestImdateUdf = udf((df:DataFrame, s:String) =>
calculateLatestImdate(df,s)
)
val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")
val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))
【问题讨论】:
【参考方案1】:这对我来说有点像一个图形问题,所以我使用 Scala 和图形框架想出了一个答案。它利用了connectedComponents 算法和图框的outDegrees
方法。我假设根据您的样本数据,每棵树的末端都是唯一的,但需要检查这个假设。我有兴趣了解更多数据的性能如何,但请告诉我您对解决方案的看法。
完整的脚本:
// NB graphframes had to be installed separately with the right Scala version
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
// Create the test data
// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")
// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
(2, 3, "is linked to"),
(3, 6, "is linked to"),
(4, 5, "is linked to")
)).toDF("src", "dst", "relationship")
// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)
// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)
// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
.select("component", "data")
.where("outDegree is null")
endOfTree.show()
components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
.select($"c.id", $"c.component", $"t.data")
.orderBy("id")
.show()
我的结果:
如果您的数据已经在数据框中,只需使用select
和where
过滤器就可以轻松地从原始数据框生成边缘数据框,例如
// Create the GraphFrame from the dataframe
val v2 = df
val e2 = df
.select("id", "actual_id")
.withColumn("rel", lit("is linked to"))
.where("actual_id > 0")
.toDF("src", "dst", "rel")
val g2 = GraphFrame(v2, e2)
print(g2)
g2.vertices.show()
g2.edges.show()
【讨论】:
您展示的内容似乎有效,我唯一不喜欢的是我需要将连接放在单独的数据框中(您的变量 e2),这对于一般情况是未知的但我想它可以通过一个简单的 select 来计算,其中 actual_id 为空。 我已经用一个例子更新了我的答案,但你是对的 - 在创建 GraphFrame 之前生成边缘数据框很容易,只需select
和 where
。我有兴趣用一些有意义的音量来尝试这两种方法——你在看什么样的音量?如果需要,我可能会创建一些虚拟数据。
这将用于 80MB(大约几十万行)的 csv。我还没有尝试过你的方法,但会尝试检查它。
嗨,你是怎么处理这个音量的?我很想知道它是如何执行的,以及图形方法是否最终成为解决这个特定问题的好方法?【参考方案2】:
相信我已经找到了问题的答案。
def calculateLatestId(df: DataFrame) : DataFrame =
var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)
val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")
joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
if(differentIds.count > 0)
calculateLatestId(joinedDf)
else
joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
joinedDf
我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。
【讨论】:
很高兴知道您的问题已解决。您可以接受它作为答案(单击答案旁边的复选标记以将其从灰色切换为已填充。)。这对其他社区成员可能是有益的。谢谢。以上是关于Azure Databricks Scala:如何替换相应层次结构之后的行的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Azure 数据工厂的 Databricks 上运行 .Net spark 作业?
使用 6.4 版扩展支持(包括 Apache Spark 2.4.5、Scala 2.11)在 azure databricks 上启动集群时出现问题
Azure Databricks:如何在 Databricks 群集中添加 Spark 配置