将镶木地板从 AWS Kinesis firehose 写入 AWS S3
Posted
技术标签:
【中文标题】将镶木地板从 AWS Kinesis firehose 写入 AWS S3【英文标题】:Write parquet from AWS Kinesis firehose to AWS S3 【发布时间】:2018-01-07 21:22:16 【问题描述】:我想将 kinesis firehose 中的数据提取到 s3 中,格式为镶木地板。到目前为止,我刚刚找到了一个暗示创建 EMR 的解决方案,但我正在寻找更便宜、更快的方法,比如直接从 firehose 将接收到的 json 存储为镶木地板或使用 Lambda 函数。
非常感谢, 哈维。
【问题讨论】:
【参考方案1】:Amazon Kinesis Firehose 接收流式记录并可以将它们存储在 Amazon S3(或 Amazon Redshift 或 Amazon Elasticsearch Service)中。
每条记录最大可达 1000KB。
但是,记录会一起附加到文本文件中,并根据时间或大小进行批处理。传统上,记录是 JSON 格式。
您将无法发送镶木地板文件,因为它不符合此文件格式。
可以触发 Lambda 数据转换函数,但这也不能输出 parquet 文件。
事实上,鉴于 parquet 文件的性质,您不太可能一次构建一条记录。作为一种列式存储格式,我怀疑它们确实需要批量创建,而不是为每条记录附加数据。
底线:不。
【讨论】:
嗨@Javi,如果这个或任何答案解决了您的问题,请点击复选标记考虑accepting it。这向更广泛的社区表明您已经找到了解决方案,并为回答者和您自己提供了一些声誉。没有义务这样做。 @JohnRotenstein 您能否让 lambda 对 Firehose 中的每个缓冲时间/大小批次进行转换,然后每隔几个小时左右将 Parquet 文件连接到更大的大小?这使您可以通过 Firehose 将 JSON 流式传输到 Parquet,以在 Athena 中获取近乎实时的数据,同时仍然获得 Parquet 的性能优势。 @cmclen,Parquet 是一种柱状文件格式。我认为你不能一次只追加一行——这会破坏使用 Parquet 的目的。 @JohnRotenstein 您不能(直到 12 天前:cf Vlad 的回答)依赖 Firehose 将转换后的数据为您转储到 S3,但您可以使用 S3FS 或就像布拉卡纳指出的那样。如果您希望 Firehose 显示为已成功,则只需为 Firehose 返回格式正确的行(通常只需添加一个 processes_at 时间戳并按原样返回输入行)。如果您不依赖pandas,也可以直接在lambda中执行此操作,因为pandas的库太大而无法将其打包到Lambda中(最大50MB)。【参考方案2】:在处理了 AWS 支持服务和一百种不同的实施之后,我想解释一下我所取得的成就。
最后,我创建了一个 Lambda 函数,它处理 Kinesis Firehose 生成的每个文件,根据负载对我的事件进行分类,并将结果存储在 S3 中的 Parquet 文件中。
做到这一点并不容易:
首先,您应该创建一个 Python 虚拟环境,包括所有必需的库(在我的例子中是 Pandas、NumPy、Fastparquet 等)。 由于生成的文件(包括所有库和我的 Lambda 函数很重,需要启动一个 EC2 实例,我使用了免费套餐中包含的那个)。要创建虚拟环境,请按照以下步骤操作:
在 EC2 中登录 创建一个名为 lambda(或任何其他名称)的文件夹 须藤 yum -y 更新 须藤 yum -y 升级 sudo yum -y groupinstall "开发工具" sudo yum -y install blas sudo yum -y install lapack sudo yum -y install atlas-sse3-devel sudo yum install python27-devel python27-pip gcc 虚拟环境 源环境/bin/激活 pip install boto3 pip install fastparquet 点安装熊猫 pip install thriftpy 点安装 s3fs pip install(任何其他需要的库) 找到 ~/lambda/env/lib*/python2.7/site-packages/ -name "*.so" | xargs 条 推送 env/lib/python2.7/site-packages/ zip -r -9 -q ~/lambda.zip * 流行音乐 推送 env/lib64/python2.7/site-packages/ zip -r -9 -q ~/lambda.zip * 流行音乐正确创建 lambda_function:
import json
import boto3
import datetime as dt
import urllib
import zlib
import s3fs
from fastparquet import write
import pandas as pd
import numpy as np
import time
def _send_to_s3_parquet(df):
s3_fs = s3fs.S3FileSystem()
s3_fs_open = s3_fs.open
# FIXME add something else to the key or it will overwrite the file
key = 'mybeautifullfile.parquet.gzip'
# Include partitions! key1 and key2
write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
compression='GZIP',open_with=s3_fs_open)
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
try:
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
decoded = data.decode('utf-8')
lines = decoded.split('\n')
# Do anything you like with the dataframe (Here what I do is to classify them
# and write to different folders in S3 according to the values of
# the columns that I want
df = pd.DataFrame(lines)
_send_to_s3_parquet(df)
except Exception as e:
print('Error getting object from bucket .'.format(key, bucket))
raise e
将 lambda 函数复制到 lambda.zip 并部署 lambda_function:
返回您的 EC2 实例并将所需的 lambda 函数添加到 zip:zip -9 lambda.zip lambda_function.py(lambda_function.py 是第 2 步中生成的文件) 将生成的 zip 文件复制到 S3,因为不通过 S3 进行部署就非常繁重。 aws s3 cp lambda.zip s3://support-bucket/lambda_packages/ 部署 lambda 函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip在你喜欢的时候触发要执行的,例如,每次在 S3 中创建一个新文件时,甚至你可以将 lambda 函数关联到 Firehose。 (我没有选择这个选项,因为 'lambda' 限制低于 Firehose 限制,您可以将 Firehose 配置为每 128Mb 或 15 分钟写入一个文件,但如果您将此 lambda 函数关联到 Firehose,则会执行 lambda 函数每 3 分钟或 5MB,在我的情况下,我遇到了生成很多小拼花文件的问题,因为每次启动 lambda 函数时,我都会生成至少 10 个文件)。
【讨论】:
我是否正确理解此管道每条记录创建一个镶木地板文件? Parquet 是一种列式存储,那么是否需要某种单独的压缩作业来将这些小 Parquet 文件协调为一个更大的文件? 嗨@Tagar,每次调用lamba_handler时它都会写入一个parquet文件并且可以配置,例如,您可以将其配置为每15分钟启动一次,每个文件都会创建一个文件15 分钟内收到的所有事件。【参考方案3】:好消息,这个功能今天发布了!
Amazon Kinesis Data Firehose 可以转换您输入数据的格式 从 JSON 到 Apache Parquet 或 Apache ORC,然后将数据存储在 亚马逊 S3。 Parquet 和 ORC 是节省空间的柱状数据格式 并启用更快的查询
要启用,请转到您的 Firehose 流并单击 编辑。您应该会看到 记录格式转换 部分,如下图所示:
详见文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html
【讨论】:
以上是关于将镶木地板从 AWS Kinesis firehose 写入 AWS S3的主要内容,如果未能解决你的问题,请参考以下文章
如何将镶木地板文件从 s3 导入到 postgresql rds
如何将镶木地板格式的特定列加载到 Redshift 光谱中?