不支持没有相等谓词的流流连接

Posted

技术标签:

【中文标题】不支持没有相等谓词的流流连接【英文标题】:Stream stream joins without equality predicate is not supported 【发布时间】:2018-07-04 01:34:25 【问题描述】:

我正在使用 Spark 2.3 并尝试加入两个数据流。我的左右流都有一个数组。只有当右流数组是左流数组的子集时,我才想加入这两个流。

例如,我的 streamA 如下所示:

StreamA:
|---|------|---------------------|-----------|
|id | dept | employeesInMeetings | DateTime  |
|---|------|---------------------|-----------|
| 1 | sales| [John]              | 7/2 14:00 |
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |
|---|------|---------------------|-----------|

我的 streamB 如下所示:

StreamB:
|--------------|--------------|----------|
|employees     | confRooms    | DateTime |
|--------------|--------------|----------|
| [John, Jane] |      A       | 7/2 14:00|
| [Adam, Mike] |      C       | 7/2 12:30| 
| [Jill, Andy] |      B       | 7/2 14:00|
|--------------|--------------|----------|

我只关心参加同一会议的同一部门的员工。因此,作为交集的结果,我的结果流需要如下所示:

|---|------|---------------------|-----------|----------|
|id | dept | employeesInMeetings | DateTime  | confRoom |
|---|------|---------------------|-----------|----------|
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |    C     |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |    B     |
|---|------|---------------------|-----------|----------|

我创建了一个 UDF 来进行相交:

val arrayIntersect = udf((leftArr: Array[String], rightArr: Array[String]) => 
  import spark.implicits._
  if(leftArr.intersect(rightArr.toSeq).length == rightArr.size)
    true
   else 
    false
  
)

并尝试如下使用:

streamA.joinWith(streamB, expr("arrayIntersect(leftArr, rightArr) AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))

但是,我得到了错误:

org.apache.spark.sql.AnalysisException: Stream stream joins without equality predicate is not supported;

有人知道这里是否有解决方法吗?任何帮助将不胜感激!谢谢!

【问题讨论】:

【参考方案1】:

不幸的是,在流-流连接中没有解决方法:(

我们确实需要一个相等谓词,因为我们使用它来使用流式对称哈希连接算法执行连接——两个流都使用公共密钥进行分区,以便来自两个流的相关记录最终在同一个分区中。

【讨论】:

谢谢!是否有在 Stream-stream 连接中加入不等式谓词的计划?【参考方案2】:

首先将你的数组转换成字符串,然后在左边的数组字符串中搜索右边的数组字符串。

val arrayToString = udfarr: Seq[String] => arr.sorted.map(_.trim.toLowerCase).mkString(",")

streamA.withColumn("leftArrStr", arrayToString(col("leftArr"))).joinWith(
  streamB.withColumn("rightArrStr", arrayToString(col("rightArr")))
  , expr("instr(leftArrStr, rightArrStr) != 0 " +
    "AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))

【讨论】:

您好@Kaushal,进行字符串比较可能会产生不正确的结果。例如,如果我的左数组(已排序)是 `[Andy, Jade, Jill, Rick] 而我的右数组(已排序)是 [Andy, Rick],则连接条件将失败,不是吗?想法? 我只是尝试实施您建议的解决方案,但仍然收到相同的错误。您是否测试过您的解决方案? @DataGeek DateTime 列的值是否同步?如果是然后爆炸你的数组,然后加入个人员工也与 DateTime 条件,因为同一个人不能在两个或多个地方。 谢谢@Kaushal - 这就是我现在采用的方法。一旦我有解决方案,将更新帖子。

以上是关于不支持没有相等谓词的流流连接的主要内容,如果未能解决你的问题,请参考以下文章

BIGQUERY:连接谓词中的表不受支持的子查询

Flink 流流关联( Interval Join)总结

如何在 Spark 中创建 UDF 以支持自定义谓词

CHAR类型列上具有相等谓词的查询中没有返回数据[关闭]

Hive中的Join总结

Hive不支持非相等的join