AWS Athena 创建表和分区

Posted

技术标签:

【中文标题】AWS Athena 创建表和分区【英文标题】:AWS Athena create table and partition 【发布时间】:2019-05-03 08:55:33 【问题描述】:

我将传感器数据存储在 S3 中(每 5 分钟写入一次数据):

farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443

1541252701443 是一个包含测量值的 json 文件:

  "temperature": 14.78,  "pressure": 961.70,  "humidity": 68.32

我肯定缺少一些蜂巢技能。不幸的是,我没有找到一个提取时间序列 json 数据的示例来帮助我入门。我也不确定 Hive / Athena 是否支持这种数据抓取。

我正在努力为这些数据创建一个 Athena 表...

CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
  device string,
  sensor string,
  data_point string,
  value double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/farm0001/sensor01/'
PARTITIONED BY (timestamp string)
TBLPROPERTIES ('has_encrypted_data'='false')

我正在考虑的另一条路是将数据存储在更易于处理的结构中/也许我没有对数据进行足够的分区??!

所以也许我应该像这样将 dt 添加到结构中:

farm_iot/sensor_data/2018-11-03-02-45-02/farm/farm0001/sensor01/1541252701443

仍然没有让我到达我想去的地方:

+---------------+----------+----------+-------------+--------+
| timestamp     | device   | sensor   | data_point  | value  |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | temperature |  14.78 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | humidity    |  68.32 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | pressure    | 961.70 |
+---------------+----------+----------+-------------+--------+

任何指向此目标的指针将不胜感激。谢谢!

请注意:我不想使用胶水,并且想了解如何手动操作。除了胶水昨天已经创建了~16.000张桌子:)

【问题讨论】:

您在写入数据或创建表时是否遇到错误?请发布错误消息。 @VamsiPrabhala 抱歉,这不是“我有一个错误问题” 刚刚编辑了一个替代方案以将数据保持在当前格式,但是性能不如使用分区 刚刚添加了一个帖子,其中详细介绍了我正在使用该mark-fink.de/2018-12-09-query-aws-athena-from-jupyter-notebook 【参考方案1】:

让我试着解释一下我在前面看到的一些问题。

看起来您想要的输出需要一些数据,这些数据是路径文件位置、设备和传感器的一部分,但它没有定义为表定义的一部分,只有表定义或 virtual columns 中的列可用. 几个小文件可能会影响查询的性能(但这不会影响您想要的结果) Hive 分区用于提高查询的性能,避免扫描 所有的数据。分区指向文件夹,在这种情况下,您正在尝试访问特定文件 您想要的输出基本上是在几条记录中爆炸 1 条记录,这不应该在表定义中处理,可以通过您的 select 语句来完成 Hive 分区具有partitionname=partitionvalue 的命名约定,这不是强制性的,但如果您希望提前执行命令以根据文件夹结构自动添加分区,这很有用。

如果您主要通过传感器或设备进行查询,这就是我将如何解决您的问题

更改数据结构

理想情况下,您的文件夹结构应该来自

farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443

到farm_iot/sensor_data/farm/device=farm0001/sensor=sensor01/1541252701443

更改表定义

