根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)

Posted 望夜空流星划过

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)相关的知识,希望对你有一定的参考价值。

根据不通消息类型,进行不同的处理。
---------------------------重构前----------------------------------------------

@Override
public void onMessage(MapRecord<String, String, String> message) {
     log.info("MessageId: {}", message.getId());
     log.info("Stream: {}", message.getStream());
     log.info("Body: {}", message.getValue());
     
     Map<String, String> msgValueMap = message.getValue();
     String type = msgValueMap.get("type"); // 消息类型
     if ("消息类型1".equals(type)) {
          // 消息类型1处理  例:userService.save(...);
     } else if ("消息类型2".equals(type)) {
          // 消息类型2处理  例:roleService.delete(...);
     } else if ("消息类型3".equals(type)) {
          // 消息类型3处理  例:taskService.save(...);
     }
}

---------------------------重构后----------------------------------------------

@Autowired
private MessageHandlerContext messageHandlerContext;
    
@Override
public void onMessage(MapRecord<String, String, String> message) {
     log.info("MessageId: {}", message.getId());
     log.info("Stream: {}", message.getStream());
     log.info("Body: {}", message.getValue());
     
     Map<String, String> msgValueMap = message.getValue();
     MessageInfo<String, String> messageIfo = new MassageInfo();
     messageInfo.setType(msgValueMap.get("type"));
     messageInfo.setData(msgValueMap.get("data"));
     messageHandlerContext.handler(messageIfo);
}

// [方式二]
@Async // @EnableAsync
@MessageListener("TestMessage")
public void testMessageHandler(MessageInfo messageInfo) {
    System.out.println("TestMessage消息处理" + messageInfo);
}

[方式一]重构-1:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface Message {
    /** 消息类型 */
    String value();
}

public interface MessageHandler {

    default String getType() {
        return this.getClass().getAnnotation(Message.class).value();
    }

    @PostConstruct
    default void register() {
        MessageHandlerContext.register(this);
    }

    void handler(MessageInfo messageInfo);
}

@Slf4j
@Component
public class MessageHandlerContext {
    private static final Map<String, MessageHandler> MESSAGE_HANDLER_MAP = new HashMap<>();

    public static void register(MessageHandler messageHandler) {
        MESSAGE_HANDLER_MAP.put(messageHandler.getType(), messageHandler);
        log.info("注册消息处理:{}", messageHandler.getType());
    }

    public void handler(MessageInfo messageInfo) {
        Optional.ofNullable(MESSAGE_HANDLER_MAP.get(messageInfo.getType()))
                .orElseThrow(() -> new IllegalArgumentException("未识别的消息类型:" + messageInfo.getType()))
                .handler(messageInfo);
    }

}

@Data
public class MessageInfo<T> {
    /** 消息类型 */
    private String type;
    /** 消息内容 */
    private T data;
}

[方式一]重构-2:

@Message("ExampleMessage")
@Slf4j
@Component
public class ExampleMessageHandler implements MessageHandler {

    @Override
    public void handler(MessageInfo messageInfo) {
        log.debug("ExampleMessageHandler->handler:{}", messageInfo);
    }

}

@Message("TestMessage")
@Component
public class TestMessageHandler implements MessageHandler {

    @Override
    public void handler(MessageInfo messageInfo) {
        log.debug("TestMessageHandler->handler:{}", messageInfo);
    }

}

[方式二]重构-1

public interface MessageHandler {

    String getType();

    void handler(MessageInfo messageInfo);
}

@Component
public class MessageHandlerContext {
    private static final Map<String, MessageHandler> MESSAGE_HANDLER_MAP = new HashMap<>();

    public static void register(MessageHandler messageHandler) {
        MESSAGE_HANDLER_MAP.put(messageHandler.getType(), messageHandler);
        System.out.println("注册消息处理("+messageHandler+"):" + messageHandler.getType());
    }

    public void handler(MessageInfo messageInfo) {
        Optional.ofNullable(MESSAGE_HANDLER_MAP.get(messageInfo.getType()))
                .orElseThrow(() -> new IllegalArgumentException("未识别的消息类型:" + messageInfo.getType()))
                .handler(messageInfo);
    }

}

@Data
public class MessageInfo<T> {
    /** 消息类型 */
    private String type;
    /** 消息内容 */
    private T data;
}

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface MessageListener {
    /** 消息类型 */
    String value();
}

/**
 * Registers {@link MessageListener} methods as individual {@link ApplicationListener} instances.
 * 参考:org.springframework.context.event.EventListenerMethodProcessor
 */
