AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称
Posted
技术标签:
【中文标题】AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称【英文标题】:AWS Lambda - How to get the topic name of data coming from AWS IOT 【发布时间】:2017-09-21 07:15:25 【问题描述】:我正在使用 AWS IOT 源测试 AWS Lambda。我的 mqtt 客户端在不同的主题中发布:设备 A 将数据发布到 streaming/A
,设备 B 将数据发布到 streaming/B
所以在 AWS Lambda 中我定义了一个 SQL 规则,选择来自主题 streaming/+
的所有设备。问题是现在我没有设备来源的信息,因为我只有一个带有额外信息的Array[Byte]]
。如果有人有解决方案可以访问带有主题信息的mqtt有效负载,我会接受!
import java.io.ByteArrayOutputStream, InputStream, OutputStream
import com.amazonaws.services.lambda.runtime.Context, RequestStreamHandler
/**
* Created by alifirat on 24/04/17.
*/
class IOTConsumer extends RequestStreamHandler
val BUFFER_SIZE = 1024 * 4
override def handleRequest(input: InputStream, output: OutputStream, context: Context): Unit =
val bytes = toByteArray(input)
val logger= context.getLogger
logger.log("Receive following thing :" + new String(bytes))
output.write(bytes)
/**
* Reads and returns the rest of the given input stream as a byte array.
* Caller is responsible for closing the given input stream.
*/
def toByteArray(is : InputStream) : Array[Byte] =
val output = new ByteArrayOutputStream()
try
val b = new Array[Byte](BUFFER_SIZE);
var n = 0
var flag = true
while(flag)
n = is.read(b)
if(n == -1) flag = false
else
output.write(b, 0, n)
output.toByteArray();
finally
output.close();
Array[Byte]()
【问题讨论】:
您遵循什么指示? github.com/aws/aws-iot-device-sdk-java 似乎相关 我关注这个:docs.aws.amazon.com/iot/latest/developerguide/… 但在文档中,未指定事件类型(通常是 node.js),但我使用的是 Scala/Java,所以我需要一个类型。此类型必须允许访问数据源 mqtt 主题。 啊,好的。但是为什么你认为你需要处理java.io.InputStream
?那个代码是从哪里来的?该文档似乎表明 mqtt 消息是 JSON。
因为我在 AWS IOT 中发送字节数组,所以我希望检索这个字节数组作为 AWS Lambda 的输入。
哦,好的。我不知道你能做到这一点。我对 IoT 或 mqtt 一无所知。听起来您在低级别使用 Lambda 和 SNS 来传递字节。我想这就是为什么您无法访问任何元信息的原因。仅当您使用 JSON 或 POJO 时才可用。
【参考方案1】:
我一直在寻找同样的东西,有一种方法可以实现。在构建 SQL 时,您可以使用 topic() 函数来获取消息发送到的主题。这样你就可以放入属性部分
*, topic() as topic
所以您的最终 SQL 将如下所示:
SELECT *, topic() as topic FROM one/of/my/+/topics
然后,您的有效负载将包含一个新的属性主题,您可以在 lambda 函数中对其进行解析。更多关于这个https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html
【讨论】:
感谢您的解决方案,它确实有效,如果您可以发布指向您找到的文档的链接,非常感谢。 @v1shva 在我的回答中添加了指向文档的链接【参考方案2】:如果您的触发器是 SNS 消息,那么我只会读取 JSON。这将在 Scala 中工作:
import com.amazonaws.services.lambda.runtime.events.SNSEvent
import scala.collection.JavaConverters._
object Example extends LambdaApp
/** Convert Java lists (or nulls!) to Scala lists */
def safeList[A](xs: java.util.List[A]) =
Option(xs).map(_.asScala).getOrElse(List.empty[A])
/** Install the handler in AWS Lambda as `Example::handler`. */
def handler(e: SNSEvent) =
val rs = for
r <- safeList(e.getRecords)
yield
r.getSNS.getMessage
rs.asJava // Convert Scala list to Java.
您需要在 build.sbt 中有以下依赖项:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
"com.amazonaws" % "aws-lambda-java-events" % "1.3.0"
)
如果您对 SNS 主题名称感兴趣,可以从以下网址获取:
r.EventSubscriptionArn
AWS Lambda JDK 库使用 jackson-core 为您解析 SNS 消息的 JSON。
【讨论】:
我的触发器是一条 AWS IOT 规则:我定义了一条来自 AWS IOT 并应用 AWS Lambda 的规则转发消息,因此它不是 SNS 消息。以上是关于AWS Lambda - 如何获取来自 AWS IOT 的数据的主题名称的主要内容,如果未能解决你的问题,请参考以下文章
来自 Lambda 的 AWS Cognito adminCreateUser,使用 Amplify CLI 创建
当从 s3 获取对象时,aws lambda 函数被拒绝访问
当 AWS AppSync 使用 Cognito 时如何验证 Lambda 生成的突变
如何在放大中捕获来自 aws cognito pre signup lambda 的错误?