您的表定义应该包含您的分区位置,以便能够在没有正则表达式的情况下选择它并利用它的性能改进(我猜一个常见的查询将按设备或传感器过滤。除此之外,您还需要添加所有作为文件一部分的 json 列

CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
  temperature double,
  preassure double,
  humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')

查询您的数据

我们缺少时间戳,它本质上是带有 json 输入的文件名的一部分。我们可以在 select 语句中使用虚拟列 INPUT__FILE__NAME 包含文件名,如下所示

select device, sensor, temperature, preassure, humidity, INPUT__FILE__NAME as mytimestamp from farm.sensor_data

如果你想要压力、温度和湿度以及不同的行,我建议用这三个创建一个数组并分解它,使用 UNION ALL 运行 3 个查询来附加结果应该会非常有效

添加新分区

如果您遵循 Hive 约定,则可以利用命令 msck repair table 在包含新设备/传感器后自动添加新分区。在最坏的情况下,如果您想保留文件夹结构,您可以按如下方式添加分区

ALTER TABLE test ADD PARTITION (device='farm0001', sensor='sensor01') location 's3://farm_iot/sensor_data/farm/farm0001/sensor01'

注意:新分区不会自动添加,您始终需要添加它们

我尝试添加尽可能多的细节。如果有不清楚的地方,请告诉我。

编辑: 如果您的查询主要基于时间序列(例如日期范围),我建议在日级别(不小于此)添加一个分区以提高查询的性能。所以你的表定义看起来像

CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
  temperature double,
  preassure double,
  humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (dt=long, device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')

你的文件夹结构看起来像

farm_iot/sensor_data/farm/dt=20191204/device=farm0001/sensor=sensor01/1541252701443

作为澄清,您不需要为每个新分区修改表,只需将此分区添加到表中,这本质上是 Hive 将如何知道创建了一个新分区。如果你决定使用分区,这是唯一的方法,如果你不这样做(这会影响性能),还有一些其他的方法可以让它工作

EDIT2:

如果您想保持数据结构不变并且不使用分区,则可以获得如下预期结果

CREATE EXTERNAL TABLE IF NOT EXISTS yourdb.sensordata (
  temperature double,
  pressure double,
  humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) 
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');

SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;
select * from yourdb.sensordata;

select 
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'temperature' as data_point,
temperature as value
from yourdb.sensordata
union all
select 
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'pressure' as data_point,
pressure as value
from yourdb.sensordata
union all
select 
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'humidity' as data_point,
humidity as value
from yourdb.sensordata;

如您所见,我从文件路径中获取了大部分信息,但是需要设置一些标志来告诉 Hive 递归读取文件夹

ts,device,sensor,_data_point,value
1541252701443,farm0001,sensor01,temperature,14.78
1541252701443,farm0001,sensor01,pressure,961.7
1541252701443,farm0001,sensor01,humidity,68.32

【讨论】:

感谢您的回答。我不太喜欢您建议的表格更改,因为我必须使用我引入的每种新传感器类型来更改表格定义。如果我理解正确,那么不可能将 json 数据设置为我想要的格式。这意味着我最好以 csv 格式('device'、'sensor'、'data_point'、'value')存储传感器数据。设备分区可能会变得有用。我之前没有提到这一点,但常见的查询会从两年的数据中按时间片(例如一天或一周)过滤时间序列。 如果我使用 dt 和设备分区,例如“farm_iot/sensor_data/dt=2018-11-03-02-45-02/farm/device=farm0001/sensor01/1541252701443”,“查询工作中 dt 和 mytimestamp 之间的相关性”?这可能是一些我还没有理解的蜂巢魔法。顺便提一句。使用分区是否意味着我必须在每次查询之前重新创建表才能获取最新数据?不知何故让我想起了几年前的 CouchDB…… 不需要重新创建表,您只需要运行 msck repair table 命令来添加任何新分区,在这种情况下,仅当您在设备或传感器级别添加新文件夹时,没有需要餐桌娱乐。你应该能够毫无问题地加载你得到的 json,从这个角度来看, csv 或 json 不会有任何区别。关于您将 dt 添加到分区的建议,这对我来说很有意义,如果这将是一个常见的查询,但是我建议仅在文件夹级别添加,仅在日级别,您将结束 分区过多也会影响性能。让我更新我对这一点的回答 完全没有问题,很乐意提供帮助。我尽量不要触摸或更改您存储数据的方式(您并不总是可以控制它)。我很高兴找到一个更简单的解决方案很有帮助【参考方案2】:

首先非常感谢@hlagos 的帮助。

AWS Athena 无法按照我需要的方式转换 json 传感器数据(我们在 cmets 中对 @hlagos 的回答进行了讨论)。因此,处理这种情况的“最简单”方法是将数据格式从 json 更改为 CSV,以更接近我需要的格式。

我现在将传感器数据以 CSV 格式存储在 S3 中(每 5 分钟写入一次数据),另外我还添加了我们讨论过的日期和设备分区。

生成的文件夹结构:

farm_iot/sensor_data/farm/day=20181129/device=farm0001/1543535738493

CSV文件的数据内容:

sensor01,temperature,2.82
sensor01,pressure,952.83
sensor01,humidity,83.64
sensor02,temperature,2.61
sensor02,pressure,952.74
sensor02,humidity,82.41

AWS Athena 表定义:

CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
  `sensor` string,
  `data_point` string,
  `value` double 
) 
PARTITIONED BY (day string, device string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');

我添加的分区是这样的(后面我会有一个脚本来提前创建分区):

msck repair table farm.sensor_data

现在我可以查询数据了:

select regexp_extract("$path", '[^/]+$') as timestamp, device, sensor, 
    data_point, value from farm.sensor_data where day='20181104'

Results
    timestamp       device      sensor      data_point  value
1   1541310040278   farm0001    sensor01    temperature 21.61
2   1541310040278   farm0001    sensor01    pressure    643.65
3   1541310040278   farm0001    sensor01    humidity    74.84
4   1541310040278   farm0001    sensor02    temperature 9.14
5   1541310040278   farm0001    sensor02    pressure    956.04
6   1541310040278   farm0001    sensor02    humidity    88.01
7   1541311840309   farm0001    sensor01    temperature 21.61
8   ...

【讨论】:

以上是关于AWS Athena 创建表和分区的主要内容,如果未能解决你的问题,请参考以下文章

正则表达式创建 AWS Athena 表 (RegexSerDe)

aws glue / pyspark - 如何使用 Glue 以编程方式创建 Athena 表

AWS Athena 使用填充错误数据的创建表从 Epoch 转换为时间戳

AWS Athena 分区获取所有路径

amazon athena 创建带分区的请求

AWS Athena 从 S3 的 GLUE Crawler 输入 csv 创建的表中返回零记录