将每一行的值汇总为布尔值(PySpark)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将每一行的值汇总为布尔值(PySpark)相关的知识,希望对你有一定的参考价值。

我目前有一个PySpark数据框,其中有许多由整数计数填充的列。其中许多列的计数为零。 我想找到一种方法来汇总多少列的计数大于零的方法

换句话说,我想要一种对一行中的值求和的方法,其中给定行的所有列实际上都是布尔值(尽管可能不需要数据类型转换)。表中的几列是日期时间或字符串,因此理想情况下,我会采用一种首先选择数字列的方法。

当前数据框示例和所需的输出

+---+---------- +----------+------------            
|USER|   DATE   |COUNT_COL1| COUNT_COL2|...     DESIRED COLUMN
+---+---------- +----------+------------ 
| b | 7/1/2019 |  12      |     1     |              2        (2 columns are non-zero)
| a | 6/9/2019 |  0       |     5     |              1
| c | 1/1/2019 |  0       |     0     |              0

Pandas:例如,在熊猫中,这可以通过选择数字列,转换为bool并与axis = 1求和来实现。我正在寻找等效的PySpark。

test_cols=list(pandas_df.select_dtypes(include=[np.number]).columns.values)
pandas_df[test_cols].astype(bool).sum(axis=1)
答案

对于数字,可以通过使用array(使用integer values)为所有列创建一个df.dtypes来实现,然后使用higher order functions。在这种情况下,我使用filter消除了所有0,然后使用size获得了每行所有非零元素的数量。(spark2.4+)

from pyspark.sql import functions as F
df.withColumn("arr", F.array(*[F.col(i[0]) for i in df.dtypes if i[1] in ['int','bigint']]))
  .withColumn("DESIRED COLUMN", F.expr("""size(filter(arr,x->x!=0))""")).drop("arr").show()

#+----+--------+----------+----------+--------------+
#|USER|    DATE|COUNT_COL1|COUNT_COL2|DESIRED COLUMN|
#+----+--------+----------+----------+--------------+
#|   b|7/1/2019|        12|         1|             2|
#|   a|6/9/2019|         0|         5|             1|
#|   c|1/1/2019|         0|         0|             0|
#+----+--------+----------+----------+--------------+
另一答案

假设您的df以下:

df.show()
df.printSchema()

+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|  a|  1|  2|  3|
|  a|  0|  2|  1|
|  a|  0|  0|  1|
|  a|  0|  0|  0|
+---+---+---+---+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

使用when语句时,可以检查列是否为数字,然后是否大于0。在下一步中,由于f.size仅将cols保留为f.array_remove值,因此True将返回计数。

from pyspark.sql import functions as f
cols = [f.when(f.length(f.regexp_replace(f.col(x), '\d+', '')) > 0, False).otherwise(f.col(x).cast('int') > 0) for x in df2.columns]
df.select("*", f.size(f.array_remove(f.array(*cols), False)).alias("count")).show()

+---+---+---+---+-----+
|_c0|_c1|_c2|_c3|count|
+---+---+---+---+-----+
|  a|  1|  2|  3|    3|
|  a|  0|  2|  1|    2|
|  a|  0|  0|  1|    1|
|  a|  0|  0|  0|    0|
+---+---+---+---+-----+

以上是关于将每一行的值汇总为布尔值(PySpark)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Laravel 的 Eloquent/Fluent 将每一行设置为相同的值?

PySpark - ValueError:无法将列转换为布尔值

PYSPARK:根据条件用另一个行值更新一行中的值?

使用 PySpark 将每一行的每一列作为单独的文件写入 S3

Spark中多列的窗口聚合

PySpark/Hive:如何使用 LazySimpleSerDe 创建表以转换布尔值“t”/“f”?