AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称

Posted

技术标签:

【中文标题】AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称【英文标题】:AWS Lambda - How to get the topic name of data coming from AWS IOT 【发布时间】:2017-09-21 07:15:25 【问题描述】:

我正在使用 AWS IOT 源测试 AWS Lambda。我的 mqtt 客户端在不同的主题中发布:设备 A 将数据发布到 streaming/A,设备 B 将数据发布到 streaming/B 所以在 AWS Lambda 中我定义了一个 SQL 规则,选择来自主题 streaming/+ 的所有设备。问题是现在我没有设备来源的信息,因为我只有一个带有额外信息的Array[Byte]]。如果有人有解决方案可以访问带有主题信息的mqtt有效负载,我会接受!

import java.io.ByteArrayOutputStream, InputStream, OutputStream
import com.amazonaws.services.lambda.runtime.Context, RequestStreamHandler
/**
  * Created by alifirat on 24/04/17.
  */
class IOTConsumer extends RequestStreamHandler 

  val BUFFER_SIZE = 1024 * 4

  override def handleRequest(input: InputStream, output: OutputStream, context: Context): Unit = 
    val bytes = toByteArray(input)
    val logger= context.getLogger
    logger.log("Receive following thing :"  + new String(bytes))
    output.write(bytes)
  

   /**
     * Reads and returns the rest of the given input stream as a byte array.
     * Caller is responsible for closing the given input stream.
     */
   def toByteArray(is : InputStream) : Array[Byte] = 
     val output = new ByteArrayOutputStream()
     try 
       val b = new Array[Byte](BUFFER_SIZE);
       var n = 0
       var flag = true
       while(flag) 
         n = is.read(b)
         if(n == -1) flag = false
         else 
           output.write(b, 0, n)
         
       
       output.toByteArray();
      finally 
       output.close();
       Array[Byte]()
     
   

【问题讨论】:

您遵循什么指示? github.com/aws/aws-iot-device-sdk-java 似乎相关 我关注这个:docs.aws.amazon.com/iot/latest/developerguide/… 但在文档中,未指定事件类型(通常是 node.js),但我使用的是 Scala/Java,所以我需要一个类型。此类型必须允许访问数据源 mqtt 主题。 啊,好的。但是为什么你认为你需要处理java.io.InputStream?那个代码是从哪里来的?该文档似乎表明 mqtt 消息是 JSON。 因为我在 AWS IOT 中发送字节数组,所以我希望检索这个字节数组作为 AWS Lambda 的输入。 哦,好的。我不知道你能做到这一点。我对 IoT 或 mqtt 一无所知。听起来您在低级别使用 Lambda 和 SNS 来传递字节。我想这就是为什么您无法访问任何元信息的原因。仅当您使用 JSON 或 POJO 时才可用。 【参考方案1】:

我一直在寻找同样的东西,有一种方法可以实现。在构建 SQL 时,您可以使用 topic() 函数来获取消息发送到的主题。这样你就可以放入属性部分

*, topic() as topic

所以您的最终 SQL 将如下所示:

SELECT *, topic() as topic FROM one/of/my/+/topics

然后,您的有效负载将包含一个新的属性主题,您可以在 lambda 函数中对其进行解析。更多关于这个https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html

【讨论】:

感谢您的解决方案,它确实有效,如果您可以发布指向您找到的文档的链接,非常感谢。 @v1shva 在我的回答中添加了指向文档的链接【参考方案2】:

如果您的触发器是 SNS 消息,那么我只会读取 JSON。这将在 Scala 中工作:

import com.amazonaws.services.lambda.runtime.events.SNSEvent

import scala.collection.JavaConverters._

object Example extends LambdaApp  

  /** Convert Java lists (or nulls!) to Scala lists */
  def safeList[A](xs: java.util.List[A]) =
    Option(xs).map(_.asScala).getOrElse(List.empty[A])

  /** Install the handler in AWS Lambda as `Example::handler`. */
  def handler(e: SNSEvent)  = 

    val rs = for 
      r <- safeList(e.getRecords)
     yield 
      r.getSNS.getMessage
    
    rs.asJava // Convert Scala list to Java.
  

您需要在 build.sbt 中有以下依赖项:

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
  "com.amazonaws" % "aws-lambda-java-events" % "1.3.0"
)

如果您对 SNS 主题名称感兴趣,可以从以下网址获取:

r.EventSubscriptionArn

AWS Lambda JDK 库使用 jackson-core 为您解析 SNS 消息的 JSON。

【讨论】:

我的触发器是一条 AWS IOT 规则:我定义了一条来自 AWS IOT 并应用 AWS Lambda 的规则转发消息,因此它不是 SNS 消息。

以上是关于AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称的主要内容,如果未能解决你的问题,请参考以下文章

来自 Lambda 的 AWS Cognito adminCreateUser,使用 Amplify CLI 创建

当从 s3 获取对象时,aws lambda 函数被拒绝访问

当 AWS AppSync 使用 Cognito 时如何验证 Lambda 生成的突变

如何在放大中捕获来自 aws cognito pre signup lambda 的错误?

AWS Lambda、API 网关和 Cognito:如何在 lambda 函数中获取身份对象?

AWS CloudFormation:如何为 Lambda 代码指定来自另一个 AWS 账户的存储桶?