如何通过 Kinesis 将数据从 Lambda (Python) 发送到 Redshift

Posted

技术标签:

【中文标题】如何通过 Kinesis 将数据从 Lambda (Python) 发送到 Redshift【英文标题】:How to send data from Lambda (Python) to Redshift through Kinesis 【发布时间】:2019-11-13 07:47:12 【问题描述】:

我在 Python 中有一个 lambda 函数,它可以生成一些东西并返回一些需要插入到 Redshift 中的值。在 lambda 中,我将值推送到 Kinesis,后者将它们复制到 S3 中,然后复制到 Redshift 中。

lambda 中的值是以字符串形式获取的,如下所示:

final_string = 'a;b;d;c'

每个字母是 Redshift 表中不同列的值,因此分隔符是“;”。然后我将数据推送到 Kinesis Stream:

put_response = kinesis_client.put_record(StreamName = 'PixelTrack',
                                            Data=json.dumps(final_string),
                                           PartitionKey='first')

然后,Kinesis 流向 Kinesis Firehose 流提供数据。使用 Kinesis Firehose 在 S3 中生成的文件类似于(包括文件中的引号):

"a;b;c;d;c" 

最后,我使用以下语句(在 Kinesis firehose 中配置)将数据复制到 redshift:

copy table
from blabla
BLANKSASNULL 
DELIMITER ';' 
EMPTYASNULL 
NULL AS 'null' 
ESCAPE 
FILLRECORD;

当 Kinesis 中仅缓冲一个结果时,我已设法使其工作并获取 Redshift 中的值(不过,在 Redshift 中创建了一个新列)。因此,当在缓冲时间内只执行了一个 lambda 时,Redshift 表如下所示:

  A        B         C         D     no_info_column
  "a       b         c         d"        <null>

当我多次执行 lambda 时出现问题,因为我在 S3 中得到一个文件,其中包含以下文本:

"a,b,c,d" "a1,b1,c1,d1"

我在 Redshift 中收到错误 Extra column(s) found,因为复制语句无法找到行分隔符。

我尝试了以下方法但没有成功:

在 lambda 中返回字符串 搜索如何在副本中设置行分隔符 (SO question) 将列表转换为 json 而不是字符串。然后我在打开列表时遇到了括号问题 在复制语句中使用 REMOVEQUOTES

我最初的问题是:“如何从 s3 复制到 redshift,不同的行用双引号分隔”,但问题可能出在我的第一种方法中,所以我决定稍微提一下这个问题更广泛一点。

那么,我该如何解决呢?

【问题讨论】:

为什么不去掉 kinesis 直接从你的 lambda 插入 Redshift 呢?您可以在 python 中以编程方式轻松连接到 Redshift,这将为您节省大量时间和金钱 您使用的是 Kinesis Streams 还是 Kinesis Firehose?如果您要发送到 Redshift,那么更简单的方法是使用 Kinesis Firehose,因为它会为您处理整个过程,自动缓冲并将内容发送到 Redshift。 我创建了一个 Kinesis Stream,它将信息发送到 Kinesis Firehose(也许这就是问题所在?)。我不是直接从 lambda 插入,因为我问过并且被告知插入语句效率非常低,我们谈论的是每天需要插入 400 万行,这最终可能会大大降低数据库速度 @John Rotenstein 你给了我使用 Kinesis Firehose 而不是两者的想法,所以如果你愿意,你可以回答这个问题 【参考方案1】:

如果您希望将流数据发送到 Amazon Redshift,您可以使用 Amazon Kinesis Data Firehose。它具有基于大小 (MB) 或时间(秒)的内置数据缓冲,用于批量写入 Amazon Redshift。

您说得对,在 Redshift 上执行小型 INSERT 操作并不理想。批量加载数据要好得多。因此,如果您需要连续加载数据,Kinesis Data Firehose 可以提供最佳的性能组合。

您提到“kinesis 流为 Kinesis Firehose 流提供食物”。随意直接从 AWS Lambda 函数写入 Kinesis Data Firehose。

【讨论】:

以上是关于如何通过 Kinesis 将数据从 Lambda (Python) 发送到 Redshift的主要内容,如果未能解决你的问题,请参考以下文章

通过 Cloudformation 模板将 Lambda 函数添​​加到 Kinesis Firehose

kinesis代理到lambda,如何获取原始文件和服务器

来自 API 网关 VS kinesis Streams 的 Lambda

任何人在使用 AWS kinesis 流、lambda 和 firehose 时都遇到过数据丢失的情况?

Node.js中的代码AWS Lambda Package不会调用putRecord()来将数据添加到AWS Kinesis Firehose Stream中

如何暂停/恢复aws lambda函数