根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)
Posted 望夜空流星划过
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)相关的知识,希望对你有一定的参考价值。
根据不通消息类型,进行不同的处理。
---------------------------重构前----------------------------------------------
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
Map<String, String> msgValueMap = message.getValue();
String type = msgValueMap.get("type"); // 消息类型
if ("消息类型1".equals(type)) {
// 消息类型1处理 例:userService.save(...);
} else if ("消息类型2".equals(type)) {
// 消息类型2处理 例:roleService.delete(...);
} else if ("消息类型3".equals(type)) {
// 消息类型3处理 例:taskService.save(...);
}
}
---------------------------重构后----------------------------------------------
@Autowired
private MessageHandlerContext messageHandlerContext;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
Map<String, String> msgValueMap = message.getValue();
MessageInfo<String, String> messageIfo = new MassageInfo();
messageInfo.setType(msgValueMap.get("type"));
messageInfo.setData(msgValueMap.get("data"));
messageHandlerContext.handler(messageIfo);
}
// [方式二]
@Async // @EnableAsync
@MessageListener("TestMessage")
public void testMessageHandler(MessageInfo messageInfo) {
System.out.println("TestMessage消息处理" + messageInfo);
}
[方式一]重构-1:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface Message {
/** 消息类型 */
String value();
}
public interface MessageHandler {
default String getType() {
return this.getClass().getAnnotation(Message.class).value();
}
@PostConstruct
default void register() {
MessageHandlerContext.register(this);
}
void handler(MessageInfo messageInfo);
}
@Slf4j
@Component
public class MessageHandlerContext {
private static final Map<String, MessageHandler> MESSAGE_HANDLER_MAP = new HashMap<>();
public static void register(MessageHandler messageHandler) {
MESSAGE_HANDLER_MAP.put(messageHandler.getType(), messageHandler);
log.info("注册消息处理:{}", messageHandler.getType());
}
public void handler(MessageInfo messageInfo) {
Optional.ofNullable(MESSAGE_HANDLER_MAP.get(messageInfo.getType()))
.orElseThrow(() -> new IllegalArgumentException("未识别的消息类型:" + messageInfo.getType()))
.handler(messageInfo);
}
}
@Data
public class MessageInfo<T> {
/** 消息类型 */
private String type;
/** 消息内容 */
private T data;
}
[方式一]重构-2:
@Message("ExampleMessage")
@Slf4j
@Component
public class ExampleMessageHandler implements MessageHandler {
@Override
public void handler(MessageInfo messageInfo) {
log.debug("ExampleMessageHandler->handler:{}", messageInfo);
}
}
@Message("TestMessage")
@Component
public class TestMessageHandler implements MessageHandler {
@Override
public void handler(MessageInfo messageInfo) {
log.debug("TestMessageHandler->handler:{}", messageInfo);
}
}
[方式二]重构-1
public interface MessageHandler {
String getType();
void handler(MessageInfo messageInfo);
}
@Component
public class MessageHandlerContext {
private static final Map<String, MessageHandler> MESSAGE_HANDLER_MAP = new HashMap<>();
public static void register(MessageHandler messageHandler) {
MESSAGE_HANDLER_MAP.put(messageHandler.getType(), messageHandler);
System.out.println("注册消息处理("+messageHandler+"):" + messageHandler.getType());
}
public void handler(MessageInfo messageInfo) {
Optional.ofNullable(MESSAGE_HANDLER_MAP.get(messageInfo.getType()))
.orElseThrow(() -> new IllegalArgumentException("未识别的消息类型:" + messageInfo.getType()))
.handler(messageInfo);
}
}
@Data
public class MessageInfo<T> {
/** 消息类型 */
private String type;
/** 消息内容 */
private T data;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface MessageListener {
/** 消息类型 */
String value();
}
/**
* Registers {@link MessageListener} methods as individual {@link ApplicationListener} instances.
* 参考:org.springframework.context.event.EventListenerMethodProcessor
*/
@Component
public class MessageListenerProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
@Nullable
private ConfigurableApplicationContext applicationContext;
@Nullable
private ConfigurableListableBeanFactory beanFactory;
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
System.out.println("setApplicationContext");
Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
"ApplicationContext does not implement ConfigurableApplicationContext");
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
System.out.println("postProcessBeanFactory");
this.beanFactory = beanFactory;
}
@Override
public void afterSingletonsInstantiated() {
System.out.println("afterSingletonsInstantiated");
ConfigurableListableBeanFactory beanFactory = this.beanFactory;
Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
Class<?> type = null;
try {
type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
}
catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let\'s ignore it.
ex.printStackTrace();
}
if (type != null) {
if (ScopedObject.class.isAssignableFrom(type)) {
try {
Class<?> targetClass = AutoProxyUtils.determineTargetClass(
beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
if (targetClass != null) {
type = targetClass;
}
}
catch (Throwable ex) {
// An invalid scoped proxy arrangement - let\'s ignore it.
ex.printStackTrace();
}
}
try {
processBean(beanName, type);
}
catch (Throwable ex) {
throw new BeanInitializationException("Failed to process @MessageEventListener " +
"annotation on bean with name \'" + beanName + "\'", ex);
}
}
}
}
}
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, MessageListener.class) &&
!isSpringContainerClass(targetType)) {
Map<Method, MessageListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<MessageListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, MessageListener.class));
}
catch (Throwable ex) {
// An unresolvable type in a method signature, probably from a lazy bean - let\'s ignore it.
ex.printStackTrace();
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
for (Method method : annotatedMethods.keySet()) {
System.out.println(method);
MessageHandler messageHandler = new MessageHandler() {
@Override
public String getType() {return method.getAnnotation(MessageListener.class).value();}
@Override
public void handler(MessageInfo messageInfo) {
try {
System.out.println(context.getBean(beanName));
method.invoke(context.getBean(beanName), messageInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
};
MessageHandlerContext.register(messageHandler);
}
}
}
}
private static boolean isSpringContainerClass(Class<?> clazz) {
return (clazz.getName().startsWith("org.springframework.") &&
!AnnotatedElementUtils.isAnnotated(ClassUtils.getUserClass(clazz), Component.class));
}
}
[方式二]重构-2
@Component
public class MethodMessageHandler {
@MessageListener("TestMessage")
public void testMessageHandler(MessageInfo messageInfo) {
System.out.println("testMessage消息处理" + messageInfo);
}
@MessageListener("ExampleMessage")
public void methodMessage2Handler(MessageInfo messageInfo) {
System.out.println("exampleMessage消息处理" + messageInfo);
}
}
测试:
@SpringBootTest
class DemoApplicationTests {
@Autowired
private MessageHandlerContext messageHandlerContext;
@Test
void contextLoads() {
}
@Test
public void testMessage() {
MessageInfo<String> testMessage = new MessageInfo<>();
testMessage.setType("TestMessage");
testMessage.setData("test");
messageHandlerContext.handler(testMessage);
MessageInfo<String> exampleMessage = new MessageInfo<>();
exampleMessage.setType("ExampleMessage");
exampleMessage.setData("example");
messageHandlerContext.handler(exampleMessage);
}
}
// 输出:
// ......
// 注册消息处理:ExampleMessage
// 注册消息处理:TestMessage
// ......
// TestMessageHandler->handler:MessageInfo{type=\'TestMessage\', data=test}
// ExampleMessageHandler->handler:MessageInfo{type=\'ExampleMessage\', data=example}
以上是关于根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