SOFARPC

Posted 奋进的IT创业者

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SOFARPC相关的知识,希望对你有一定的参考价值。

SOFARPC 服务发布原理

SOFARPC 是蚂蚁SOFASTACK中的rpc框架,每一个中间件的兴起都值得我们学习它的设计理念,以拓展我们的知识储备。基本应该这里就不再展示了。没了解的过的同学可以参考:https://www.sofastack.tech/projects/sofa-rpc/overview/

本次基于zookeeper作为注册分析。

暴露服务

配置好接口后如下

@SofaService(interfaceType = HelloSofaV2.class,
       bindings = {@SofaServiceBinding(bindingType = "bolt"),@SofaServiceBinding(bindingType = "rest")})
@Service
public class HelloSofav2Impl implements HelloSofaV2 {
   private Logger logger = LoggerFactory.getLogger(HelloSofav2Impl.class);
   @Override
   public String sayHello(String sofa) {
       logger.info("hello sofa...");
       return "hellosofa";
  }
}

会在接口实现上标识@SofaService注解。ServiceBeanFactoryPostProcessor是spring BeanFactoryPostProcessor扩展点的扩展通过编程的方式定义bean

ServiceBeanFactoryPostProcessor#postProcessBeanFactory->transformSofaBeanDefinition->generateSofaServiceDefinitionOnClass->generateSofaServiceDefinition

private void generateSofaServiceDefinition(String beanId, SofaService sofaServiceAnnotation,
                                           Class<?> beanClass, BeanDefinition beanDefinition,
                                           ConfigurableListableBeanFactory beanFactory) {
    if (sofaServiceAnnotation == null) {
        return;
    }
    AnnotationWrapperBuilder<SofaService> wrapperBuilder = AnnotationWrapperBuilder.wrap(
        sofaServiceAnnotation).withBinder(binder);
//通过代理实现占位符解析
    sofaServiceAnnotation = wrapperBuilder.build();

    Class<?> interfaceType = sofaServiceAnnotation.interfaceType();
    if (interfaceType.equals(void.class)) {
        Class<?> interfaces[] = beanClass.getInterfaces();

        if (beanClass.isInterface() || interfaces == null || interfaces.length == 0) {
            interfaceType = beanClass;
        } else if (interfaces.length == 1) {
            interfaceType = interfaces[0];
        } else {
            throw new FatalBeanException("Bean " + beanId + " has more than one interface.");
        }
    }

    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition();
    String serviceId = SofaBeanNameGenerator.generateSofaServiceBeanName(interfaceType,
        sofaServiceAnnotation.uniqueId());

    if (!beanFactory.containsBeanDefinition(serviceId)) {
        builder.getRawBeanDefinition().setScope(beanDefinition.getScope());
        builder.setLazyInit(beanDefinition.isLazyInit());
        builder.getRawBeanDefinition().setBeanClass(ServiceFactoryBean.class);
        builder.addPropertyValue(AbstractContractDefinitionParser.INTERFACE_CLASS_PROPERTY,
            interfaceType);
        builder.addPropertyValue(AbstractContractDefinitionParser.UNIQUE_ID_PROPERTY,
            sofaServiceAnnotation.uniqueId());
        builder.addPropertyValue(AbstractContractDefinitionParser.BINDINGS,
            getSofaServiceBinding(sofaServiceAnnotation, sofaServiceAnnotation.bindings()));
        builder.addPropertyReference(ServiceDefinitionParser.REF, beanId);
        builder.addPropertyValue(ServiceDefinitionParser.BEAN_ID, beanId);
        builder.addPropertyValue(AbstractContractDefinitionParser.DEFINITION_BUILDING_API_TYPE,
            true);
        builder.addDependsOn(beanId);
        ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(serviceId,
            builder.getBeanDefinition());
    } else {
        SofaLogger.error("SofaService was already registered: {}", serviceId);
    }
}

通过BeanDefinitionBuilder创建ServiceFactoryBean。通过看SOFARPC简绍我们知道@SofaService注解支持占位符,占位符的解析通过 PlaceHolderAnnotationInvocationHandler 这个类去实现的,上面注释的部分是实现原理。到这需要讲目光转移到ServiceFactoryBean通过继承关系ServiceFactoryBean继承了AbstractContractFactoryBean实现了InitializingBean,模板的设计模式

ServiceFactoryBean#doAfterPropertiesSet

