thrift生产环境服务端使用的正确姿势
Posted 小白懂编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了thrift生产环境服务端使用的正确姿势相关的知识,希望对你有一定的参考价值。
主要是解决一个端口暴露多个接口以及选择高性能的服务线程模型的问题。
Thrift是一个rpc框架,用于企业内部多个系统之间高效通信的框架。通常企业内部业务线多了后需要将项目分成多个独立部署,多个项目之间难免需要交互。直接依赖业务模块的源码肯定是不可取的,这时需要提供接口给其他模块调用。作为内部通信,通常不会选择http,主要是性能问题。
关于Thrift的基本使用请大家参考网上的资料。
这里做了一层简单的封装,主要是解决服务端自动注册service接口的问题和客户端自动创建service引用的问题。同时针对生产环境选择了最合适的服务模型及相关参数配置。
Thrift服务类:处理Thrift服务端相关必要和合适的配置。同时通过ThriftInteface注解自动从spring容器中将标注了该注解的服务注册到Thrift服务中。
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 基于Thrift的rpc服务器,将多个接口暴露到同一端口
* tcp 二进制传输
* 初始化时,将所有标注了ThriftInteface并且实现了Thrift接口的类自动注册到Thrift服务器
* @see ThriftInteface
* @author qiang.xie
* @date 2016/10/13
*/
public class ThriftTcpServer {
private Logger logger= LoggerFactory.getLogger(ThriftTcpServer.class);
private AtomicBoolean init=new AtomicBoolean(false);
private TServer server;
private ApplicationContext applicationContext;
private int port=8899;
private int selectorThreads=getProcessNum();
private int workerThreads=getProcessNum() * 10;
private int acceptQueueSizePerThread=5;
/***
* 协议类型,in (binary,compact,json)
*/
private String protocol="binary";
protected void init(){
try {
if(init.get()){
return;
}
if(!init.compareAndSet(false,true)){
return;
}
//多线程非阻塞IO服务transport
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
//支持多接口的处理器
TMultiplexedProcessor processor = new TMultiplexedProcessor();
//注册服务
registerService(processor);
//对于非阻塞服务,需要使用TFramedTransport,它将数据分块发送
TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
//默认是2个selector线程,5个工作线程
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
tArgs.transportFactory(transportFactory);
tArgs.protocolFactory(chooseProtocol());
tArgs.processor(processor);
tArgs.selectorThreads(selectorThreads);//selector线程只处理对accept的连接的读写事件轮询,除非并发量极大时可以适度调大此值,否则太大会浪费资源
tArgs.workerThreads(workerThreads);//工作线程池的大小,根据实际统计数据决定
tArgs.acceptQueueSizePerThread(acceptQueueSizePerThread);//selector线程等待请求队列,业务方是期望快速返回的,服务端繁忙时客户端也不会一直等下去,所以不需设置太多
//多线程非阻塞IO服务模式,兼顾资源使用与高性能
server = new TThreadedSelectorServer(tArgs);
Runtime.getRuntime().addShutdownHook(new Thread(()->{
server.stop();
}));
logger.info("thrift server init success");
}catch (Exception e){
logger.error("thriftTcpServer init error",e);
throw new RuntimeException(e);
}
}
private int getProcessNum(){
return Runtime.getRuntime().availableProcessors();
}
private TProtocolFactory chooseProtocol(){
TProtocolFactory protocolFactory;
if ("json".equals(protocol)) {
protocolFactory = new TJSONProtocol.Factory();
} else if ("compact".equals(protocol)) {
protocolFactory = new TCompactProtocol.Factory();
} else {
//默认binary
protocolFactory = new TBinaryProtocol.Factory();
}
return protocolFactory;
}
protected void registerService(TMultiplexedProcessor processor) throws Exception{
Pattern pattern = Pattern.compile("^(.+)\\$Iface$");
Map<String, Object> beans = applicationContext
.getBeansWithAnnotation(ThriftInteface.class);
logger.info("find ThriftService: {}", beans.keySet());
for (String beanName : beans.keySet()) {
Object bean = beans.get(beanName);
Class<?> beanClass = AopUtils.getTargetClass(bean);
for (Class<?> interfaceClazz : beanClass.getInterfaces()) {
String interfaceName = interfaceClazz.getName();//HelloWordService.Iface
Matcher matcher = pattern.matcher(interfaceName);
if (matcher.find()) {
String interfaceClass = matcher.group(1);//HelloWordService
TProcessor serviceProcess = (TProcessor) Class.forName(interfaceClass + "$Processor")
.getDeclaredConstructor(interfaceClazz)
.newInstance(bean);
processor.registerProcessor(interfaceClass, serviceProcess);
logger.info("register thrift service:{}", interfaceClass);
}
}
}
}
public void start(){
init();
server.serve();
System.out.println("soa tcp server start at port["+port+"]...");
}
public void stop(){
if(server!=null) {
server.stop();
}
}
public void setPort(int port) {
this.port = port;
}
public void setSelectorThreads(int selectorThreads) {
this.selectorThreads = selectorThreads;
}
public void setWorkerThreads(int workerThreads) {
this.workerThreads = workerThreads;
}
public void setAcceptQueueSizePerThread(int acceptQueueSizePerThread) {
this.acceptQueueSizePerThread = acceptQueueSizePerThread;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
}
ThriftInteface:
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 标识某个类是Thrift接口。被标注的类需要实现Thrift接口(XXX.Iface)
* @author qiang.xie
* @date 2016/10/13
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ThriftInteface {
}
启动Thrift服务器:
public class SoaTcpServerMain {
public static void main(String[] args) throws Exception{
int port = 8200;
try {
port = Integer.parseInt(args[0])>1000 ? Integer.parseInt(args[0]) : port;
} catch (Exception ignored) {}
ApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:application-soa.xml");
ThriftTcpServer server=new ThriftTcpServer();
server.setApplicationContext(applicationContext);
server.setPort(port);
server.start();
}
}
客户端工厂类:解决调用每个服务接口时需要重复场景Thrift相关的socket,protocol等。
package com.thrift;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.springframework.beans.factory.FactoryBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author qiang.xie
* @date 2016/10/14
*/
public class ThriftTcpClientFactoryBean implements FactoryBean{
/**远程服务的完整类名,不含.Iface或Iclient部分*/
private String className;
private String host;
private int port;
private Class clientClass;
@Override
public Object getObject() throws Exception {
return Proxy.newProxyInstance(clientClass.getClassLoader(), clientClass.getInterfaces(), new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
TTransport transport=new TFramedTransport(new TSocket(host,port));
transport.open();
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, className);
Object result=method.invoke(clientClass.getDeclaredConstructor(TProtocol.class).newInstance(mp),args);
transport.close();//短连接方式。如需改为长连接方式,需结合tcp负载均衡、连接池等一起优化
return result;
}
});
}
@Override
public Class<?> getObjectType() {
return clientClass;
}
@Override
public boolean isSingleton() {
return true;
}
public void setClassName(String className) throws ClassNotFoundException {
this.className = className;
this.clientClass=Class.forName(className+ "$Client");
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
}
假如服务端暴露了一个Thrift接口,在客户端这样使用即可:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<bean id="helloWordService" class="com.thrift.ThriftTcpClientFactoryBean">
<property name="className" value="com.HelloWordService"/>
<property name="host" value="localhost"/>
<property name="port" value="8200"/>
</bean>
</beans>
测试类:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:application-soa.xml")
public class TestLivePredicService {
@Autowired//自动注入即可得到服务端接口的引用
protected com.HelloWordService.Iface helloWordService;
@Test
public void test() throws TException {
System.out.println(helloWordService.hello("thrift"));
}
}
以上是关于thrift生产环境服务端使用的正确姿势的主要内容,如果未能解决你的问题,请参考以下文章