将 AWS Lambda 数据推送到 Kinesis Stream
Posted
技术标签:
【中文标题】将 AWS Lambda 数据推送到 Kinesis Stream【英文标题】:Pushing AWS Lambda data to Kinesis Stream 【发布时间】:2016-10-31 21:44:19 【问题描述】:有没有办法将数据从 Lambda 函数推送到 Kinesis 流?我搜索了互联网,但没有找到任何与之相关的示例。
谢谢。
【问题讨论】:
【参考方案1】:是的,您可以将信息从 Lambda 发送到 Kinesis Stream,这非常简单。确保您以正确的权限运行 Lambda。
-
创建一个名为 kinesis.js 的文件,该文件将提供一个“保存”功能,用于接收有效负载并将其发送到 Kinesis Stream。我们希望能够在任何我们想向流中发送数据的地方包含这个“保存”功能。代码:
const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis(
apiVersion: kinesisConstant.API_VERSION, //optional
//accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
//secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
region: kinesisConstant.REGION
);
const savePayload = (payload) =>
//We can only save strings into the streams
if( typeof payload !== kinesisConstant.PAYLOAD_TYPE)
try
payload = JSON.stringify(payload);
catch (e)
console.log(e);
let params =
Data: payload,
PartitionKey: kinesisConstant.PARTITION_KEY,
StreamName: kinesisConstant.STREAM_NAME
;
kinesis.putRecord(params, function(err, data)
if (err) console.log(err, err.stack);
else console.log('Record added:',data);
);
;
exports.save = (payload) =>
const params =
StreamName: kinesisConstant.STREAM_NAME,
;
kinesis.describeStream(params, function(err, data)
if (err) console.log(err, err.stack);
else
//Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
|| data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING )
savePayload(payload);
else
console.log(`Kinesis stream $kinesisConstant.STREAM_NAME is $data.StreamDescription.StreamStatus.`);
console.log(`Record Lost`, JSON.parse(payload));
);
;
-
创建一个 kinesisConstant.js 文件以保持一致:)
module.exports =
STATE:
ACTIVE: 'ACTIVE',
UPDATING: 'UPDATING',
CREATING: 'CREATING',
DELETING: 'DELETING'
,
STREAM_NAME: '<your-stream-name>',
PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
PAYLOAD_TYPE: 'String',
REGION: '<the-region-where-you-have-lambda-and-kinesis>',
API_VERSION: '2013-12-02'
-
您的处理程序文件:我们添加了“完成”函数来向想要将数据发送到流的任何人发送响应,但“kinesis.save(event)”完成了所有工作。
const kinesis = require('./kinesis');
exports.handler = (event, context, callback) =>
console.log('LOADING handler');
const done = (err, res) => callback(null,
statusCode: err ? '400' : '200',
body: err || res,
headers:
'Content-Type': 'application/json',
,
);
kinesis.save(event); // here we send it to the stream
done(null, event);
【讨论】:
很好的例子,对我帮助很大。请纠正错字(小) - kinesisConstants.js - 你错过了文件名中的最后一个“s” 这是一个很好的例子。但我不禁想知道,在这里使用 Kinesis 有什么好处。如果您将数据传递给 Lambda,为什么不直接推送到存储(例如:Elastic Search)?当大多数 AWS 存储工具都有自己的 Node 库时,为什么还要使用第三方工具?性能更高?【参考方案2】:这应该完全像在您的计算机上那样完成。
这是nodejs
中的一个例子:
let aws = require('aws');
let kinesis = new aws.Kinesis();
// data that you'd like to send
let data_object = "some": "properties" ;
let data = JSON.stringify(data_object);
// push data to kinesis
const params =
Data: data,
PartitionKey: "1",
StreamName: "stream name"
kinesis.putRecord(params, (err, data) =>
if (err) console.error(err);
else console.log("data sent");
请注意,这段代码不会工作,因为Lambda
对您的信息流没有任何权限。
通过Lambda
访问AWS
资源时,最好使用IAM
角色;
-
配置新
Lambda
时,可以选择现有/创建角色。
转到IAM
,然后转到角色,然后选择您分配给Lambda
函数的角色名称。
添加相关权限(putRecord
、putRecords
)。
然后,测试Lambda
。
【讨论】:
【参考方案3】:是的,这可以做到,我试图完成同样的事情,并且能够在 Lambda 中使用 Node.js 4.3 运行时做到这一点,并且它也适用于 6.10 版。
代码如下:
在您的 Lambda 函数顶部声明以下内容:
var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();
function writeKinesis(rawdata)
data = JSON.stringify(rawdata);
params = Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>";
kinesis.putRecord(params, (err, data) =>
if (err) console.error(err);
else console.log("data sent");
);
现在,在exports.handler 中调用函数:
writeKinesis(<YOUR_DATA>);
需要注意的几点... Kinesis 要摄取数据,必须对其进行编码。在下面的示例中,我具有从 CloudWatch 获取日志并将它们发送到 Kinesis 流的函数。
请注意,我将 buffer.toString('utf8') 的内容插入到 writeKinesis 函数中:
exports.handler = function(input, context)
...
var zippedInput = new Buffer(input.awslogs.data, 'base64');
zlib.gunzip(zippedInput, function(error, buffer)
...
writeKinesis(buffer.toString('utf8'));
...
...
最后,在 IAM 中配置适当的权限。您的 Lambda 函数必须在包含以下以下权限的 IAM 角色的上下文中运行。就我而言,我只是修改了默认的 lambda_elasticsearch_execution 角色,以包含一个名为“lambda_kinesis_execution”的策略,代码如下:
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": [
"<YOUR_STREAM_ARN>"
]
【讨论】:
以上是关于将 AWS Lambda 数据推送到 Kinesis Stream的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 AWS Kinesis Firehose 将嵌套结构推送到 Redshift
如何将数据从本地 SQL Server 推送到 AWS 上的 Tableau Server
我可以使用 AWS Glue 将 S3 上的 json 数据转换为列格式并将其推送到 Redshift 吗?