如何使用 Boto3 get_query_results 方法从 AWS Athena 创建数据框

Posted

技术标签:

【中文标题】如何使用 Boto3 get_query_results 方法从 AWS Athena 创建数据框【英文标题】:How to Create Dataframe from AWS Athena using Boto3 get_query_results method 【发布时间】:2019-02-01 05:31:13 【问题描述】:

我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出存储桶,因此我曾经这样做:

df = pd.read_csv(OutputLocation)

但这似乎是一种昂贵的方式。最近我注意到boto3get_query_results 方法返回一个复杂的结果字典。

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

我面临两个主要问题:

    如何将get_query_results 的结果格式化为pandas 数据框? get_query_results 只返回 1000 行。我怎样才能用它来获得两百万行?

【问题讨论】:

我认为,如果您提供返回的“复杂字典”样本,可能会对回答您的问题的人有所帮助。任何敏感数据都可以被编辑,因为它主要是重要的数据结构。此外,pandas 还提供DataFrame.from_dict()DataFrame.from_records()pandas.read_json()。还有其他的,但是在不知道数据结构的情况下很难确定使用哪个。此外,查看get_query_results() 的文档可能对您有所帮助。也许它需要参数,这意味着可以增加 1000 行的默认值。 谢谢@chillin 我明白你的意思了。放在这里的文字真的很长。让我们使用link 的基本结构作为参考并保留,但数据列表中的一个字段是 varchar,看起来像另一个字典。例如temperature=41.1 试试response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000),看看这次你能不能得到2000行。此外,假设可以通过单个请求返回的行数存在上限可能是合理的(尽管我在文档中找不到任何提及)。如果有上限,您需要做的就是解析 JSON 以响应 'NextToken' 键,并在下次调用 client.get_query_results() 时将其包含在内,您将有效地获得下一个 1000(或任何限制是)行。 文档状态 get_query_results() 返回一个 Python 字典,所以尝试 d = response['ResultSet']['Rows'],然后 df = pd.DataFrame.from_dict(d)。但是,如果 d 包含元数据(您不希望在最终 DataFrame 中出现的东西),您可能不会得到预期的 DataFrame。如果是这种情况,您可能需要从/变异d(使用 for 循环或其他逻辑)中提取/变异,以便它包含您想要的内容。此链接可能会有所帮助:pandas.pydata.org/pandas-docs/stable/generated/… 谢谢@chillin。至于最大限制,您可以在此link 中看到 1000 的限制。但我想您是对的,我必须找到一种使用 Next Token 的方法。 【参考方案1】:

我使用了 while 循环方法来解决这个问题,如果 NextToken 存在,我扩展 que 数据帧:

# Receive Query Results
# Method get_query_results() limits to max 1000, handled with while, and called NextToken.
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'])
results = query_results['ResultSet']['Rows']
while 'NextToken' in query_results:
    query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'], NextToken = query_results['NextToken'])
    results.extend(query_results['ResultSet']['Rows'])
    return results
return query_results['ResultSet']['Rows']

【讨论】:

请在您的代码 cmets 中使用英文。 SO是一个英语社区 欢迎,胡里奥。这个问题有六个现有的答案,包括一个接受了 24 票的答案。你确定你的方法还没有被建议吗?如果不是,为什么有人会更喜欢您的方法而不是提议的现有方法?您是否正在利用新功能?是否存在更适合您的方法的场景? 大家好,感谢您的提示,抱歉打扰了。【参考方案2】:

我的第一个问题有一个解决方案,使用以下函数

def results_to_df(results):
 
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
         values = []
         for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return listed_results

然后:

t = results_to_df(response)
pd.DataFrame(t)

至于我的第二个问题和@EricBellet 的请求,我还添加了我的分页方法,与在 S3 中加载 Athena 输出的结果相比,我发现这种方法效率低且时间更长:

def run_query(query, database, s3_output):
    ''' 
    Function for executing Athena queries and return the query ID 
    '''
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext=
            'Database': database
            ,
        ResultConfiguration=
            'OutputLocation': s3_output,
            
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response



def format_result(results):
    '''
    This function format the results toward append in the needed format.
    '''
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    formatted_results = []
 
    for result in results['ResultSet']['Rows'][0:]:
        values = []
        for field in result['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        formatted_results.append(
            dict(zip(columns, values))
        )
    return formatted_results



res = run_query(query_2, database, s3_ouput) #query Athena



import sys
import boto3

marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()

while True:
    paginator = client.get_paginator('get_query_results')
    response_iterator = paginator.paginate( 
        QueryExecutionId=query_id,
        PaginationConfig=
            'MaxItems': 1000,
            'PageSize': 1000,
            'StartingToken': marker)

    for page in response_iterator:
        i = i + 1
        format_page = format_result(page)
        if i == 1:
            formatted_results = pd.DataFrame(format_page)
        elif i > 1:
            formatted_results = formatted_results.append(pd.DataFrame(format_page))

    try:
        marker = page['NextToken']
    except KeyError:
        break

print ("My program took", time.time() - start_time, "to run")

它的格式不是很好,但我认为它可以完成工作......

2021 年更新

今天我使用 aws-data-wrangler 的自定义包装作为我几年前提出的原始问题的最佳解决方案。

import awswrangler as wr

