将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式
Posted
技术标签:
【中文标题】将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式【英文标题】:Streaming events to Big Query - Dataflow - Best way to insert **epoch timestamp** (int) to timestamp column 【发布时间】:2021-11-16 00:20:29 【问题描述】:我正在尝试使用数据流 Apache Beam (python) 将流中的事件写入 Big Query 表,但时间戳格式存在问题。
我有一个带有 epoch timestamp (int) 值的事件 (json),我想将其插入到带有 timestamp 列的 BIG QUERY 表中。
最好的方法是什么? 我可以在不解析每个事件的情况下做到这一点吗?我可以声明收到的时间戳的格式吗?
例如:
event= 'ts' : 1630494181342 ,'user' : 'anat'
进入表格:
ts: timestamp , user: string
【问题讨论】:
嗨@anats,如果您觉得我的回答对您的问题有帮助,请考虑按照Stack Overflow guidelines 接受并投票。 【参考方案1】:Apache Beam 将转换应用于通过管道传递的所有元素。因此,定义的管道步骤将在将纪元时间提取到 BigQuery 之前,将每个元素的纪元时间纪元时间转换为日期时间格式。
以下 Apache Beam 管道代码将以秒为单位的纪元时间转换为日期时间,即。时间戳
示例代码:
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, mytime, timestamp=beam.DoFn.TimestampParam):
yield ''.format(timestamp.to_utc_datetime())
with beam.Pipeline() as pipeline:
plant_timestamps = (
pipeline
| 'My Time' >> beam.Create([
'ts': 1633013727, 'user': 'anant',
'ts': 1590969600, 'user':'samwilliam',
])
| 'With timestamps' >> beam.Map(
lambda mytime: beam.window.TimestampedValue(mytime, mytime['ts']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)
【讨论】:
以上是关于将事件流式传输到大查询 - 数据流 - 将 **epoch 时间戳** (int) 插入时间戳列的最佳方式的主要内容,如果未能解决你的问题,请参考以下文章