如何使用 Spring 和 Websocket 构建推送通知服务
Posted
技术标签:
【中文标题】如何使用 Spring 和 Websocket 构建推送通知服务【英文标题】:How to build a push-notifications service with Spring and Websocket 【发布时间】:2021-12-20 07:43:20 【问题描述】:我们在开始时有什么
假设有一个简单的 Spring Boot 应用程序,它为某些前端提供 API。技术栈非常常规:Kotlin
、Gradle
、Spring WebMVC
、PostgreSQL
、Keycloak
。
前端通过 HTTP 与应用程序同步交互。客户端使用 JWT 令牌进行身份验证。
业务任务
有一个可以在系统某处引发的事件列表。应通知用户。
用户可以订阅一个或多个事件通知。订阅只是一对保留在专用 Postgres 表中的user_id + event_type_id
。
当事件 X 被引发时,我们应该找到所有订阅它的用户并通过Websocket
向他们发送一些数据
【问题讨论】:
更健壮的解决方案是将 AMQP 与 spring 一起使用,因为 web-sockets 可能会丢失消息 【参考方案1】:配置
让我们先配置 Spring。 Spring 使用 STOMP over Websocket 协议。
给build.gradle.kts
添加依赖
implementation("org.springframework.boot:spring-boot-starter-websocket")
implementation("org.springframework:spring-messaging")
添加 Websocket 配置
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
class WebsocketConfig(
private val websocketAuthInterceptor: WebsocketAuthInterceptor
) : WebSocketMessageBrokerConfigurer
override fun configureMessageBroker(config: MessageBrokerRegistry)
config.enableSimpleBroker("/queue/")
config.setUserDestinationPrefix("/user")
override fun registerStompEndpoints(registry: StompEndpointRegistry)
registry.addEndpoint("/ws").setAllowedOrigins("*")
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS()
override fun configureClientInboundChannel(registration: ChannelRegistration)
registration.interceptors(websocketAuthInterceptor) // we'll talk later about this
registerStompEndpoints
是关于如何与我们的websocket建立连接的。该配置允许前端通过SockJS
的Websocket
库与我们交互。它们是什么以及有什么区别不是今天的话题
configureMessageBroker
是关于建立连接后我们将如何与前端交互。
configureClientInboundChannel
是关于消息拦截的。以后再说吧。
在 Spring Security 配置中将 /ws/**
路径添加到忽略的模式。
连接建立
在前端它应该看起来像这样 (javascript)
const socket = new WebSocket('ws://<api.host>/ws')
//const socket = new SockJS('http://<api.host>/ws') // alternative way
const headers = Authorization: 'JWT token here';
const stompClient = Stomp.over(socket, headers);
stompClient.connect(
headers,
function (frame)
stompClient.subscribe('/user/queue/notification',
function (message) ...message processing... );
);
关键时刻是传递一个授权标头。它不是 HTTP 标头。并且初始 HTTP 握手本身不会被授权。这就是将/ws/**
添加到忽略模式的原因。
我们需要标头和令牌,因为我们希望仅允许授权用户进行 websocket 连接,并且我们还想知道究竟连接了哪个用户。
身份验证
现在我们正在添加身份验证机制
@Component
class WebsocketAuthInterceptor(
private val authService: AuthService, //your implementation
private val sessionStore: WebsocketUserSessionStore
) : ChannelInterceptor
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>?
val accessor: StompHeaderAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor::class.java)
val sessionId: String = accessor.messageHeaders["simpSessionId"].toString()
if (StompCommand.CONNECT == accessor.command)
val jwtToken: String = accessor.getFirstNativeHeader("Authorization")
val token: AccessToken = authService.verifyToken(jwtToken)
val userId: Long = token.otherClaims["user_id"].toString().toLong()
sessionStore.add(sessionId, userId)
else if (StompCommand.DISCONNECT == accessor.command)
sessionStore.remove(sessionId)
return message
关键是将随机生成的 websocket 会话 ID 与我们 Spring Security 存储中的用户 ID 链接起来,并在会话生命周期内保持该对。应该从消息头中解析 JWT 令牌。
然后通过给定的令牌应该获得一个用户ID。该部分的实现取决于您确切拥有的 Spring Security 配置。在 Keyclock 的情况下,有一个有用的静态方法 org.keycloak.adapters.rotation.AdapterTokenVerifier::verifyToken
WebsocketUserSessionStore
只是将session_id
与user_id
链接起来的映射,它可能类似于以下代码。当然记得并发访问
@Component
class WebsocketUserSessionStore
private val lock = ReentrantLock()
private val store = HashMap<String, Long>()
fun add(sessionId: String, userId: Long) = lock.withLock
store.compute(sessionId) _, _ -> userId
fun remove(sessionId: String) = lock.withLock
store.remove(sessionId)
fun remove(userId: Long) = lock.withLock
store.values.remove(userId)
实际通知
所以事件 A 是在业务逻辑中的某个地方引发的。让我们实现一个 websocket 发布者。
@Component
class WebsocketPublisher(
private val messagingTemplate: SimpMessagingTemplate,
private val objectMapper: ObjectMapper,
private val sessionStore: WebsocketUserSessionStore,
private val userNotificationRepository: UserNotificationRepository
)
suspend fun publish(eventType: EventType, eventData: Any)
val userIds = userNotificationRepository.getUserSubscribedTo(eventType)
val sessionIds = sessionStore.getSessionIds(userIds)
sessionIds.forEach
messagingTemplate.convertAndSendToUser(
it,
"/queue/notification",
objectMapper.writeValueAsString(eventData),
it.toMessageHeaders()
)
private fun String.toMessageHeaders(): MessageHeaders
val headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE)
headerAccessor.sessionId = this
headerAccessor.setLeaveMutable(true)
return headerAccessor.messageHeaders
EventType
是系统具有的事件类型的枚举。
UserNotificationRepository
只是数据持久层的一部分(Hibernate|JOOQ 存储库、MyBatis 映射器或 smth)。函数getUserSubscribedTo
应该执行类似select user_id from user_subscription where event_type_id = X
的操作。
其余代码非常简单。通过提供 userIds,可以获得活动的 websocket 会话。然后每个会话都应该调用convertAndSendToUser
函数。
【讨论】:
以上是关于如何使用 Spring 和 Websocket 构建推送通知服务的主要内容,如果未能解决你的问题,请参考以下文章
如何使用类 + spring 4.0.0 配置 websocket 句柄
如何使用和自定义 MessageConversion(Spring websocket 客户端)
java - 如何使用simplebroker或rabbitMQ和java spring在websocket中获取所有连接的用户