使用 pyspark 验证不同行中同一列的数据

Posted

技术标签:

【中文标题】使用 pyspark 验证不同行中同一列的数据【英文标题】:Validate data from the same column in different rows with pyspark 【发布时间】:2019-07-03 17:29:11 【问题描述】:

如何根据某些单元格之间的某些验证来更改列的值?我需要的是比较每个客户(id)记录的公里数,以比较公里数后面的记录是否更高。

fecha      id   estado  id_cliente  error_code  kilometraje error_km
1/1/2019    1     A         1                       10  
2/1/2019    2     A                    ERROR        20  
3/1/2019    1     D         1          ERROR        30
4/1/2019    2     O                                          ERROR

error_km 列中的错误是因为客户 (id) 2 的 kmetraje 值小于 2/1/2019 的同一客户记录(如果时间过去了,使用了汽车,因此 kmetraje 增加了,所以有没有错误,里程必须更高或相同)

我知道 withColumn 我可以覆盖或创建一个不存在的列,并且当我可以设置条件时使用它。例如:这将是我用来验证 estado 和 id_cliente 列的代码,并且在适用的情况下 ERROR 会覆盖 error_code 列,但我不明白如何在同一客户端的不同行之间进行验证。

from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.functions import col

file_path = 'archive.txt'

error = 'ERROR'

df = spark.read.parquet(file_path)
df = df.persist(StorageLevel.MEMORY_AND_DISK)
df = df.select('estado', 'id_cliente')
df = df.withColumn("error_code", lit(''))

df = df.withColumn('error_code',
                            F.when((F.col('status') == 'O') &
                                    (F.col('client_id') != '') |
                                    (F.col('status') == 'D') &
                                    (F.col('client_id') != '') |
                                    (F.col('status') == 'A') &
                                    (F.col('client_id') == ''),
                                     F.concat(F.col("error_code"), F.lit(":[]".format(error)))
                                   )
                             .otherwise(F.col('error_code')))

【问题讨论】:

请不要发布截图。它们只会为回答您的questions 的人带来更多的工作。 是的,我知道,我还没有在 PC 上,一会儿我会修复它 【参考方案1】:

您可以使用lag 窗口函数来实现这一点。 lag 函数返回当前行之前的行。有了它,您可以轻松地比较 kmetraje 值。看看下面的代码:

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [('1/1/2019' , 1      , 10),
('2/1/2019', 2     , 20  ),
('3/1/2019', 1      , 30  ),
('4/1/2019', 1      , 10  ),
('5/1/2019', 1      , 30  ),
('7/1/2019', 3      , 30  ),
('4/1/2019', 2      , 5)]

columns = ['fecha', 'id', 'kilometraje']

df=spark.createDataFrame(l, columns)
df = df.withColumn('fecha',F.to_date(df.fecha,  'dd/MM/yyyy'))

w = Window.partitionBy('id').orderBy('fecha')

df = df.withColumn('error_km', F.when(F.lag('kilometraje').over(w) > df.kilometraje, F.lit('ERROR') ).otherwise(F.lit('')))

df.show()

输出:

+----------+---+-----------+--------+ 
|     fecha| id|kilometraje|error_km| 
+----------+---+-----------+--------+ 
|2019-01-01|  1|         10|        | 
|2019-01-03|  1|         30|        | 
|2019-01-04|  1|         10|   ERROR| 
|2019-01-05|  1|         30|        | 
|2019-01-07|  3|         30|        | 
|2019-01-02|  2|         20|        | 
|2019-01-04|  2|          5|   ERROR| 
+----------+---+-----------+--------+

第四行没有被标记为“错误”,因为之前的值具有较小的千公里值 (10

df.drop('error_km').join(df.filter(df.error_km == 'ERROR').groupby('id').agg(F.first(df.error_km).alias('error_km')), 'id', 'left').show()

【讨论】:

了不起的家伙!谢谢! @cronoik 不错的答案。但是,如果最新的公里数大于滞后但小于该 customerId 的最大里程,它将不起作用。例如,如果我再添加一行 ('4/1/2019', 2 , 6 ) ,这里 6 大于 5 但对于 Id 2 小于 20。所以它也应该给出一个错误。但根据 OP 当前方案,它是完美的。 我正要提出这个问题,因为我正在使用它,但我得到了错误 10、15、8、9、16、8 和 9 是错误的,因为它们小于 15 我已经找到了解决办法,这就是魔法***.com/questions/45946349/… 这完美地表明尝试回答问题的人只能使用所提供的信息。仅仅因为它们之前的值较高并不意味着所有随后的较小值都是不正确的。这也可能意味着记录的较高值不正确。这实际上取决于您 (Juan) 希望如何处理数据中的此类情况,因此在您的问题中提供此类信息非常重要。【参考方案2】:

我使用 .rangeBetween(Window.unboundedPreceding,0)。

该函数从当前值中搜索后面的附加值

import pyspark
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

error = 'This is error'

l = [('1/1/2019' , 1      , 10),
('2/1/2019', 2     , 20  ),
('3/1/2019', 1      , 30  ),
('4/1/2019', 1      , 10  ),
('5/1/2019', 1      , 22  ),
('7/1/2019', 1      , 23  ),
('22/1/2019', 2      , 5),
('11/1/2019', 2      , 24),
('13/2/2019', 1      , 16),
('14/2/2019', 2      , 18),
('5/2/2019', 1      , 19),
('6/2/2019', 2      , 23),
('7/2/2019', 1      , 14),
('8/3/2019', 1      , 50),
('8/3/2019', 2      , 50)]

columns = ['date', 'vin', 'mileage']

df=spark.createDataFrame(l, columns)
df = df.withColumn('date',F.to_date(df.date,  'dd/MM/yyyy'))
df = df.withColumn("max", lit(0))
df = df.withColumn("error_code", lit(''))

w = Window.partitionBy('vin').orderBy('date').rangeBetween(Window.unboundedPreceding,0)

df = df.withColumn('max',F.max('mileage').over(w))
df = df.withColumn('error_code', F.when(F.col('mileage') < F.col('max'), F.lit('ERROR')).otherwise(F.lit('')))

df.show()

最后,剩下的就是删除最大的列

df = df.drop('max')
df.show()

【讨论】:

以上是关于使用 pyspark 验证不同行中同一列的数据的主要内容,如果未能解决你的问题,请参考以下文章

在Oracle SQL中从同一列的不同行中选择多个变量的值

在 Oracle PL/SQL 中,当列的其余值相等时,我可以交换表中两个不同行的同一列的值吗?

如何通过在python中添加同一列的2个不同行值的值来估算特定的行值

pyspark:如果列在不同行中具有相同的值,则合并两行或多行

如何通过 Pyspark 中同一数据框中另一列的正则表达式值过滤数据框中的一列

pyspark中同一列的多个AND条件没有连接操作