应用监控CAT之cat-home源码阅读

Posted yougewe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了应用监控CAT之cat-home源码阅读相关的知识,希望对你有一定的参考价值。

  上两章从点到点讲了,cat-client  到  cat-consumer 的请求流程过程,但是怎么样让我们监控给人看到呢?那么就需要一个展示的后台了,也就是本章要讲的 cat-home 模块 ! 带你一起走cat-home。

  作为观察监控的平台,为所需要的人提供着可视化的稳健服务!

  作为web展现层,在java中,自然是以servlet为接收方法了。

  以tomcat作为web容器,进行运行cat-home服务。

servlet 以处理 uri 为基础,因此,让我们先看一下都有些什么样的路由。也就是说总体服务能力就是这些。

@OutboundActionMeta(name = "home")
@OutboundActionMeta(name = "app")
@OutboundActionMeta(name = "cdn")
@OutboundActionMeta(name = "top")
@OutboundActionMeta(name = "web")
@OutboundActionMeta(name = "home")
@OutboundActionMeta(name = "alert")
@OutboundActionMeta(name = "cache")
@OutboundActionMeta(name = CrossAnalyzer.ID)
@OutboundActionMeta(name = "e")
@OutboundActionMeta(name = "model")
@OutboundActionMeta(name = StateAnalyzer.ID)
@OutboundActionMeta(name = MatrixAnalyzer.ID)
@OutboundActionMeta(name = MetricAnalyzer.ID)
@OutboundActionMeta(name = "system")
@OutboundActionMeta(name = "m")
@OutboundActionMeta(name = "monitor")
@OutboundActionMeta(name = "network")
@OutboundActionMeta(name = "p")
@OutboundActionMeta(name = "storage")
@OutboundActionMeta(name = "activity")
@OutboundActionMeta(name = "database")
@OutboundActionMeta(name = "overload")
@OutboundActionMeta(name = "dashboard")
@OutboundActionMeta(name = "h")
@OutboundActionMeta(name = "alteration")
@OutboundActionMeta(name = DependencyAnalyzer.ID)
@OutboundActionMeta(name = "statistics")
@OutboundActionMeta(name = "t")
@OutboundActionMeta(name = "login")
@OutboundActionMeta(name = "config")
@OutboundActionMeta(name = "plugin")
@OutboundActionMeta(name = "router")

目录结构为 xxx/Handler.java,也算是比较难以理解的结构了。不过学习还是可以的!!

技术分享图片

 

