Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据
Posted
技术标签:
【中文标题】Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据【英文标题】:Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data 【发布时间】:2019-12-16 09:32:21 【问题描述】:场景
我有一个传感器节点,它发布有关特定 MQTT 主题的信息(发送到 Mosquitto 代理)。发送的数据是纯字符串。
后端
目前我正在使用apollo-server-express
构建一个 GraphQL 服务器。我希望使用`graphql-mqtt-subscriptions 来:
graphiql
UI
dependencies
"dependencies":
"apollo-server-express": "^2.8.1",
"express": "^4.17.1",
"graphql": "^14.4.2",
"graphql-mqtt-subscriptions": "^1.1.0",
"graphql-subscriptions": "^1.1.0",
"graphql-tools": "^4.0.5",
"mqtt": "^3.0.0",
"subscriptions-transport-ws": "^0.9.16"
,
代码片段
入口点server.js
代码:
import express from 'express';
import ApolloServer from 'apollo-server-express';
import typeDefs from './graphql/schema';
import resolvers from './graphql/resolvers';
import createServer from 'http';
const server = new ApolloServer( typeDefs, resolvers);
const app = express();
server.applyMiddleware( app );
const httpServer = createServer(app);
server.installSubscriptionHandlers(httpServer);
httpServer.listen(port: 4000, () =>
console.log(`???? Server ready at http://localhost:4000/$server.graphqlPath`)
console.log(`???? Subscriptions ready at ws://localhost:4000/$server.subscriptionsPath`)
);
GraphQL 的 typeDefs
架构如下:
type Result
data: String
type Subscription
siteAdded(topic: String): Result
schema
query: Query
mutation: Mutation
subscription: Subscription
siteAdded(topic: String)
将获取 MQTT 需要订阅的主题。示例:
subscription
siteAdded(topic: "test/1/env")
data
resolvers.js
如下所示(如 5 月文档中所述):
import MQTTPubSub from 'graphql-mqtt-subscriptions';
import connect from 'mqtt';
const client = connect('mqtt://my.mqtt.broker.ip.address',
reconnectPeriod: 1000,
);
const pubsub = new MQTTPubSub(
client
);
export const resolvers:
Subscription:
siteAdded:
subscribe: (_, args) =>
console.log(args.topic); // to check if this gets called or not.
pubsub.asyncIterator([args.topic]);
;
推理
args.topic
上的 console.log
被调用,但之后graphiql
出现以下错误:
"error":
"message": "Subscription field must return Async Iterable. Received: undefined"
如果我执行return pubsub.asyncIterator()
:
它提供来自 Broker 的及时数据,但输出为null
:
"data":
"siteAdded": null
我已经根据Apollo Docs在上面提到的server.js
添加了Websockets中间件
我在哪里出错以及如何将来自订阅主题的数据添加到graphiql
?
【问题讨论】:
您忘记在订阅解析器中调用return
。 return pubsub.asyncIterator([args.topic]);
@Dom 我试过它不起作用。 UI 不断发送 HTTP POST 但没有数据
你肯定需要return
像@Dom 说的那样调用pubsub.asyncIterator()
的结果。您如何测试订阅?在 Playground 中订阅后,您需要以某种方式触发 publish
调用(例如,通过在另一个选项卡中打开 Playground 并发送调用 publish
的突变)
@DanielRearden 但由于传感器已经在该主题上发布,这不会为我提供一些东西吗?我实际上没有触发publish
的突变。我已经有一个传感器发布主题test/1/env
,由于我使用test/+/env
通配符,数据应该可用
啊,我明白了。我会安装一个调用publish
的虚拟突变,至少看看它是否按预期工作。假设确实如此,那么您可以从那里进行调试。也许主题名称不匹配?
【参考方案1】:
总结
更新
graphql-mqtt-subscriptions
: v1.2.0
现在提供通配符解析支持
Documented a practical usage of MQTT and GraphQL
注意事项
NPM Registry 上的graphql-mqtt-subscriptions
v1.1.0 中的+
和#
等通配符不可用。但是,存储库已经有了实现。存储库的所有者需要更新注册表。见Open Issue for graphql-mqtt-subscriptions
我目前正在使用 MQTT 订阅的完整主题,以便从传感器获取数据,例如test/1/env
而不是 test/+/env
开发更新
之前我以原始字符串格式(纯文本)从传感器发送数据,因此我更新了固件以 JSON 字符串发送数据,如下所示:
"data": "temp=23,humid=56 1500394302"
解决方案
正如@Dom 和@DanielRearden 在cmets 中提到的,如果我使用大括号,我最初忘记添加
return
。例如:
Subscription:
siteAdded:
subscribe: (_, args) =>
console.log(args.topic); // to check if this gets called or not.
return pubsub.asyncIterator([args.topic]);
或者我只是通过编写解析器删除了括号和return
,如下所示:
Subscription:
siteAdded:
subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
正如查询中提到的,这仍然返回给我null
。
我能够按照 Apollo 的 Payload Transformation 文档从订阅中获取数据,在我的解析器中,我执行了以下操作:
Subscription:
siteAdded:
resolve: (payload) =>
return
data: payload.data,
;
,
subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
必须为 Schema 相应地解析有效负载。
结果
现在订阅如下所示:
subscription
siteAdded(topic: "test/1/env")
data
提供以下结果:
"data":
"siteAdded":
"data": "temp=27.13,humid=43.33 1565345004"
【讨论】:
以上是关于Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据的主要内容,如果未能解决你的问题,请参考以下文章