附加唯一 ID 的 Spark 数据集
Posted
技术标签:
【中文标题】附加唯一 ID 的 Spark 数据集【英文标题】:Spark Dataset appending unique ID 【发布时间】:2018-01-29 21:13:55 【问题描述】:我正在查看 spark 数据集上的 append
唯一 ID 是否有“已实现的替代方案”。
我的场景:
我有一个增量工作,每天运行处理一批信息。在这项工作中,我创建了一个something
的维度表,并使用monotonically_increasing_id()
为每一行分配唯一的ID。第二天,我想在 something
表中追加一些行,并希望为这些行生成唯一 ID。
例子:
第 1 天:
something_table
uniqueID name
100001 A
100002 B
第 2 天:
something_table
uniqueId name
100001 A
100002 B
100003 C -- new data that must be created on day 2
第 1 天的代码截图:
case class BasicSomething(name: String)
case class SomethingTable(id: Long, name: String)
val ds: Dataset[BasicSomething] = spark.createDataset(Seq(BasicSomething("A"), BasicSomething("B")))
ds.withColumn("uniqueId", monotonically_increasing_id())
.as[SomethingTable]
.write.csv("something")
我不知道如何保持monotonically_increasing_id()
的状态,以便在第二天它会知道来自something_table
唯一ID 的现有ID。
【问题讨论】:
有官方的“已经实施的替代方案”,如果有任何其他的,建议是题外话。此外,“保持状态”听起来也不是一个好主意,因为高位已经被使用,因此您可以保留的信息量是有限的。如果您设置在Long
上,则可以使用zipWithIndex
和基于现有数据的偏移量。 Better us ID,可以像元组一样就地创建(batch, monotonically_increasing_id)。
@user6910411 “保持状态听起来不是个好主意”是什么意思?如果没有固定 ID,引用将如何工作?
@MehdiB。估计不是很清楚。因此,如前所述,主要问题是分区号已经使用了高 31 位。因此,实际上只能使用 64 位中的一小部分,或者换句话说,如果您有多个分区,那么您已经从非常大的数字开始,并且向其中添加任何内容并不是一个好主意。有许多强大的方案允许您在分布式系统中生成唯一标识符,而无需保持状态 - 参见例如 MongoDB 对象 ID。
感谢@user6910411的回复
【参考方案1】:
您始终可以获得您创建的数据集的last uniqueId。因此,您可以将 uniqueId 与 monotically_increasing_id() 一起使用并创建新的 uniqueIds。
ds.withColumn("uniqueId", monotonically_increasing_id()+last uniqueId of previous dataframe)
【讨论】:
以上是关于附加唯一 ID 的 Spark 数据集的主要内容,如果未能解决你的问题,请参考以下文章
将浮点数组写入和附加到 C++ 中 hdf5 文件中的唯一数据集