已解决:Spark 使非唯一字段按出现顺序具有 ID
Posted
技术标签:
【中文标题】已解决:Spark 使非唯一字段按出现顺序具有 ID【英文标题】:Solved: Spark make nonunique fields have an ID by order of occurance 【发布时间】:2020-06-27 00:02:23 【问题描述】:我有一堆 CSV 文件,我读入这些文件来激发(使用 pyspark),然后我想将它们在特定字段中加入到一个大表中。
问题是,这个字段不是唯一的,但是关联的属性是唯一的。数据的来源是独一无二的,但是在我将它们作为 csv 获取之前,这些信息就被删除了。我无法为我的联接查询使用附加属性来说明文件之间的连接。但是所有文件中的出现顺序都说明了结构。 因此,如果我可以制作一个具有 ID 和文件中出现次数的人工 ID,它将起作用。
我的问题是,我可以定义一个 SparkSQL 查询(或另一种 pyspark 方式),我可以使用它为每个文件中的非唯一行添加连续计数,以便我可以将其用于我的联接?
我想要的是:
ID| ct(ID) | generated_number
A | 2 | 1
A | 2 | 2
A | 2 | 3
B | 1 | 1
C | 2 | 1
C | 2 | 2
D | 1 | 1
E | 3 | 1
E | 3 | 2
E | 3 | 3
基于此,我可以创建一个新 ID 作为 conc(ID, '_', generated_number) - 至少我会为非唯一的行做。
有没有聪明的 SparkNative 版本,我真的不想在 shell 脚本中修改源数据(我会想到 awk)
非常感谢
解决方案:
两个答案都适合解决方案,非常感谢。我现在的做法如下:
SELECT ID,
row_number() OVER (
PARTITION BY ID
ORDER BY ID ) as row_count,
count(ID) OVER (
PARTITION BY ID
ORDER BY ID ) as count
FROM TB_TEMP AS main
WHERE cellname_s = "A"
不使用 WHERE 子句,但为了显示它更容易;)
这给了我想要的输出:
+----------+---------+-----+
| ID|row_count|count|
+----------+---------+-----+
| A| 1| 4|
| A| 2| 4|
| A| 3| 4|
| A| 4| 4|
+----------+---------+-----+
为了获得我的唯一 ID,我将制作一个
CASE WHEN count > 1 THEN concact(ID, "_", row_count) ELSE ID END AS ID
因此给了我独特的字段,而我没有但不操纵已经独特的字段,因为这对处理数据的人来说更好。
【问题讨论】:
您尝试过以下解决方案吗?您可以将其中任何一个标记为已接受的答案吗? 您好,非常感谢您迄今为止的帮助!我会在明天再次工作时尝试并在此处发布有帮助的详细信息(并标记已接受的答案)。抱歉耽搁了,周末我休息 没问题..祝你好运:-) 【参考方案1】:我想你在这里要求一个 row_number
使用类似的东西
select id,CT(id),row_number() over(partition by id,CT(id) order by id) from ** your ** table
如果你想在 Dataframe 中使用它,你可以使用:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
【讨论】:
这里的问题是对于像 [A 1] 这样的每次出现,id 都必须更改。在这种方法中,所有的 A 都会被连续编号 select id,CT(id),row_number() over(partition by id,CT(id) order by id) from ** your ** table 这行得通。像这样,同一列也可以用于分区和排序【参考方案2】:关键是非唯一值。您可以使用 monotonically_increasing_id() 函数。
tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2)],schema=("id","count"))
tst_id = tst.withColumn("inc_id",monotonically_increasing_id())
这将确保为非唯一值分配唯一 ID。对于您的情况,您还可以使用它与 id 连接。但是,如果您想要连续且更好看的 id 编号,那么我们可以在此生成的单调递增 id 上使用行号(在性能方面代价高昂):
w=Window.partitionBy('id','count').orderBy('inc_id')
tst_row = tst_id.withColumn("uniq_id",F.row_number().over(w))
所以,终于
tst_row.sort('id','count','uniq_id').show()
+---+-----+----------+-------+
| id|count| inc_id|uniq_id|
+---+-----+----------+-------+
| A| 2| 0| 1|
| A| 2| 2| 2|
| A| 2|8589934594| 3|
| A| 3|8589934592| 1|
| B| 2| 1| 1|
| B| 2|8589934595| 2|
| B| 4|8589934593| 1|
+---+-----+----------+-------+
【讨论】:
以上是关于已解决:Spark 使非唯一字段按出现顺序具有 ID的主要内容,如果未能解决你的问题,请参考以下文章