已解决:Spark 使非唯一字段按出现顺序具有 ID

Posted

技术标签:

【中文标题】已解决:Spark 使非唯一字段按出现顺序具有 ID【英文标题】:Solved: Spark make nonunique fields have an ID by order of occurance 【发布时间】:2020-06-27 00:02:23 【问题描述】:

我有一堆 CSV 文件,我读入这些文件来激发(使用 pyspark),然后我想将它们在特定字段中加入到一个大表中。

问题是,这个字段不是唯一的,但是关联的属性是唯一的。数据的来源是独一无二的,但是在我将它们作为 csv 获取之前,这些信息就被删除了。我无法为我的联接查询使用附加属性来说明文件之间的连接。但是所有文件中的出现顺序都说明了结构。 因此,如果我可以制作一个具有 ID 和文件中出现次数的人工 ID,它将起作用。

我的问题是,我可以定义一个 SparkSQL 查询(或另一种 pyspark 方式),我可以使用它为每个文件中的非唯一行添加连续计数,以便我可以将其用于我的联接?

我想要的是:

ID| ct(ID) | generated_number
A | 2      | 1 
A | 2      | 2
A | 2      | 3
B | 1      | 1
C | 2      | 1
C | 2      | 2
D | 1      | 1
E | 3      | 1
E | 3      | 2
E | 3      | 3

基于此,我可以创建一个新 ID 作为 conc(ID, '_', generated_number) - 至少我会为非唯一的行做。

有没有聪明的 SparkNative 版本,我真的不想在 shell 脚本中修改源数据(我会想到 awk)

非常感谢

解决方案:

两个答案都适合解决方案,非常感谢。我现在的做法如下:

SELECT  ID,
   row_number() OVER (
        PARTITION BY ID
        ORDER BY ID ) as row_count,
   count(ID) OVER (
        PARTITION BY ID
         ORDER BY ID ) as count
FROM TB_TEMP AS main
WHERE cellname_s = "A"

不使用 WHERE 子句,但为了显示它更容易;)

这给了我想要的输出:

+----------+---------+-----+
|        ID|row_count|count|
+----------+---------+-----+
|         A|        1|    4|
|         A|        2|    4|
|         A|        3|    4|
|         A|        4|    4|
+----------+---------+-----+

为了获得我的唯一 ID,我将制作一个

CASE WHEN count > 1 THEN concact(ID, "_", row_count) ELSE ID END AS ID

因此给了我独特的字段,而我没有但不操纵已经独特的字段,因为这对处理数据的人来说更好。

【问题讨论】:

您尝试过以下解决方案吗?您可以将其中任何一个标记为已接受的答案吗? 您好,非常感谢您迄今为止的帮助!我会在明天再次工作时尝试并在此处发布有帮助的详细信息(并标记已接受的答案)。抱歉耽搁了,周末我休息 没问题..祝你好运:-) 【参考方案1】:

我想你在这里要求一个 row_number

使用类似的东西

 select id,CT(id),row_number() over(partition by id,CT(id) order  by id) from ** your ** table

如果你想在 Dataframe 中使用它,你可以使用:

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

【讨论】:

这里的问题是对于像 [A 1] 这样的每次出现,id 都必须更改。在这种方法中,所有的 A 都会被连续编号 select id,CT(id),row_number() over(partition by id,CT(id) order by id) from ** your ** table 这行得通。像这样,同一列也可以用于分区和排序【参考方案2】:

关键是非唯一值。您可以使用 monotonically_increasing_id() 函数。

tst=sqlContext.createDataFrame([('A',2),('B',2),('A',2),('A',3),('B',4),('A',2),('B',2)],schema=("id","count"))
tst_id = tst.withColumn("inc_id",monotonically_increasing_id())

这将确保为非唯一值分配唯一 ID。对于您的情况,您还可以使用它与 id 连接。但是,如果您想要连续且更好看的 id 编号,那么我们可以在此生成的单调递增 id 上使用行号(在性能方面代价高昂):

w=Window.partitionBy('id','count').orderBy('inc_id')
tst_row = tst_id.withColumn("uniq_id",F.row_number().over(w))

所以,终于

tst_row.sort('id','count','uniq_id').show()
+---+-----+----------+-------+
| id|count|    inc_id|uniq_id|
+---+-----+----------+-------+
|  A|    2|         0|      1|
|  A|    2|         2|      2|
|  A|    2|8589934594|      3|
|  A|    3|8589934592|      1|
|  B|    2|         1|      1|
|  B|    2|8589934595|      2|
|  B|    4|8589934593|      1|
+---+-----+----------+-------+

【讨论】:

以上是关于已解决:Spark 使非唯一字段按出现顺序具有 ID的主要内容,如果未能解决你的问题,请参考以下文章

spark dataframe reducebykey(具有非唯一键值)和自定义值操作

找到 2 个具有非唯一 ID 的元素

合并具有非唯一索引的多个熊猫数据集

SQL:查找具有非唯一特征 ID 的两个表之间的差异?

使数据库中的数据按照一定的顺序查出来

在具有非唯一索引列日期的 Dask 数据框中提取最新值