SparkSql 查询以从 cassandra 获取定义值的上一行和下一行
Posted
技术标签:
【中文标题】SparkSql 查询以从 cassandra 获取定义值的上一行和下一行【英文标题】:SparkSql query to get just previous and next row from cassandra for a defined value 【发布时间】:2018-04-28 07:10:35 【问题描述】:我们必须编写一个 SparkSQL 查询来获取特定值的上一行和下一行。 假设我们在 Cassandra 中的表结构如下所示
id, timestamp
1, 100
2,200
3,300
4,400
现在我必须编写一个 Spark Query 以仅获取两行,在这两行中,第一行的值应小于 300,即 (2,200),第二行的值应大于 300,即 (4,400 )。而且由于数据量大,我不想按操作执行顺序。在数据量大的情况下,按操作排序会很慢。 我们可以理解这样的要求,假设我想要两个从时间戳值为的表中获取前一行和下一行: - 对于第一行:应该小于 300 所以预期的行是 (2, 200) 对于第二行:应该大于 300 所以预期的行是 (4, 400) 输出应该如下所示
2,200
4,400
但这应该不按操作顺序执行。
【问题讨论】:
“上一个”和“下一个”是什么意思?是基于id
上的订单还是timestamp
上的订单?还是基于 DF 中的当前记录顺序(可能无法预测)?
行按时间戳字段升序排列。
我想得到:1,时间戳值小于 300 的一行,所以在输出结果中它应该返回 (2, 200) 2。另一行时间戳值更大大于 300,所以在输出中它应该返回 (4, 400) 这应该在没有操作顺序的情况下完成。因为 cassandra 表中的数据量太大。所以我们不能按操作执行订单。
【参考方案1】:
您可以使用 RDD API,制作一个上下移动的索引列来模拟滑动操作:
#Obtain an index for each element
df_id = df.rdd.zipWithIndex()\
.map(lambda row: Row(id=row[0].id, timestamp=row[0].timestamp, idx=row[1]))\
.toDF()
previousDF = df_id.rdd\
.map(lambda row: Row(previous_id=row.id, previous_timestamp=row.timestamp, idx=row.idx+1))\
.toDF()
nextDF = df_id.rdd\
.map(lambda row: Row(next_id=row.id, next_timestamp=row.timestamp, idx=row.idx-1))\
.toDF()
现在在idx
列上执行连接以将原始 DF 与其他列连接:
df_id.join(previousDF, on='idx')\
.join(nextDF, on='idx')\
.show()
结果如下:
+---+---+---------+-----------+------------------+-------+--------------+
|idx| id|timestamp|previous_id|previous_timestamp|next_id|next_timestamp|
+---+---+---------+-----------+------------------+-------+--------------+
| 1| 2| 200| 1| 100| 3| 300|
| 2| 3| 300| 2| 200| 4| 400|
+---+---+---------+-----------+------------------+-------+--------------+
所有这些 DF 上的内部连接会导致“上一个”和“下一个”侧的一些条目丢失。但是,如果您有兴趣查看前一个或下一个记录,您可以执行一个接一个的连接。
【讨论】:
感谢您的回答...但是它对 Cassandra 表中的大数据有好处吗?因为在我们的例子中,我们将从 kafka 获取 1 lac 行,然后我们必须执行相同的操作(从 Cassandra 获取从 kafka 传入的每个行的上一行和下一行) @RajendraJangir 如果这是在如此精细的水平上完成的,那么你为什么要一开始就这样做呢?使用根据需要获取记录的普通 Java/Scala/etc 程序可以更容易地完成......对吗?另外,数据来源于 Kafka,难道不能使用 Spark Streaming 并使用其内置的滑动窗口功能吗? 是的,你是对的。非常感谢您提供的所有宝贵答案和建议。以上是关于SparkSql 查询以从 cassandra 获取定义值的上一行和下一行的主要内容,如果未能解决你的问题,请参考以下文章
使用 JDBC(例如 Squirrel SQL)用 Spark SQL 查询 Cassandra
如何使用 PySpark、SparkSQL 和 Cassandra?