我们如何使用 SQL 风格的“LIKE”标准连接两个 Spark SQL 数据帧?

Posted

技术标签:

【中文标题】我们如何使用 SQL 风格的“LIKE”标准连接两个 Spark SQL 数据帧?【英文标题】:How can we JOIN two Spark SQL dataframes using a SQL-esque "LIKE" criterion? 【发布时间】:2015-10-16 11:06:03 【问题描述】:

我们正在使用与 Spark 1.3.1 交互的 PySpark 库。

我们有两个数据框,documents_df := document_id, document_textkeywords_df := keyword。我们想加入这两个数据帧,并返回一个带有document_id, keyword 对的结果数据帧,使用关键字_df.keyword 出现在document_df.document_text 字符串中的条件。

例如,在 PostgreSQL 中,我们可以使用以下形式的 ON 子句来实现这一点:

document_df.document_text ilike '%' || keyword_df.keyword || '%'

但是,在 PySpark 中,我无法使用任何形式的连接语法。以前有没有人取得过这样的成绩?

致以诚挚的问候,

【问题讨论】:

您可以通过接受答案来结束问题,这将鼓励其他人回答问题!如果您还有问题,您也可以保持开放并更新问题:) 【参考方案1】:

可能有两种不同的方式,但一般来说不推荐。首先让我们创建一个虚拟数据:

from pyspark.sql import Row

document_row = Row("document_id", "document_text")
keyword_row = Row("keyword") 

documents_df = sc.parallelize([
    document_row(1L, "apache spark is the best"),
    document_row(2L, "erlang rocks"),
    document_row(3L, "but haskell is better")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("erlang"),
    keyword_row("haskell"),
    keyword_row("spark")
]).toDF()

    Hive UDF

    documents_df.registerTempTable("documents")
    keywords_df.registerTempTable("keywords")
    
    query = """SELECT document_id, keyword
        FROM documents JOIN keywords
        ON document_text LIKE CONCAT('%', keyword, '%')"""
    
    like_with_hive_udf = sqlContext.sql(query)
    like_with_hive_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

    Python UDF

    from pyspark.sql.functions import udf, col 
    from pyspark.sql.types import BooleanType
    
    # Of you can replace `in` with a regular expression
    contains = udf(lambda s, q: q in s, BooleanType())
    
    like_with_python_udf = (documents_df.join(keywords_df)
        .where(contains(col("document_text"), col("keyword")))
        .select(col("document_id"), col("keyword")))
    like_with_python_udf.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

为什么不推荐?因为在这两种情况下都需要笛卡尔积:

like_with_hive_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter document_text#3 LIKE concat(%,keyword#4,%)
##   CartesianProduct
##    Scan PhysicalRDD[document_id#2L,document_text#3]
##    Scan PhysicalRDD[keyword#4]

like_with_python_udf.explain()

## TungstenProject [document_id#2L,keyword#4]
##  Filter pythonUDF#13
##   !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ...
##    CartesianProduct
##     Scan PhysicalRDD[document_id#2L,document_text#3]
##     Scan PhysicalRDD[keyword#4]

