如何使用 BigQuery 和 Apache Beam 将 SQL 表转换为行序列列表?

Posted

技术标签:

【中文标题】如何使用 BigQuery 和 Apache Beam 将 SQL 表转换为行序列列表?【英文标题】:How to transform an SQL table into a list of row sequences using BigQuery and Apache Beam? 【发布时间】:2019-11-04 19:10:06 【问题描述】:

我有一个非常大的表,其中每一行代表一个称为 Trip 的抽象。行程由数字列组成,例如车辆 ID、行程 ID、开始时间、停止时间、行驶距离、行驶时长等。因此每个行程都是浮点值的一维向量。

我想将此表或向量列表转换为行程序列列表,其中行程按车辆 ID 分组为序列,并根据开始时间排序。序列长度需要限制为特定大小,例如 256,但可以/应该有多个具有相同 VehicleId 的序列。

示例: (序列长度 = 4)

[  
(Vehicle1, [Trip1, Trip2, Trip3, Trip4]),  
(Vehicle1, [Trip5, Trip6, Trip7]),  
(Vehicle2, [Trip1, Trip2, Trip3, Trip4])  
]

我正在尝试使用基于序列的模型(例如 LSTM / Transformer)基于这些行程对驾驶模式进行建模。将每个行程想象成一个词嵌入,每个行程序列作为一个句子。不知何故,我需要通过 BigQuery / Apache Beam 函数(或任何其他推荐的工具)的组合来构建这些句子,因为我们正在谈论数百 GB 的数据。我对这两种工具都很陌生,因此我们将不胜感激。

【问题讨论】:

【参考方案1】:

以下是 BigQuery 标准 SQL

#standardSQL
SELECT trip.vehicle_id, ARRAY_AGG(trip ORDER BY trip.start_time) trips
FROM (
  SELECT trip, DIV(ROW_NUMBER() OVER(PARTITION BY vehicle_id ORDER BY start_time) - 1, 4) grp   
  FROM `project.dataset.table` trip
)
GROUP BY trip.vehicle_id, grp

以上假设按 start_time 和序列长度 = 4 对行程进行排序 此外,它返回 vehicle_id 作为数组中行程信息的一部分 - 如下例所示

Row vehicle_id  trips.vehicle_id    trips.trip_id   trips.start_time    trips.stop_time  
1   Vehicle1    Vehicle1            Trip1           1                   2    
                Vehicle1            Trip2           2                   3    
                Vehicle1            Trip3           3                   4    
                Vehicle1            Trip4           4                   5    
2   Vehicle1    Vehicle1            Trip5           5                   6    
                Vehicle1            Trip6           6                   6    
                Vehicle1            Trip7           7                   6    
3   Vehicle2    Vehicle2            Trip1           2                   3    
                Vehicle2            Trip2           3                   4    
                Vehicle2            Trip3           4                   5    
                Vehicle2            Trip4           5                   6    

要消除这个 - 试试下面

#standardSQL
SELECT vehicle_id,
  ARRAY( 
    SELECT AS STRUCT * EXCEPT(vehicle_id)
    FROM UNNEST(trips)
    ORDER BY start_time
  ) trips
FROM (
  SELECT trip.vehicle_id, ARRAY_AGG(trip ORDER BY trip.start_time) trips
  FROM (
    SELECT trip, DIV(ROW_NUMBER() OVER(PARTITION BY vehicle_id ORDER BY start_time) - 1, 4) grp   
    FROM `project.dataset.table` trip
  )
  GROUP BY trip.vehicle_id, grp
)


Row vehicle_id  trips.trip_id   trips.start_time    trips.stop_time  
1   Vehicle1    Trip1           1                   2    
                Trip2           2                   3    
                Trip3           3                   4    
                Trip4           4                   5    
2   Vehicle1    Trip5           5                   6    
                Trip6           6                   6    
                Trip7           7                   6    
3   Vehicle2    Trip1           2                   3    
                Trip2           3                   4    
                Trip3           4                   5    
                Trip4           5                   6    

【讨论】:

我试过了,正是我需要的,谢谢!只是想知道是否有任何方法可以按天数而不是特定的序列长度来窗口化。即行程根据时间上不重叠的滑动窗口聚合,结果是可变长度的行程序列。 当然。我认为这是可行的 - 请发布带有这些新细节的新问题,以便我或其他人可以回答:o)

以上是关于如何使用 BigQuery 和 Apache Beam 将 SQL 表转换为行序列列表?的主要内容,如果未能解决你的问题,请参考以下文章

如何配置 apache 在同一台机器上使用 FE 和 BE?

使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId

在Apache Spark中使用Bigquery Connector时如何设置分区数?

Apache Beam 数据流 BigQuery

如何使用 Apache BEAM 在 BigQuery 中执行快速联接

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表