拆分特定的 PySpark df 列并创建另一个 DF

Posted

技术标签:

【中文标题】拆分特定的 PySpark df 列并创建另一个 DF【英文标题】:Splitting a specific PySpark df column and create another DF 【发布时间】:2018-10-31 10:02:22 【问题描述】:

我有一个 dataframe(比如 ac_df),它有 32 个不同的列。我想获得一个特定的列并将值拆分为 3 的块作为一个新值,并从中创建另一个 df 。

ac_df['payment_history_1'] 给出以下结果

我想要一个具有以下结构的新 df。

例如:如果我取第一行'000000000000',它将被分组为

“000”、“000”、“000”、“000”

这将创建新 df 的第一行。

执行此任务的 Python 等效代码如下:

temp1 = ac_df['payment_history_1'].str.split(r'(...)', expand=True)

在 spark 中,我尝试了以下方法:

temp1 = ac_df.select(ac_df['payment_history_1']).rdd.map(lambda each_row: str(each_row[0])).map(lambda y: y.split(r'(...)')).collect()

输出:

 [['000000000000'], ['000000000003000000000'], ['000000000003000000000000000']]

但是,我无法继续前进并获得预期的结果。有人可以推荐吗?

【问题讨论】:

你可以在数据框列上使用pyspark.sql.functions.split,就像在python中使用str.split一样。 ***.com/questions/39235704/… 【参考方案1】:

试试这个,你将能够建立在这个之上:

df = spark.createDataFrame(
        [
            [1, '000000000000'], 
            [2, '000000000003000000000'], 
            [3, '000000000003000000000000000']
        ]
        , ["id", "numbers"]
        )


df.show()

应该产生类似于你开始的数据框的东西:

+---+--------------------+
| id|             numbers|
+---+--------------------+
|  1|        000000000000|
|  2|00000000000300000...|
|  3|00000000000300000...|
+---+--------------------+

获取数字列,您将能够将其解析为“,”分隔的字符串,我们可以在其中应用: posexplode(expr) - 将数组 expr 的元素分隔为多行位置,或者将 map expr 的元素分成多行多列的位置。

from pyspark.sql.functions import posexplode

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
).show()

这应该导致:

+---+--------------------+---+---+
| id|             numbers|pos|val|
+---+--------------------+---+---+
|  1|[000, 000, 000, 000]|  0|000|
|  1|[000, 000, 000, 000]|  1|000|
|  1|[000, 000, 000, 000]|  2|000|
|  1|[000, 000, 000, 000]|  3|000|
|  2|[000, 000, 000, 0...|  0|000|
|  2|[000, 000, 000, 0...|  1|000|
|  2|[000, 000, 000, 0...|  2|000|
|  2|[000, 000, 000, 0...|  3|003|
|  2|[000, 000, 000, 0...|  4|000|
|  2|[000, 000, 000, 0...|  5|000|
|  2|[000, 000, 000, 0...|  6|000|
|  3|[000, 000, 000, 0...|  0|000|
|  3|[000, 000, 000, 0...|  1|000|
|  3|[000, 000, 000, 0...|  2|000|
|  3|[000, 000, 000, 0...|  3|003|
|  3|[000, 000, 000, 0...|  4|000|
|  3|[000, 000, 000, 0...|  5|000|
|  3|[000, 000, 000, 0...|  6|000|
|  3|[000, 000, 000, 0...|  7|000|
|  3|[000, 000, 000, 0...|  8|000|
+---+--------------------+---+---+

接下来,我们使用 :pyspark.sql.functions.expr 来抓取这个数组中索引 pos 处的元素。

第一个是我们新列的名称,它将是数字和数组中索引的连接。第二列将是数组中相应索引处的值。我们通过利用 pyspark.sql.functions.expr 的功能得到后者,它允许我们使用列值作为参数。

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
    "id",
    f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
    f.expr("numbers[pos]").alias("val")
)\
.show()

结果:

+---+--------+---+
| id|  number|val|
+---+--------+---+
|  1|numbers0|000|
|  1|numbers1|000|
|  1|numbers2|000|
|  1|numbers3|000|
|  2|numbers0|000|
|  2|numbers1|000|
|  2|numbers2|000|
|  2|numbers3|003|
|  2|numbers4|000|
|  2|numbers5|000|
|  2|numbers6|000|
|  3|numbers0|000|
|  3|numbers1|000|
|  3|numbers2|000|
|  3|numbers3|003|
|  3|numbers4|000|
|  3|numbers5|000|
|  3|numbers6|000|
|  3|numbers7|000|
|  3|numbers8|000|
+---+--------+---+

最后我们可以通过 id 分组并旋转 DataFrame

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
    "id",
    f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
    f.expr("numbers[pos]").alias("val")
)\
.groupBy("id").pivot("number").agg(f.first("val"))\
.show()

给出最终的数据框:

从以下位置获取详细信息: Split Spark Dataframe string column into multiple columns

【讨论】:

以上是关于拆分特定的 PySpark df 列并创建另一个 DF的主要内容,如果未能解决你的问题,请参考以下文章

R:拆分字符列并创建两个新的

如何使用逗号分隔值拆分列并存储在 PySpark Dataframe 中的数组中?如下所示

如何从pyspark数据框列值中删除方括号

Pyspark 使用 udf 处理数组列并返回另一个数组

我们可以从另一个数据框向数据框添加新列吗

Pandas:通过分隔符拆分列并根据其他列重新排列