Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据

Posted

技术标签:

【中文标题】Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据【英文标题】:Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data 【发布时间】:2019-12-16 09:32:21 【问题描述】:

场景

我有一个传感器节点,它发布有关特定 MQTT 主题的信息(发送到 Mosquitto 代理)。发送的数据是纯字符串。

后端

目前我正在使用apollo-server-express 构建一个 GraphQL 服务器。我希望使用`graphql-mqtt-subscriptions 来:

订阅 MQTT 代理 阅读特定主题的信息并将其返回到graphiql UI

dependencies

"dependencies": 
    "apollo-server-express": "^2.8.1",
    "express": "^4.17.1",
    "graphql": "^14.4.2",
    "graphql-mqtt-subscriptions": "^1.1.0",
    "graphql-subscriptions": "^1.1.0",
    "graphql-tools": "^4.0.5",
    "mqtt": "^3.0.0",
    "subscriptions-transport-ws": "^0.9.16"
  ,

代码片段

入口点server.js 代码:

import express from 'express';
import ApolloServer  from 'apollo-server-express';
import  typeDefs  from './graphql/schema';
import  resolvers  from './graphql/resolvers';
import  createServer  from 'http';


const server = new ApolloServer( typeDefs, resolvers);

const app = express();

server.applyMiddleware( app );

const httpServer = createServer(app);

server.installSubscriptionHandlers(httpServer);

httpServer.listen(port: 4000, () => 
    console.log(`???? Server ready at http://localhost:4000/$server.graphqlPath`)
    console.log(`???? Subscriptions ready at ws://localhost:4000/$server.subscriptionsPath`)
);

GraphQL 的 typeDefs 架构如下:


type Result 
        data: String


type Subscription 
        siteAdded(topic: String): Result
    

schema 
  query: Query
  mutation: Mutation
  subscription: Subscription

siteAdded(topic: String) 将获取 MQTT 需要订阅的主题。示例:

subscription 
   siteAdded(topic: "test/1/env") 
       data
   

resolvers.js 如下所示(如 5 月文档中所述):

import  MQTTPubSub  from 'graphql-mqtt-subscriptions';
import  connect  from 'mqtt';

const client = connect('mqtt://my.mqtt.broker.ip.address', 
    reconnectPeriod: 1000,
);

const pubsub = new MQTTPubSub(
    client
);

export const resolvers: 
Subscription: 
        siteAdded: 
            subscribe: (_, args) => 
                console.log(args.topic); // to check if this gets called or not.
                pubsub.asyncIterator([args.topic]);

            
        
    
;

推理

args.topic 上的 console.log 被调用,但之后graphiql 出现以下错误:


  "error": 
    "message": "Subscription field must return Async Iterable. Received: undefined"
  

如果我执行return pubsub.asyncIterator():

它提供来自 Broker 的及时数据,但输出为null


  "data": 
    "siteAdded": null
  

我已经根据Apollo Docs在上面提到的server.js添加了Websockets中间件

我在哪里出错以及如何将来自订阅主题的数据添加到graphiql

【问题讨论】:

您忘记在订阅解析器中调用returnreturn pubsub.asyncIterator([args.topic]); @Dom 我试过它不起作用。 UI 不断发送 HTTP POST 但没有数据 你肯定需要return 像@Dom 说的那样调用pubsub.asyncIterator() 的结果。您如何测试订阅?在 Playground 中订阅后,您需要以某种方式触发 publish 调用(例如,通过在另一个选项卡中打开 Playground 并发送调用 publish 的突变) @DanielRearden 但由于传感器已经在该主题上发布,这不会为我提供一些东西吗?我实际上没有触发publish 的突变。我已经有一个传感器发布主题test/1/env,由于我使用test/+/env 通配符,数据应该可用 啊,我明白了。我会安装一个调用publish 的虚拟突变,至少看看它是否按预期工作。假设确实如此,那么您可以从那里进行调试。也许主题名称不匹配? 【参考方案1】:

总结

更新

graphql-mqtt-subscriptions: v1.2.0 现在提供通配符解析支持

Documented a practical usage of MQTT and GraphQL

注意事项

NPM Registry 上的graphql-mqtt-subscriptions v1.1.0 中的+# 等通配符不可用。但是,存储库已经有了实现。存储库的所有者需要更新注册表。见Open Issue for graphql-mqtt-subscriptions

我目前正在使用 MQTT 订阅的完整主题,以便从传感器获取数据,例如test/1/env 而不是 test/+/env

开发更新

之前我以原始字符串格式(纯文本)从传感器发送数据,因此我更新了固件以 JSON 字符串发送数据,如下所示:

  "data": "temp=23,humid=56 1500394302"

解决方案

    正如@Dom 和@DanielRearden 在cmets 中提到的,如果我使用大括号,我最初忘记添加return。例如:

        Subscription: 
        siteAdded: 
            subscribe: (_, args) => 
                console.log(args.topic); // to check if this gets called or not.
                return pubsub.asyncIterator([args.topic]);
    
            
        
    
    

    或者我只是通过编写解析器删除了括号和return,如下所示:

        Subscription: 
           siteAdded: 
               subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
           
        
    

    正如查询中提到的,这仍然返回给我null

    我能够按照 Apollo 的 Payload Transformation 文档从订阅中获取数据,在我的解析器中,我执行了以下操作:

       Subscription: 
        siteAdded: 
            resolve: (payload) => 
                return 
                    data: payload.data,
                ;
            ,
            subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
        
    
    

    必须为 Schema 相应地解析有效负载。

结果

现在订阅如下所示:

    subscription 
        siteAdded(topic: "test/1/env") 
            data
        
    

提供以下结果:


  "data": 
    "siteAdded": 
      "data": "temp=27.13,humid=43.33 1565345004"
    
  

【讨论】:

以上是关于Apollo GraphQL:MQTT 订阅代理以仅提供已发布的数据的主要内容,如果未能解决你的问题,请参考以下文章

29. Apache apollo

使用 Express-GraphQL 和 React-Apollo 订阅 GraphQL

Apollo 服务器 + Lambda + 订阅

Apollo graphQL 订阅使用哪个包

Apollo GraphQL 订阅

Angular Apollo GraphQL watchQuery 与订阅