根据来自其他数据帧的位置条件在数据帧上编写选择查询,scala

Posted

技术标签:

【中文标题】根据来自其他数据帧的位置条件在数据帧上编写选择查询,scala【英文标题】:Writing select queries on dataframe based on where condition from other dataframe, scala 【发布时间】:2019-08-21 22:57:58 【问题描述】:

我有两个包含以下列的数据框..

DF1 - partitionNum、lowerBound、upperBound

DF2- ID,累积计数

我想要一个具有 - ID、partitionNum 的结果 Frame

我做了一个交叉连接,它的性能如下所示

DF2.crossJoin(DF1).where(col("cumulativeCount").between(col("lowerBound"), col("upperBound"))).orderBy("cumulativeCount") .select("ID", "partitionNum")

由于 DF2 有 500 万行,而 DF1 有 50 行,因此此交叉连接产生 2.5 亿行,此任务即将终止。我如何将其作为选择,其中结果帧应具有来自 DF2 的 ID 和来自 DF1 的 partitionNum,条件是从 DF1 选择分区编号 WHERE DF2 的累积计数在 DF1 的下限和上限之间

我正在寻找类似下面这样的东西

sparkSession.sqlContext.sql("SELECT ID,cumulativeCount,A.partitionNum FROM CumulativeCountViewById WHEREcumulativeCount IN" + "(SELECT partitionNum FROM CumulativeRangeView WHERE concurrentCount BETWEEN lowerBound 和 upperBound) AS A")

【问题讨论】:

【参考方案1】:

试试这个。

解决方案是 - 您不需要进行交叉连接。由于您的 DF1 只有 50 行,因此将其转换为键映射:partitionNum,值:Tuple2(lowerBound, UppperBound)。 创建一个 UDF,它接受一个数字(您的累积计数)并在 lowerBound

如果您愿意,您可以编辑 UDF 以仅返回 partitionNumbers 并在最后分解“partNums”数组列。

scala> DF1.show
+------------+----------+----------+
|partitionNum|lowerBound|upperBound|
+------------+----------+----------+
|           1|        10|        20|
|           2|         5|        10|
|           3|         6|        15|
|           4|         8|        20|
+------------+----------+----------+


scala> DF2.show
+---+---------------+
| ID|cumulativeCount|
+---+---------------+
|100|              5|
|100|             10|
|100|             15|
|100|             20|
|100|             25|
|100|             30|
|100|              6|
|100|             12|
|100|             18|
|100|             24|
|101|              1|
|101|              2|
|101|              3|
|101|              4|
|101|              5|
|101|              6|
|101|              7|
|101|              8|
|101|              9|
|101|             10|
+---+---------------+


scala> val smallData = DF1.collect.map(row => row.getInt(0) -> (row.getInt(1), row.getInt(2))).toMap
smallData: scala.collection.immutable.Map[Int,(Int, Int)] = Map(1 -> (10,20), 2 -> (5,10), 3 -> (6,15), 4 -> (8,20))

scala> val myUdf = udf((num:Int) => smallData.filter((v) => v._2._2 > num && num > v._2._1))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false)),true),Some(List(IntegerType)))

scala> DF2.withColumn("partNums", myUdf($"cumulativeCount")).show(false)
+---+---------------+-------------------------------------------+
|ID |cumulativeCount|partNums                                   |
+---+---------------+-------------------------------------------+
|100|5              |[]                                         |
|100|10             |[3 -> [6, 15], 4 -> [8, 20]]               |
|100|15             |[1 -> [10, 20], 4 -> [8, 20]]              |
|100|20             |[]                                         |
|100|25             |[]                                         |
|100|30             |[]                                         |
|100|6              |[2 -> [5, 10]]                             |
|100|12             |[1 -> [10, 20], 3 -> [6, 15], 4 -> [8, 20]]|
|100|18             |[1 -> [10, 20], 4 -> [8, 20]]              |
|100|24             |[]                                         |
|101|1              |[]                                         |
|101|2              |[]                                         |
|101|3              |[]                                         |
|101|4              |[]                                         |
|101|5              |[]                                         |
|101|6              |[2 -> [5, 10]]                             |
|101|7              |[2 -> [5, 10], 3 -> [6, 15]]               |
|101|8              |[2 -> [5, 10], 3 -> [6, 15]]               |
|101|9              |[2 -> [5, 10], 3 -> [6, 15], 4 -> [8, 20]] |
|101|10             |[3 -> [6, 15], 4 -> [8, 20]]               |
+---+---------------+-------------------------------------------+

【讨论】:

谢谢,试一试。当数据框有 500 万行时,调用 UDF 是一种好方法 我的第一选择始终是使用现有函数的惯用方法。如果操作正确,UDF 也不错。在这种情况下,避免交叉连接(昂贵)会更好。另一种解决方案是将较小的 DF 广播到每个节点并进行交叉连接。比较性能。 谢谢 这工作,我无法让 SQL 工作,虽然 SQL 在 SQL 服务器中工作,但不是作为数据框查询。我使用了UDF。我将尝试比较性能。

以上是关于根据来自其他数据帧的位置条件在数据帧上编写选择查询,scala的主要内容,如果未能解决你的问题,请参考以下文章

如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?

(运行的干净代码)根据来自另一个数据帧的日期间隔和字符串条件获取一个数据帧中的值的平均值

在多个条件下合并来自多个数据帧的数据

在多个条件下合并来自多个数据帧的数据

如何比较来自 PySpark 数据帧的记录

根据条件转换数据帧的列