将 AWS Lambda 数据推送到 Kinesis Stream

Posted

技术标签:

【中文标题】将 AWS Lambda 数据推送到 Kinesis Stream【英文标题】:Pushing AWS Lambda data to Kinesis Stream 【发布时间】:2016-10-31 21:44:19 【问题描述】:

有没有办法将数据从 Lambda 函数推送到 Kinesis 流?我搜索了互联网,但没有找到任何与之相关的示例。

谢谢。

【问题讨论】:

【参考方案1】:

是的,您可以将信息从 Lambda 发送到 Kinesis Stream,这非常简单。确保您以正确的权限运行 Lambda。

    创建一个名为 kinesis.js 的文件,该文件将提供一个“保存”功能,用于接收有效负载并将其发送到 Kinesis Stream。我们希望能够在任何我们想向流中发送数据的地方包含这个“保存”功能。代码:

const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis(
  apiVersion: kinesisConstant.API_VERSION, //optional
  //accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
  //secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
  region: kinesisConstant.REGION
);

const savePayload = (payload) => 
//We can only save strings into the streams
  if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) 
    try 
      payload = JSON.stringify(payload);
     catch (e) 
      console.log(e);
    
  

  let params = 
    Data: payload,
    PartitionKey: kinesisConstant.PARTITION_KEY,
    StreamName: kinesisConstant.STREAM_NAME
  ;

  kinesis.putRecord(params, function(err, data) 
    if (err) console.log(err, err.stack);
    else     console.log('Record added:',data);
  );
;

exports.save = (payload) => 
  const params = 
    StreamName: kinesisConstant.STREAM_NAME,
  ;

  kinesis.describeStream(params, function(err, data) 
    if (err) console.log(err, err.stack);
    else 
      //Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
      if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
        || data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) 
        savePayload(payload);
       else 
        console.log(`Kinesis stream $kinesisConstant.STREAM_NAME is $data.StreamDescription.StreamStatus.`);
        console.log(`Record Lost`, JSON.parse(payload));
      
    
  );
;
    创建一个 kinesisConstant.js 文件以保持一致:)

module.exports = 
  STATE: 
    ACTIVE: 'ACTIVE',
    UPDATING: 'UPDATING',
    CREATING: 'CREATING',
    DELETING: 'DELETING'
  ,
  STREAM_NAME: '<your-stream-name>',
  PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
  PAYLOAD_TYPE: 'String',
  REGION: '<the-region-where-you-have-lambda-and-kinesis>',
  API_VERSION: '2013-12-02'
    您的处理程序文件:我们添加了“完成”函数来向想要将数据发送到流的任何人发送响应,但“kinesis.save(event)”完成了所有工作。

const kinesis = require('./kinesis');

exports.handler = (event, context, callback) => 
  console.log('LOADING handler');
  
  const done = (err, res) => callback(null, 
    statusCode: err ? '400' : '200',
    body: err || res,
    headers: 
      'Content-Type': 'application/json',
    ,
  );
  
  kinesis.save(event); // here we send it to the stream
  done(null, event);

【讨论】:

很好的例子,对我帮助很大。请纠正错字(小) - kinesisConstants.js - 你错过了文件名中的最后一个“s” 这是一个很好的例子。但我不禁想知道,在这里使用 Kinesis 有什么好处。如果您将数据传递给 Lambda,为什么不直接推送到存储(例如:Elastic Search)?当大多数 AWS 存储工具都有自己的 Node 库时,为什么还要使用第三方工具?性能更高?【参考方案2】:

这应该完全像在您的计算机上那样完成。

这是nodejs中的一个例子:

let aws = require('aws');
let kinesis = new aws.Kinesis();

// data that you'd like to send
let data_object =  "some": "properties" ;
let data = JSON.stringify(data_object);

// push data to kinesis
const params = 
  Data: data,
  PartitionKey: "1",
  StreamName: "stream name"


kinesis.putRecord(params, (err, data) => 
  if (err) console.error(err);
  else console.log("data sent");

请注意,这段代码不会工作,因为Lambda 对您的信息流没有任何权限。 通过Lambda访问AWS资源时,最好使用IAM角色;

    配置新Lambda时,可以选择现有/创建角色。 转到IAM,然后转到角色,然后选择您分配给Lambda 函数的角色名称。 添加相关权限(putRecordputRecords)。

然后,测试Lambda

【讨论】:

【参考方案3】:

是的,这可以做到,我试图完成同样的事情,并且能够在 Lambda 中使用 Node.js 4.3 运行时做到这一点,并且它也适用于 6.10 版。

代码如下:

在您的 Lambda 函数顶部声明以下内容:

var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();
function writeKinesis(rawdata)
    data = JSON.stringify(rawdata);
    params = Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>";
    kinesis.putRecord(params, (err, data) => 
    if (err) console.error(err);
    else console.log("data sent");
    );  

现在,在exports.handler 中调用函数:

writeKinesis(<YOUR_DATA>);

需要注意的几点... Kinesis 要摄取数据,必须对其进行编码。在下面的示例中,我具有从 CloudWatch 获取日志并将它们发送到 Kinesis 流的函数。

请注意,我将 buffer.toString('utf8') 的内容插入到 writeKinesis 函数中:

exports.handler = function(input, context) 
    ...
    var zippedInput = new Buffer(input.awslogs.data, 'base64');
    zlib.gunzip(zippedInput, function(error, buffer) 
        ...
        writeKinesis(buffer.toString('utf8'));
        ...
    
    ...

最后,在 IAM 中配置适当的权限。您的 Lambda 函数必须在包含以下以下权限的 IAM 角色的上下文中运行。就我而言,我只是修改了默认的 lambda_elasticsearch_execution 角色,以包含一个名为“lambda_kinesis_execution”的策略,代码如下:

"Effect": "Allow",
"Action": [
    "kinesis:*"
],
"Resource": [
    "<YOUR_STREAM_ARN>"
]

【讨论】:

以上是关于将 AWS Lambda 数据推送到 Kinesis Stream的主要内容,如果未能解决你的问题,请参考以下文章

AWS Amplify CLI:将资源推送到云时出错

如何使用 AWS Kinesis Firehose 将嵌套结构推送到 Redshift

如何将数据从本地 SQL Server 推送到 AWS 上的 Tableau Server

我可以使用 AWS Glue 将 S3 上的 json 数据转换为列格式并将其推送到 Redshift 吗?

我可以使用AWS Glue将S3上的json数据转换为柱状格式并将其推送到Redshift吗?

如何使 AWS ALB 将请求源发送到 lambda