RPC(二)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC(二)相关的知识,希望对你有一定的参考价值。

参考技术A 上篇文章简单的介绍了RPC的原理和传输协议采用HTTP协议与TCP协议的区别。

其中说到HTTP 协议有连接的建立和断开的开销。那TCP就不用建立和断开了吗?

HTTP有长连接为什么还要连接池?

HTTP1.1队首阻塞怎么解决?

回归RPC,既然Spring Cloud feign采用了HTTP1.1进行了传输,那它是怎么缓解对头阻塞的?
Feign的HTTP客户端
支持三种框架:HttpURLConnection、HttpClient、OkHttp;默认是HttpURLConnection
HttpURLConnection是没有连接池的,默认开启长连接keep-alive,不过这个长连接需要URL一致才能服用,而且只能支持GET协议的请求。连接复用率略低。解决方案:只要不多个请求用同一连接不就没有对头阻塞了。
长连接与短连接——JDK的HttpClient、ApacheHttpClient及OkHttpClient类比——Feign产品优化

HTTP1.1为什么都等连接空闲才复用,不是有管道技术吗?

手撸rpc框架,并基于spring进行二次注解开发

一、rpc是什么

手撸rpc框架,并基于spring进行二次注解开发
RPC是远程过程调用(Remote Procedure Call)的缩写形式。 客户端通过网络传输,远程调用服务端的函数,服务端处理客户端的调用请求,然后将结果通过网络传输返回客户端。

二、自定义rpc框架的使用展示

(一)、客户端代码

只需将远程的接口(来自rpc_api包,只有接口)IHelloService添加为属性并使用@Autowired即可像使用普通本地对象一样的调用远程函数

@Componentpublic class ServiceTest { @Autowired IHelloService helloService;
public String hello(String content) { System.out.println("===========client:开始调用远程方法==============="); String res = helloService.sayHello(content); System.out.println("===========client:开始调用远程结束==============="); return res; }}


(二)、服务端代码

只需使用@RpcServiceImpl(value = IHelloService.class, version = "v1.0")注解,继承rpc_api包中的接口进行实现,并启动容器即可

@RpcServiceImpl(value = IHelloService.class, version = "v1.0")public class V1HelloServiceImpl implements IHelloService {
@Override public String sayHello(String content) { System.out.println("===============server:开始执行远程方法sayHello【v1.0】==============="); System.out.println(content); System.out.println("===============server:远程方法sayHello【v1.0】执行完毕==============="); return "恭喜你,rpc通信完成了!这是我返回给你的结果"; }}

该框架包括简单示例都已上传至github,链接自取:自定义rpc框架

三、自定义rpc框架

(一)、代码架构

手撸rpc框架,并基于spring进行二次注解开发

  • rpc_api远程服务端提供的方法接口。

  • rpc_client :自定义rpc框架客户端部分,实现了rpc客户端部分逻辑。

  • rpc_server:自定义rpc框架服务端部分,实现了rpc服务端部分逻辑。

  • clienct_app:客户端应用,依赖rpc_api,和rpc_client。可以使用rpc_api下接口的方法(像上面示例一样,直接使用spring的@Autowired注解即可),实质是远程调用rpc_server实现的方法。

  • rpc_server:服务端应用,依赖rpc_apirpc_server实现rpc_api包的接口供客户端远程调用(像上面示例一样,只要使用@RpcServiceImpl注解即可)。

(二)、 rpc_api代码

很简单,定义服务端提供的api即可,使用自定义注解@RpcServiceAPI

package com.zte;
import com.zte.annotation.RpcServiceAPI;
@RpcServiceAPIpublic interface IHelloService { String sayHello(String content);}

(三)、rpc_client代码手撸rpc框架,并基于spring进行二次注解开发

  • annotation:api包注册API注解

  • client:管理socket,并处理远程调用的逻辑

  • spring_rpc:和mybatis_spring一样,将我们自定义的rpc框架整合到spirng中,使其使用时直接使用@AutoWired即可

  • RpcRequest:通信时序列化传输的对象

1. client

(1)、RpcHandler

