Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux
Posted
技术标签:
【中文标题】Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux【英文标题】:Spring Webflux(Mono/Flux) with AOP triggering REST call at interception and working with Mono/Flux 【发布时间】:2020-01-22 00:29:25 【问题描述】:我编写了一个@Aspect 来拦截返回 Mono/Flux 值的响应式方法。使用@AfterReturning 建议,我试图通过调用网络服务来触发 APNS 通知。
不幸的是,processNotification Mono 服务在没有执行调用链的情况下立即返回 onComplete 信号。下面是我的示例程序。
@Aspect
@Component
@Slf4j
public class NotifyAspect
private final NotificationServiceHelper notificationServiceHelper;
@Autowired
public NotifyAspect(NotificationServiceHelper notificationServiceHelper)
this.notificationServiceHelper = notificationServiceHelper;
@AfterReturning(pointcut = "@annotation(com.cupid9.api.common.annotations.Notify)", returning = "returnValue")
public void generateNotification(JoinPoint joinPoint, Object returnValue) throws Throwable
log.info("AfterReturning Advice - Intercepting Method : ", joinPoint.getSignature().getName());
//Get Intercepted method details.
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
//Get the Notification Details.
Notify myNotify = method.getAnnotation(Notify.class);
if (Mono.class.isAssignableFrom(returnValue.getClass()))
Mono<Object> result = (Mono<Object>) returnValue;
result.doOnSubscribe(o ->
log.debug("On Subscription...");
notificationServiceHelper.processNotification(myNotify.notificationType())
.doOnError(throwable ->
log.error("Exception in notification processor",throwable);
);
);
@Slf4j
@Service
public class NotificationServiceHelper
private ReactiveUserProfileRepository userProfileRepository;
@Value("$services.notification.url")
private String notificationServiceUrl;
private RestWebClient restWebClient;
@Autowired
public NotificationServiceHelper(RestWebClient restWebClient,
ReactiveUserProfileRepository reactiveUserProfileRepository)
this.restWebClient = restWebClient;
this.userProfileRepository = reactiveUserProfileRepository;
public Flux<Notification> processNotification(NotificationSchema.NotificationType notificationType)
/*Get user profile details*/
return SessionHelper.getProfileId()
.switchIfEmpty( Mono.error(new BadRequest("Invalid Account 1!")))
.flatMap(profileId ->
Mono.zip(userProfileRepository.findByIdAndStatus(profileId, Status.Active), SessionHelper.getJwtToken()))
.switchIfEmpty( Mono.error(new BadRequest("Invalid Account 2!")))
.flatMapMany(tuple2 ->
//Get user details and make sure there are some valid devices associated.
var userProfileSchema = tuple2.getT1();
log.info("Processing Notifications for User Profile : ", userProfileSchema.getId());
if (Objects.isNull(userProfileSchema.getDevices()) || (userProfileSchema.getDevices().size() < 1))
return Flux.error(new InternalServerError("No Devices associate with this user. Can not send notifications."));
//Build Notification message from the Notification Type
var notificationsMap = new LinkedHashSet<Notification>();
userProfileSchema.getDevices().forEach(device ->
var notificationPayload = Notification.builder()
.notificationType(notificationType)
.receiverDevice(device)
.receiverProfileRef(userProfileSchema.getId())
.build();
notificationsMap.add(notificationPayload);
);
//Get session token for authorization
var jwtToken = tuple2.getT2();
//Build the URI needed to make the rest call.
var uri = UriComponentsBuilder.fromUriString(notificationServiceUrl).build().toUri();
log.info("URI built String : ", uri.toString());
//Build the Headers needed to make the rest call.
var headers = new HttpHeaders();
headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.AUTHORIZATION, jwtToken);
var publishers = new ArrayList<Mono<ClientResponse>>();
notificationsMap.forEach(notification ->
publishers.add(restWebClient.post(uri, headers, notification));
);
return Flux.merge(publishers).flatMap(clientResponse ->
var httpStatus = clientResponse.statusCode();
log.info("NotificationService HTTP status code : ", httpStatus.value());
if (httpStatus.is2xxSuccessful())
log.info("Successfully received response from Notification Service...");
return clientResponse.bodyToMono(Notification.class);
else
// return Flux.empty();
return clientResponse.bodyToMono(Error.class)
.flatMap(error ->
log.error("Error calling Notification Service :", httpStatus.getReasonPhrase());
if (httpStatus.value() == 400)
return Mono.error(new BadRequest(error.getMessage()));
return Mono.error(new InternalServerError(String.format("Error calling Notification Service : %s", error.getMessage())));
);
);
).doOnError(throwable ->
throw new InternalServerError(throwable.getMessage(), throwable);
);
我们如何在不等待拦截的情况下异步触发此调用。现在 processNotification 总是返回 onComplete 信号而不执行。链未按预期执行
【问题讨论】:
你的切面试图拦截@Notify
注解的方法,但是你的服务没有被相应的注解。为什么期望它被切面拦截?
您能否详细说明“服务未相应注释”的含义?当 API 成功完成时,我正在尝试发送 APN 通知。我使用带有 @Notify 注释的 API 服务的返回值来触发不起作用的 APN 服务逻辑。如果你能分享一个简单的代码来帮助我使用 MONO 或 FLUX 类型的返回值并触发另一个使用反应完成的自动连接服务
我在 NotifyAspect 类中调用 NotificationServiceHelper 类,该类使用@Aspect
进行注释,其中@AfterReturning
建议将在切面截获注释的切入点时执行
我的意思是你的processNotification
方法没有@Notify
注解,所以永远不会被切面拦截。
【参考方案1】:
@Target(ElementType.PARAMETER, ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log
public String title() default "";
@SuppressWarnings("unchecked")
@Around("@annotation(operlog)")
public Mono<Result> doAround(ProceedingJoinPoint joinPoint, Log operlog)
Mono<Result> mono;
try
mono = (Mono<Result>) joinPoint.proceed();
catch (Throwable throwable)
throw new RuntimeException(throwable);
return mono.doOnNext(result ->
//doSomething(result);
;
【讨论】:
虽然此代码 sn-p 可能是解决方案,但 including an explanation 确实有助于提高您的帖子质量。请记住,您是在为将来的读者回答问题,而这些人可能不知道您提出代码建议的原因。以上是关于Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux的主要内容,如果未能解决你的问题,请参考以下文章
Spring Webflux - 03 Webflux编程模型
Spring Webflux - 03 Webflux编程模型
Spring Webflux - 03 Webflux编程模型