如何将字典中的值映射到 Pyspark 中的新列
Posted
技术标签:
【中文标题】如何将字典中的值映射到 Pyspark 中的新列【英文标题】:How do I map values from a dictionary to a new column in Pyspark 【发布时间】:2021-10-21 11:16:12 【问题描述】:我试图将值映射到我的 pyspark df 中的新列
dict = '443368995': 0, '667593514': 1, '940995585': 2, '880811536': 3, '174590194': 4
I am reading a csv which has following data -
+--------------------+----------------+---------+------------+-------------+----------+---------+
| Region| Country| ItemType|SalesChannel|OrderPriority| OrderDate| OrderID|
+--------------------+----------------+---------+------------+-------------+----------+---------+
| Sub-Saharan Africa| South Africa| Fruits| Offline| M| 7/27/2012|443368995|
|Middle East and N...| Morocco| Clothes| Online| M| 9/14/2013|667593514|
|Australia and Oce...|Papua New Guinea| Meat| Offline| M| 5/15/2015|940995585|
| Sub-Saharan Africa| Djibouti| Clothes| Offline| H| 5/17/2017|880811536|
| Europe| Slovakia|Beverages| Offline| L|10/26/2016|174590194|
+--------------------+----------------+---------+------------+-------------+----------+---------+
下面是我想根据上面的字典的某些决定添加的附加列 -
+---------+
| SomeFlag|
+---------+
| Y|
| N|
| Y|
| Y|
| Y|
+---------+
这是我尝试过的代码 ==>
df = spark.read.option("header", True).csv("sample.csv")
def my_mapp_fn(checkcol, dict1):
print(col(checkcol))
print(key)
return coalesce(*[when(col(checkcol) == key, lit(value)) for key, value in d.items()])
new_df = df.withColumn("SomeFlag", my_mapp_fn(dict, col('OrderId')))
我遇到错误 ==>
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in my_mapp_fn
File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 106, in col
return _invoke_function("col", col)
File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 58, in _invoke_function
return Column(jf(*args))
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1296, in __call__
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1260, in _build_args
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1247, in _get_args
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_collections.py", line 510, in convert
File "C:\BigData\Spark\python\pyspark\sql\column.py", line 460, in __iter__
raise TypeError("Column is not iterable")
TypeError: Column is not iterable
有什么建议吗?提前致谢
【问题讨论】:
【参考方案1】:为了映射 dict 并根据您可以编写的内容创建列:
from pyspark.sql import functions as F
dict_data = '443368995': '0', '667593514': '1', '940995585': '2', '880811536': '3', '174590194': '4'
d = [
("M", '443368995'),
("M", '667593514'),
("M", '940995585'),
("H", '880811536'),
("L", '174590194'),
]
df = spark.createDataFrame(d,['OrderPriority','OrderID'])
df.show()
# output
+-------------+---------+
|OrderPriority| OrderID|
+-------------+---------+
| M|443368995|
| M|667593514|
| M|940995585|
| H|880811536|
| L|174590194|
+-------------+---------+
(
df
.withColumn("MapOrderID", F.col("OrderID"))
.replace(to_replace=dict_data, subset=["MapOrderID"])
.show()
)
# output
+-------------+---------+----------+
|OrderPriority| OrderID|MapOrderID|
+-------------+---------+----------+
| M|443368995| 0|
| M|667593514| 1|
| M|940995585| 2|
| H|880811536| 3|
| L|174590194| 4|
+-------------+---------+----------+
您可以在新创建的列上应用何时/否则:
(
df
.withColumn("MapOrderID", F.col("OrderID"))
.replace(to_replace=dict_data, subset=["MapOrderID"])
.withColumn("MapOrderID",
F.when(F.col("MapOrderID") == "2", "Ok").otherwise("Not Ok")
)
.show()
)
# output
+-------------+---------+----------+
|OrderPriority| OrderID|MapOrderID|
+-------------+---------+----------+
| M|443368995| Not Ok|
| M|667593514| Not Ok|
| M|940995585| Ok|
| H|880811536| Not Ok|
| L|174590194| Not Ok|
+-------------+---------+----------+
重要的是 dict 中的键/值必须是相同的类型。 这意味着如果你要映射的ID是字符串,值1/2/3也应该是字符串,否则会报错:
ValueError: Mixed type replacements are not supported
【讨论】:
但是,我可以这样做,然后使用 OrderID 和 SomeFlag 创建一个新的 df 并加入原始文件。我正在寻找一个 udf 或 pyspark 函数级别的解决方案,这样我就不必创建一个新的 df以上是关于如何将字典中的值映射到 Pyspark 中的新列的主要内容,如果未能解决你的问题,请参考以下文章
如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值
PySpark 1.5 Groupby Sum 用于 Dataframe 中的新列