pyspark 使用正则表达式搜索关键字,然后与其他数据框连接

Posted

技术标签:

【中文标题】pyspark 使用正则表达式搜索关键字,然后与其他数据框连接【英文标题】:pyspark seaching keywords with regex and then join with other dataframe 【发布时间】:2020-07-07 02:34:08 【问题描述】:

我有两个数据框

数据框 A

name       groceries 
Mike       apple, orange, banana, noodle, red wine
Kate       white wine, green beans, extra pineapple hawaiian pizza
Leah       red wine, juice, rice, grapes, green beans
Ben        water, spaghetti

数据框 B

id       item
0001     red wine
0002     green beans

我正在逐行浏览 B,并使用正则表达式搜索数据框 A 的杂货中是否存在项目

df = None
for keyword in B.select('item').rdd.flatMap(lambda x : x).collect():
    if keyword == None:
        continue
    pattern = '(?i)^'
    start = '(?=.*\\b'
    end = '\\b)'
    for word in re.split('\\s+', keyword):
        pattern = pattern + start + word + end
    pattern = pattern + '.*$'
    
    if df == None:
        df = A.filter(A['groceries'].rlike(pattern)).withColumn('item', F.lit(keyword))
    else:
        df = df.unionAll(A.filter(A['groceries'].rlike(pattern)).withColumn('item', F.lit(keyword)))

我想要的输出是 A 中的行,其中包含 B 中的项目,而且项目关键字作为新列插入

name       groceries                                                     item
Mike       apple, orange, banana, noodle, red wine                       red wine
Leah       red wine, juice, rice, grapes, green beans                    red wine
Kate       white wine, green beans, extra pineapple hawaiian pizza       green beans
Leah       red wine, juice, rice, grapes, green beans                    green beans

实际输出不是我想要的,我不明白这种方法有什么问题。

我还想知道是否有一种方法可以使用 rlike 直接连接 A 和 B,这样只有当 A 中的项目存在于 B 中的杂货中时,行才会连接。谢谢!

更复杂的数据集

