PySpark查找另一列中是否存在一列中的模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark查找另一列中是否存在一列中的模式相关的知识,希望对你有一定的参考价值。

我有两个pyspark数据帧。一个包含FullAddress字段(比如col1),另一个数据框包含其中一个列中的city / town / suburb的名称(比如col2)。我想比较col2和col1,如果有匹配则返回col2。

此外,郊区名称可以是郊区名称列表。

包含完整地址的Dataframe1

+--------+--------+----------------------------------------------------------+
|Postcode|District|City/ Town/ Suburb                                        |
+--------+--------+----------------------------------------------------------+
|2000    |Sydney  |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks  |
|2001    |Sydney  |Sydney                                                    |
|2113    |Sydney  |North Ryde                                                |
+--------+--------+----------------------------------------------------------+



+-----------------------------------------------------------+
|FullAddress                                                |
+-----------------------------------------------------------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia               |
| HAY STREET HAYMARKET 2000, NSW, Australia                 |
| SMART STREET FAIRFIELD 2165, NSW, Australia               |
|CLARENCE STREET SYDNEY 2000, NSW, Australia                |
+-----------------------------------------------------------+

我想要这样的东西

+-----------------------------------------------------------++-----------+
|FullAddress                                                |suburb      |
+-----------------------------------------------------------++-----------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia               |NORTH RYDE  |
| HAY STREET HAYMARKET 2000, NSW, Australia                 |HAYMARKET   |
| SMART STREET FAIRFIELD 2165, NSW, Australia               |NULL        |
|CLARENCE STREET SYDNEY 2000, NSW, Australia                |SYDNEY      |
+-----------------------------------------------------------++-----------+
答案

有两个DataFrames -

DataFrame 1:包含完整地址的DataFrame

DataFrame 2:包含基础数据的DataFrame - PostcodeDistrictCity / Town / Suburb

问题的目的是从suburb中为DataFrame 1提取适当的DataFrame 2。尽管OP尚未明确指定我们可以加入两个DataFrame的key,但Postcode似乎只是合理的选择。

# Importing requisite functions
from pyspark.sql.functions import col,regexp_extract,split,udf
from pyspark.sql.types import StringType

让我们创建DataFrame 1作为df。在这个DataFrame我们需要提取Postcode。在澳大利亚,所有邮政编码都是4 digit long,因此我们使用regexp_extract()string列中提取4位数字。

df = sqlContext.createDataFrame([('BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia ',),
                                 ('HAY STREET HAYMARKET 2000, NSW, Australia',),
                                 ('SMART STREET FAIRFIELD 2165, NSW, Australia',),
                                 ('CLARENCE STREET SYDNEY 2000, NSW, Australia',)],
                                 ('FullAddress',))
df = df.withColumn('Postcode', regexp_extract('FullAddress', "(\d{4})" , 1 ))
df.show(truncate=False)
+---------------------------------------------+--------+
|FullAddress                                  |Postcode|
+---------------------------------------------+--------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |2113    |
|HAY STREET HAYMARKET 2000, NSW, Australia    |2000    |
|SMART STREET FAIRFIELD 2165, NSW, Australia  |2165    |
|CLARENCE STREET SYDNEY 2000, NSW, Australia  |2000    |
+---------------------------------------------+--------+

现在,我们已经提取了Postcode,我们创造了key加入两个DataFrames。让我们创建DataFrame 2,我们需要从中提取各自的suburb

df_City_Town_Suburb = sqlContext.createDataFrame([(2000,'Sydney','Dawes Point, Haymarket, Millers Point, Sydney, The Rocks'),
                                             (2001,'Sydney','Sydney'),(2113,'Sydney','North Ryde')],
                                             ('Postcode','District','City_Town_Suburb'))
df_City_Town_Suburb.show(truncate=False)

+--------+--------+--------------------------------------------------------+
|Postcode|District|City_Town_Suburb                                        |
+--------+--------+--------------------------------------------------------+
|2000    |Sydney  |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2001    |Sydney  |Sydney                                                  |
|2113    |Sydney  |North Ryde                                              |
+--------+--------+--------------------------------------------------------+

加入两个DataFramesleft加入 -

df = df.join(df_City_Town_Suburb.select('Postcode','City_Town_Suburb'), ['Postcode'],how='left')
df.show(truncate=False)
+--------+---------------------------------------------+--------------------------------------------------------+
|Postcode|FullAddress                                  |City_Town_Suburb                                        |
+--------+---------------------------------------------+--------------------------------------------------------+
|2113    |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |North Ryde                                              |
|2165    |SMART STREET FAIRFIELD 2165, NSW, Australia  |null                                                    |
|2000    |HAY STREET HAYMARKET 2000, NSW, Australia    |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2000    |CLARENCE STREET SYDNEY 2000, NSW, Australia  |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
+--------+---------------------------------------------+--------------------------------------------------------+

使用City_Town_Suburb函数将列split()拆分为数组 -

df = df.select('Postcode','FullAddress',split(col("City_Town_Suburb"), ",s*").alias("City_Town_Suburb"))
df.show(truncate=False)
+--------+---------------------------------------------+----------------------------------------------------------+
|Postcode|FullAddress                                  |City_Town_Suburb                                          |
+--------+---------------------------------------------+----------------------------------------------------------+
|2113    |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |[North Ryde]                                              |
|2165    |SMART STREET FAIRFIELD 2165, NSW, Australia  |null                                                      |
|2000    |HAY STREET HAYMARKET 2000, NSW, Australia    |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
|2000    |CLARENCE STREET SYDNEY 2000, NSW, Australia  |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
+--------+---------------------------------------------+----------------------------------------------------------+

最后创建一个UDF来检查数组City_Town_Suburb中的每个元素(如果它存在于FullAddress列中)。如果存在一个,我们立即返回,否则返回None

def suburb(FullAddress,City_Town_Suburb):
   # Check for the case where there is no Array, otherwise we will get an Error
   if City_Town_Suburb == None:
      return None
   # Checking each and every Array element if it exists in 'FullAddress',
   # and if a match is found, it's immediately returned.
   for sub in City_Town_Suburb:
      if sub.strip().upper() in FullAddress:
         return sub.upper()
   return None
suburb_udf = udf(suburb,StringType())

应用此UDF -

df = df.withColumn('suburb', suburb_udf(col('FullAddress'),col('City_Town_Suburb'))).drop('City_Town_Suburb')
df.show(truncate=False)
+--------+---------------------------------------------+----------+
|Postcode|FullAddress                                  |suburb    |
+--------+---------------------------------------------+----------+
|2113    |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |NORTH RYDE|
|2165    |SMART STREET FAIRFIELD 2165, NSW, Australia  |null      |
|2000    |HAY STREET HAYMARKET 2000, NSW, Australia    |HAYMARKET |
|2000    |CLARENCE STREET SYDNEY 2000, NSW, Australia  |SYDNEY    |
+--------+---------------------------------------------+----------+

以上是关于PySpark查找另一列中是否存在一列中的模式的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串

在另一列上查找最近的时间戳并在新列中添加值 PySpark

Pyspark:如何根据另一列中的匹配值从数组中的第一次出现中选择直到最后的值

检查一列中的值是不是存在于另一列中,如果存在,则将另一列中的值复制到新列中

基于另一列中的值的一列上的pyspark滞后函数

利用 PySpark,确定数组列中有多少元素包含在另一列中的数组数组中