@Override
   protected void doAfterPropertiesSet() {
       if (!apiType && hasSofaServiceAnnotation()) {
           throw new ServiceRuntimeException(
               "Bean " + beanId + " of type " + ref.getClass()
                       + " has already annotated by @SofaService,"
                       + " can not be registered using xml. Please check it.");
      }

       Implementation implementation = new DefaultImplementation();
       implementation.setTarget(ref);
       service = buildService();

       // default add jvm binding and service jvm binding should set serialize as true
       if (bindings.size() == 0) {
           JvmBindingParam jvmBindingParam = new JvmBindingParam().setSerialize(true);
           bindings.add(new JvmBinding().setJvmBindingParam(jvmBindingParam));
      }

       for (Binding binding : bindings) {
           service.addBinding(binding);
      }

       ComponentInfo componentInfo = new ServiceComponent(implementation, service,
           bindingAdapterFactory, sofaRuntimeContext);
       sofaRuntimeContext.getComponentManager().register(componentInfo);
  }

主要注意力集中在sofaRuntimeContext.getComponentManager().register(componentInfo);通过调用会到 ComponentManagerImpl#doRegister

private ComponentInfo doRegister(ComponentInfo ci) {
       ComponentName name = ci.getName();
       if (isRegistered(name)) {
           SofaLogger.error("Component was already registered: {}", name);
           if (ci.canBeDuplicate()) {
               return getComponentInfo(name);
          }
           throw new ServiceRuntimeException("Component can not be registered duplicated: " + name);
      }

       try {
           ci.register();
      } catch (Throwable t) {
           SofaLogger.error("Failed to register component: {}", ci.getName(), t);
           return null;
      }

       SofaLogger.info("Registering component: {}", ci.getName());

       try {
           ComponentInfo old = registry.putIfAbsent(ci.getName(), ci);
           if (old != null) {
               SofaLogger.error("Component was already registered: {}", name);
               if (ci.canBeDuplicate()) {
                   return old;
              }
               throw new ServiceRuntimeException("Component can not be registered duplicated: "
                                                 + name);

          }
           if (ci.resolve()) {
               typeRegistry(ci);
               //暴露服务
               ci.activate();
          }
      } catch (Throwable t) {
           ci.exception(new Exception(t));
           SofaLogger.error("Failed to create the component {}", ci.getName(), t);
      }

       return ci;
  }

ci.activate()会依靠com.alipay.sofa.runtime.spi.binding.Binding暴力服务 。Binding是SOFARPC中的扩展点。通过 BindingAdapterFactory包装成BindingAdapter<?>,通过

Object outBinding(Object contract, T binding, Object target,
                 SofaRuntimeContext sofaRuntimeContext);

暴露服务。以RpcBindingAdapter为例

@Override
   public Object outBinding(Object contract, RpcBinding binding, Object target,
                            SofaRuntimeContext sofaRuntimeContext) {

       ApplicationContext applicationContext = sofaRuntimeContext.getSofaRuntimeManager()
          .getRootApplicationContext();
       ProviderConfigContainer providerConfigContainer = applicationContext
          .getBean(ProviderConfigContainer.class);
       ProcessorContainer processorContainer = applicationContext
          .getBean(ProcessorContainer.class);

       String uniqueName = providerConfigContainer.createUniqueName((Contract) contract, binding);
       ProviderConfig providerConfig = providerConfigContainer.getProviderConfig(uniqueName);
       processorContainer.processorProvider(providerConfig);

       if (providerConfig == null) {
           throw new ServiceRuntimeException(LogCodes.getLog(
               LogCodes.INFO_SERVICE_METADATA_IS_NULL, uniqueName));
      }

       try {
           //暴露服务
           providerConfig.export();
      } catch (Exception e) {
           throw new ServiceRuntimeException(LogCodes.getLog(LogCodes.ERROR_PROXY_PUBLISH_FAIL), e);
      }

       if (providerConfigContainer.isAllowPublish()) {
           providerConfig.setRegister(true);
           List<RegistryConfig> registrys = providerConfig.getRegistry();
           for (RegistryConfig registryConfig : registrys) {
               Registry registry = RegistryFactory.getRegistry(registryConfig);
               registry.init();
               registry.start();
               registry.register(providerConfig);
          }
      }
       return Boolean.TRUE;
  }

providerConfig#export

