如何在 Spark SQL 表达式中使用字符串变量?

Posted

技术标签:

【中文标题】如何在 Spark SQL 表达式中使用字符串变量?【英文标题】:How to use string Variable in Spark SQL expression? 【发布时间】:2018-04-06 04:11:01 【问题描述】:

我发现了类似的帖子here,但是当我将它应用于字符串变量时会出现一些额外的问题。让我解释一下我想要做什么。 我有一个包含一些地点信息的单列 DataFrame df1:

+-------+
|place  |
+-------+
|Place A|
|Place B|
|Place C|
+-------+ 

另一个DataFrame df2如下:

+--+-------+
|id|place  |
+--+-------+
|1| Place A|
|2| Place C|
|3| Place C|
|4| Place B|

我需要遍历 df2 来检查每个 id 匹配的位置,并在匹配的 id 上做一些事情。代码sn-p如下:

  val places = df1.distinct.map(_.toString).collect
  for (place <- places)
    val students = df2.where(s"place = '$place'").select("id","place")
    // do something on students (add some unique columns depending the place)
    students.show(2)
 

我得到的错误是 SQL ParseException:

extraneous input '[' expecting '(', ....
== SQL ==
academic_college = [Place A]
-------------------^^^

我现在的理解是,这个 Parse Exception 来自我执行 collect 操作后的 places 数组。它固有地包含“[]”:

places = Array([Place A], [Place B], [Place C])

我的问题有两个方面:

    我只知道如何将 df1 收集到 Array 中并循环它以实现我想要的,因为每个地方的操作都不同。如果我们继续使用这种方法,删除“[]”或将其更改为“()”或执行其他操作来解决 Parse Exception 的最佳方法是什么?

    有没有更好的方法来实现这一点,而无需收集(实现)df1 并将所有内容保存在 DataFrame 中?

【问题讨论】:

您错过了报价。应该是where(s"place = '$place'") 感谢您指出这一点。我更新了帖子。 【参考方案1】:

你可以从 df1 获取Array[String]

val places = df1.distinct().collect().map(_.getString(0))

现在你可以从数组中选择每个

places.foreach(place => 
  val student = df2.where($"place" === place).select("id","place")
  student.show()
)

但请确保这不会影响您的原始数据帧。

如果 df1 很小并且可以放入您的内存中,您可以将其收集到驱动程序中,否则不建议使用。

如果您提供一些输入和预期输出,您可以轻松获得更多帮助。

【讨论】:

感谢 Shankar,这几乎解决了问题。我得到了一个不同的 Parse Exception:extraneous input 'Designated' expecting &lt;EOF&gt;(line 1, pos 30) == SQL == place = No Place Designated ------------------------^^^ 知道为什么吗? (没有指定地点是 df1 中的另一个选项,我之前没有列出,这恰好是收集后的第一个选项) for (place 你可以像我上面那样使用 foreach 并使用 === 进行比较 使用“===”进行比较似乎确实有效。我不知道确切的原因。是因为 SQL 表达式总是在寻找 吗?您介意再解释一下为什么一个有效而另一个无效吗?谢谢。 === 是两列之间的相等检查,相当于equalTo 函数,= 是赋值运算符,不用于检查相等。【参考方案2】:

我需要遍历 df2 来检查每个 id 匹配的位置,并在匹配的 id 上做一些事情。

collect() 并且迭代收集的数据非常昂贵,因为所有处理都发生在驱动程序节点中。

我建议你使用 join

假设你有

df1
+-------+
|place  |
+-------+
|Place A|
|Place B|
+-------+

df2
+---+-------+
|id |place  |
+---+-------+
|1  |Place A|
|2  |Place C|
|3  |Place C|
|4  |Place B|
+---+-------+

您可以使用 join 获取具有 id 的匹配地点 as

df2.join(df1, Seq("place"))

这应该给你

+-------+---+
|place  |id |
+-------+---+
|Place A|1  |
|Place B|4  |
+-------+---+

现在您可以在此数据帧上执行 do something on the matched ids

希望回答对你有帮助

【讨论】:

感谢 Ramesh,我同意收集和获取并不是最好的方法。本来我以为groupBy("place") 可能是一个解决办法,但是groupBy之后我的操作相当复杂。例如,我将使用每个组中的“id”作为 ParallelPersonalizePageRank 的 sourceIds,并为每个组添加一个关于 PPR 排名的列。 groupBy 操作后,我对 RelationalGroup 不熟悉。您认为这可以通过 groupBy 完成,然后是我描述的操作吗? 是的,你绝对可以做到。您可以对 id 进行聚合并传递给 pagerank。但为此,我必须知道 ParallelPersonalizePageRank 的详细信息,我猜这将是另一个问题。 对不起 Ramesh,我不得不接受 Shankar 对这个问题的回答,因为它与我这次尝试的收集和方法更直接相关。稍后我将尝试使用 groupBy 来完成。我相信我会面临这方面的问题。我稍后会针对该方法发布一个更直接的问题。感谢您确认 groupBy 也是一个可行的尝试方向。 @GuanghuaShu 没关系。如果有更好和有用的答案可以满足您的需求,我不希望我的答案被接受。 :)

以上是关于如何在 Spark SQL 表达式中使用字符串变量?的主要内容,如果未能解决你的问题,请参考以下文章

存储为字符串变量时如何执行spark sql多行查询?

使用Spark SQL中的regex函数从字符串中提取特定数字

spark magic - 输入sql context作为字符串

如何在 Spark SQL 中使用连字符转义列名

Spark SQL 版本的 EXEC()

如何使用 jupyter notebook 在 pyspark 中的 Hive 上使用 %sql Magic 字符串启用 spark SQL