test1 = spark.createDataFrame([("Mike","apple, oranges, red wine"),("Kate","Whitewine, green beans waterrr, pineapple, red wine"), ("Leah", "red wine, juice, rice, grapes, green beans"),("Ben","Water,Spaghetti, the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["name","groceries"])
test2 = spark.createDataFrame([("001","red wine"),("002","green beans waterrr"), ("003", "the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["id","item"])
#%%
test_join =test1.join(test2,F.expr("""groceries rlike item"""),how='inner').show(truncate = False)
+----+---------------------------------------------------+---+-------------------+
|name|groceries                                          |id |item               |
+----+---------------------------------------------------+---+-------------------+
|Mike|apple, oranges, red wine                           |001|red wine           |
|Kate|Whitewine, green beans waterrr, pineapple, red wine|001|red wine           |
|Kate|Whitewine, green beans waterrr, pineapple, red wine|002|green beans waterrr|
|Leah|red wine, juice, rice, grapes, green beans         |001|red wine           |
+----+---------------------------------------------------+---+-------------------+

即使“小王子70周年礼物套装(书/CD/可下载音频)”有完全匹配的关键字,结果仍然不匹配

test1 = spark.createDataFrame([("Mike","apple, oranges, red wine"),("Kate","Whitewine, green beans waterrr, pineapple, red wine"), ("Leah", "red wine, juice, rice, grapes, green beans"),("Ben","Water,Spaghetti, the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["name","groceries"])
test2 = spark.createDataFrame([("001","red apple"),("002","green beans waterrr"), ("003", "the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["id","item"])

---------------如果我用正则表达式做一个 rlike 来寻找“红苹果”,如下所示 ---------------

test1 = spark.createDataFrame([("Mike","apple, oranges, red wine"),("Kate","Whitewine, green beans waterrr, pineapple, red wine"), ("Leah", "red wine, juice, rice, grapes, green beans"),("Ben","Water,Spaghetti, the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["name","groceries"])
test2 = spark.createDataFrame([("001","red apple"),("002","green beans waterrr"), ("003", "the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["id","item"])

test_join = test1.filter(test1['groceries'].rlike('(?i)^(?=.*\\bred\\b)(?=.*\\bapple\\b).*$'))
+----+------------------------+
|name|groceries               |
+----+------------------------+
|Mike|apple, oranges, red wine|
+----+------------------------+

它会给我我想要的,因为我只是想确认项目中的所有单词都存在于杂货中,即使它们出现故障。然而,做下面不会给我上面的匹配

test_join =test1.join(test2,F.expr("""groceries rlike item"""),how='inner').show(truncate = False)
test_join =test1.join(test2,F.col('groceries').contains(F.col('item')),how='inner')

解决方案:

def my_udf(keyword):
    if keyword == None:
        return ''
    pattern = '(?i)^'
    start = '(?=.*\\b'
    end = '\\b)'
    for word in re.split('\\s+', keyword):
        pattern = pattern + start + word + end
    pattern = pattern + '.*$'
    return pattern

regex_udf = udf(my_udf, T.StringType())
B = B.withColumn('regex', regex_udf(B['item']))

regex_join = A.join(B, F.expr("""groceries rlike regex"""), how = 'inner')

它成功地完成了我想要的操作,但运行速度仍然很慢。可能是因为join和udf的使用

【问题讨论】:

【参考方案1】:

rlike 连接可以使用 F.expr()。在您的情况下,您需要将它与内部连接一起使用。试试这个,

    #%%
import pyspark.sql.functions as F
test1 =sqlContext.createDataFrame([("Mike","apple,greenbeans,redwine,the little prince 70th anniversary gift set (book/cd/downloadable audio)" ),("kate","Whitewine,greenbeans,pineapple"),("Ben","Water,Spaghetti")],schema=["name","groceries"])
test2 = sqlContext.createDataFrame([("001","redwine"),("002","greenbeans"),("003","cd")],schema=["id","item"])
#%%
test_join =test1.join(test2,F.expr("""groceries rlike item"""),how='inner')

结果:

 test_join.show(truncate=False)
   +----+-------------------------------------------------------------------------------------------------+---+----------+
|name|groceries                                                                                        |id |item      |
+----+-------------------------------------------------------------------------------------------------+---+----------+
|Mike|apple,greenbeans,redwine,the little prince 70th anniversary gift set (book/cd/downloadable audio)|001|redwine   |
|Mike|apple,greenbeans,redwine,the little prince 70th anniversary gift set (book/cd/downloadable audio)|002|greenbeans|
|Mike|apple,greenbeans,redwine,the little prince 70th anniversary gift set (book/cd/downloadable audio)|003|cd        |
|kate|Whitewine,greenbeans,pineapple                                                                   |002|greenbeans|
+----+-------------------------------------------------------------------------------------------------+---+----------+

对于复杂的数据集,contains() 函数必须工作

import pyspark.sql.functions as F
test1 = spark.createDataFrame([("Mike","apple, oranges, red wine,green beans"),("Kate","Whitewine, green beans waterrr, pineapple, red wine"), ("Leah", "red wine, juice, rice, grapes, green beans"),("Ben","Water,Spaghetti, the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["name","groceries"])
test2 = spark.createDataFrame([("001","red wine"),("002","green beans waterrr"), ("003", "the little prince 70th anniversary gift set (book/cd/downloadable audio)")],schema=["id","item"])
#%%
test_join =test1.join(test2,F.col('groceries').contains(F.col('item')),how='inner')

结果:

+----+-----------------------------------------------------------------------------------------+---+------------------------------------------------------------------------+
|name|groceries                                                                                |id |item                                                                    |
+----+-----------------------------------------------------------------------------------------+---+------------------------------------------------------------------------+
|Mike|apple, oranges, red wine,green beans                                                     |001|red wine                                                                |
|Kate|Whitewine, green beans waterrr, pineapple, red wine                                      |001|red wine                                                                |
|Kate|Whitewine, green beans waterrr, pineapple, red wine                                      |002|green beans waterrr                                                     |
|Leah|red wine, juice, rice, grapes, green beans                                               |001|red wine                                                                |
|Ben |Water,Spaghetti, the little prince 70th anniversary gift set (book/cd/downloadable audio)|003|the little prince 70th anniversary gift set (book/cd/downloadable audio)|
+----+-----------------------------------------------------------------------------------------+---+------------------------------------------------------------------------+

【讨论】:

您给出的答案运行缓慢,即使我使用的是具有 20 个节点的 aws emr 集群。此外,当杂货中的关键字变得复杂时,它似乎不起作用。比如graceries有关键词“小王子70周年礼物套装(书/CD/可下载音频)”即使item中存在确切的关键词,也不会匹配 Join 在 pyspark 中是一项昂贵的操作。因此,根据您的数据大小,它会很慢。如果表 2 很小,可以考虑广播它。关于匹配 - 我们不能假设您的完整数据会是什么样子,这个问题也特别提到了 rlike join。如果您需要其他内容,请进行更多研究或改进您的问题。 很抱歉给您带来了困惑。 rlike 加入更像是我的第二个问题。代码 sn-p 中显示的原始问题意味着我基本上将“项目”取出并将它们形成正则表达式,然后使用 .filter 从表中获取与正则表达式匹配的杂货的行,并连接所有结果一起。但是它没有做我想要的 您能否更新您的问题以显示我的答案不起作用的复杂情况? 因为像 (book/cd) 这样的特殊字符对我有用。查看更新的答案

以上是关于pyspark 使用正则表达式搜索关键字,然后与其他数据框连接的主要内容,如果未能解决你的问题,请参考以下文章

在 git 上搜索特定的关键字或正则表达式模式 [重复]

括号中的正则表达式搜索关键字

pyspark udf 的可变参数数量

idea全局搜索正则表达式同时匹配多个关键字

pyspark 中的正则表达式来检查字母和空格(也可以使用 uni 代码)

Mongodb数据库的模糊搜索