将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式

Posted

技术标签:

【中文标题】将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式【英文标题】:Streaming events to Big Query - Dataflow - Best way to insert **epoch timestamp** (int) to timestamp column 【发布时间】:2021-11-16 00:20:29 【问题描述】:

我正在尝试使用数据流 Apache Beam (python) 将流中的事件写入 Big Query 表,但时间戳格式存在问题。

我有一个带有 epoch timestamp (int) 值的事件 (json),我想将其插入到带有 timestamp 列的 BIG QUERY 表中。

最好的方法是什么? 我可以在不解析每个事件的情况下做到这一点吗?我可以声明收到的时间戳的格式吗?

例如:

event= 'ts' : 1630494181342 ,'user' : 'anat'

进入表格:

ts: timestamp , user: string

【问题讨论】:

嗨@anats,如果您觉得我的回答对您的问题有帮助,请考虑按照Stack Overflow guidelines 接受并投票。 【参考方案1】:

Apache Beam 将转换应用于通过管道传递的所有元素。因此,定义的管道步骤将在将纪元时间提取到 BigQuery 之前,将每个元素的纪元时间纪元时间转换为日期时间格式。

以下 Apache Beam 管道代码将以秒为单位的纪元时间转换为日期时间,即。时间戳

示例代码:

import apache_beam as beam
 
class GetTimestamp(beam.DoFn):
 def process(self, mytime, timestamp=beam.DoFn.TimestampParam):
   yield ''.format(timestamp.to_utc_datetime())
 
with beam.Pipeline() as pipeline:
 plant_timestamps = (
     pipeline
     | 'My Time' >> beam.Create([
         'ts': 1633013727, 'user': 'anant',
         'ts': 1590969600, 'user':'samwilliam',   
     ])
     | 'With timestamps' >> beam.Map(
         lambda mytime: beam.window.TimestampedValue(mytime, mytime['ts']))
     | 'Get timestamp' >> beam.ParDo(GetTimestamp())
     | beam.Map(print)
 )

【讨论】:

以上是关于将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式的主要内容,如果未能解决你的问题,请参考以下文章

将数据从 MySQL 二进制日志流式传输到 Kinesis

将原始“事件”数据从 Firebase 导出到大查询?

使用 GET 方法将数据流式传输到 Google BigQuery?

未找到任何事件 - Azure 事件中心

Spark:在流式查询中使用事件时间滑动窗口时出现问题

结构化流式传输:具有流式传输源的查询必须使用 writeStream.start() 执行