附加唯一 ID 的 Spark 数据集

Posted

技术标签:

【中文标题】附加唯一 ID 的 Spark 数据集【英文标题】:Spark Dataset appending unique ID 【发布时间】:2018-01-29 21:13:55 【问题描述】:

我正在查看 spark 数据集上的 append 唯一 ID 是否有“已实现的替代方案”。

我的场景: 我有一个增量工作,每天运行处理一批信息。在这项工作中,我创建了一个something 的维度表,并使用monotonically_increasing_id() 为每一行分配唯一的ID。第二天,我想在 something 表中追加一些行,并希望为这些行生成唯一 ID。

例子:

第 1 天:

something_table    
uniqueID   name
100001     A
100002     B

第 2 天:

something_table
uniqueId   name
100001     A
100002     B
100003     C -- new data that must be created on day 2

第 1 天的代码截图:

case class BasicSomething(name: String)
case class SomethingTable(id: Long, name: String)

val ds: Dataset[BasicSomething] = spark.createDataset(Seq(BasicSomething("A"), BasicSomething("B")))

ds.withColumn("uniqueId", monotonically_increasing_id())
.as[SomethingTable]
.write.csv("something")

我不知道如何保持monotonically_increasing_id() 的状态,以便在第二天它会知道来自something_table 唯一ID 的现有ID。

【问题讨论】:

有官方的“已经实施的替代方案”,如果有任何其他的,建议是题外话。此外,“保持状态”听起来也不是一个好主意,因为高位已经被使用,因此您可以保留的信息量是有限的。如果您设置在Long 上,则可以使用zipWithIndex 和基于现有数据的偏移量。 Better us ID,可以像元组一样就地创建(batch, monotonically_increasing_id)。 @user6910411 “保持状态听起来不是个好主意”是什么意思?如果没有固定 ID,引用将如何工作? @MehdiB。估计不是很清楚。因此,如前所述,主要问题是分区号已经使用了高 31 位。因此,实际上只能使用 64 位中的一小部分,或者换句话说,如果您有多个分区,那么您已经从非常大的数字开始,并且向其中添加任何内容并不是一个好主意。有许多强大的方案允许您在分布式系统中生成唯一标识符,而无需保持状态 - 参见例如 MongoDB 对象 ID。 感谢@user6910411的回复 【参考方案1】:

您始终可以获得您创建的数据集last uniqueId。因此,您可以将 uniqueId 与 monotically_increasing_id() 一起使用并创建新的 uniqueIds

ds.withColumn("uniqueId", monotonically_increasing_id()+last uniqueId of previous dataframe)

【讨论】:

以上是关于附加唯一 ID 的 Spark 数据集的主要内容,如果未能解决你的问题,请参考以下文章

Spark向数据框添加索引并附加其他没有索引的数据集

将字段附加到 JSON 数据集 Java-Spark

将浮点数组写入和附加到 C++ 中 hdf5 文件中的唯一数据集

无法使用 jdbc 将 spark 数据集写入数据库

编写并将float数组附加到C ++中hdf5文件中的唯一数据集

Spark 连接数据框和数据集