如何将字典中的值映射到 Pyspark 中的新列

Posted

技术标签:

【中文标题】如何将字典中的值映射到 Pyspark 中的新列【英文标题】:How do I map values from a dictionary to a new column in Pyspark 【发布时间】:2021-10-21 11:16:12 【问题描述】:

我试图将值映射到我的 pyspark df 中的新列

dict = '443368995': 0, '667593514': 1, '940995585': 2, '880811536': 3, '174590194': 4

I am reading a csv which has following data -
+--------------------+----------------+---------+------------+-------------+----------+---------+
|              Region|         Country| ItemType|SalesChannel|OrderPriority| OrderDate|  OrderID|
+--------------------+----------------+---------+------------+-------------+----------+---------+
|  Sub-Saharan Africa|    South Africa|   Fruits|     Offline|            M| 7/27/2012|443368995|
|Middle East and N...|         Morocco|  Clothes|      Online|            M| 9/14/2013|667593514|
|Australia and Oce...|Papua New Guinea|     Meat|     Offline|            M| 5/15/2015|940995585|
|  Sub-Saharan Africa|        Djibouti|  Clothes|     Offline|            H| 5/17/2017|880811536|
|              Europe|        Slovakia|Beverages|     Offline|            L|10/26/2016|174590194|
+--------------------+----------------+---------+------------+-------------+----------+---------+

下面是我想根据上面的字典的某些决定添加的附加列 -

+---------+
| SomeFlag|
+---------+
|        Y|
|        N|
|        Y|
|        Y|
|        Y|
+---------+

这是我尝试过的代码 ==>

df = spark.read.option("header", True).csv("sample.csv")
def my_mapp_fn(checkcol, dict1):
     print(col(checkcol))
     print(key)
     return coalesce(*[when(col(checkcol) == key, lit(value)) for key, value in d.items()])

new_df = df.withColumn("SomeFlag", my_mapp_fn(dict, col('OrderId')))

我遇到错误 ==>

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in my_mapp_fn
  File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 106, in col
    return _invoke_function("col", col)
  File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 58, in _invoke_function
    return Column(jf(*args))
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1296, in __call__
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1260, in _build_args
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1247, in _get_args
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_collections.py", line 510, in convert
  File "C:\BigData\Spark\python\pyspark\sql\column.py", line 460, in __iter__
    raise TypeError("Column is not iterable")
TypeError: Column is not iterable

有什么建议吗?提前致谢

【问题讨论】:

【参考方案1】:

为了映射 dict 并根据您可以编写的内容创建列:

from pyspark.sql import functions as F

dict_data = '443368995': '0', '667593514': '1', '940995585': '2', '880811536': '3', '174590194': '4'

d = [
    ("M", '443368995'),
    ("M", '667593514'),
    ("M", '940995585'),
    ("H", '880811536'),
    ("L", '174590194'),
    
]
df = spark.createDataFrame(d,['OrderPriority','OrderID'])
df.show()

# output
+-------------+---------+
|OrderPriority|  OrderID|
+-------------+---------+
|            M|443368995|
|            M|667593514|
|            M|940995585|
|            H|880811536|
|            L|174590194|
+-------------+---------+


(
    df
    .withColumn("MapOrderID", F.col("OrderID"))
    .replace(to_replace=dict_data, subset=["MapOrderID"])
    .show()
)

# output
+-------------+---------+----------+
|OrderPriority|  OrderID|MapOrderID|
+-------------+---------+----------+
|            M|443368995|         0|
|            M|667593514|         1|
|            M|940995585|         2|
|            H|880811536|         3|
|            L|174590194|         4|
+-------------+---------+----------+

您可以在新创建的列上应用何时/否则:

(
    df
    .withColumn("MapOrderID", F.col("OrderID"))
    .replace(to_replace=dict_data, subset=["MapOrderID"])
    .withColumn("MapOrderID", 
                F.when(F.col("MapOrderID") == "2", "Ok").otherwise("Not Ok")
               )
    .show()
)

# output
+-------------+---------+----------+
|OrderPriority|  OrderID|MapOrderID|
+-------------+---------+----------+
|            M|443368995|    Not Ok|
|            M|667593514|    Not Ok|
|            M|940995585|        Ok|
|            H|880811536|    Not Ok|
|            L|174590194|    Not Ok|
+-------------+---------+----------+

重要的是 dict 中的键/值必须是相同的类型。 这意味着如果你要映射的ID是字符串,值1/2/3也应该是字符串,否则会报错:

ValueError: Mixed type replacements are not supported

【讨论】:

但是,我可以这样做,然后使用 OrderID 和 SomeFlag 创建一个新的 df 并加入原始文件。我正在寻找一个 udf 或 pyspark 函数级别的解决方案,这样我就不必创建一个新的 df

以上是关于如何将字典中的值映射到 Pyspark 中的新列的主要内容,如果未能解决你的问题,请参考以下文章

如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值

过滤 pyspark 数据框中的行并创建一个包含结果的新列

如何根据 Pyspark 中数组列中的值创建新列

PySpark 1.5 Groupby Sum 用于 Dataframe 中的新列

将来自一个数据框的值合并到 Pandas 中的新列中[重复]

将函数应用于两列并将输出映射到新列[重复]