graphql-spring-boot-starter 只有 websockets 的应用程序

Posted

技术标签:

【中文标题】graphql-spring-boot-starter 只有 websockets 的应用程序【英文标题】:graphql-spring-boot-starter Application with only websockets 【发布时间】:2021-12-13 15:32:58 【问题描述】:

我正在使用 spring-boot-starter-webflux 2.5.6com.graphql-java-kickstart:graphql-spring-boot-starter:12.0.0 构建一个 graphql 应用程序。 此时应用程序运行良好,因为 com.graphql-java-kickstart 很容易上手。 使用 http-Requests,我可以调用 Queries 并运行 Mutations,甚至可以通过 Websockets 上的 Subscriptions 创建和获取更新。

但对于我的应用程序,查询和突变也必须通过 websocket 运行。

似乎在 com.graphql-java-kickstart:graphql-spring-boot-starter 中您只能将订阅端点配置为 websocket。 通过 'extends Endpoint' 和 '@ServerEndpoint' 添加一个额外的 websocket 什么也没做。

我也尝试添加自己的HandlerMapping:

@PostConstruct
public void init()

    Map<String, Object> map = new HashMap<String, Object>(
            ((SimpleUrlHandlerMapping) webSocketHandlerMapping).getUrlMap());
    
    map.put("/mysocket", myWebSocketHandler);
    //map.put("/graphql", myWebSocketHandler);

    ((SimpleUrlHandlerMapping) webSocketHandlerMapping).setUrlMap(map);
    ((SimpleUrlHandlerMapping) webSocketHandlerMapping).initApplicationContext();


这似乎适用于 /mysocket 主题,但如何为 /graphql 启用它,似乎已经有一个处理程序正在监听:

WARN 12168 --- [ctor-http-nio-2] notprivacysafe.graphql.GraphQL : Query failed to parse : ''

以及如何将 websocket 与我的 GraphQLMutationResolvers 连接起来?

【问题讨论】:

【参考方案1】:

我解决这个问题的切入点是创建一个RestController 并将ServerWebExchange 连接到WebSocketService 中的WebSocketHandler,如下所示:

@RestController
@RequestMapping("/")
public class WebSocketController


    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);

    private final GraphQLObjectMapper objectMapper;

    private final GraphQLInvoker graphQLInvoker;

    private final GraphQLSpringInvocationInputFactory invocationInputFactory;

    private final WebSocketService service;

    @Autowired
    public WebSocketController(GraphQLObjectMapper objectMapper, GraphQLInvoker graphQLInvoker,
            GraphQLSpringInvocationInputFactory invocationInputFactory, WebSocketService service)
    
        this.objectMapper = objectMapper;
        this.graphQLInvoker = graphQLInvoker;
        this.invocationInputFactory = invocationInputFactory;
        this.service = service;
    

    @GetMapping("$graphql.websocket.path:graphql-ws")
    public Mono<Void> getMono(ServerWebExchange exchange)
    
        logger.debug("New connection via GET");
        return service.handleRequest(exchange,
                new GraphQLWebsocketMessageConsumer(exchange, objectMapper, graphQLInvoker, invocationInputFactory));
    

    @PostMapping("$graphql.websocket.path:graphql-ws")
    public Mono<Void> postMono(ServerWebExchange exchange)
    
        ...
    


在这个原型状态下,WebSocketHandler 也实现了Consumer,它被调用来处理每个WebSocketMessage

public class GraphQLWebsocketMessageConsumer implements Consumer<String>, WebSocketHandler


    private static final Logger logger = LoggerFactory.getLogger(GraphQLWebsocketMessageConsumer.class);

    private final ServerWebExchange swe;

    private final GraphQLObjectMapper objectMapper;

    private final GraphQLInvoker graphQLInvoker;

    private final GraphQLSpringInvocationInputFactory invocationInputFactory;

    private final Sinks.Many<String> publisher;

    public GraphQLWebsocketMessageConsumer(ServerWebExchange swe, GraphQLObjectMapper objectMapper,
            GraphQLInvoker graphQLInvoker, GraphQLSpringInvocationInputFactory invocationInputFactory)
    
        ...
        publisher = Sinks.many().multicast().directBestEffort();
    

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession)
    
        Mono<Void> input = webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(this).then();
        Mono<Void> sender = webSocketSession.send(publisher.asFlux().map(webSocketSession::textMessage));
        return Mono.zip(input, sender).then();
    

    @Override
    public void accept(String body)
    
        try
        
            String query = extractQuery(body);
            if(query == null)
            
                return;
            
            GraphQLRequest request = objectMapper.readGraphQLRequest(query);
            GraphQLSingleInvocationInput invocationInput = invocationInputFactory.create(request, swe);
            Mono<ExecutionResult> executionResult = Mono.fromCompletionStage(graphQLInvoker.executeAsync(invocationInput));
            Mono<String> jsonResult = executionResult.map(objectMapper::serializeResultAsJson);
            jsonResult.subscribe(publisher::tryEmitNext);
         catch (Exception e)
        
            ...
        
    

    @SuppressWarnings("unchecked")
    private String extractQuery(final String query) throws Exception
    
        Map<String, Object> map = (Map<String, Object>) objectMapper.getJacksonMapper().readValue(query, Map.class);
        ...
        return queryPart;
    

    @Override
    public List<String> getSubProtocols()
    
        logger.debug("getSubProtocols called");
        return Collections.singletonList("graphql-ws");
    


此解决方案尚未涉及身份验证或会话处理等安全方面。

【讨论】:

以上是关于graphql-spring-boot-starter 只有 websockets 的应用程序的主要内容,如果未能解决你的问题,请参考以下文章

使用graphql-java-tools和graphql-spring-boot-starter处理错误。

有没有办法使用spring boot starter app graphql-spring-boot-starter公开2个graphql端点?

解析类型时不存在 BigDecimal

将 apollo 跟踪添加到 GraphQL Spring

zeeqs 一个通用的zeebe 数据查询服务