如何让 MSCK REPAIR TABLE 在 AWS Athena 中自动执行
Posted
技术标签:
【中文标题】如何让 MSCK REPAIR TABLE 在 AWS Athena 中自动执行【英文标题】:How to make MSCK REPAIR TABLE execute automatically in AWS Athena 【发布时间】:2018-05-12 19:13:45 【问题描述】:我有一个每小时执行一次的 Spark 批处理作业。每次运行都会生成新数据并将其存储在 S3
中,目录命名模式为 DATA/YEAR=?/MONTH=?/DATE=?/datafile
。
将数据上传到S3
后,我想使用Athena
进行调查。另外,我想通过连接到 Athena 作为数据源在 QuickSight
中可视化它们。
问题是每次运行我的 Spark 批处理后,Athena 不会发现存储在 S3
中的新生成的数据,除非我手动运行查询 MSCK REPAIR TABLE
。
有没有办法让 Athena 自动更新数据,这样我就可以创建一个全自动的数据可视化管道?
【问题讨论】:
@samuel_liew 这个问题并不广泛,它只是为围绕问题的上下文提供了一些额外的信息。 OP 想要一个完全自动化的数据即管道在技术上可能并不重要,但是上下文对于允许人们提供指导以解决潜在挑战很重要。该特定挑战是在 Athena 中管理分区,因为它们是需要创建的不同元数据对象。它们不是自动创建或发现的,这是非常出乎意料的,这一点的赞成票数量很明显。 【参考方案1】:有多种方法可以安排此任务。您如何安排工作流程?你是使用Airflow、Luigi、Azkaban、cron 之类的系统,还是使用AWS Data pipeline?
从其中任何一个中,您都应该能够触发以下 CLI 命令。
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
另一个选项是AWS Lambda。您可以有一个函数调用MSCK REPAIR TABLE some_database.some_table
以响应新上传到 S3。
一个示例 Lambda 函数可以这样编写:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config =
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': 'EncryptionOption': 'SSE_S3'
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = 'Database': 'some_database'
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
然后,您将配置一个触发器以在您的存储桶中的 DATA/
前缀下添加新数据时执行您的 Lambda 函数。
最终,在使用作业调度程序运行 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 对于这样的工作很方便。
【讨论】:
我认为另一种可能性是在您的 Glue 脚本中使用 boto。您应该可以使用this 来执行相关的MSCK REPAIR TABLE
命令
我使用预定的 AWS Crawler 来爬取数据库以更新表。您对此解决方案有何看法?
可以的。使用 Lambda 函数的好处是 Lambda 可以动态响应事件,例如在 Athena 的情况下将文件添加到 S3。 Lambda 函数的坏处在于,从持续集成和版本控制的角度来看,它们管理起来有些混乱。
在这种情况下,Lambda 的另一个坏处是它的执行必须在 5 分钟内完成,这对于 REPAIR TABLE 来说可能很短(但对于 ADD PARTITION 来说已经足够了)
关于这个问题的任何想法***.com/questions/63149782/…【参考方案2】:
您应该改为运行ADD PARTITION
:
aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."
这会从您的S3
位置添加一个新创建的分区
Athena 利用 Hive 对数据进行分区。
要创建带有分区的表,您必须在 CREATE TABLE
语句期间定义它。使用PARTITIONED BY
定义用于分区数据的键。
【讨论】:
不确定如果您使用 Firehose 将数据放入 Athena 存储桶中是否可行。即使使用“动态”分区,您仍然需要指定分区:-( @RalphBolton 使用 Firehose 登录时,您还可以使用分区投影。看我的回答。【参考方案3】:有多种方法可以解决问题并更新表格:
致电MSCK REPAIR TABLE
。这将扫描所有数据。这是昂贵的,因为每个文件都被完整读取(至少它是由 AWS 完全收费的)。它也非常缓慢。简而言之:不要这样做!
通过调用ALTER TABLE ADD PARTITION abc ...
自行创建分区。从某种意义上说,这很好,无需扫描数据并且成本低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱而没有任何通用模式(在您看来并非如此,因为它是一个组织良好的 S3 密钥模式),这也是一个不错的选择。 这种方法也有缺点: A) 难以维护 B) 所有分区都将存储在 GLUE 目录中。当您有很多分区时,这可能会成为一个问题,因为它们需要被读取并传递给 Athenas 和 EMR Hadoop 基础架构。
使用分区投影。您可能想要评估两种不同的风格。这是在查询时为 Hadoop 创建分区的变体。这意味着没有 GLUE 目录条目通过网络发送,因此可以更快地处理大量分区。缺点是您可能会“碰到”一些可能不存在的分区。这些当然会被忽略,但在内部将生成所有 COULD 与您的查询匹配的分区 - 无论它们是否在 S3 上(因此请始终将分区过滤器添加到您的查询中!)。如果操作正确,此选项是一种一劳永逸的方法,因为不需要更新。
CREATE EXTERNAL TABLE `mydb`.`mytable`
(
...
)
PARTITIONED BY (
`YEAR` int,
`MONTH` int,
`DATE` int)
...
LOCATION
's3://DATA/'
TBLPROPERTIES(
"projection.enabled" = "true",
"projection.account.type" = "integer",
"projection.account.range" = "1,50",
"projection.YEAR.type" = "integer",
"projection.YEAR.range" = "2020,2025",
"projection.MONTH.type" = "integer",
"projection.MONTH.range" = "1,12",
"projection.DATE.type" = "integer",
"projection.DATE.range" = "1,31",
"storage.location.template" = "s3://DATA/YEAR=$YEAR/MONTH=$MONTH/DATE=$DATE/"
);
https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
仅列出所有选项:您也可以使用GLUE crawlers
。但这似乎不是一个有利的方法,因为它不像宣传的那样灵活。
您可以直接使用Glue Data Catalog API
对 GLUE 进行更多控制,如果您有很多自动化脚本,这可能是方法 #2 的替代方法
做准备工作来设置你的桌子。
简而言之:
如果您的应用程序以 SQL 为中心,您喜欢没有脚本的最精简方法,请使用分区投影 如果您有很多分区,请使用分区投影 如果您有几个分区或分区没有通用模式,请使用方法 #2 如果您的脚本繁重,并且脚本完成了大部分工作并且更容易为您处理,请考虑方法 #5 如果您感到困惑并且不知道从哪里开始 - 请先尝试分区投影!它应该适合 95% 的用例。【讨论】:
关于投影的注意事项:阅读 AWS 文档中的注意事项和限制部分。 RTFM 总是一个好主意。您能否详细说明您特别关注的是什么(因为我自己正在使用这种方法,所以我会对任何陷阱都非常感兴趣)?顺便说一句,官方的 AWS 预测文档已经链接在我的回答中。谢谢! 当然!非常好的答案顺便说一句。关于“如果太多分区为空”的部分。我只是重读它,我可能会读错......这意味着如果分区内没有任何内容,而不是如果分区不存在。我将它与 Firehose 一起使用到 S3 两种投影机制在这里不同。示例一个创建所有可能的分区。例如如果您仅定义 "projection.YEAR.range" = "2000,3000" 并且不对查询应用过滤器,则该机制将创建 1000 个分区(如果未过滤,则具有多个分区键,它将创建一个笛卡尔积)。分区将被传递给执行(到集群)。 Presto 会跳过空分区,但您会陷入与 GLUE 相同的陷阱:数据传输正在杀死您。我的经历(偶然)创建数万个分区会很慢。 @Tanmay 我一开始也是这么想的。正确的是只创建了新的分区。但它确实读取数据并收费(相信我 - 我非常确定这一点,因为它出乎意料地打击了我们)。运行时间也会增加。有没有想过,为什么 2021 年的文件列表需要将近 9 秒才能读完? Presto 上有说明文件需要打开的信息。 Presto 有一个特定的模式/驱动程序/fs 层补丁/任何东西都可以解决这个问题,但在 Athena 1 和 2 中却不行。使用投影,你永远不会回头。以上是关于如何让 MSCK REPAIR TABLE 在 AWS Athena 中自动执行的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 中是不是有等效于 SQL 的 MSCK REPAIR TABLE 的方法
对于小型数据集,AWS Athena MSCK REPAIR TABLE 花费的时间太长
MySQL错误修复:Table crashed repair
Incorrect key file for table './xx_db/xx_table.MYI'; try to repair it