def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
    """
    An end 2 end Athena query method, based on the AWS Wrangler package. 
    The method will execute a query and will return a pandas dataframe as an output.
    you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html

    Args:
        - query: SQL query.

        - database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).

        - ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.

        - categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.

        - chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.

        - s3_output (str, optional): Amazon S3 path.

        - workgroup (str, optional): Athena workgroup. 

        - keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False

        - profile (str, optional): aws account profile. if boto3_session profile will be ignored.

        - boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.

        - max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0

    Returns:
        - Pandas DataFrame

    """
    # test for boto3 session and profile.
    if ((boto3_session == None) & (profile != None)):
        boto3_session = boto3.Session(profile_name=profile, region_name=region_name)

    print("Quering AWS Athena...")

    try:
        # Retrieving the data from Amazon Athena
        athena_results_df = wr.athena.read_sql_query(
            query,
            database=database,
            boto3_session=boto3_session,
            categories=categories,
            chunksize=chunksize,
            ctas_approach=ctas_approach,
            s3_output=s3_output,
            workgroup=workgroup,
            keep_files=keep_files,
            max_cache_seconds=max_cache_seconds
        )

        print("Query completed, data retrieved successfully!")
    except Exception as e:
        print(f"Something went wrong... the error is:e")
        raise Exception(e)

    return athena_results_df

你可以阅读更多here

【讨论】:

【参考方案3】:

您可以使用 AWS Data Wrangler 创建 pandas 数据框,直接通过 Athena 进行查询。

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

您可以找到更多信息here

【讨论】:

优秀的图书馆,这确实是2021年最好的答案 这个的 MaxResults 大小是多少?【参考方案4】:

尝试这种方法,使用 columnMetadata 将响应 ['records'] 转换为数据框:

def results_to_df(response):
    columns = [
        col['label']
        for col in response['columnMetadata']
    ]

    listed_results = [[list(col.values())[0] if list(col.values())[0] else '' for col in 
    record] for record in response['records']]
    df = pd.DataFrame(listed_results, columns=columns)
    return df

【讨论】:

【参考方案5】:

也许你可以尝试使用 pandas read_sql 和 pyathena:

from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir='s3://bucket/folder',region_name='region')
df = pd.read_sql('select * from database.table', conn) #don't change the "database.table"

【讨论】:

【参考方案6】:

一个非常简单的解决方案是使用带有 boto3 Athena 分页器的列表推导式。然后可以简单地将列表理解传递到pd.DataFrame() 以创建一个这样的DataFrame,

pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
              results['ResultSet']['Rows']])

Boto3 Athena 到 Pandas 数据帧

import pandas as pd
import boto3

result = get_query_results( . . . ) # your code here

def cleanQueryResult(result) :
    '''
    This will take the dictionary of the raw Boto3 Athena results and turn it into a 
    2D array for further processing

    Parameters
    ----------
    result dict
        The dictionary from the boto3 Athena client function get_query_results

    Returns
    -------
    list(list())
        2D list which is essentially the table result. The first row is the column name.
    '''
    return [[data.get('VarCharValue') for data in row['Data']]
            for row in result['ResultSet']['Rows']]

# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))

数以百万计的结果

这需要一个分页器对象https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators

作为提示,以下是在每一页之后附加的方法

df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))

【讨论】:

要正确设置标题,你可以这样做clean_result = cleanQueryResult(result) df = pd.Dataframe(clean_result[1:], columns=clean_result[0]) 【参考方案7】:

get_query_results 只返回 1000 行。如何使用它将 200 万行放入 Pandas 数据框中?

如果你尝试添加:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

你会得到下一个错误:

调用时发生错误(InvalidRequestException) GetQueryResults 操作:MaxResults 超过允许的最大值 长度 1000。

如果您直接从存储桶 s3 获取文件(在下一个示例中为 Pandas Dataframe),则可以获得数百万行:

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

self.filename 可以是:

self.filename = response['QueryExecutionId'] + ".csv"

因为 Athena 将文件命名为 QueryExecutionId。我将为您编写所有接受查询并返回包含所有行和列的数据框的代码。

import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext=
                    'Database': self.database
                    ,
                    ResultConfiguration=
                    'OutputLocation': self.s3_output,
                    
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response                

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)      

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  


if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()

【讨论】:

嗨@EricBellet,“如果您直接从存储桶s3(在下一个示例中为Pandas Dataframe)获取文件,您可以获得数百万行” - 这正是我最终所做的。我还可以说,当涉及到 200 万行时,我比较了这种方法 VS 分页和从 S3 加载结果之间的性能,就像我的情况一样...... 嗨@NivCohen,你有200万行分页吗?你有代码示例要分享吗? 嗨@EricBellet,我已经用分页示例更新了我上面的答案,该示例在200万行的情况下对我有用。我必须恢复它并适合它,所以它的格式不太好。我希望这会有所帮助... 这绝对是最好的答案,这对我来说非常有用,并且可以用更简单的代码处理任何数量的数据。感谢您将其放入一个类中,以便轻松整合解决方案。 @EricBellet:顺便说一句很好的答案

以上是关于如何使用 Boto3 get_query_results 方法从 AWS Athena 创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

AWS:如何使用 Boto3 从实例创建公共 AMI?

如何使用 boto3 从连接到 ECS 任务的网络接口获取公共 IP

如何使用 boto3 获取我拥有的 EBS 快照列表?

如何在 Python 中使用 boto3 模块检查 Redshift 的集群状态?

如何使用 boto3 将 S3 对象保存到文件中

如何在 boto3 中设置 executionRoleArn 的值?