@Component
public class MessageListenerProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
    @Nullable
    private ConfigurableApplicationContext applicationContext;

    @Nullable
    private ConfigurableListableBeanFactory beanFactory;

    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        System.out.println("setApplicationContext");
        Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
                "ApplicationContext does not implement ConfigurableApplicationContext");
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        System.out.println("postProcessBeanFactory");
        this.beanFactory = beanFactory;
    }

    @Override
    public void afterSingletonsInstantiated() {
        System.out.println("afterSingletonsInstantiated");
        ConfigurableListableBeanFactory beanFactory = this.beanFactory;
        Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
        String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
        for (String beanName : beanNames) {
            if (!ScopedProxyUtils.isScopedTarget(beanName)) {
                Class<?> type = null;
                try {
                    type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
                }
                catch (Throwable ex) {
                    // An unresolvable bean type, probably from a lazy bean - let\'s ignore it.
                    ex.printStackTrace();
                }
                if (type != null) {
                    if (ScopedObject.class.isAssignableFrom(type)) {
                        try {
                            Class<?> targetClass = AutoProxyUtils.determineTargetClass(
                                    beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
                            if (targetClass != null) {
                                type = targetClass;
                            }
                        }
                        catch (Throwable ex) {
                            // An invalid scoped proxy arrangement - let\'s ignore it.
                            ex.printStackTrace();
                        }
                    }
                    try {
                        processBean(beanName, type);
                    }
                    catch (Throwable ex) {
                        throw new BeanInitializationException("Failed to process @MessageEventListener " +
                                "annotation on bean with name \'" + beanName + "\'", ex);
                    }
                }
            }
        }
    }
    private void processBean(final String beanName, final Class<?> targetType) {
        if (!this.nonAnnotatedClasses.contains(targetType) &&
                AnnotationUtils.isCandidateClass(targetType, MessageListener.class) &&
                !isSpringContainerClass(targetType)) {

            Map<Method, MessageListener> annotatedMethods = null;
            try {
                annotatedMethods = MethodIntrospector.selectMethods(targetType,
                        (MethodIntrospector.MetadataLookup<MessageListener>) method ->
                                AnnotatedElementUtils.findMergedAnnotation(method, MessageListener.class));
            }
            catch (Throwable ex) {
                // An unresolvable type in a method signature, probably from a lazy bean - let\'s ignore it.
                ex.printStackTrace();
            }

            if (CollectionUtils.isEmpty(annotatedMethods)) {
                this.nonAnnotatedClasses.add(targetType);
            }
            else {
                // Non-empty set of methods
                ConfigurableApplicationContext context = this.applicationContext;
                Assert.state(context != null, "No ApplicationContext set");
                for (Method method : annotatedMethods.keySet()) {
                    System.out.println(method);
                    MessageHandler messageHandler = new MessageHandler() {
                        @Override
                        public String getType() {return method.getAnnotation(MessageListener.class).value();}

                        @Override
                        public void handler(MessageInfo messageInfo) {
                            try {
                                System.out.println(context.getBean(beanName));
                                method.invoke(context.getBean(beanName), messageInfo);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    MessageHandlerContext.register(messageHandler);
                }
            }
        }
    }

    private static boolean isSpringContainerClass(Class<?> clazz) {
        return (clazz.getName().startsWith("org.springframework.") &&
                !AnnotatedElementUtils.isAnnotated(ClassUtils.getUserClass(clazz), Component.class));
    }
}

[方式二]重构-2

@Component
public class MethodMessageHandler {

    @MessageListener("TestMessage")
    public void testMessageHandler(MessageInfo messageInfo) {
        System.out.println("testMessage消息处理" + messageInfo);
    }

    @MessageListener("ExampleMessage")
    public void methodMessage2Handler(MessageInfo messageInfo) {
        System.out.println("exampleMessage消息处理" + messageInfo);
    }
}

测试:

@SpringBootTest
class DemoApplicationTests {
    @Autowired
    private MessageHandlerContext messageHandlerContext;

    @Test
    void contextLoads() {
    }

    @Test
    public void testMessage() {
        MessageInfo<String> testMessage = new MessageInfo<>();
        testMessage.setType("TestMessage");
        testMessage.setData("test");
        messageHandlerContext.handler(testMessage);

        MessageInfo<String> exampleMessage = new MessageInfo<>();
        exampleMessage.setType("ExampleMessage");
        exampleMessage.setData("example");
        messageHandlerContext.handler(exampleMessage);
    }

}

// 输出:
// ......
// 注册消息处理:ExampleMessage
// 注册消息处理:TestMessage
// ......
// TestMessageHandler->handler:MessageInfo{type=\'TestMessage\', data=test}
// ExampleMessageHandler->handler:MessageInfo{type=\'ExampleMessage\', data=example}

以上是关于根据消息类型对消息进行不同的处理(消除if else,通过方法注解方式实现不同消息的处理-支持异步)的主要内容,如果未能解决你的问题,请参考以下文章

java里对多态的了解

Java中多态简介

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

获取Java接口的所有实现类

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