还有其他方法可以在没有完整笛卡尔坐标的情况下实现类似的效果。

    加入标记化文档 - 如果关键字列表太大而无法在单台机器的内存中处理,则很有用

    from pyspark.ml.feature import Tokenizer
    from pyspark.sql.functions import explode
    
    tokenizer = Tokenizer(inputCol="document_text", outputCol="words")
    
    tokenized = (tokenizer.transform(documents_df)
        .select(col("document_id"), explode(col("words")).alias("token")))
    
    like_with_tokenizer = (tokenized
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          3|haskell|
    ## |          1|  spark|
    ## |          2| erlang|
    ## +-----------+-------+
    

    这需要洗牌而不是笛卡尔:

    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#2L,keyword#4]
    ##  SortMergeJoin [token#29], [keyword#4]
    ##   TungstenSort [token#29 ASC], false, 0
    ##    TungstenExchange hashpartitioning(token#29)
    ##     TungstenProject [document_id#2L,token#29]
    ##      !Generate explode(words#27), true, false, [document_id#2L, ...
    ##       ConvertToSafe
    ##        TungstenProject [document_id#2L,UDF(document_text#3) AS words#27]
    ##         Scan PhysicalRDD[document_id#2L,document_text#3]
    ##   TungstenSort [keyword#4 ASC], false, 0
    ##    TungstenExchange hashpartitioning(keyword#4)
    ##     ConvertToUnsafe
    ##      Scan PhysicalRDD[keyword#4]
    

    Python UDF 和广播变量 - 如果关键字列表相对较小

    from pyspark.sql.types import ArrayType, StringType
    
    keywords = sc.broadcast(set(
        keywords_df.map(lambda row: row[0]).collect()))
    
    bd_contains = udf(
        lambda s: list(set(s.split()) & keywords.value), 
        ArrayType(StringType()))
    
    
    like_with_bd = (documents_df.select(
        col("document_id"), 
        explode(bd_contains(col("document_text"))).alias("keyword")))
    
    like_with_bd.show()
    
    ## +-----------+-------+
    ## |document_id|keyword|
    ## +-----------+-------+
    ## |          1|  spark|
    ## |          2| erlang|
    ## |          3|haskell|
    ## +-----------+-------+
    

    它既不需要 shuffle 也不需要笛卡尔,但您仍然需要将广播变量传输到每个工作节点。

    like_with_bd.explain()
    
    ## TungstenProject [document_id#2L,keyword#46]
    ##  !Generate explode(pythonUDF#47), true, false, ...
    ##   ConvertToSafe
    ##    TungstenProject [document_id#2L,pythonUDF#47]
    ##     !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ...
    ##      Scan PhysicalRDD[document_id#2L,document_text#3]
    

    从 Spark 1.6.0 开始,您可以使用 sql.functions.broadcast 标记一个小数据帧,以获得与上述类似的效果,而无需使用 UDF 和显式广播变量。重用标记化数据:

    from pyspark.sql.functions import broadcast
    
    like_with_tokenizer_and_bd = (broadcast(tokenized)
        .join(keywords_df, col("token") == col("keyword"))
        .drop("token"))
    
    like_with_tokenizer.explain()
    
    ## TungstenProject [document_id#3L,keyword#5]
    ##  BroadcastHashJoin [token#10], [keyword#5], BuildLeft
    ##   TungstenProject [document_id#3L,token#10]
    ##    !Generate explode(words#8), true, false, ...
    ##     ConvertToSafe
    ##      TungstenProject [document_id#3L,UDF(document_text#4) AS words#8]
    ##       Scan PhysicalRDD[document_id#3L,document_text#4]
    ##   ConvertToUnsafe
    ##    Scan PhysicalRDD[keyword#5]
    

相关

对于近似匹配,请参阅Efficient string matching in Apache Spark。

【讨论】:

这是一个非常有用的回复。谢谢你花时间写这么全面的东西。你不仅回答了我的问题,而且我还学到了很多其他我不知道你能做到的事情。我将使用可变广播方法,因为关键字列表会很小。问题解决了! 好的。一个问题,@ zero323。 explode() 函数仅在 Spark 1.4 中引入。我(现在)被 1.3.1 卡住了。是否可以将 UDF 嵌入到 map() 函数中,以便为每一行输入返回多行(即每个匹配关键字一个)? 解决了!供参考:like_with_bd= documents_df.select( col("document_id"), bd_contains(col("document_text")).alias("keyword")).flatMap(lambda row: [(kw, row[0]) for kw in row[1]])【参考方案2】:

精确的方法如下:(有点慢但准确)

from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType

from pyspark.sql import Row


def string_match_percentage(col_1, col_2, confidence):
    s = col_1.lower()
    t = col_2.lower()

    global row, col
    rows = len(s) + 1
    cols = len(t) + 1
    array_diffrence = np.zeros((rows, cols), dtype=int)

    for i in range(1, rows):
        for k in range(1, cols):
            array_diffrence[i][0] = i
            array_diffrence[0][k] = k

    for col in range(1, cols):
        for row in range(1, rows):
            if s[row - 1] == t[col - 1]:
                cost = 0
            else:
                cost = 2
            array_diffrence[row][col] = min(array_diffrence[row - 1][col] + 1,
                                            array_diffrence[row][col - 1] + 1,
                                            array_diffrence[row - 1][col - 1] + cost)
    match_percentage = ((len(s) + len(t)) - array_diffrence[row][col]) / (len(s) + len(t)) * 100
    if match_percentage >= confidence:
        return True
    else:
        return False


document_row = Row("document_id", "document_text")
keyword_row = Row("keyword")

documents_df = sc.parallelize([
    document_row(1, "google llc"),
    document_row(2, "blackfiled llc"),
    document_row(3, "yahoo llc")
]).toDF()

keywords_df = sc.parallelize([
    keyword_row("yahoo"),
    keyword_row("google"),
    keyword_row("apple")
]).toDF()

conditional_contains = udf(lambda s, q: string_match_percentage(s, q, confidence=70), BooleanType())

like_joined_df = (documents_df.crossJoin(keywords_df)
                        .where(conditional_contains(col("document_text"), col("keyword")))
                        .select(col("document_id"), col("keyword"), col("document_text")))
like_joined_df.show()

输出:

# +-----------+-------+-------------+
# |document_id|keyword|document_text|
# +-----------+-------+-------------+
# |          1| google|   google llc|
# |          3|  yahoo|    yahoo llc|
# +-----------+-------+-------------+

【讨论】:

以上是关于我们如何使用 SQL 风格的“LIKE”标准连接两个 Spark SQL 数据帧?的主要内容,如果未能解决你的问题,请参考以下文章

VB连接ACCESS数据库,使用 LIKE 通配符问题

如何在具有多种搜索模式的SQL中使用LIKE?

PLSQL连接Oracle使用like模糊查询中文时返回结果为空

SQL SELECT 名称 LIKE $name 但不完全相同的名称两次

LIKE 语句 SQL

FlutterUnit 更新 | 拓展样式风格切换 - 标准风格