微服务架构下消息服务多通道设计思路
Posted 全象云低代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务架构下消息服务多通道设计思路相关的知识,希望对你有一定的参考价值。
在微服务架构软件中,消息是极其重要的一部分,一般会独立成一个服务,称为 message 服务。其主要作用就是发送消息,并且支持向多种第三方发送,如站内信、邮件、短信等。
发送消息难点
- 发送消息接口响应时间太长。当 mesage 服务接收到发送消息请求后,可以做到实时向第三方进行发送,但第三方的消息返回时间不可控制。即发送消息的接口响应时间受到了第三方的限制。
- 只开发一个接口就能同时兼容不同的第三方发送行为。比如通过一个接口即可处理发邮件、发短信的行为。
解决思路
一般通过第三方发送消息的整体流程是:首先 client 提出发消息的请求,message 服务收到请求后对其进行解析,并判断需将此次消息传输到哪个第三方。然后调用对应的第三方发送接口,由第三方发出消息内容,并等待第三方返回结果后响应客户端。此时接口的响应时间是受到第三方限制的,在这样的情况下,接口很容易超时。
如下图所示:
要解决这个问题,我们需要解耦 message 服务与第三方服务,引入消息中间件的发布订阅模式,即 pub/sub 方式。
引入后的流程是:message 接收到发送的请求,跟前面的做法一样,也需要解析发送到哪些第三方。但与前面不同的是,我们不立即调用第三方,而是根据解析到的第三方,发布事件到不同的 topic。比如消息需要发送 email 和 sms,就发往 email 和 sms 的 topic,message 服务再订阅对应的 topic。当对应的 topic 有新的事件时,即调用对应的第三方发送消息的接口。
如下图所示:
一般来说,对于同一个事件,一个程序仅能作为一个角色,即事件的发布者或者事件订阅者。而按照上面的业务分析,message 既是事件的发布者,又是事件的订阅者,从服务的架构上来看,这是耦合的。所以,需要再拆分出两个服务,专门来做邮件(emial)的发送和短信(sms)的发送,作为消息的订阅者。其主要的任务,是监听对应的 topic,响应 topic 事件,触发对应的业务行为。
当有新的第三方发送消息请求接入系统,我们只需要增加一个服务,专门处理新的第三方业务逻辑,而不需要修改之前的业务代码逻辑。
引入 Dapr
关于消息中间件的选择,RabbitMQ、Kafka、Redis 等等都是可以的。不过以上模式也有不足的地方,在发布和订阅的服务中,不仅需要引入消息中间件的 sdk,处理跟中间件相关的逻辑,还需要保证消息的可靠性传递。但是,业务并不关心消息中间件相关的内容。
基于此,我们可以引入 Dapr。Dapr 对类似像 Kafka、Redis 以及各大云厂商的中间件进行了整合,简化了在不同消息中间件间进行切换所需的操作。我们只需要在配置文件中,重新配置消息中间件的相关信息,就能实现灵活的切换到不同的中间件,无需修改代码。除此之外,在服务中引入 Dapr 发布/订阅还具有以下优势:
- 统一的消息格式:Cloud Events
- 支持消息过期时间(per message TTL)
- at-least-once 提供至少一次的保证
如果消息组件原生支持消息有效期,runtime 会直接转发 TTL 相关操作,过期的行为则由组件直接控制。对于那些不支持消息有效期的组件,Dapr 会在 runtime 中补齐相关的过期功能。(CloudEvent 里有 expiration)
总结
对于 message 服务来说,如果是实时发送,接口因第三方接口的限制,比较容易超时。一般都采用消息中间件 pub/sub 的方式,解耦 mesage 服务和第三方服务。同时,中间件消息的可靠性也非常重要,Dapr 能把消息至少一次的发送到消息队列中,保证了消息的可靠性。所以,如果不想通过在业务代码引入消息中间的 sdk 来处理消息的可靠性,那么可以把维护消息中间件的工作,交给 Dapr 去做,此时业务代码也可以更注重业务本身。
引用
Dapr:https://docs.dapr.io/zh-hans/developing-applications/building-blocks/pubsub/
公众号:全象云低代码
GitHub:https://github.com/quanxiang-cloud/quanxiang
微服务架构下的自动化测试全链路设计思路分析
given(marketingService.mixMarketingActivity(anyObject())).willReturn(stubResponse);
RuleCalculateResponse response = this.ruleCalculatorBiz.ruleCalculate(request);
public interface CCMarketingCentralFacade {
CallResponse callMarketingCentral(CallRequest request);
}
public interface ClassMarketingCentralFacade {
CallResponse callMarketingCentral(CallRequest request);
}
public class CCMarketingCentralFacadeMocker implements CCMarketingCentralFacade {
@Override
public CallResponse callMarketingCentral(CallRequest request) {
CallResponse response = ...
MarketingResultDto marketingResultDto = ...
marketingResultDto.setTotalDiscount(new BigDecimal("90.19"));
marketingResultDto.setUseTotalDiscount(true);
response.getData().setMarketingResult(marketingResultDto);
return response;
}
}
public class ClassMarketingCentralFacadeMocker implements ClassMarketingCentralFacade {
@Override
public CallResponse callMarketingCentral(CallRequest request) {
CallResponse response = ...
MarketingResultDto marketingResultDto = ...
marketingResultDto.setUseCoupon(true);
marketingResultDto.setTotalDiscount(null);
marketingResultDto.setUseTotalDiscount(false);
List<MarketingProductDiscountDto> discountDtos = ...
request.getMarketingProductTagsParameter().getMarketingTags().forEach(item -> {
MarketingProductDiscountDto discountDto = ...
discountDto.setProductId(item.getProductID());
...
discountDtos.add(discountDto);
});
...
return response;
}
}
public abstract class BaseRequest implements Serializable {
public MockParameter mockParameter;
}
public class MockParameter {
/**
* mock cc 营销调用接口
*/
public Boolean mockCCMarketingInterface;
/**
* mock class 营销调用接口
*/
public Boolean mockClassMarketingInterface;
/**
* 是否自动化测试 mock
*/
public Boolean useAutoTestMock;
/**
* 测试mock参数
*/
public String testMockParam;
}
@Value("${marketing.cloud.business.access.url.mock}")
private String mockUrl;
/**
* 自动化测试 mocker bean
*/
@Bean("CCMarketingCentralFacadeTestMock")
public CCMarketingCentralFacade CCMarketingCentralFacadeTestMock() {
RestClientProxyFactoryBean<CCMarketingCentralFacade> restClientProxyFactoryBean ...
restClientProxyFactoryBean.setBaseUri(this.mockUrl);
...
}
/**
* 自动化测试 mocker bean
*/
@Bean("ClassMarketingCentralFacadeTestMock")
public ClassMarketingCentralFacade ClassMarketingCentralFacadeTestMock() {
RestClientProxyFactoryBean<ClassMarketingCentralFacade> restClientProxyFactoryBean ...
restClientProxyFactoryBean.setBaseUri(this.mockUrl);
...
}
public abstract class BaseRequest implements Serializable {
public MockParameter mockParameter;
}
@Component
@Path("v1/calculator/")
public class RuleCalculatorFacadeImpl extends BaseFacade implements RuleCalculatorFacade {
@MockFacade(Setting = MockFacade.SETTING_REQUEST_MOCK_PARAMETER)
public RuleCalculateResponse ruleCalculate(RuleCalculateRequest request) {
...
}
}
@Component
public class MarketingServiceImpl extends MarketingBaseService implements MarketingService {
@MockFacade(Setting = MockFacade.SETTING_FACADE_MOCK_BEAN)
public MarketingResult onlyExtendMarketingActivity(Marketing..Parameter tagsParameter) {
...
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MockFacade {
String SETTING_REQUEST_MOCK_PARAMETER = "setting_request_mock_parameter";
String SETTING_FACADE_MOCK_BEAN = "setting_facade_mock_bean";
String Setting();
}
@Aspect
@Component
@Slf4j
public class MockMarketingFacadeInterceptor {
@Before("@annotation(mockFacade)")
public void beforeMethod(JoinPoint joinPoint, MockFacade mockFacade) {
String settingName = mockFacade.Setting();
if (MockFacade.SETTING_REQUEST_MOCK_PARAMETER.equals(settingName)) {
Object[] args = joinPoint.getArgs();
if (args == null) return;
List<Object> argList = Arrays.asList(args);
argList.forEach(item -> {
if (item instanceof BaseRequest) {
BaseRequest request = (BaseRequest) item;
if (request.getMockParameter() != null) {
MarketingBaseService.mockParameterThreadLocal.set(request.getMockParameter());
log.info("----setting mock parameter:{}", JSON.toJSONString(request.getMockParameter()));
}
}
});
} else if (MockFacade.SETTING_FACADE_MOCK_BEAN.equals(settingName)) {
MarketingBaseService marketingBaseService = (MarketingBaseService) joinPoint.getThis();
marketingBaseService.mockBean();
log.info("----setting mock bean.");
}
}
@After("@annotation(mockFacade)")
public void afterMethod(JoinPoint joinpoint, MockFacade mockFacade) {
if (MockFacade.SETTING_FACADE_MOCK_BEAN.equals(mockFacade.Setting())) {
MarketingBaseService marketingBaseService = (MarketingBaseService) joinpoint.getThis();
marketingBaseService.mockRemove();
log.info("----remove mock bean.");
}
if (MockFacade.SETTING_REQUEST_MOCK_PARAMETER.equals(mockFacade.Setting())) {
MarketingBaseService.mockParameterThreadLocal.remove();
log.info("----remove ThreadLocal. ThreadLocal get {}", MarketingBaseService.mockParameterThreadLocal.get());
}
}
}
public abstract class MarketingBaseService extends BaseService {
protected ClassMarketingCentralFacade classMarketingCentralFacade;
protected CCMarketingCentralFacade ccMarketingCentralFacade;
public static ThreadLocal<MockParameter> mockParameterThreadLocal = new ThreadLocal<>();
public void mockBean() {
MockParameter mockParameter = mockParameterThreadLocal.get();
if (mockParameter != null && mockParameter.mockClassMarketingInterface) {
if (mockParameter.useAutoTestingMock) {
this.setClassMarketingCentralFacade(SpringContextHolder.getBean("ClassMarketingCentralFacadeTestMock", ClassMarketingCentralFacade.class));
} else {
this.setClassMarketingCentralFacade(SpringContextHolder.getBean("ClassMarketingCentralFacadeMocker", ClassMarketingCentralFacadeMocker.class));
}
} else {
this.setClassMarketingCentralFacade(SpringContextHolder.getBean("ClassMarketingCentralFacade", ClassMarketingCentralFacade.class));
}
if (mockParameter != null && mockParameter.mockCCMarketingInterface) {
if (mockParameter.useAutoTestingMock) {
this.setCcMarketingCentralFacade(SpringContextHolder.getBean("CCMarketingCentralFacadeTestMock", CCMarketingCentralFacade.class));
} else {
this.setCcMarketingCentralFacade(SpringContextHolder.getBean("CCMarketingCentralFacadeMocker", CCMarketingCentralFacadeMocker.class));
}
} else {
this.setCcMarketingCentralFacade(SpringContextHolder.getBean("CCMarketingCentralFacade", CCMarketingCentralFacade.class));
}
}
public void mockRemove() {
mockParameterThreadLocal.remove();
}
}
@Component
public class MockHttpHeadSetting implements ClientRequestFilter {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
MultivaluedMap<String, Object> header = requestContext.getHeaders();
MockParameter mockParameter = MarketingBaseService.mockParameterThreadLocal.get();
if (mockParameter != null && StringUtils.isNotBlank(mockParameter.getTestingMockParam())) {
header.add("Mock-parameter", mockParameter.getTestingMockParam());
}
}
}
com.hujiang.marketingcloud.ruleengine.service.MockHttpHeadSetting
以上是关于微服务架构下消息服务多通道设计思路的主要内容,如果未能解决你的问题,请参考以下文章