@Override
   public void export() {
       if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
           Thread thread = factory.newThread(new Runnable() {
               @Override
               public void run() {
                   try {
                       Thread.sleep(providerConfig.getDelay());
                  } catch (Throwable ignore) { // NOPMD
                  }
                   doExport();
              }
          });
           thread.start();
      } else {
           doExport();
      }
  }
private void doExport() {
       if (exported) {
           return;
      }

       // 检查参数
       checkParameters();

       String appName = providerConfig.getAppName();

       //key is the protocol of server,for concurrent safe
       Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
       // 将处理器注册到server
       List<ServerConfig> serverConfigs = providerConfig.getServer();
       for (ServerConfig serverConfig : serverConfigs) {
           String protocol = serverConfig.getProtocol();

           String key = providerConfig.buildKey() + ":" + protocol;

           if (LOGGER.isInfoEnabled(appName)) {
               LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
          }

           // 注意同一interface,同一uniqueId,不同server情况
           AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
           if (cnt == null) { // 没有发布过
               cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
          }
           int c = cnt.incrementAndGet();
           hasExportedInCurrent.put(serverConfig.getProtocol(), true);
           int maxProxyCount = providerConfig.getRepeatedExportLimit();
           if (maxProxyCount > 0) {
               if (c > maxProxyCount) {
                   decrementCounter(hasExportedInCurrent);
                   // 超过最大数量,直接抛出异常
                   throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_PROVIDER_CONFIG, key,
                       maxProxyCount));
              } else if (c > 1) {
                   if (LOGGER.isInfoEnabled(appName)) {
                       LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.WARN_DUPLICATE_PROVIDER_CONFIG, key, c));
                  }
              }
          }

      }

       try {
           // 构造请求调用器
           providerProxyInvoker = new ProviderProxyInvoker(providerConfig);

           preProcessProviderTarget(providerConfig, (ProviderProxyInvoker) providerProxyInvoker);
           // 初始化注册中心
           if (providerConfig.isRegister()) {
               List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
               if (CommonUtils.isNotEmpty(registryConfigs)) {
                   for (RegistryConfig registryConfig : registryConfigs) {
                       RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                  }
              }
          }
           // 将处理器注册到server
           for (ServerConfig serverConfig : serverConfigs) {
               try {
                   Server server = serverConfig.buildIfAbsent();
                   // 注册请求调用器
                   server.registerProcessor(providerConfig, providerProxyInvoker);
                   if (serverConfig.isAutoStart()) {
                       //启动服务
                       server.start();
                  }

              } catch (SofaRpcRuntimeException e) {
                   throw e;
              } catch (Exception e) {
                   LOGGER.errorWithApp(appName,
                       LogCodes.getLog(LogCodes.ERROR_REGISTER_PROCESSOR_TO_SERVER, serverConfig.getId()), e);
              }
          }

           // 注册到注册中心
           providerConfig.setConfigListener(new ProviderAttributeListener());
           register();
      } catch (Exception e) {
           decrementCounter(hasExportedInCurrent);
           if (e instanceof SofaRpcRuntimeException) {
               throw e;
          }
           throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_PROVIDER_PROXY), e);
      }

       // 记录一些缓存数据
       RpcRuntimeContext.cacheProviderConfig(this);
       exported = true;
  }

最后通过BoltServer启动服务。到这服务就发布完了。

注册服务

通过sofaboot。spring.factories中看到有一个SofaRpcAutoConfiguration。里面配置一了一个SofaBootRpcStartListener,这个监听器监听 SofaBootRpcStartEvent。SofaBootRpcStartEvent又是ApplicationContextRefreshedListener 发布的,ApplicationContextRefreshedListener监听的是 ContextRefreshedEvent,也是就说spring boot 启动完成会发布一个SofaBootRpcStartEvent这个事件被SofaBootRpcStartListener监听到

@Override
public void onApplicationEvent(SofaBootRpcStartEvent event) {
//choose disable metrics lookout
disableLookout();

//extra info
processExtra(event);

//start fault tolerance
faultToleranceConfigurator.startFaultTolerance();

Collection<ProviderConfig> allProviderConfig = providerConfigContainer
.getAllProviderConfig();
if (!CollectionUtils.isEmpty(allProviderConfig)) {
//start server
serverConfigContainer.startServers();
}

//set allow all publish
providerConfigContainer.setAllowPublish(true);

//register registry
providerConfigContainer.publishAllProviderConfig();

//export dubbo
providerConfigContainer.exportAllDubboProvideConfig();
}

