PySpark - 创建键值对时丢失字符串值
Posted
技术标签:
【中文标题】PySpark - 创建键值对时丢失字符串值【英文标题】:PySpark - Losing String values when Creating Key Value Pairs 【发布时间】:2018-03-19 10:14:13 【问题描述】:我需要为数据框/RDD 中的每一行创建键值对。也就是说,每个人都将成为每一行的键,而他们的关联交易是一个成为价值的列表。
我有以下例子来说明我的问题:
a = [
('Bob', 562,"Food", "12 May 2018"),
('Bob',880,"Food","01 June 2018"),
('Bob',380,'Household'," 16 June 2018"),
('Sue',85,'Household'," 16 July 2018"),
('Sue',963,'Household'," 16 Sept 2018")
]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
然后,我创建一个函数来为每一行做键值对
def make_keys_and_value(row):
""" Convert the dataframe rows into key value pairs
"""
return (row["Person"], [[row["Amount"], row["Budget"],
row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
但是,当我想显示结果时,Budget
和 Date
变为空值。我认为这是因为它们是字符串值。
person_summarries_rdd.toDF().show(5,False)
+---+-------------------------------+
|_1 |_2 |
+---+-------------------------------+
|Bob|[WrappedArray(562, null, null)]|
|Bob|[WrappedArray(880, null, null)]|
|Bob|[WrappedArray(380, null, null)]|
|Sue|[WrappedArray(85, null, null)] |
|Sue|[WrappedArray(963, null, null)]|
+---+-------------------------------+
我需要在仍然使用此方法的同时保留字符串的值。
【问题讨论】:
【参考方案1】:无需序列化为rdd
。你可以使用pyspark.sql.functions.struct()
:
import pyspark.sql.function as f
df.withColumn('values', f.struct(f.col('Amount'), f.col('Budget'), f.col('Date')))\
.select('Person', 'values').show(truncate=False)
#+------+-----------------------------+
#|Person|values |
#+------+-----------------------------+
#|Bob |[562,Food,12 May 2018] |
#|Bob |[880,Food,01 June 2018] |
#|Bob |[380,Household, 16 June 2018]|
#|Sue |[85,Household, 16 July 2018] |
#|Sue |[963,Household, 16 Sept 2018]|
#+------+-----------------------------+
或者使用列表推导:
array_columns = [c for c in df.columns if c != 'Person']
df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
.select('Person', 'values').show(truncate=False)
#+------+-----------------------------+
#|Person|values |
#+------+-----------------------------+
#|Bob |[562,Food,12 May 2018] |
#|Bob |[880,Food,01 June 2018] |
#|Bob |[380,Household, 16 June 2018]|
#|Sue |[85,Household, 16 July 2018] |
#|Sue |[963,Household, 16 Sept 2018]|
#+------+-----------------------------+
您的代码不起作用,因为您不能在 WrappedArray()
中包含混合类型。 Spark 正在从第一项 (Amount
) 推断类型。
您可以将Amount
转换为str
:
def make_keys_and_value(row):
""" Convert the dataframe rows into key value pairs
"""
return (row["Person"], [[str(row["Amount"]), row["Budget"],
row["Date"]]])
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
person_summarries_rdd.toDF().show(truncate=False)
#+---+---------------------------------------------+
#|_1 |_2 |
#+---+---------------------------------------------+
#|Bob|[WrappedArray(562, Food, 12 May 2018)] |
#|Bob|[WrappedArray(880, Food, 01 June 2018)] |
#|Bob|[WrappedArray(380, Household, 16 June 2018)]|
#|Sue|[WrappedArray(85, Household, 16 July 2018)] |
#|Sue|[WrappedArray(963, Household, 16 Sept 2018)]|
#+---+---------------------------------------------+
或者使用tuple
而不是list
:
def make_keys_and_value(row):
""" Convert the dataframe rows into key value pairs
"""
return (row["Person"], ((row["Amount"]), row["Budget"],
row["Date"]))
person_summarries_rdd = df.rdd.map(lambda row : make_keys_and_value(row))
#+---+-----------------------------+
#|_1 |_2 |
#+---+-----------------------------+
#|Bob|[562,Food,12 May 2018] |
#|Bob|[880,Food,01 June 2018] |
#|Bob|[380,Household, 16 June 2018]|
#|Sue|[85,Household, 16 July 2018] |
#|Sue|[963,Household, 16 Sept 2018]|
#+---+-----------------------------+
这里我取出了嵌套的[]
,但如果您希望输出看起来像[[562,Food,12 May 2018]]
而不是[562,Food,12 May 2018]
,则可以轻松将其重新添加。
另一种选择是使用pyspark.sql.functions.create_map()
创建地图:
df.withColumn(
'values',
f.create_map(
*reduce(
list.__add__,
[[f.lit(c), f.col(c)] for c in array_columns]
)
)
).select('Person', 'values').show(truncate=False)
#+------+--------------------------------------------------------------+
#|Person|values |
#+------+--------------------------------------------------------------+
#|Bob |Map(Amount -> 562, Budget -> Food, Date -> 12 May 2018) |
#|Bob |Map(Amount -> 880, Budget -> Food, Date -> 01 June 2018) |
#|Bob |Map(Amount -> 380, Budget -> Household, Date -> 16 June 2018)|
#|Sue |Map(Amount -> 85, Budget -> Household, Date -> 16 July 2018) |
#|Sue |Map(Amount -> 963, Budget -> Household, Date -> 16 Sept 2018)|
#+------+--------------------------------------------------------------+
或者,如果您想直接转到Person->array
的映射:
df.withColumn('values', f.struct(*[f.col(c) for c in array_columns]))\
.withColumn('map',f.create_map(f.col('Person'), f.col('values')))\
.select('map')\
.show(truncate=False)
#+-----------------------------------------+
#|map |
#+-----------------------------------------+
#|Map(Bob -> [562,Food,12 May 2018]) |
#|Map(Bob -> [880,Food,01 June 2018]) |
#|Map(Bob -> [380,Household, 16 June 2018])|
#|Map(Sue -> [85,Household, 16 July 2018]) |
#|Map(Sue -> [963,Household, 16 Sept 2018])|
#+-----------------------------------------+
【讨论】:
以上是关于PySpark - 创建键值对时丢失字符串值的主要内容,如果未能解决你的问题,请参考以下文章