pyspark dataframe foreach 填充列表

Posted

技术标签:

【中文标题】pyspark dataframe foreach 填充列表【英文标题】:pyspark dataframe foreach to fill a list 【发布时间】:2018-02-26 11:00:00 【问题描述】:

我正在使用 Spark 1.6.1 和 Python 2.7,我有这个问题要解决:

获取包含 X 行的数据框 A 对于 A 中的每一行,根据一个字段,创建一个或多个新数据框 B 的行 保存新的数据框 B

我现在提出的解决方案是收集数据帧 A,检查它,将 B 的行附加到列表中,然后从该列表创建数据帧 B。

有了这个解决方案,我显然失去了使用数据框的所有好处,我想使用 foreach,但我找不到实现这项工作的方法。到目前为止我已经尝试过了:

将一个空列表传递给 foreach 函数(这只是忽略了 foreach 函数并且不执行任何操作) 创建一个用于foreach函数的全局变量(抱怨找不到列表)

有人有什么想法吗?

谢谢

----------编辑:

我尝试过的例子:

def f(row, list):
    if row.one:
        list += [Row(type='one', field='ok')]
    else:
        list += [Row(type='one', field='ok')]
        list += [Row(type='two', field='nok')]

list = []
dfA.foreach(lambda x : f(x, list))

正如我提到的,这什么都不做,它不执行函数

我也尝试过(在课程开头定义了哪个列表):

global list
def f(row):
    if row.one:
        list += [Row(type='one', field='ok')]
    else:
        list += [Row(type='one', field='ok')]
        list += [Row(type='two', field='nok')]

dfA.foreach(list)

---------编辑2:

我现在正在做的是:

    list = []
    for row in dfA.collect():
        string = re.search(a_regex, row['raw'])
        if string:
            dates = re.findall(date_regex, string.group())
            for date in dates:
                date_string = datetime.strptime(date, '%Y-%m-%d').date()
                list += [Row(event_type='1', event_date=date_string)]

        b_string = re.search(b_regex, row['raw'])
        if b_string:
            dates = re.findall(date_regex, b_string.group())
            for date in dates:
                scheduled_to = datetime.strptime(date, '%Y-%m-%d').date()
                list += [Row(event_type='2', event_date= date_string)]

然后:

dfB = self._sql_context.createDataFrame(list)

dfA 是由其他进程提供的,我无法更改它,我知道这是使用数据帧的一种非常愚蠢的方式,但我对此无能为力

--------编辑3: dfA.raw 样本:

