使用流式数据帧进行内部连接

Posted

技术标签:

【中文标题】使用流式数据帧进行内部连接【英文标题】:Inner Join with streaming dataframes 【发布时间】:2020-02-19 09:02:18 【问题描述】:

所以我在 pyspark 中有这个流数据帧 (gps_messages)-

并且我希望生成的数据帧具有相同的(所有)列,但每个 device_unique_id 具有最高时间戳值的记录/行,所以基本上类似于 -

                                                              (MAX)
+----------------+-----------+--------+---------+---------+----------+
|device_unique_id|signal_type|latitude|longitude|elevation| Timestamp|
+----------------+-----------+--------+---------+---------+----------+
|       TR1      |loc_update |-35.5484|149.61684|666.47164|   12345  |  <-- *NOTE - please check below
|       TR2      |loc_update |-35.5484|149.61684|666.47164|   87251  |
|       TR3      |loc_update |-35.5484|149.61684|666.47164|   32458  |
|       TR4      |loc_update |-35.5484|149.61684|666.47164|   98274  |
+----------------+-----------+--------+---------+---------+----------+

*Note = only 1 record for TR1 from previous dataframe which had max value of timeframe among all records having 'device_unique_id'=='TR1'

到目前为止,我已经写了这段代码,

gps_messages.createOrReplaceTempView('gps_table')
SQL_QUERY = 'SELECT device_unique_id, max(timestamp) as timestamp ' \
            'FROM gps_table ' \
            'GROUP BY device_unique_id'

# SQL_QUERY1 = 'SELECT * ' \
#              'FROM gps_table t2 ' \
#              'JOIN (SELECT device_unique_id AS unique_id, max(timestamp) AS time ' \
#              'FROM gps_table t1 ' \
#              'GROUP BY unique_id) t1 ' \
#              'ON t2.device_unique_id = t1.unique_id ' \
#              'AND t2.timestamp = t1.time'

filtered_gps_messages = spark.sql(SQL_QUERY)

filtered_gps_messages.createOrReplaceTempView('table_max_ts')
SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
            'FROM table_max_ts b, gps_table a ' \
            'WHERE b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'

latest_data_df = spark.sql(SQL_QUERY)

query = latest_data_df \
    .writeStream \
    .outputMode('append') \
    .format('console') \
    .start()

query.awaitTermination()

它抛出了这个错误 -

raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nProject [device_unique_id#25, signal_type#26, latitude#27, longitude#28, elevation#29, timestamp#30, unique_id#43, time#44]\n+- Join Inner, ((device_unique_id#25 = unique_id#43) && (timestamp#30 = time#44))\n   :- SubqueryAlias `t2`\n   :  +- SubqueryAlias `gps_table`\n   :     +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n   :        +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n   :           +- Project [cast(value#8 as string) AS value#21]\n   :              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n   +- SubqueryAlias `t1`\n      +- Aggregate [device_unique_id#25], [device_unique_id#25 AS unique_id#43, max(timestamp#30) AS time#44]\n         +- SubqueryAlias `t1`\n            +- SubqueryAlias `gps_table`\n               +- Project [json#23.device_unique_id AS device_unique_id#25, json#23.signal_type AS signal_type#26, json#23.latitude AS latitude#27, json#23.longitude AS longitude#28, json#23.elevation AS elevation#29, json#23.timestamp AS timestamp#30]\n                  +- Project [jsontostructs(StructField(device_unique_id,StringType,true), StructField(signal_type,StringType,true), StructField(latitude,StringType,true), StructField(longitude,StringType,true), StructField(elevation,StringType,true), StructField(timestamp,StringType,true), value#21, Some(Asia/Kolkata)) AS json#23]\n                     +- Project [cast(value#8 as string) AS value#21]\n                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@49a5cdc2, kafka, Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@611544,kafka,List(),None,List(),None,Map(subscribe -> gpx_points_input, kafka.bootstrap.servers -> 172.17.9.26:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'

Process finished with exit code 1

如果我尝试使用“完整”输出模式,它会显示 -

分析异常:完整模式不支持两个流式数据帧/数据集之间的内连接,仅在附加模式下支持。

我在这里做错了什么?有没有替代方法或解决方法? 为问题类型道歉,我是新手。 谢谢。

【问题讨论】:

not supported in Complete mode, only in append mode.without watermark 有什么不清楚的地方? 嗨@cricket_007,先生,我非常感谢您回答我在 pyspark 上的每个问题并帮助我,尽管我是这方面的一个巨大的初学者,我什至不太熟悉中级 sql 概念。我阅读了错误并尝试了理解水印和窗口功能和概念,但我不明白如何在我的案例中使用这个概念。恕我直言,我并没有要求这个社区的其他人为我编写代码,我只是在寻找如何解决错误和/或完成所需任务的正确方法的指导。 另外,在一些文章中,我发现流数据帧(根本)不支持连接,这就是我不确定的原因。您在这个社区中更有经验会知道陷入某些错误并且无法解决它或有解决方法是多么令人沮丧。我想再次为我的问题的不成熟表示歉意,我只是想了解如何完成工作。 好吧,我对 Spark Streaming 没有太多实际经验,所以祝你好运! 【参考方案1】:

看看这里 => http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries 流模式不支持某些连接。

也许使用左外连接。

在追加模式下写作应该可以解决问题

SQL_QUERY = 'SELECT a.device_unique_id, a.signal_type, a.longitude, a.latitude, a.timestamp ' \
        'FROM table_max_ts b
         LEFT JOIN gps_table a ' \
        'ON b.timestamp==a.timestamp AND b.device_unique_id==a.device_unique_id'

编辑:需要水印以确保及时查看版权数据。对于外连接

    filtered_gps_messagesW = filtered_gps_messages.withWatermark("timestamp", "2 hours")
    gps_messagesW= gps_messages.withWatermark("timestamp", "3 hours")

然后将带水印的 DS 注册为 tmpTables,你应该没问题。根据需要调整时间间隔。

【讨论】:

感谢您的回复,非常感谢。上述带有附加模式的查询会引发相同的错误:>>“当流式 DataFrames/DataSets 上存在流式聚合时,不支持附加输出模式而没有水印”。我要研究水印概念吗? 我面临着类似的情况。 append 模式有效,而 complete 无效。

以上是关于使用流式数据帧进行内部连接的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花流中刷新加载的数据帧内容?

旋转流式数据帧 pyspark

将视频帧转换为流式视频

如何在流式传输期间每 n 秒获取特定帧?

Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列

带有广播连接的 Spark 流式传输