通过 AWS 上的 API 在胶表上添加分区?
Posted
技术标签:
【中文标题】通过 AWS 上的 API 在胶表上添加分区?【英文标题】:Add a partition on glue table via API on AWS? 【发布时间】:2018-11-11 07:50:35 【问题描述】:我有一个 S3 存储桶,它不断被新数据填充,我正在使用 Athena 和 Glue 来查询该数据,问题是如果胶水不知道创建了一个新分区,它就不会搜索它需要在那里搜索。如果我每次需要一个新分区时都调用 API 来运行 Glue 爬虫,这太昂贵了,所以最好的解决方案是告诉胶水添加了一个新分区,即在它的属性表中创建一个新分区。我查看了 AWS 文档,但没有运气,我将 Java 与 AWS 一起使用。有什么帮助吗?
【问题讨论】:
太贵了在计算或金钱方面? 从金钱上来说,这么多占用CPU并不是什么难操作。 那么如果您知道何时添加新分区,请尝试我的答案中的#3 选项。 @Gudzo - 如果有帮助,你能接受我的回答吗? @Gudzo 你好?检查您是否可以接受我的解决方案 【参考方案1】:您可以将您的胶水爬虫配置为每 5 分钟触发一次
您可以创建一个 lambda 函数,该函数将按计划运行,或者由存储桶中的事件(例如 putObject 事件)触发,并且该函数可以调用 athena 来发现分区:
import boto3
athena = boto3.client('athena')
def lambda_handler(event, context):
athena.start_query_execution(
QueryString = "MSCK REPAIR TABLE mytable",
ResultConfiguration =
'OutputLocation': "s3://some-bucket/_athena_results"
使用 Athena 手动添加分区。您还可以像我的 lambda 示例一样通过 API 运行 sql 查询。
来自Athena manual的示例:
ALTER TABLE orders ADD
PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
【讨论】:
“您可以将您的胶水目录配置为每 5 分钟触发一次”...请问这是什么意思? 哦,对不起 - 我的意思是写“你可以配置你的胶水爬虫每 5 分钟触发一次”。我会更新我的答案。【参考方案2】:您可能想使用batch_create_partition()
glue api 来注册新分区。它不需要任何昂贵的操作,例如 MSCK REPAIR TABLE 或重新爬网。
我有一个类似的用例,我为此编写了一个 python 脚本,它执行以下操作 -
第 1 步 - 获取表信息并从中解析注册分区所需的必要信息。
# Fetching table information from glue catalog
logger.info("Fetching table info for .".format(l_database, l_table))
try:
response = l_client.get_table(
CatalogId=l_catalog_id,
DatabaseName=l_database,
Name=l_table
)
except Exception as error:
logger.error("Exception while fetching table info for . - "
.format(l_database, l_table, error))
sys.exit(-1)
# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']
第 2 步 - 生成列表字典,其中每个列表包含创建单个分区的信息。所有列表都将具有相同的结构,但它们的分区特定值会发生变化(年、月、日、小时)
def generate_partition_input_list(start_date, num_of_days, table_location,
input_format, output_format, serde_info):
input_list = [] # Initializing empty list
today = datetime.utcnow().date()
if start_date > today: # To handle scenarios if any future partitions are created manually
start_date = today
end_date = today + timedelta(days=num_of_days) # Getting end date till which partitions needs to be created
logger.info("Partitions to be created from to ".format(start_date, end_date))
for input_date in date_range(start_date, end_date):
# Formatting partition values by padding required zeroes and converting into string
year = str(input_date)[0:4].zfill(4)
month = str(input_date)[5:7].zfill(2)
day = str(input_date)[8:10].zfill(2)
for hour in range(24): # Looping over 24 hours to generate partition input for 24 hours for a day
hour = str(':02d'.format(hour)) # Padding zero to make sure that hour is in two digits
part_location = "////".format(table_location, year, month, day, hour)
input_dict =
'Values': [
year, month, day, hour
],
'StorageDescriptor':
'Location': part_location,
'InputFormat': input_format,
'OutputFormat': output_format,
'SerdeInfo': serde_info
input_list.append(input_dict.copy())
return input_list
第 3 步 - 调用 batch_create_partition() API
for each_input in break_list_into_chunks(partition_input_list, 100):
create_partition_response = client.batch_create_partition(
CatalogId=catalog_id,
DatabaseName=l_database,
TableName=l_table,
PartitionInputList=each_input
)
单个 api 调用中的分区限制为 100 个,因此如果您创建的分区超过 100 个,则需要将列表分成块并对其进行迭代。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition
【讨论】:
嘿@conetfun,我试图找到单个 API 中分区数量的限制。有没有提到它的官方文档?【参考方案3】:这个问题很老,但我想把它说出来,有人可以让s3:ObjectCreated:Put
通知触发一个 Lambda 函数,该函数在数据到达 S3 时注册新分区。我什至会扩展这个函数来处理基于对象删除等的弃用。这是 AWS 的一篇博文,其中详细介绍了 S3 事件通知:https://aws.amazon.com/blogs/aws/s3-event-notification/
【讨论】:
【参考方案4】:AWS Glue 最近添加了一个 RecrawlPolicy,它只抓取您添加到 S3 存储桶的新文件夹/分区。
https://docs.aws.amazon.com/glue/latest/dg/incremental-crawls.html
这应该可以帮助您最大限度地减少再次抓取所有数据。根据我的阅读,您可以在设置爬虫或编辑现有爬虫时定义增量爬网。不过需要注意的一点是,增量爬网要求新数据的架构与现有架构大致相同。
【讨论】:
这可能适用于一些非常简单的模式......但对于更复杂的模式忘记爬虫并使用上面列出的策略。以上是关于通过 AWS 上的 API 在胶表上添加分区?的主要内容,如果未能解决你的问题,请参考以下文章
AWS Glue 数据目录,具有 S3 文件上的分区表和分区中的不同架构
通过 AWS EC2 上的 Docker 容器发布时,Auth0 OWIN API 未验证 JWT 令牌