Pyspark:如何编写复杂的 Dataframe 计算

Posted

技术标签:

【中文标题】Pyspark:如何编写复杂的 Dataframe 计算【英文标题】:Pyspark :How to code complicated Dataframe calculation 【发布时间】:2020-08-06 19:35:05 【问题描述】:

数据框已经按日期排序,

col1 ==1 值是唯一的,

并且 col1==1 被传递,它将增加 1 的增量(例如 1,2,3,4,5,6,7...) 并且只有 -1 是重复的。

我有一个看起来像这样的数据框,称之为 df

TEST_schema = StructType([StructField("date", StringType(), True),\
                          StructField("col1", IntegerType(), True),\
                          StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',-1,-1),('2020-08-02',-1,-1),('2020-08-03',-1,3),('2020-08-04',-1,2),('2020-08-05',1,4),\
             ('2020-08-06',2,1),('2020-08-07',3,2),('2020-08-08',4,3),('2020-08-09',5,-1)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df.show()



+--------+----+----+
    date |col1|col2|
+--------+----+----+
2020-08-01| -1|  -1|
2020-08-02| -1|  -1|
2020-08-03| -1|   3|
2020-08-04| -1|   2|
2020-08-05| 1 |   4|
2020-08-06| 2 |   1|
2020-08-07| 3 |   2|
2020-08-08| 4 |   3|
2020-08-09| 5 |  -1|
+--------+----+----+

条件是当 col1 == 1 时,我们从 col2 ==4 开始向后添加,(例如 4,5,6,7,8,...),然后 col2 == 4 返回 0 all方式(例如 4,0,0,0,0...)

所以,我得到的 df 看起来像这样。

   +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  8 |
    2020-08-02| -1|  -1|  7 |
    2020-08-03| -1|   3|  6 |
    2020-08-04| -1|   2|  5 |
    2020-08-05| 1 |   4|  4 |
    2020-08-06| 2 |   1|  0 |
    2020-08-07| 3 |   2|  0 |
    2020-08-08| 4 |   3|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

增强功能:我想在 col2 == -1 时添加其他条件 col1 == 1(在 2020-08-05),并且 col2 == -1 连续.. 然后我想连续计算 -1,然后在连续中断 col2 == 的位置添加?价值。所以这里有一个例子来清除。

    +--------+----+----+----+
        date |col1|col2|want
    +--------+----+----+----+
    2020-08-01| -1|  -1|  11|
    2020-08-02| -1|  -1|  10|
    2020-08-03| -1|   3|  9 |
    2020-08-04| -1|   2|  8 |
    2020-08-05| 1 |  -1|  7*|
    2020-08-06| 2 |  -1|  0 |
    2020-08-07| 3 |  -1|  0 |
    2020-08-08| 4 |  4*|  0 |
    2020-08-09| 5 |  -1|  0 |
   +---------+----+----+----+  

所以,我们看到 3 个连续的 -1(从 2020 年 8 月 5 日开始,我们只关心第一个连续的 -1),连续之后我们有 4 个(在 2020 年 8 月 8 日表示为 *),然后我们将在 col1 ==1 行有 4+ 3 =7。有可能吗?

** 我的第一次尝试**

TEST_df = TEST_df.withColumn('cumsum', sum(when( col('col1') < 1, col('col1') ) \
                 .otherwise( when( col('col1') == 1, 1).otherwise(0))).over(Window.partitionBy('col1').orderBy().rowsBetween(-sys.maxsize, 0)))
TEST_df.show()

+----------+----+----+------+
|      date|col1|col2|cumsum|
+----------+----+----+------+
|2020-08-01|  -1|  -1|    -1|
|2020-08-02|  -1|  -1|    -2|
|2020-08-03|  -1|   3|    -3|
|2020-08-04|  -1|   2|    -4|
|2020-08-05|   1|   4|     1|
|2020-08-07|   3|   2|     0|
|2020-08-09|   5|  -1|     0|
|2020-08-08|   4|   3|     0|
|2020-08-06|   2|   1|     0|
+----------+----+----+------+

w1 = Window.orderBy(desc('date'))
w2 =Window.partitionBy('case').orderBy(desc('cumsum'))

TEST_df.withColumn('case', sum(when( (col('cumsum') == 1) & (col('col2') != -1) , col('col2')) \
       .otherwise(0)).over(w1)) \
  .withColumn('rank', when(col('case') != 0, rank().over(w2)-1).otherwise(0)) \
  .withColumn('want', col('case') + col('rank')) \
  .orderBy('date') \
+----------+----+----+------+----+----+----+
|date      |col1|col2|cumsum|case|rank|want|
+----------+----+----+------+----+----+----+
|2020-08-01|-1  |-1  |-1    |4   |1   |5   |
|2020-08-02|-1  |-1  |-2    |4   |2   |6   |
|2020-08-03|-1  |3   |-3    |4   |3   |7   |
|2020-08-04|-1  |2   |-4    |4   |4   |8   |
|2020-08-05|1   |4   |1     |4   |0   |4   |
|2020-08-06|2   |1   |0     |0   |0   |0   |
|2020-08-07|3   |2   |0     |0   |0   |0   |
|2020-08-08|4   |3   |0     |0   |0   |0   |
|2020-08-09|5   |-1  |0     |0   |0   |0   |
+----------+----+----+------+----+----+----+

您会看到排名 1,2,3,4 如果我可以将其设为 4,3,2,1,它将看起来像我的结果数据框....如何反转它?我尝试了orderby asc和desc ... 当然这是在增强

之前

【问题讨论】:

你的 spark 版本是什么? Spark 版本:2.4.6 【参考方案1】:

IIUC,您可以尝试以下方法:

    groupby 并创建所有相关行的 collect_list(vals 在下面的代码中),按日期按降序对列表进行排序(注意:groupby(lit(1)) 更改为您可以使用的任何列将您的数据分成独立的子集。

    找到具有col1 == 1的数组索引idx

    如果col2==-1idx,则找到从idx到列表开头的偏移量,第一行有col2 != -1注意:在当前代码中,偏移量可能是如果idx 之前的所有col2 都是-1,则为NULL,你必须决定你想要什么。例如使用coalesce(IF(...),0))

    在我们有了 offset 和 idx 之后,want 列可以通过以下方式计算:

    IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
    

    使用 SparkSQL 函数inline 来分解结构数组。

注意:可以使用 Window 函数应用相同的逻辑,以防生产数据框中存在太多列。

代码如下:

from pyspark.sql.functions import sort_array, collect_list, struct, expr, lit

TEST_df = spark.createDataFrame([
  ('2020-08-01', -1, -1), ('2020-08-02', -1, -1), ('2020-08-03', -1, 3),
  ('2020-08-04', -1, 2), ('2020-08-05', 1, -1), ('2020-08-06', 2, -1),
  ('2020-08-07', 3, -1), ('2020-08-08', 4, 4), ('2020-08-09', 5, -1)
], ['date', 'col1', 'col2'])

# list of column used in calculation
cols = ["date", "col1", "col2"]

df_new = TEST_df \
    .groupby(lit(1)) \
    .agg(sort_array(collect_list(struct(*cols)),False).alias('vals')) \
    .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
    .withColumn('offset', expr("""
        coalesce(IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0),0)
     """)).selectExpr("""
       inline(
         transform(vals, (x,i) -> named_struct(
             'dta', x,
             'want', IF(i<idx, 0, vals[idx-offset].col2 + offset + i - idx)
           )
         )
    )""").select('dta.*', 'want')

输出:

df_new.orderBy('date').show()
+----------+----+----+----+
|      date|col1|col2|want|
+----------+----+----+----+
|2020-08-01|  -1|  -1|  11|
|2020-08-02|  -1|  -1|  10|
|2020-08-03|  -1|   3|   9|
|2020-08-04|  -1|   2|   8|
|2020-08-05|   1|  -1|   7|
|2020-08-06|   2|  -1|   0|
|2020-08-07|   3|  -1|   0|
|2020-08-08|   4|   4|   0|
|2020-08-09|   5|  -1|   0|
+----------+----+----+----+

编辑:每个 cmets,添加了使用 Window 聚合函数而不是 groupby 的替代方法:

from pyspark.sql import Window

# WindowSpec to cover all related Rows in the same partition
w1 = Window.partitionBy().orderBy('date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

cols = ["date", "col1", "col2"]

# below `cur_idx` is the index for the current Row in array `vals`
df_new = TEST_df.withColumn('vals', sort_array(collect_list(struct(*cols)).over(w1),False)) \
    .withColumn('idx', expr("filter(sequence(0,size(vals)-1), i -> vals[i].col1=1)[0]")) \
    .withColumn('offset', expr("IF(vals[idx].col2=-1, filter(sequence(1,idx), i -> vals[idx-i].col2 != -1)[0],0)")) \
    .withColumn("cur_idx", expr("array_position(vals, struct(date,col1,col2))-1")) \
    .selectExpr(*TEST_df.columns, "IF(cur_idx<idx, 0, vals[idx-offset].col2 + offset + cur_idx - idx) as want")

【讨论】:

感谢您的辛勤工作 jxc... 示例使用 coalesce(IF(...),0) ,我在哪里放置合并条件?是的,如果 idx 之前的 col2 是 -1,我确实希望它像 offset =0 @hellotherebj 理论上应该用于filter语句的输出,但最终结果应该是一样的:IF(vals[idx].col2=-1, coalesce(filter(sequence(1,idx), i -&gt; vals[idx-i].col2 != -1)[0],0),0) 谢谢。这说得通。你能看看我的新问题吗?有点像这个..***.com/questions/63308490/… 嘿,jxc,我发布了另一个难题...我自己似乎无法做到这一点,所以我需要您的帮助..***.com/questions/63384238/… 如果要保留原始方法,我们必须将named_struct()中的所有列包含在内联函数中。只需使用 Python 格式即可:.selectExpr(""" inline(transform(vals, (x,i) -&gt; named_struct(, 'want', IF(i&lt;idx, 0, vals[idx-offset].col2 + offset + i - idx)))) """.format(','.join("'0', x['0']".format(c) for c in cols)))

以上是关于Pyspark:如何编写复杂的 Dataframe 计算的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

通过 pyspark.sql.dataframe 将 XML 数据转换为 pandas 数据帧

用列表 Pyspark Dataframe 中的值替换 NA

使用 PySpark 删除 Dataframe 的嵌套列

Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame

Pyspark 从具有不同列的行/数据创建 DataFrame