pyspark rdd map 没有调用函数
Posted
技术标签:
【中文标题】pyspark rdd map 没有调用函数【英文标题】:pyspark rdd map is not calling function 【发布时间】:2018-03-27 17:34:18 【问题描述】:我正在尝试对我的 RDD 进行一些转换,为此,我正在使用 map 调用一个函数。但是,这个函数没有被调用。有人请让我知道我在这里做错了什么?
我可以看到 test
函数被调用但不是 store_past_info
def store_past_info(row):
print "------------------- store_past_info ------------------------------"
if row["transactiontype"] == "Return":
global prv_transaction_number
prv_transaction_number = row["transnumber"]
global return_occured
return_occured = True
global group_id
group_id.append(row["transnumber"])
if row["transactiontype"] == "Purchase":
if return_occured:
global group_id
group_id.append(prv_transaction_number)
else:
global group_id
group_id.append(row["transnumber"])
print group_id
def test(rdd):
print "------------------- test ------------------------------"
rdd.map(store_past_info).collect()
print group_id
这是它在商店中的运作方式:
-
如果购买了某些商品,则会生成一个 ID。
如果您想退回购买的几件商品,则输入了两个条目
-
使用新 id 退回所有产品的退货条目,
org_id
作为您要退回的采购订单的 id
新的购买条目与您上次购买的 ID 相同的 id
用于您想要保留的东西
输入
Date Type Id org_id
25-03-2018 Purchase 111
25-03-2018 Purchase 112
26-03-2018 Return 113 111
26-03-2018 Purchase 111
输出 我想添加一个新列 group_id,它将显示退货和退货后发生的相应购买的相同 id(客户不进行此购买,这是系统为每次退货保留条目的方式)步骤 2.1
Date Type Id org_id group_id
25-03-2018 Purchase 111 111
25-03-2018 Purchase 112 112
26-03-2018 Return 113 111 113
26-03-2018 Purchase 111 113
【问题讨论】:
我认为你不能像在 pySpark 中那样使用全局变量。你想做什么?你能提供一些示例输入/所需的输出吗? @pault 我已经更新了这个问题。但是,我也没有看到函数被调用。 您能否详细说明如何从输入到输出?我认为你更适合使用数据框和窗口函数来解决这个问题,但我仍然不是 100% 清楚你的逻辑。 @pault 请检查问题现在是否更清楚。 【参考方案1】:IIUC,我相信您可以使用DataFrame
s、pyspark.sql.Window
函数和crossJoin()
获得输出
首先使用
将您的rdd
转换为 DataFrame
df = rdd.toDF() # you may have to specify the column names
df.show()
#+----------+--------+---+------+
#| Date| Type| Id|org_id|
#+----------+--------+---+------+
#|25-03-2018|Purchase|111| null|
#|25-03-2018|Purchase|112| null|
#|26-03-2018| Return|113| 111|
#|26-03-2018|Purchase|111| null|
#+----------+--------+---+------+
然后我们需要添加一个索引列来跟踪行的顺序。我们可以使用pyspark.sql.functions.monotonically_increasing_id()
。这将保证值会增加(因此它们可以被排序),但并不意味着它们将是连续的。
import pyspark.sql.functions as f
df = df.withColumn('Index', f.monotonically_increasing_id())
df.show()
#+----------+--------+---+------+-----------+
#| Date| Type| Id|org_id| Index|
#+----------+--------+---+------+-----------+
#|25-03-2018|Purchase|111| null| 8589934592|
#|25-03-2018|Purchase|112| null|17179869184|
#|26-03-2018| Return|113| 111|34359738368|
#|26-03-2018|Purchase|111| null|42949672960|
#+----------+--------+---+------+-----------+
排序很重要,因为您想查找在 Return 之后出现的行。
接下来使用crossJoin
将DataFrame
加入自身。
由于这会返回笛卡尔积,我们会将其过滤到仅满足以下条件任一的行:
l.Index = r.Index
(本质上是连接一行)
(l.Id = r.org_id) AND (l.Index > r.Index)
(Id
等于前一行中的 org_id
- 这是索引列有用的地方)
然后我们为group_id
添加一列,如果满足第二个条件,则将其设置为等于r.Id
。否则我们将此列设置为None
。
df1 = df.alias('l').crossJoin(df.alias('r'))\
.where('(l.Index = r.Index) OR ((l.Id = r.org_id) AND (l.Index > r.Index))')\
.select(
'l.Index',
'l.Date',
'l.Type',
'l.Id',
'l.org_id',
f.when(
(f.col('l.Id') == f.col('r.org_id')) & (f.col('l.Index') > f.col('r.Index')),
f.col('r.Id')
).otherwise(f.lit(None)).alias('group_id')
)
df1.show()
#+-----------+----------+--------+---+------+--------+
#| Index| Date| Type| Id|org_id|group_id|
#+-----------+----------+--------+---+------+--------+
#| 8589934592|25-03-2018|Purchase|111| null| null|
#|17179869184|25-03-2018|Purchase|112| null| null|
#|34359738368|26-03-2018| Return|113| 111| null|
#|42949672960|26-03-2018|Purchase|111| null| 113|
#|42949672960|26-03-2018|Purchase|111| null| null|
#+-----------+----------+--------+---+------+--------+
我们快到了,但正如您所见,仍有两件事需要完成。
-
我们需要消除
Index = 42949672960
的重复行
我们需要使用Id
中的值填写group_id
中的null
行。
第一步,我们将使用Window
函数创建一个名为rowNum
的临时列。这将是按布尔条件group_id IS NULL
排序的每个Index
的pyspark.sql.functions.row_number()
。
对于有多行的索引值,已经设置了group_id
的索引值将首先排序。因此,我们只需要选择rowNum
等于 1 的行(row_number()
从 1 开始,而不是 0)。
完成后,第二步就很简单了——只需将剩余的null
值替换为Id
中的值即可。
from pyspark.sql import Window
w = Window.partitionBy(f.col('Index')).orderBy(f.isnull('group_id'))
df2 = df1.withColumn('rowNum', f.row_number().over(w))\
.where(f.col('rowNum')==1)\
.sort('Index')\
.select(
'Date',
'Type',
'Id',
'org_id',
f.when(
f.isnull('group_id'),
f.col('Id')
).otherwise(f.col('group_id')).alias('group_id')
)
df2.show()
#+----------+--------+---+------+--------+
#| Date| Type| Id|org_id|group_id|
#+----------+--------+---+------+--------+
#|25-03-2018|Purchase|111| null| 111|
#|25-03-2018|Purchase|112| null| 112|
#|26-03-2018| Return|113| 111| 113|
#|26-03-2018|Purchase|111| null| 113|
#+----------+--------+---+------+--------+
【讨论】:
在 spark 1.6crossJoin
中不可用,所以我使用了`join(how='cross)以上是关于pyspark rdd map 没有调用函数的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:使用 map 函数而不是 collect 来迭代 RDD