"new":[],"removed":["start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null
"new":["start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null],"removed
"new":["start":"2017-01-28","end":"2017-02-03","start":"2017-02-04","end":"2017-02-10","start":"2017-02-11","end":"2017-02-17","start":"2017-02-18","end":"2017-02-24","start":"2017-03-04","end":"2017-03-10","start":"2017-03-11","end":"2017-03-17","start":"2017-03-18","end":"2017-03-24","start":"2017-09-02","end":"2017-09-08","start":"2017-09-16","end":"2017-09-22","start":"2017-09-23","end":"2017-09-29","start":"2017-09-30","end":"2017-10-06","start":"2017-10-07","end":"2017-10-13","start":"2017-12-02","end":"2017-12-08","start":"2017-12-09","end":"2017-12-15","start":"2017-12-16","end":"2017-12-22","start":"2017-12-23","end":"2017-12-29","start":"2018-01-06","end":"2018-01-12"],"removed":["start":"2017-02-04","end":"2017-02-10","start":"2017-02-11","end":"2017-02-17","start":"2017-02-18","end":"2017-02-24","start":"2017-03-04","end":"2017-03-10","start":"2017-03-11","end":"2017-03-17","start":"2017-03-18","end":"2017-03-24","start":"2017-01-28","end":"2017-02-03","start":"2017-09-16","end":"2017-09-22","start":"2017-09-02","end":"2017-09-08","start":"2017-09-30","end":"2017-10-06","start":"2017-10-07","end":"2017-10-13","start":"2017-09-23","end":"2017-09-29","start":"2017-12-16","end":"2017-12-22","start":"2017-12-23","end":"2017-12-29","start":"2018-01-06","end":"2018-01-12","start":"2017-12-09","end":"2017-12-15","start":"2017-12-02","end":"2017-12-08","start":"2018-02-10","end":"2018-02-16"]|

和正则表达式:

a_regex = r'\"new\":(.*?)2|\"new\":\[(.*?)\]'
b_regex = r'\"removed\":(.*?)2|removed\":\[(.*?)\]'
date_regex = r'\"start\":\"(\d4-\d2-\d2)\"'

dfA.select('raw').show(2,False)

+-------------------------------------------------------------------------------------------------------+
|raw                                                                                                    |
+-------------------------------------------------------------------------------------------------------+
|"new":["start":"2018-03-24","end":"2018-03-30","scheduled_by_system":null],"removed":[]|
|"new":["start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null],"removed":[]|
+-------------------------------------------------------------------------------------------------------+
only showing top 2 rows

df.select('raw').printSchema()

root
 |-- raw: string (nullable = true)

【问题讨论】:

请分享您的尝试,示例输入和预期输出。 完成,预期的输出显然是填充列表 从 A 获取 X 行的条件是什么。您可以发布示例输入和预期输出吗? 也解释一下For each row in A, depending on a field, create one or more rows of a new dataframe B 嗨,ramesh,首先,让我粘贴我现在拥有的内容,以便您了解我要做什么: 【参考方案1】:

在选择所需的raw 列后,您需要编写一个udf 函数来返回event_typeevent_date 字符串。

import re
def searchUdf(regex, dateRegex, x):
    list_return = []
    string = re.search(regex, x)
    if string:
        dates = re.findall(dateRegex, string.group())
        for date in dates:
            date_string = datetime.strptime(date, '%Y-%m-%d').date()
            list_return.append(date_string)
    return list_return

from pyspark.sql import functions as F
udfFunctionCall = F.udf(searchUdf, T.ArrayType(T.DateType()))

udf 函数将解析原始列字符串,并将 regexdateRegex 作为参数传递并返回 eventType data_string 作为arrayType

您应该调用已定义的 udf 函数和 filter 出空行,然后将列分隔event_typeevent_date

df = df.select("raw")
adf = df.select(F.lit(1).alias("event_type"), udfFunctionCall(F.lit(a_regex), F.lit(date_regex), df.raw).alias("event_date"))\
    .filter(F.size(F.col("event_date")) > 0)

bdf = df.select(F.lit(2).alias("event_type"), udfFunctionCall(F.lit(a_regex), F.lit(date_regex), df.raw).alias("event_date")) \
    .filter(F.size(F.col("event_date")) > 0)

使用的 regex 在问题中提供为

a_regex = r'\"new\":(.*?)2|\"new\":\[(.*?)\]'
b_regex = r'\"removed\":(.*?)2|removed\":\[(.*?)\]'
date_regex = r'\"start\":\"(\d4-\d2-\d2)\"'

现在你有两个dataframesevent_type,最后一步是将它们合并在一起

adf.unionAll(bdf)

就是这样。你的困惑都解决了。

使用以下原始列


|raw|

|"new":[],"removed":["start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|"new":["start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null],"removed|
|"new":["start":"2017-01-28","end":"2017-02-03","start":"2017-02-04","end":"2017-02-10","start":"2017-02-11","end":"2017-02-17","start":"2017-02-18","end":"2017-02-24","start":"2017-03-04","end":"2017-03-10","start":"2017-03-11","end":"2017-03-17","start":"2017-03-18","end":"2017-03-24","start":"2017-09-02","end":"2017-09-08","start":"2017-09-16","end":"2017-09-22","start":"2017-09-23","end":"2017-09-29","start":"2017-09-30","end":"2017-10-06","start":"2017-10-07","end":"2017-10-13","start":"2017-12-02","end":"2017-12-08","start":"2017-12-09","end":"2017-12-15","start":"2017-12-16","end":"2017-12-22","start":"2017-12-23","end":"2017-12-29","start":"2018-01-06","end":"2018-01-12"],"removed":["start":"2017-02-04","end":"2017-02-10","start":"2017-02-11","end":"2017-02-17","start":"2017-02-18","end":"2017-02-24","start":"2017-03-04","end":"2017-03-10","start":"2017-03-11","end":"2017-03-17","start":"2017-03-18","end":"2017-03-24","start":"2017-01-28","end":"2017-02-03","start":"2017-09-16","end":"2017-09-22","start":"2017-09-02","end":"2017-09-08","start":"2017-09-30","end":"2017-10-06","start":"2017-10-07","end":"2017-10-13","start":"2017-09-23","end":"2017-09-29","start":"2017-12-16","end":"2017-12-22","start":"2017-12-23","end":"2017-12-29","start":"2018-01-06","end":"2018-01-12","start":"2017-12-09","end":"2017-12-15","start":"2017-12-02","end":"2017-12-08","start":"2018-02-10","end":"2018-02-16"]|


你应该得到

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_type|event_date                                                                                                                                                                                                  |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1         |[2018-03-10]                                                                                                                                                                                                |
|1         |[2017-01-28, 2017-02-04, 2017-02-11, 2017-02-18, 2017-03-04, 2017-03-11, 2017-03-18, 2017-09-02, 2017-09-16, 2017-09-23, 2017-09-30, 2017-10-07, 2017-12-02, 2017-12-09, 2017-12-16, 2017-12-23, 2018-01-06]|
|2         |[2018-03-10]                                                                                                                                                                                                |
|2         |[2017-01-28, 2017-02-04, 2017-02-11, 2017-02-18, 2017-03-04, 2017-03-11, 2017-03-18, 2017-09-02, 2017-09-16, 2017-09-23, 2017-09-30, 2017-10-07, 2017-12-02, 2017-12-09, 2017-12-16, 2017-12-23, 2018-01-06]|
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

【讨论】:

嘿,Ramesh,这看起来非常好...我将尝试所有案例(新但未删除,删除但没有新,很多新闻和很多删除)看看它是如何工作的,我会告诉你(如果一切顺利,我会接受答案:))非常感谢你的巨大帮助 嗨,Ramesh,很抱歉,它不起作用 u_u。对于此输入: "new":[],"removed":["start":"2018-02-24","end":"2018-03-02","start":"2018 -03-03","end":"2018-03-09","start":"2018-03-10","end":"2018-03-16","start": "2018-03-17","end":"2018-03-23","start":"2018-03-24","end":"2018-03-30"] 只获取第一个,据我所知,filter 将获取至少包含一个条目的行,但查看您发送的代码,只获取第一个......我错过了什么吗?为什么不工作?谢谢! 你能解释一下it only gets the first one吗? 因为我只得到一个 df,其中一行的 event_type = 2 和 event_date=2018-02-24 (这是第一行) 不仅帮助了我,还教会了我一些未来的技巧:)

以上是关于pyspark dataframe foreach 填充列表的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

将pyspark列连接到pyspark DataFrame

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

PySpark:转换DataFrame中给定列的值

PySpark|比RDD更快的DataFrame

Pyspark:将 pyspark.sql.row 转换为 Dataframe