当客户端使用api包下的接口方法时,使用动态代理技术,实际上调用的是我们在InvocationHandler中定义的invoke方法,而该方法会通过socket通信,将调用信息序列化传输至服务端服务端接收到请求后,处理请求,调用其具体实现类,然后返回结果。动态代理是spring框架的一大基石,掌握理解对开发与面试非常重要,可以参考两万字吐血总结,代理模式及手写实现动态代理(aop原理,基于jdk动态代理)

/** * api接口的实际处理者 */public class RpcHandler implements InvocationHandler { private String host; private int port; private String version;
public RpcHandler(String host, int port, String version) { this.host = host; this.port = port; this.version = version; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setArgs(args); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setVersion(version); return RpcClient.send(rpcRequest, host, port); }}


(2)、RpcClient

该类建立socket通信发送远程调用信息至服务端,并接受远程返回结果

/** * 客户端发送请求 */class RpcClient {
static Object send(RpcRequest rpcRequest, String host, int port) { Object res = null; //try with ,实现了Closeable的都可以在执行玩方法体后自动关闭,会处理异常 try ( Socket socket = new Socket(host, port); final ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); final ObjectInputStream in = new ObjectInputStream(socket.getInputStream()) ) { out.writeObject(rpcRequest); out.flush();
res = in.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return res; }}

2.@RpcServiceAPI

标注api的接口,在构造SpringBean时,如果bean中有@RpcServiceAPI标注的接口的成员变量,会使用动态代理注入代理类,负责远程调用服务端方法。(如同mybatis中使用@AutoWired UserDao dao一样,虽然我们只定义了接口,但是依然可以使用它的方法,原因就是因为,spring将mybatis生成的动态代理类注入了。)

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)public @interface RpcServiceAPI {}

3.spring_rpc

将自定义的rpc框架整合到spring中。可以参考 面试官你好,我自己手写实现过Mybatis(超详细注释)

(1)、RpcClientProxyFactory

/** * 提供api接口的代理类,实现spring的FactoryBean接口 */ //该注解可以从配置文件读取value@PropertySource("classpath:rpc.properties")public class RpcClientProxyFactory<T> implements FactoryBean<T> { @Value("${rpc.host:127.0.0.1}") private String host; @Value("${rpc.port:6666}") private int port; @Value("${rpc.version:v1.0}") private String version; private Class<T> interfaceCls;
public RpcClientProxyFactory(Class<T> interfaceCls) { this.interfaceCls = interfaceCls; }
public Class<T> getInterfaceCls() { return interfaceCls; }

public void setInterfaceCls(Class<T> interfaceCls) { this.interfaceCls = interfaceCls; } @Override public T getObject() throws Exception { //将代理类对象注入容器 return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[]{interfaceCls}, new RpcHandler(host, port, version)); }
@Override public boolean isSingleton() { return true; }
@Override public Class<T> getObjectType() { return interfaceCls; }}

(2)、ServiceBeanDefinitionRegistry

