pyspark rdd map 没有调用函数

Posted

技术标签:

【中文标题】pyspark rdd map 没有调用函数【英文标题】:pyspark rdd map is not calling function 【发布时间】:2018-03-27 17:34:18 【问题描述】:

我正在尝试对我的 RDD 进行一些转换,为此,我正在使用 map 调用一个函数。但是,这个函数没有被调用。有人请让我知道我在这里做错了什么?

我可以看到 test 函数被调用但不是 store_past_info

def store_past_info(row):
    print "------------------- store_past_info  ------------------------------"

    if row["transactiontype"] == "Return":
        global prv_transaction_number
        prv_transaction_number = row["transnumber"]
        global return_occured
        return_occured = True
        global group_id
        group_id.append(row["transnumber"])

    if row["transactiontype"] == "Purchase":
            if return_occured:
                global group_id
                group_id.append(prv_transaction_number)
            else:
                global group_id
                group_id.append(row["transnumber"])

    print group_id


def test(rdd):
    print "------------------- test  ------------------------------"
    rdd.map(store_past_info).collect()
    print group_id

这是它在商店中的运作方式:

    如果购买了某些商品,则会生成一个 ID。

    如果您想退回购买的几件商品,则输入了两个条目

      使用新 id 退回所有产品的退货条目,org_id 作为您要退回的采购订单的 id 新的购买条目与您上次购买的 ID 相同的 id 用于您想要保留的东西

输入

Date        Type        Id      org_id
25-03-2018  Purchase    111 
25-03-2018  Purchase    112 
26-03-2018  Return      113     111    
26-03-2018  Purchase    111 

输出 我想添加一个新列 group_id,它将显示退货和退货后发生的相应购买的相同 id(客户不进行此购买,这是系统为每次退货保留条目的方式)步骤 2.1

Date        Type        Id      org_id  group_id
25-03-2018  Purchase    111             111 
25-03-2018  Purchase    112             112
26-03-2018  Return      113     111     113
26-03-2018  Purchase    111             113

【问题讨论】:

我认为你不能像在 pySpark 中那样使用全局变量。你想做什么?你能提供一些示例输入/所需的输出吗? @pault 我已经更新了这个问题。但是,我也没有看到函数被调用。 您能否详细说明如何从输入到输出?我认为你更适合使用数据框和窗口函数来解决这个问题,但我仍然不是 100% 清楚你的逻辑。 @pault 请检查问题现在是否更清楚。 【参考方案1】:

IIUC,我相信您可以使用DataFrames、pyspark.sql.Window 函数和crossJoin() 获得输出

首先使用

将您的 rdd 转换为 DataFrame
df = rdd.toDF()  # you may have to specify the column names
df.show()
#+----------+--------+---+------+
#|      Date|    Type| Id|org_id|
#+----------+--------+---+------+
#|25-03-2018|Purchase|111|  null|
#|25-03-2018|Purchase|112|  null|
#|26-03-2018|  Return|113|   111|
#|26-03-2018|Purchase|111|  null|
#+----------+--------+---+------+

然后我们需要添加一个索引列来跟踪行的顺序。我们可以使用pyspark.sql.functions.monotonically_increasing_id()。这将保证值会增加(因此它们可以被排序),但并不意味着它们将是连续的。

import pyspark.sql.functions as f
df = df.withColumn('Index', f.monotonically_increasing_id())
df.show()
#+----------+--------+---+------+-----------+
#|      Date|    Type| Id|org_id|      Index|
#+----------+--------+---+------+-----------+
#|25-03-2018|Purchase|111|  null| 8589934592|
#|25-03-2018|Purchase|112|  null|17179869184|
#|26-03-2018|  Return|113|   111|34359738368|
#|26-03-2018|Purchase|111|  null|42949672960|
#+----------+--------+---+------+-----------+

排序很重要,因为您想查找在 Return 之后出现的行。

接下来使用crossJoinDataFrame 加入自身。

由于这会返回笛卡尔积,我们会将其过滤到仅满足以下条件任一的行:

l.Index = r.Index(本质上是连接一行) (l.Id = r.org_id) AND (l.Index > r.Index)Id 等于前一行中的 org_id - 这是索引列有用的地方)

然后我们为group_id 添加一列,如果满足第二个条件,则将其设置为等于r.Id。否则我们将此列设置为None

df1 = df.alias('l').crossJoin(df.alias('r'))\
    .where('(l.Index = r.Index) OR ((l.Id = r.org_id) AND (l.Index > r.Index))')\
    .select(
        'l.Index',
        'l.Date',
        'l.Type',
        'l.Id',
        'l.org_id',
        f.when(
            (f.col('l.Id') == f.col('r.org_id')) & (f.col('l.Index') > f.col('r.Index')),
            f.col('r.Id')
        ).otherwise(f.lit(None)).alias('group_id')
    )
df1.show()
#+-----------+----------+--------+---+------+--------+
#|      Index|      Date|    Type| Id|org_id|group_id|
#+-----------+----------+--------+---+------+--------+
#| 8589934592|25-03-2018|Purchase|111|  null|    null|
#|17179869184|25-03-2018|Purchase|112|  null|    null|
#|34359738368|26-03-2018|  Return|113|   111|    null|
#|42949672960|26-03-2018|Purchase|111|  null|     113|
#|42949672960|26-03-2018|Purchase|111|  null|    null|
#+-----------+----------+--------+---+------+--------+

我们快到了,但正如您所见,仍有两件事需要完成。

    我们需要消除Index = 42949672960 的重复行 我们需要使用Id 中的值填写group_id 中的null 行。

第一步,我们将使用Window 函数创建一个名为rowNum 的临时列。这将是按布尔条件group_id IS NULL 排序的每个Indexpyspark.sql.functions.row_number()

对于有多行的索引值,已经设置了group_id 的索引值将首先排序。因此,我们只需要选择rowNum 等于 1 的行(row_number() 从 1 开始,而不是 0)。

完成后,第二步就很简单了——只需将剩余的null 值替换为Id 中的值即可。

from pyspark.sql import Window
w = Window.partitionBy(f.col('Index')).orderBy(f.isnull('group_id'))
df2 = df1.withColumn('rowNum', f.row_number().over(w))\
    .where(f.col('rowNum')==1)\
    .sort('Index')\
    .select(
        'Date',
        'Type',
        'Id',
        'org_id',
        f.when(
            f.isnull('group_id'),
            f.col('Id')
        ).otherwise(f.col('group_id')).alias('group_id')
    )

df2.show()
#+----------+--------+---+------+--------+
#|      Date|    Type| Id|org_id|group_id|
#+----------+--------+---+------+--------+
#|25-03-2018|Purchase|111|  null|     111|
#|25-03-2018|Purchase|112|  null|     112|
#|26-03-2018|  Return|113|   111|     113|
#|26-03-2018|Purchase|111|  null|     113|
#+----------+--------+---+------+--------+

【讨论】:

在 spark 1.6 crossJoin 中不可用,所以我使用了`join(how='cross)

以上是关于pyspark rdd map 没有调用函数的主要内容,如果未能解决你的问题,请参考以下文章

熊猫数据帧的 PySpark rdd

Pyspark:使用 map 函数而不是 collect 来迭代 RDD

使用 map/filter 在 Pyspark 中的 RDD 中查找最大元素

Pyspark 在元组列表上设置

PySpark理解wordcount.py

从 Pyspark 中的 RDD 中提取字典