既然是web服务,第一个自然要看一下 web.xml 了:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
    version="2.5">
    <!-- 初始化一些cat需要的参数 -->
    <filter>
        <filter-name>cat-filter</filter-name>
        <filter-class>com.dianping.cat.servlet.CatFilter</filter-class>
    </filter>
    <!-- 设置响应用的cookie信息 -->
    <filter>
        <filter-name>domain-filter</filter-name>
        <filter-class>com.dianping.cat.report.view.DomainFilter</filter-class>
    </filter>
    <!-- 以 cat-servlet 作为第一个启动的servlet -->
    <servlet>
        <servlet-name>cat-servlet</servlet-name>
        <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <!-- 以 mvc-servlet 作为第二个启动的servlet -->
    <servlet>
        <servlet-name>mvc-servlet</servlet-name>
        <servlet-class>org.unidal.web.MVC</servlet-class>
        <init-param>
            <param-name>cat-client-xml</param-name>
            <param-value>client.xml</param-value>
        </init-param>
        <init-param>
            <param-name>init-modules</param-name>
            <param-value>false</param-value>
        </init-param>
        <load-on-startup>2</load-on-startup>
    </servlet>
    <!-- 定义过滤器的使用场景: REQUEST -->
    <filter-mapping>
        <filter-name>cat-filter</filter-name>
        <url-pattern>/r/*</url-pattern>
        <dispatcher>REQUEST</dispatcher>
    </filter-mapping>
    <filter-mapping>
        <filter-name>domain-filter</filter-name>
        <url-pattern>/r/*</url-pattern>
        <dispatcher>REQUEST</dispatcher>
    </filter-mapping>
    <filter-mapping>
        <filter-name>cat-filter</filter-name>
        <url-pattern>/s/*</url-pattern>
        <dispatcher>REQUEST</dispatcher>
    </filter-mapping>
    <filter-mapping>
        <filter-name>cat-filter</filter-name>
        <url-pattern>/jsp/*</url-pattern>
        <dispatcher>FORWARD</dispatcher>
    </filter-mapping>
    <!-- servlet 请求映射,主要转给 mvc-servlet,如果匹配不到再交给 -->
    <servlet-mapping>
        <servlet-name>mvc-servlet</servlet-name>
        <url-pattern>/r/*</url-pattern>
    </servlet-mapping>
    <servlet-mapping>
        <servlet-name>mvc-servlet</servlet-name>
        <url-pattern>/s/*</url-pattern>
    </servlet-mapping>
    <jsp-config>
        <taglib>
            <taglib-uri>/WEB-INF/app.tld</taglib-uri>
            <taglib-location>/WEB-INF/app.tld</taglib-location>
        </taglib>
    </jsp-config>
</web-app>

// 其中 Cat-Servlet 主要用于初始化cat相关的程序,不作具体的请求接收功能

// 主要为调用如下 initComponents() 方法
    @Override
    protected void initComponents(ServletConfig servletConfig) throws ServletException {
        try {
            ModuleContext ctx = new DefaultModuleContext(getContainer());
            ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
            File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
            File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");

            ctx.setAttribute("cat-client-config-file", clientXmlFile);
            ctx.setAttribute("cat-server-config-file", serverXmlFile);
            initializer.execute(ctx);
        } catch (Exception e) {
            m_exception = e;
            System.err.println(e);
            throw new ServletException(e);
        }
    }
    // DefaultModuleInitializer.execute() 运行,
public class DefaultModuleInitializer implements ModuleInitializer {
   @Inject
   private ModuleManager m_manager;

   @InjectAttribute
   private boolean m_verbose;

   private int m_index = 1;

   // 调入,获取topModules,进行加载
   @Override
   public void execute(ModuleContext ctx) {
      Module[] modules = m_manager.getTopLevelModules();

      execute(ctx, modules);
   }

   @Override
   public void execute(ModuleContext ctx, Module... modules) {
      Set<Module> all = new LinkedHashSet<Module>();

      info(ctx, "Initializing top level modules:");

      for (Module module : modules) {
         info(ctx, "   " + module.getClass().getName());
      }

      try {
        // 先调用 setup() 方法
         expandAll(ctx, modules, all);

         for (Module module : all) {
            if (!module.isInitialized()) {
                // 初始化具体的类的初始化方法
               executeModule(ctx, module, m_index++);
            }
         }
      } catch (Exception e) {
         throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
      }
   }

   private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {
      long start = System.currentTimeMillis();

      // set flat to avoid re-entrance
      module.setInitialized(true);

      info(ctx, index + " ------ " + module.getClass().getName());

      // execute itself after its dependencies
      module.initialize(ctx);

      long end = System.currentTimeMillis();
      info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");
   }

   private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
      if (modules != null) {
         for (Module module : modules) {
            expandAll(ctx, module.getDependencies(ctx), all);

            if (!all.contains(module)) {
               if (module instanceof AbstractModule) {
                  ((AbstractModule) module).setup(ctx);
               }

               all.add(module);
            }
         }
      }
   }

}

 

// 主要接收页面请求的mvc-servlet, 

    // 初始化mvc
   @Override
   protected void initComponents(ServletConfig config) throws Exception {
      String contextPath = config.getServletContext().getContextPath();
      String path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;

      getLogger().info("MVC is starting at " + path);

      // 初始化cat
      initializeCat(config);
      // 初始化模块
      initializeModules(config);

      // 获取所有role为mvc的handler,如: r:t, r:home, r:model
      m_handler = lookup(RequestLifecycle.class, "mvc");
      m_handler.setServletContext(config.getServletContext());

      config.getServletContext().setAttribute(ID, this);
      getLogger().info("MVC started at " + path);
   }
    // MVC, 接收请求,交由handler处理
   @Override
   protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
      if (request.getCharacterEncoding() == null) {
         request.setCharacterEncoding("UTF-8");
      }

      response.setContentType("text/html;charset=UTF-8");

      try {
        // 转到handler处理
         m_handler.handle(request, response);
      } catch (Throwable t) {
         String message = "Error occured when handling uri: " + request.getRequestURI();

         getLogger().error(message, t);

         if (!response.isCommitted()) {
            response.sendError(500, message);
         }
      }
   }
   // DefaultRequestLifecycle.handle(), 接管请求第一棒
   public void handle(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
      RequestContext context = m_builder.build(request);

      try {
         handleRequest(request, response, context);
      } finally {
         m_builder.reset(context);
      }
   }
   // DefaultRequestContextBuilder.build(), 构建请求数据, 分解 module,action , 找到对应的处理handler
   @Override
   public RequestContext build(HttpServletRequest request) {
      ParameterProvider provider = buildParameterProvider(request);
      String requestModuleName = provider.getModuleName();
      ActionResolver actionResolver = (ActionResolver) m_modelManager.getActionResolver(requestModuleName);

      if (actionResolver == null) {
         return null;
      }

      UrlMapping urlMapping = actionResolver.parseUrl(provider);
      String action = urlMapping.getAction();
      // 获取 action
      InboundActionModel inboundAction = m_modelManager.getInboundAction(requestModuleName, action);

      if (inboundAction == null) {
         return null;
      }

      RequestContext context = new RequestContext();
      // 获取 module
      ModuleModel module = m_modelManager.getModule(requestModuleName, action);

      urlMapping.setModule(module.getModuleName());
      // 设置配置参数及请求参数到上下文
      context.setActionResolver(actionResolver);
      context.setParameterProvider(provider);
      context.setUrlMapping(urlMapping);
      context.setModule(module);
      context.setInboundAction(inboundAction);
      context.setTransition(module.findTransition(inboundAction.getTransitionName()));
      context.setError(module.findError(inboundAction.getErrorActionName()));

      return context;
   }
   // 处理请求,先处理入站,再处理出站
   private void handleRequest(final HttpServletRequest request, final HttpServletResponse response,
         RequestContext requestContext) throws IOException {
      if (requestContext == null) {
         showPageNotFound(request, response);
         return;
      }

      ModuleModel module = requestContext.getModule();
      InboundActionModel inboundAction = requestContext.getInboundAction();
      ActionContext<?> actionContext = createActionContext(request, response, requestContext, inboundAction);
      Transaction t = Cat.getManager().getPeekTransaction();

      if (t == null) { // in case of no CatFilter is configured
         t = NullMessage.TRANSACTION;
      }

      request.setAttribute(CatConstants.CAT_PAGE_URI,
            actionContext.getRequestContext().getActionUri(inboundAction.getActionName()));

      try {
         InboundActionHandler handler = m_actionHandlerManager.getInboundActionHandler(module, inboundAction);

         // 调用前置方法,为类似于拦截器一类的方法生效
         handler.preparePayload(actionContext);
         
         if (!handlePreActions(request, response, module, requestContext, inboundAction, actionContext)) {
            return;
         }

         // 正式开始处理进入请求
         handleInboundAction(module, actionContext);

         t.addData("module", module.getModuleName());
         t.addData("in", actionContext.getInboundAction());

         if (actionContext.isProcessStopped()) {
            t.addData("processStopped=true");
            return;
         }

         handleTransition(module, actionContext);

         t.addData("out", actionContext.getOutboundAction());
         // 开始处理出站操作
         handleOutboundAction(module, actionContext);
      } catch (Throwable e) {
         handleException(request, e, actionContext);
      }
   }
   // 正式处理入站请求
   private void handleInboundAction(ModuleModel module, ActionContext<?> actionContext) throws ActionException {
      InboundActionModel inboundAction = actionContext.getRequestContext().getInboundAction();
      InboundActionHandler inboundActionHandler = m_actionHandlerManager.getInboundActionHandler(module, inboundAction);
      // 调用获取到的handler的handle方法完成
      inboundActionHandler.handle(actionContext);
   }
   // DefaultActionHandlerManager.getInboundActionHandler(), 获取入站处理handler, 以 r:home 样例为格式获取
   public InboundActionHandler getInboundActionHandler(ModuleModel module, InboundActionModel inboundAction) {
      String key = module.getModuleName() + ":" + inboundAction.getActionName();
      InboundActionHandler actionHandler = m_inboundActionHandlers.get(key);

      // 双重锁的应用
      if (actionHandler == null) {
         synchronized (m_inboundActionHandlers) {
            actionHandler = m_inboundActionHandlers.get(key);

            if (actionHandler == null) {
               actionHandler = lookup(InboundActionHandler.class);
               actionHandler.initialize(inboundAction);
               m_inboundActionHandlers.put(key, actionHandler);
            }
         }
      }

      return actionHandler;
   }
   // DefaultInboundActionHandler.handle(), 类似于 around 式的切面
   public void handle(ActionContext ctx) throws ActionException {
      Transaction t = m_cat.newTransaction("MVC", "InboundPhase");

      try {
         for (Validator<ActionContext<?>> validator : m_preValidators) {
            validator.validate(ctx);
         }

         if (ctx.getPayload() == null) {
            preparePayload(ctx);
         }

         for (Validator<ActionContext<?>> validator : m_validators) {
            validator.validate(ctx);
         }

         // 静态调用 ReflectUtils.invokeMethod(), 
         invokeMethod(m_inboundAction.getActionMethod(), m_inboundAction.getModuleInstance(), ctx);

         for (Validator<ActionContext<?>> validator : m_postValidators) {
            validator.validate(ctx);
         }

         t.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         String actionName = m_inboundAction.getActionName();

         m_cat.logError(e);
         t.setStatus(e);
         throw new ActionException("Error occured during handling inbound action(" + actionName + ")!", e);
      } finally {
         t.complete();
      }
   }

// 以上,就是整个的入站调用过程

 

// 接下来,就是出站的调用过程了

    // t.addData("out", actionContext.getOutboundAction()); handleOutboundAction(module, actionContext); 进入出站处理
   private void handleOutboundAction(ModuleModel module, ActionContext<?> actionContext) throws ActionException {
      String outboundActionName = actionContext.getOutboundAction();
      OutboundActionModel outboundAction = module.getOutbounds().get(outboundActionName);

      if (outboundAction == null) {
         throw new ActionException("No method annotated by @" + OutboundActionMeta.class.getSimpleName() + "("
               + outboundActionName + ") found in " + module.getModuleClass());
      } else {
         OutboundActionHandler outboundActionHandler = m_actionHandlerManager.getOutboundActionHandler(module,
               outboundAction);

         outboundActionHandler.handle(actionContext);
      }
   }
   // 获取出站路由信息
   public OutboundActionHandler getOutboundActionHandler(ModuleModel module, OutboundActionModel outboundAction) {
      String key = module.getModuleName() + ":" + outboundAction.getActionName();
      OutboundActionHandler actionHandler = m_outboundActionHandlers.get(key);

      if (actionHandler == null) {
         synchronized (m_outboundActionHandlers) {
            actionHandler = m_outboundActionHandlers.get(key);

            if (actionHandler == null) {
               actionHandler = lookup(OutboundActionHandler.class);
               actionHandler.initialize(outboundAction);
               m_outboundActionHandlers.put(key, actionHandler);
            }
         }
      }

      return actionHandler;
   }
   // DefaultOutboundActionHandler.handle(), 记录操作,调用action
   public void handle(ActionContext<?> context) throws ActionException {
      Transaction t = m_cat.newTransaction("MVC", "OutboundPhase");

      try {
         invokeMethod(m_outboundAction.getMethod(), m_outboundAction.getModuleInstance(), context);
         t.setStatus(Transaction.SUCCESS);
      } catch (RuntimeException e) {
         String actionName = m_outboundAction.getActionName();

         m_cat.logError(e);
         t.setStatus(e);
         throw new ActionException("Error occured during handling outbound action(" + actionName + ")", e);
      } finally {
         t.complete();
      }
   }

// 以上,出入站的流程就讲完了

// 接下来,看一下具体的几个处理讲求的实例

// home/Handler (由 /cat 或 /cat/r 转入), 首页展现逻辑最简单,默认无数据操作
    // 入站不处理
    @Override
    @PayloadMeta(Payload.class)
    @InboundActionMeta(name = "home")
    public void handleInbound(Context ctx) throws ServletException, IOException {
    }

    // 出站显示首页页面
    @Override
    @OutboundActionMeta(name = "home")
    public void handleOutbound(Context ctx) throws ServletException, IOException {
        Model model = new Model(ctx);
        Payload payload = ctx.getPayload();

        switch (payload.getAction()) {
        case THREAD_DUMP:
            showThreadDump(model, payload);
            break;
        case VIEW:
            break;
        case CHECKPOINT:
            m_receiver.destory();
            m_realtimeConsumer.doCheckpoint();
            break;
        default:
            break;
        }

        model.setAction(payload.getAction());
        model.setPage(ReportPage.HOME);
        model.setDomain(payload.getDomain());
        model.setDate(payload.getDate());
        m_jspViewer.view(ctx, model);
    }
    
// transaction/Handler, transaction 页面呈现
    // 入站不处理
    @Override
    @PayloadMeta(Payload.class)
    @InboundActionMeta(name = "t")
    public void handleInbound(Context ctx) throws ServletException, IOException {
        // display only, no action here
    }

    // 出站进行数据复合
    @Override
    @OutboundActionMeta(name = "t")
    public void handleOutbound(Context ctx) throws ServletException, IOException {
        Model model = new Model(ctx);
        Payload payload = ctx.getPayload();

        normalize(model, payload);
        String domain = payload.getDomain();
        Action action = payload.getAction();
        String ipAddress = payload.getIpAddress();
        String group = payload.getGroup();
        String type = payload.getType();
        String name = payload.getName();
        String ip = payload.getIpAddress();

        if (StringUtils.isEmpty(group)) {
            group = m_configManager.queryDefaultGroup(domain);
            payload.setGroup(group);
        }
        model.setGroupIps(m_configManager.queryIpByDomainAndGroup(domain, group));
        model.setGroups(m_configManager.queryDomainGroup(payload.getDomain()));
        switch (action) {
        case HOURLY_REPORT:
            // 当前小时数据
            TransactionReport report = getHourlyReport(payload);

            if (report != null) {
                report = m_mergeHelper.mergeAllMachines(report, ipAddress);

                model.setReport(report);
                buildTransactionMetaInfo(model, payload, report);
            }
            break;
        case HISTORY_REPORT:
            report = m_reportService.queryReport(domain, payload.getHistoryStartDate(), payload.getHistoryEndDate());

            if (report != null) {
                model.setReport(report);
                buildTransactionMetaInfo(model, payload, report);
            }
            break;
        case HISTORY_GRAPH:
            if (Constants.ALL.equalsIgnoreCase(ipAddress)) {
                report = m_reportService.queryReport(domain, payload.getHistoryStartDate(), payload.getHistoryEndDate());

                buildDistributionInfo(model, type, name, report);
            }

            m_historyGraph.buildTrendGraph(model, payload);
            break;
        case GRAPHS:
            report = getHourlyGraphReport(model, payload);

            if (Constants.ALL.equalsIgnoreCase(ipAddress)) {
                buildDistributionInfo(model, type, name, report);
            }
            if (name == null || name.length() == 0) {
                name = Constants.ALL;
            }

            report = m_mergeHelper.mergeAllNames(report, ip, name);

            model.setReport(report);
            buildTransactionNameGraph(model, report, type, name, ip);
            break;
        case HOURLY_GROUP_REPORT:
            report = getHourlyReport(payload);
            report = filterReportByGroup(report, domain, group);
            report = m_mergeHelper.mergeAllMachines(report, ipAddress);

            if (report != null) {
                model.setReport(report);

                buildTransactionMetaInfo(model, payload, report);
            }
            break;
        case HISTORY_GROUP_REPORT:
            report = m_reportService.queryReport(domain, payload.getHistoryStartDate(), payload.getHistoryEndDate());
            report = filterReportByGroup(report, domain, group);
            report = m_mergeHelper.mergeAllMachines(report, ipAddress);

            if (report != null) {
                model.setReport(report);
                buildTransactionMetaInfo(model, payload, report);
            }
            break;
        case GROUP_GRAPHS:
            report = getHourlyGraphReport(model, payload);
            report = filterReportByGroup(report, domain, group);
            buildDistributionInfo(model, type, name, report);

            if (name == null || name.length() == 0) {
                name = Constants.ALL;
            }
            report = m_mergeHelper.mergeAllNames(report, ip, name);

            model.setReport(report);
            buildTransactionNameGraph(model, report, type, name, ip);
            break;
        case HISTORY_GROUP_GRAPH:
            report = m_reportService.queryReport(domain, payload.getHistoryStartDate(), payload.getHistoryEndDate());
            report = filterReportByGroup(report, domain, group);

            buildDistributionInfo(model, type, name, report);
            List<String> ips = m_configManager.queryIpByDomainAndGroup(domain, group);

            m_historyGraph.buildGroupTrendGraph(model, payload, ips);
            break;
        }

        if (payload.isXml()) {
            m_xmlViewer.view(ctx, model);
        } else {
            m_jspViewer.view(ctx, model);
        }
    }
    // 获取小时报告实例
    private TransactionReport getHourlyReport(Payload payload) {
        String domain = payload.getDomain();
        String ipAddress = payload.getIpAddress();
        ModelRequest request = new ModelRequest(domain, payload.getDate()).setProperty("type", payload.getType())
              .setProperty("ip", ipAddress);

        if (m_service.isEligable(request)) {
            // invoke service
            ModelResponse<TransactionReport> response = m_service.invoke(request);
            TransactionReport report = response.getModel();

            return report;
        } else {
            throw new RuntimeException("Internal error: no eligable transaction service registered for " + request + "!");
        }
    }
    // BaseCompersiteModelService.invoke(), 
    
    @Override
    public ModelResponse<T> invoke(final ModelRequest request) {
        int requireSize = 0;
        final List<ModelResponse<T>> responses = Collections.synchronizedList(new ArrayList<ModelResponse<T>>());
        // 使用信号量进行加锁
        final Semaphore semaphore = new Semaphore(0);
        final Transaction t = Cat.getProducer().newTransaction("ModelService", getClass().getSimpleName());
        int count = 0;

        t.setStatus(Message.SUCCESS);
        t.addData("request", request);
        t.addData("thread", Thread.currentThread());

        for (final ModelService<T> service : m_allServices) {
            if (!service.isEligable(request)) {
                continue;
            }
            
            // save current transaction so that child thread can access it
            if (service instanceof ModelServiceWithCalSupport) {
                ((ModelServiceWithCalSupport) service).setParentTransaction(t);
            }
            requireSize++;
            
            s_threadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        ModelResponse<T> response = service.invoke(request);

                        if (response.getException() != null) {
                            logError(response.getException());
                        }
                        if (response != null && response.getModel() != null) {
                            responses.add(response);
                        }
                    } catch (Exception e) {
                        logError(e);
                        t.setStatus(e);
                    } finally {
                        semaphore.release();
                    }
                }
            });

            count++;
        }

        try {
            semaphore.tryAcquire(count, 10000, TimeUnit.MILLISECONDS); // 10 seconds timeout
        } catch (InterruptedException e) {
            // ignore it
            t.setStatus(e);
        } finally {
            t.complete();
        }

        String requireAll = request.getProperty("requireAll");

        if (requireAll != null && responses.size() != requireSize) {
            String data = "require:" + requireSize + " actual:" + responses.size();
            Cat.logEvent("FetchReportError:" + this.getClass().getSimpleName(), request.getDomain(), Event.SUCCESS, data);

            return null;
        }
        ModelResponse<T> aggregated = new ModelResponse<T>();
        T report = merge(request, responses);

        aggregated.setModel(report);
        return aggregated;
    }
    
    // TransactionMergeHelper
    
public class TransactionMergeHelper {

    public TransactionReport mergeAllMachines(TransactionReport report, String ipAddress) {
        if (StringUtils.isEmpty(ipAddress) || Constants.ALL.equalsIgnoreCase(ipAddress)) {
            AllMachineMerger all = new AllMachineMerger();

            all.visitTransactionReport(report);
            report = all.getReport();
        }
        return report;
    }

    public TransactionReport mergeAllNames(TransactionReport report, String allName) {
        if (StringUtils.isEmpty(allName) || Constants.ALL.equalsIgnoreCase(allName)) {
            AllNameMerger all = new AllNameMerger();

            all.visitTransactionReport(report);
            report = all.getReport();
        }
        return report;
    }

    public TransactionReport mergeAllNames(TransactionReport report, String ipAddress, String allName) {
        TransactionReport temp = mergeAllMachines(report, ipAddress);

        return mergeAllNames(temp, allName);
    }

}
    // AllMerchineMerger.visitTransactionReport()
    @Override
    public void visitTransactionReport(TransactionReport transactionReport) {
        m_report = new TransactionReport(transactionReport.getDomain());
        m_report.setStartTime(transactionReport.getStartTime());
        m_report.setEndTime(transactionReport.getEndTime());
        m_report.getDomainNames().addAll(transactionReport.getDomainNames());
        m_report.getIps().addAll(transactionReport.getIps());

        super.visitTransactionReport(transactionReport);
    }
    // 调用父类的方法 BaseVisitor.visitTransactionReport() 进行循环调用集群机器小时数据
    
   @Override
   public void visitTransactionReport(TransactionReport transactionReport) {
      for (Machine machine : transactionReport.getMachines().values()) {
         visitMachine(machine);
      }
   }

    @Override
    public void visitMachine(Machine machine) {
        m_report.findOrCreateMachine(Constants.ALL);
        super.visitMachine(machine);
    }

    @Override
    public void visitType(TransactionType type) {
        m_currentType = type.getId();
        TransactionType temp = m_report.findOrCreateMachine(Constants.ALL).findOrCreateType(m_currentType);

        m_merger.mergeType(temp, type);
        super.visitType(type);
    }
    
    // TransactionReportMerger.mergeType(), 进行数据合并
    
    @Override
    public void mergeType(TransactionType old, TransactionType other) {
        long totalCountSum = old.getTotalCount() + other.getTotalCount();
        if (totalCountSum > 0) {
            // 95、99线相加/总数
            double line95Values = old.getLine95Value() * old.getTotalCount() + other.getLine95Value()
                  * other.getTotalCount();
            double line99Values = old.getLine99Value() * old.getTotalCount() + other.getLine99Value()
                  * other.getTotalCount();

            old.setLine95Value(line95Values / totalCountSum);
            old.setLine99Value(line99Values / totalCountSum);
        }

        // 取总数,取最大最小值
        old.setTotalCount(totalCountSum);
        old.setFailCount(old.getFailCount() + other.getFailCount());
        old.setTps(old.getTps() + other.getTps());

        if (other.getMin() < old.getMin()) {
            old.setMin(other.getMin());
        }

        if (other.getMax() > old.getMax()) {
            old.setMax(other.getMax());
        }

        old.setSum(old.getSum() + other.getSum());
        old.setSum2(old.getSum2() + other.getSum2());

        if (old.getTotalCount() > 0) {
            old.setFailPercent(old.getFailCount() * 100.0 / old.getTotalCount());
            old.setAvg(old.getSum() / old.getTotalCount());
            old.setStd(std(old.getTotalCount(), old.getAvg(), old.getSum2(), old.getMax()));
        }

        if (old.getSuccessMessageUrl() == null) {
            old.setSuccessMessageUrl(other.getSuccessMessageUrl());
        }

        if (old.getFailMessageUrl() == null) {
            old.setFailMessageUrl(other.getFailMessageUrl());
        }
    }

    // visitType
   @Override
   public void visitType(TransactionType type) {
      for (TransactionName name : type.getNames().values()) {
         visitName(name);
      }

      for (Range2 range2 : type.getRange2s().values()) {
         visitRange2(range2);
      }

      for (AllDuration allDuration : type.getAllDurations().values()) {
         visitAllDuration(allDuration);
      }
   }
   
   // 设置概要信息
    private void buildTransactionMetaInfo(Model model, Payload payload, TransactionReport report) {
        String type = payload.getType();
        String sorted = payload.getSortBy();
        String queryName = payload.getQueryName();
        String ip = payload.getIpAddress();

        if (!StringUtils.isEmpty(type)) {
            DisplayNames displayNames = new DisplayNames();

            model.setDisplayNameReport(displayNames.display(sorted, type, ip, report, queryName));
            // 创建 pie 饼图
            buildTransactionNamePieChart(displayNames.getResults(), model);
        } else {
            model.setDisplayTypeReport(new DisplayTypes().display(sorted, ip, report));
        }
    }

// 如上过程,简单或复杂的页面业务已经ok了

其他业务逻辑看代码自然一目了然了!

如有必要再添加几个有意义的 方法。

 

具体的业务展现逻辑一般都比较简单的,这里就不赘述了,主要在于熟悉业务,熟悉表结构。

流程图就暂不显示了,请自行脑补吧。

cat本身是好几年前的产物,技术自然算不上新,要去深究意义也不大,重在学习吧。

以上是关于应用监控CAT之cat-home源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

CAT服务端初始化

java应用监控之Cat集成DubboMybatisLogbackspring boot

Cat安装与使用(美团开源监控)

Cat安装与使用(美团开源监控)

JDK源码阅读之 HashMap

熬夜之作:一文带你了解Cat分布式监控