/** * 用于Spring动态注入rpc接口,类似于spring-mybatis整合,当被@RpcProxyFactory注解的interface使用@Autowired时会注入其代理类 */@Componentpublic class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor { private static Set<Class<?>> classCache = new HashSet<>(); private final static String RESOURCE_PATTERN = "/**/*.class"; private static final String BASE_PACKAGE = "com.zte";
@Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { getAPIClz(); classCache.forEach( beanClazz -> { //重新定义被RpcServiceAPI注解标注类的bean生成方法,这里将代理对象注入容器 BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz); GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
//在这里,我们可以给该对象的属性注入对应的实例。 //比如mybatis,就在这里注入了dataSource和sqlSessionFactory, // 注意,如果采用definition.getPropertyValues()方式的话, // 类似definition.getPropertyValues().add("interfaceType", beanClazz); // 则要求在FactoryBean(本应用中即ServiceFactory)提供setter方法,否则会注入失败 // 如果采用definition.getConstructorArgumentValues(), // 则FactoryBean中需要提供包含该属性的构造方法,否则会注入失败
//简单来说,就是将beanClazz注入到RpcClientProxyFactory的构造方法中 definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz); // 其返回的是该工厂Bean的getObject方法所返回的对象。 definition.setBeanClass(RpcClientProxyFactory.class);
//这里采用的是byType方式注入,类似的还有byName等 definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); registry.registerBeanDefinition(beanClazz.getSimpleName(), definition); } ); }
/** * */ public void getAPIClz() { /** * 扫描使用注解RpcServiceAPI的类 */ ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); try { String pattern = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + ClassUtils.convertClassNameToResourcePath(BASE_PACKAGE) + RESOURCE_PATTERN; Resource[] resources = resourcePatternResolver.getResources(pattern); MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver); for (Resource resource : resources) { if (resource.isReadable()) { MetadataReader reader = readerFactory.getMetadataReader(resource); //扫描到的class String className = reader.getClassMetadata().getClassName(); Class<?> clazz = Class.forName(className); //判断是否有指定注解 RpcServiceAPI annotation = clazz.getAnnotation(RpcServiceAPI.class); if (annotation != null) { //这个类使用了自定义注解 classCache.add(clazz); } } } } catch (IOException | ClassNotFoundException e) { System.out.println(e.getMessage()); } }
@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}}

4.RpcRequest

网络通信的消息体,包括了所有的调用信息

public class RpcRequest implements Serializable { private static final long serialVersionUID = -5854150492574586489L; private String className; private Object[] args; private String methodName; private String version;
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public String getClassName() { return className; }
public void setClassName(String className) { this.className = className; }
public Object[] getArgs() { return args; }
public void setArgs(Object[] args) { this.args = args; }
public String getMethodName() { return methodName; }
public void setMethodName(String methodName) { this.methodName = methodName; }}

(四)、rpc_server代码

手撸rpc框架,并基于spring进行二次注解开发

  • RpcServiceImpl注解:服务端应用使用该注解api接口包的实现类

  • server:处理客户端应用序列化发送而来的调用请求,使用反射,调用对应的实现类方法,并返回调用结果

  • RpcRequest:通信的消息,包括了全类名、方法名,参数列表

1. RpcServiceImpl

服务端应用使用该注解标注api的实现类

/** * 标注服务实现类,并将实现类添加到spring容器 */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface RpcServiceImpl { Class<?> value();
String version();}

2.server

(1)、ServerProcessorHandler

处理客户端的请求,解析客户端传递的消息,调用实现类方法,返回执行结果

/** * 处理客户端请求 */public class ServerProcessorHandler implements Runnable {
private Socket socket; private Map<String, Object> handlerMap;

public ServerProcessorHandler(Socket socket, Map<String, Object> handlerMap) { this.socket = socket; this.handlerMap = handlerMap; }
@Override public void run() { try ( final ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); final ObjectInputStream in = new ObjectInputStream(socket.getInputStream()) ) { final RpcRequest request = (RpcRequest) in.readObject(); out.writeObject(invoke(request)); out.flush(); } catch (Exception e) { e.printStackTrace(); } }
private Object invoke(RpcRequest request) throws Exception { Object res; String serviceName = request.getClassName(); String version = request.getVersion(); //增加版本号的判断 if (!StringUtils.isEmpty(version)) { serviceName += "-" + version; } //得到实现类bean对象 Object service = handlerMap.get(serviceName); if (service == null) { throw new RuntimeException("service not found:" + serviceName); } //拿到客户端请求的参数 Object[] args = request.getArgs(); Method method = null; Class clazz = Class.forName(request.getClassName()); //无参和有参方法 if (args != null) { //获得每个参数的类型 Class<?>[] types = new Class[args.length]; for (int i = 0; i < args.length; i++) { types[i] = args[i].getClass(); } method = clazz.getMethod(request.getMethodName(), types); res = method.invoke(service, request.getArgs()); } else { method = clazz.getMethod(request.getMethodName()); res = method.invoke(service); } return res; }}

(2)、RpcServer

开启SocketServer,接受客户端请求,并将接口对应的实现类对象注入到容器(使用spring提供的切入点),对spring生命周期不熟悉的可以查看图解Spring中bean的生命周期

/** * 开启SocketServer,接受客户端请求,并将接口对应的实现类对象注入到容器,使用spring提供的钩子函数,在spring创建bean时会调用afterPropertiesSet和setApplicationContext方法 */@PropertySource("classpath:rpc.properties")@Componentpublic class RpcServer implements ApplicationContextAware, InitializingBean {
//推荐自定参数创建 ExecutorService executorService = Executors.newCachedThreadPool(); //k为全类名+版本号,v是对应的实现类。添加版本号可以适配不同客户端,做灰度发布 private Map<String, Object> handlerMap = new HashMap();
@Value("${rpc.host:127.0.0.1}") private String host; @Value("${rpc.port:6666}") private int port;
public RpcServer() { }
public RpcServer(ExecutorService executorService, Map<String, Object> handlerMap) { this.executorService = executorService; this.handlerMap = handlerMap; }
public RpcServer(int port) { this.port = port; }
@Override public void afterPropertiesSet() throws Exception { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("server端启动了"); while (true) {//不断接受请求 Socket socket = serverSocket.accept();//BIO //每一个socket 交给一个processorHandler来处理 executorService.execute(new ServerProcessorHandler(socket, handlerMap)); }
} catch (IOException e) { e.printStackTrace(); } finally { if (serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcServiceImpl.class); if (!serviceBeanMap.isEmpty()) { for (Object servcieBean : serviceBeanMap.values()) { //拿到注解 RpcServiceImpl rpcService = servcieBean.getClass().getAnnotation((RpcServiceImpl.class)); String serviceName = rpcService.value().getName();//拿到接口类定义 String version = rpcService.version(); //拿到版本号 if (!StringUtils.isEmpty(version)) { serviceName += "-" + version; } handlerMap.put(serviceName, servcieBean); } } }}


(五)、使用demo

1、客户端

(1)、客户端启动类

@Configuration@ComponentScan(basePackages = "com.zte")public class ClientApp { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ClientApp.class); context.start(); final ServiceTest helloService = context.getBean(ServiceTest.class); System.out.println(helloService.hello("rpc我也会啦!!")); }}

(2)、使用远程接口

@Componentpublic class ServiceTest { //客户端只需使用@AutoWried注解即可,就可以像使用本地对象一样调用远程的api @Autowired IHelloService helloService;
public String hello(String content) { System.out.println("===========client:开始调用远程方法==============="); String res = helloService.sayHello(content); System.out.println("===========client:开始调用远程结束==============="); return res; }}

2、服务端

(1)、 提供api实现类

使用时,只需实现api包中的接口,并使用RpcServiceImpl注解即可

@RpcServiceImpl(value = IHelloService.class, version = "v1.0")public class V1HelloServiceImpl implements IHelloService {
@Override public String sayHello(String content) { System.out.println("===============server:开始执行远程方法sayHello【v1.0】==============="); System.out.println(content); System.out.println("===============server:远程方法sayHello【v1.0】执行完毕==============="); return "恭喜你,rpc通信完成了!这是我返回给你的结果"; }}

(2)、 服务端启动类

@Configuration@ComponentScan(basePackages = "com.zte")public class ServerApp { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ServerApp.class); context.start(); }
@Bean public RpcServer getRpcServer() { return new RpcServer(); }}

3、配置文件

rpc.host=172.0.0.1rpc.port=6666# 服务端没有该属性rpc.version=v1.0

四、运行结果

(一)、客户端

(二)、服务端

觉得本文对你有帮助?请分享给更多人

关注「互联网技术资源社区」加星标,提升全栈技能


以上是关于RPC(二)的主要内容,如果未能解决你的问题,请参考以下文章

手撸rpc框架,并基于spring进行二次注解开发

UE4网络之(二) 远程调用函数(RPC)

HDFS(二) 底层通信原理——RPC 及 动态代理

RPC

手写基于 http 的RPC框架

RPC ---- RPC入门了解 & 最简单的RPC的实现