如何使用 Spring 和 Websocket 构建推送通知服务

Posted

技术标签:

【中文标题】如何使用 Spring 和 Websocket 构建推送通知服务【英文标题】:How to build a push-notifications service with Spring and Websocket 【发布时间】:2021-12-20 07:43:20 【问题描述】:

我们在开始时有什么

假设有一个简单的 Spring Boot 应用程序,它为某些前端提供 API。技术栈非常常规:KotlinGradleSpring WebMVCPostgreSQLKeycloak。 前端通过 HTTP 与应用程序同步交互。客户端使用 JWT 令牌进行身份验证。

业务任务

有一个可以在系统某处引发的事件列表。应通知用户。

用户可以订阅一个或多个事件通知。订阅只是一对保留在专用 Postgres 表中的user_id + event_type_id

当事件 X 被引发时,我们应该找到所有订阅它的用户并通过Websocket向他们发送一些数据

【问题讨论】:

更健壮的解决方案是将 AMQP 与 spring 一起使用,因为 web-sockets 可能会丢失消息 【参考方案1】:

配置

让我们先配置 Spring。 Spring 使用 STOMP over Websocket 协议。

build.gradle.kts添加依赖

implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("org.springframework:spring-messaging")

添加 Websocket 配置

@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
class WebsocketConfig(
    private val websocketAuthInterceptor: WebsocketAuthInterceptor
) : WebSocketMessageBrokerConfigurer 

    override fun configureMessageBroker(config: MessageBrokerRegistry) 
        config.enableSimpleBroker("/queue/")
        config.setUserDestinationPrefix("/user")
    

    override fun registerStompEndpoints(registry: StompEndpointRegistry) 
        registry.addEndpoint("/ws").setAllowedOrigins("*")
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS()
    

    override fun configureClientInboundChannel(registration: ChannelRegistration) 
        registration.interceptors(websocketAuthInterceptor) // we'll talk later about this
    

registerStompEndpoints是关于如何与我们的websocket建立连接的。该配置允许前端通过SockJSWebsocket 库与我们交互。它们是什么以及有什么区别不是今天的话题

configureMessageBroker 是关于建立连接后我们将如何与前端交互。

configureClientInboundChannel 是关于消息拦截的。以后再说吧。

在 Spring Security 配置中将 /ws/** 路径添加到忽略的模式。

连接建立

在前端它应该看起来像这样 (javascript)

const socket = new WebSocket('ws://<api.host>/ws')
//const socket = new SockJS('http://<api.host>/ws') // alternative way
const headers = Authorization: 'JWT token here';
 
const stompClient = Stomp.over(socket, headers);
stompClient.connect(
    headers,
    function (frame) 
        stompClient.subscribe('/user/queue/notification', 
        function (message)  ...message processing... );
    );

关键时刻是传递一个授权标头。它不是 HTTP 标头。并且初始 HTTP 握手本身不会被授权。这就是将/ws/** 添加到忽略模式的原因。 我们需要标头和令牌,因为我们希望仅允许授权用户进行 websocket 连接,并且我们还想知道究竟连接了哪个用户。

身份验证

现在我们正在添加身份验证机制

@Component
class WebsocketAuthInterceptor(
    private val authService: AuthService, //your implementation
    private val sessionStore: WebsocketUserSessionStore
) : ChannelInterceptor 

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? 
        val accessor: StompHeaderAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor::class.java)
        val sessionId: String = accessor.messageHeaders["simpSessionId"].toString()

        if (StompCommand.CONNECT == accessor.command) 
            val jwtToken: String = accessor.getFirstNativeHeader("Authorization")
            val token: AccessToken = authService.verifyToken(jwtToken)
            val userId: Long = token.otherClaims["user_id"].toString().toLong()
            sessionStore.add(sessionId, userId)
         else if (StompCommand.DISCONNECT == accessor.command) 
            sessionStore.remove(sessionId)
        
        return message
    

关键是将随机生成的 websocket 会话 ID 与我们 Spring Security 存储中的用户 ID 链接起来,并在会话生命周期内保持该对。应该从消息头中解析 JWT 令牌。

然后通过给定的令牌应该获得一个用户ID。该部分的实现取决于您确切拥有的 Spring Security 配置。在 Keyclock 的情况下,有一个有用的静态方法 org.keycloak.adapters.rotation.AdapterTokenVerifier::verifyToken

WebsocketUserSessionStore 只是将session_iduser_id 链接起来的映射,它可能类似于以下代码。当然记得并发访问

@Component
class WebsocketUserSessionStore 
    private val lock = ReentrantLock()
    private val store = HashMap<String, Long>()

    fun add(sessionId: String, userId: Long) = lock.withLock 
        store.compute(sessionId)  _, _ -> userId 
    

    fun remove(sessionId: String) = lock.withLock 
        store.remove(sessionId)
    

    fun remove(userId: Long) = lock.withLock 
        store.values.remove(userId)
    

实际通知

所以事件 A 是在业务逻辑中的某个地方引发的。让我们实现一个 websocket 发布者。

@Component
class WebsocketPublisher(
    private val messagingTemplate: SimpMessagingTemplate,
    private val objectMapper: ObjectMapper,
    private val sessionStore: WebsocketUserSessionStore,
    private val userNotificationRepository: UserNotificationRepository
) 
    suspend fun publish(eventType: EventType, eventData: Any) 
        val userIds = userNotificationRepository.getUserSubscribedTo(eventType)
        val sessionIds = sessionStore.getSessionIds(userIds)

        sessionIds.forEach 
            messagingTemplate.convertAndSendToUser(
                it,
                "/queue/notification",
                objectMapper.writeValueAsString(eventData),
                it.toMessageHeaders()
            )
        
    

    private fun String.toMessageHeaders(): MessageHeaders 
        val headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE)
        headerAccessor.sessionId = this
        headerAccessor.setLeaveMutable(true)
        return headerAccessor.messageHeaders
    

EventType 是系统具有的事件类型的枚举。

UserNotificationRepository 只是数据持久层的一部分(Hibernate|JOOQ 存储库、MyBatis 映射器或 smth)。函数getUserSubscribedTo 应该执行类似select user_id from user_subscription where event_type_id = X 的操作。

其余代码非常简单。通过提供 userIds,可以获得活动的 websocket 会话。然后每个会话都应该调用convertAndSendToUser 函数。

【讨论】:

以上是关于如何使用 Spring 和 Websocket 构建推送通知服务的主要内容,如果未能解决你的问题,请参考以下文章

如何使用类 + spring 4.0.0 配置 websocket 句柄

如何使用和自定义 MessageConversion(Spring websocket 客户端)

如何在Spring中配置Websocket

java - 如何使用simplebroker或rabbitMQ和java spring在websocket中获取所有连接的用户

如何使用 Spring WebSocket 向 STOMP 客户端发送错误消息?

如何使用 spring+stomp+sockjs 获得 websocket 响应