通过注释可以看到providerConfigContainer.publishAllProviderConfig(); 通过Registry注册服务到注册中心上

public void register(ProviderConfig config) {
String appName = config.getAppName();
if (!registryConfig.isRegister()) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
}
return;
}

//发布
if (config.isRegister()) {
registerProviderUrls(config);
}

if (config.isSubscribe()) {
// 订阅配置节点
if (!INTERFACE_CONFIG_CACHE.containsKey(buildConfigPath(rootPath, config))) {
//订阅接口级配置
subscribeConfig(config, config.getConfigListener());
}
}
}

通过registerProviderUrls(config);我们可以看到具体的发布流程,到此服务注册完成。

SPI

接下来重点说明sofarpc spi 的应该

spi是jdk中的工具类,大量被其它框架引用sofarpc也不例外,sofarpc好多设计应该是模仿dubbo去设计的基于插件的微内核,因为jdk spi比较单一,很多框架基于jdk spi 做了扩展

ExtensionLoader 是sofarpc中spi 的工具栏,通过ExtensionLoaderFactory可以得到ExtensionLoader<T>

ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null) {
synchronized (ExtensionLoaderFactory.class) {
loader = LOADER_MAP.get(clazz);
if (loader == null) {
loader = new ExtensionLoader<T>(clazz, listener);
LOADER_MAP.put(clazz, loader);
}
}
}
return loader;
}

一个类会绑定一个ExtensionLoader绑定,并且会做缓存,构建 ExtensionLoader

protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
if (RpcRunningState.isShuttingDown()) {
this.interfaceClass = null;
this.interfaceName = null;
this.listeners = null;
this.factory = null;
this.extensible = null;
this.all = null;
return;
}
// 接口为空,既不是接口,也不是抽象类
if (interfaceClass == null ||
!(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
}
this.interfaceClass = interfaceClass;
this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
this.listeners = new ArrayList<>();
if (listener != null) {
listeners.add(listener);
}
Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
if (extensible == null) {
throw new IllegalArgumentException(
"Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
} else {
this.extensible = extensible;
}

this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
if (autoLoad) {
List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
for (String path : paths) {
loadFromFile(path);
}
}
}
public @interface Extension {
/**
* 扩展点名字
*
* @return 扩展点名字
*/
String value();

/**
* 扩展点编码,默认不需要,当接口需要编码的时候需要
*
* @return 扩展点编码
* @see Extensible#coded()
*/
byte code() default -1;

/**
* 优先级排序,默认不需要
*
* @return 排序
*/
int order() default 0;

/**
* 是否覆盖其它低{@link #order()}的同名扩展
*
* @return 是否覆盖其它低排序的同名扩展
* @since 5.2.0
*/
boolean override() default false;

/**
* 排斥其它扩展,可以排斥掉其它低{@link #order()}的扩展
*
* @return 排斥其它扩展
* @since 5.2.0
*/
String[] rejection() default {};
}

会按照优先级、相同的扩展点存在是否覆盖、等特性。

sofarpc中扩展点存在META-INF/services/sofa-rpc/、META-INF/services/路径中,

sofarpc 中所有的扩展点

负载均衡

  1. consistentHash

  2. localPref

  3. random

  4. roundRobin

  5. weightRoundRobin

  6. weightConsistentHash

  7. auto

默认的负载均衡算法是 random

集群容错

  1. failfast

  2. failover

默认容错策略 failover

sofarpc 全局配置

sofarpc的默认配置都在 rpc-config-default.json 中

如果想覆盖默认配置需要在sofa-rpc/rpc-config.json或者

META-INF/sofa-rpc/rpc-config.json中配置覆盖通过 rpc.config.order定义优先级

到处服务发布的基本流程和关键的技术点已经完成了。下节我们一起来分析服务引用流程


以上是关于SOFARPC的主要内容,如果未能解决你的问题,请参考以下文章

SOFARPC 性能优化实践(上)| SOFAChannel#2 直播整理

蚂蚁金服开源-SofaRpc源码学习篇

SOFARPC的搭建(蚂蚁金服)

开源 | SOFARPC 集成 ZooKeeper 注册中心

8.源码分析---从设计模式中看SOFARPC中的EventBus?

SOFARPC学习