通过 AWS 上的 API 在胶表上添加分区?

Posted

技术标签:

【中文标题】通过 AWS 上的 API 在胶表上添加分区?【英文标题】:Add a partition on glue table via API on AWS? 【发布时间】:2018-11-11 07:50:35 【问题描述】:

我有一个 S3 存储桶,它不断被新数据填充,我正在使用 Athena 和 Glue 来查询该数据,问题是如果胶水不知道创建了一个新分区,它就不会搜索它需要在那里搜索。如果我每次需要一个新分区时都调用 API 来运行 Glue 爬虫,这太昂贵了,所以最好的解决方案是告诉胶水添加了一个新分区,即在它的属性表中创建一个新分区。我查看了 AWS 文档,但没有运气,我将 Java 与 AWS 一起使用。有什么帮助吗?

【问题讨论】:

太贵了在计算或金钱方面? 从金钱上来说,这么多占用CPU并不是什么难操作。 那么如果您知道何时添加新分区,请尝试我的答案中的#3 选项。 @Gudzo - 如果有帮助,你能接受我的回答吗? @Gudzo 你好?检查您是否可以接受我的解决方案 【参考方案1】:

    您可以将您的胶水爬虫配置为每 5 分钟触发一次

    您可以创建一个 lambda 函数,该函数将按计划运行,或者由存储桶中的事件(例如 putObject 事件)触发,并且该函数可以调用 athena 来发现分区

     import boto3
    
     athena = boto3.client('athena')
    
     def lambda_handler(event, context):
         athena.start_query_execution(
             QueryString = "MSCK REPAIR TABLE mytable",
             ResultConfiguration = 
                 'OutputLocation': "s3://some-bucket/_athena_results"
             
    

    使用 Athena 手动添加分区。您还可以像我的 lambda 示例一样通过 API 运行 sql 查询。

    来自Athena manual的示例:

     ALTER TABLE orders ADD
       PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
       PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    

【讨论】:

“您可以将您的胶水目录配置为每 5 分钟触发一次”...请问这是什么意思? 哦,对不起 - 我的意思是写“你可以配置你的胶水爬虫每 5 分钟触发一次”。我会更新我的答案。【参考方案2】:

您可能想使用batch_create_partition()glue api 来注册新分区。它不需要任何昂贵的操作,例如 MSCK REPAIR TABLE 或重新爬网。

我有一个类似的用例,我为此编写了一个 python 脚本,它执行以下操作 -

第 1 步 - 获取表信息并从中解析注册分区所需的必要信息。

# Fetching table information from glue catalog
logger.info("Fetching table info for .".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for . - "
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']

第 2 步 - 生成列表字典,其中每个列表包含创建单个分区的信息。所有列表都将具有相同的结构,但它们的分区特定值会发生变化(年、月、日、小时)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from  to ".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str(':02d'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "////".format(table_location, year, month, day, hour)
            input_dict = 
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': 
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                
            
            input_list.append(input_dict.copy())
    return input_list

第 3 步 - 调用 batch_create_partition() API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )

单个 api 调用中的分区限制为 100 个,因此如果您创建的分区超过 100 个,则需要将列表分成块并对其进行迭代。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition

【讨论】:

嘿@conetfun,我试图找到单个 API 中分区数量的限制。有没有提到它的官方文档?【参考方案3】:

这个问题很老,但我想把它说出来,有人可以让s3:ObjectCreated:Put 通知触发一个 Lambda 函数,该函数在数据到达 S3 时注册新分区。我什至会扩展这个函数来处理基于对象删除等的弃用。这是 AWS 的一篇博文,其中详细介绍了 S3 事件通知:https://aws.amazon.com/blogs/aws/s3-event-notification/

【讨论】:

【参考方案4】:

AWS Glue 最近添加了一个 RecrawlPolicy,它只抓取您添加到 S3 存储桶的新文件夹/分区。

https://docs.aws.amazon.com/glue/latest/dg/incremental-crawls.html

这应该可以帮助您最大限度地减少再次抓取所有数据。根据我的阅读,您可以在设置爬虫或编辑现有爬虫时定义增量爬网。不过需要注意的一点是,增量爬网要求新数据的架构与现有架构大致相同。

【讨论】:

这可能适用于一些非常简单的模式......但对于更复杂的模式忘记爬虫并使用上面列出的策略。

以上是关于通过 AWS 上的 API 在胶表上添加分区?的主要内容,如果未能解决你的问题,请参考以下文章

将数据添加到主表上的多个记录的链接表中

AWS Glue 数据目录,具有 S3 文件上的分区表和分区中的不同架构

通过 AWS EC2 上的 Docker 容器发布时,Auth0 OWIN API 未验证 JWT 令牌

尚不支持 BigQuery、非分区表上的通配符表和基于字段的分区表

具有集群和分区的表上的 Bigquery SQL 性能问题

Prestodb (AWS EMR) 加载分区元数据