JVM上的Reactive套件Vert.x启动过程源码解析

Posted 拍码场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JVM上的Reactive套件Vert.x启动过程源码解析相关的知识,希望对你有一定的参考价值。

1. Vert.x简介

Vert.x的官网是vertx.io,作者用一句话介绍了它,JVM上的Reative开发套件。Vert.x基于Netty进行扩展,基于事件模型,轻量级、高性能,其参照物是node.js。

Vert.x还是一个分布式平台,既可以实现进程内的事件机制,也可以实现分布式的事件机制。

Vert.x是面向io的,默认封装了丰富的网络协议,目前已支持TCP、UDP、FileSystem、DNS、EventBus、Sockjs等协议,基本满足实现绝大多数系统架构的需求。


2. Vert.x的主要组件



2.1 Verticle

Verticle是Vert.x里最主要的组件。官方文档里把Vert.x定义为一个异步平台,类比于在JVM上的操作系统,Verticle就是运行在Vert.x平台里的进程,我们也可以把Verticle想成Actor Model中的Actor。我们平时使用Vert.x时,只需要关注编写Verticle的逻辑。

Vert.x里,有两类(三种)Verticle

  • 标准的Verticle:标准的Verticle是使用EventLoop执行的,执行的是一些不会阻塞的代码和任务。

  • Work Verticle:会阻塞线程的任务使用Work Verticle来执行,Work Verticle不使用EventLoop而是使用自定义的线程池。Work Verticle又分为单线程Work Verticle和多线程Work Verticle。

2.2 Context

当Vert.x调用Verticle执行业务逻辑是,会把Verticle与Context相关联。通常来说这个 Context 会是一个 EventLoop Context,它绑定到了一个特定的EventLoop线程上。所以在该Context上执行的操作总是在同一个EventLoop线程中。Context概念类似于Netty的ChannelContext,但他们之间有一些本质的区别,Vert.x的Context是与线程(Netty的EventLoop)绑定,ChannelContext是和Channel绑定。


2.3 Handler

Vert.x是基于事件的响应式框架,Vert.x中的Handler是个核心接口,大部分事件处理逻辑,都在Handler里进行实现。


2.4 Eventbus

EventBus是Vert.x的神经系统,Vert.x可以通过他进行内部或外部进行通信。而且EventBus可形成跨越多个服务器节点和多个浏览器的点对点的分布式消息系统。EventBus支持发布/订阅、点对点、请求/响应的消息通信方式。这里Eventbus也对应Actor Model中的信箱。


Vertx的运行模型如图一所示:

图一 Vert.x运行模型


由于Vert.x基于Netty进行扩展,因此,Vert.x的线程模型,基本继承了Netty的线程模型设计:EventLoop分发事件,由Context封装事件,每个Verticle抽象了一个业务流程,Handler触发具体的业务逻辑。


3. Vert.x的启动过程分析

下面,我们以一个最简单的代理应用为大家演示Vert.x怎样使用,并作为源码分析的入口,这个应用的逻辑是将/hello代理到www.qq.com。

public class HelloWorldServerVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
request();
}

public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(HelloWorldServerVerticle.class.getName());
}

public void request() {
vertx.createHttpServer(new HttpServerOptions()).requestHandler(httpServerRequest -> {
if (httpServerRequest.uri().equals("/hello") && httpServerRequest.method().equals(HttpMethod.GET)) {
HttpServerResponse httpServerResponse = httpServerRequest.response().setChunked(true);
vertx.createHttpClient(new HttpClientOptions()
.setKeepAlive(false))
.request(httpServerRequest.method(), 80, "www.qq.com", "/")
.handler(response -> {
System.out.println("Received response with status code " + response.statusCode());
//copy header
response.headers().forEach(entry -> {
httpServerResponse.putHeader(entry.getKey(), entry.getValue());
});
//copy body writeStream
Pump.pump(response, httpServerResponse).start();
}).end();
} else {
httpServerRequest.response().setStatusCode(404).end("not found");
}
}).listen(8089, result -> {
if (result.succeeded()) {
System.out.println("listen 8089 success !!");
}
});
}
}


代码中,在main方法里创建了一个Vertx对象,相当于创建了一个Vertx容器,将一个Verticle对象发布到Vertx容器里,就完成了应用的启动过程。Vertx启动的具体过程是什么样?下面就和大家一起深入源码一探究竟。



3.1 Vertx容器的实例化过程

第一步,Vertx首先获取VertxFactory,获取方式有spi或编程的方式,默认是VertxFactoryImpl直接调用VertxImpl()创建一个Vertx的实例。

public class VertxFactoryImpl implements VertxFactory {

@Override
public Vertx vertx() {
return new VertxImpl();
}
}


第二步,根据VertxOptions配置Vertx,比如EventLoopPoolSize,workPoolSize,eventBusOptions(Eventbus是vertx进程间通信的一个组件)。

VertxImpl(VertxOptions options){
this(options,null);
}


第三步,启动一个checker去检测线程阻塞情况(本质是个守护线程的timer),检测的方式很简单粗暴,内部维护一个VerxThred线程的集合,当前时间比对线程开始的执行时间,超时了就日志警告。


BlockedThreadChecker(long interval, long warningExceptionTime) {
timer = new Timer("vertx-blocked-thread-checker", true);
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (BlockedThreadChecker.this) {
long now = System.nanoTime();
for (VertxThread thread : threads.keySet()) {
long execStart = thread.startTime();
long dur = now - execStart;
final long timeLimit = thread.getMaxExecTime();
if (execStart != 0 && dur > timeLimit) {
final String message = "Thread " + thread + " has been blocked for " + (dur / 1000000) + " ms, time limit is " + (timeLimit / 1000000);
if (dur <= warningExceptionTime) {
log.warn(message);
} else {
VertxException stackTrace = new VertxException("Thread blocked");
stackTrace.setStackTrace(thread.getStackTrace());
log.warn(message, stackTrace);
}
}
}
}
}
}, interval, interval);
}



第四步,设置eventLoopGroup,这里Vertx是拿来主义,直接用Netty的NioEventLoopGroup,并设置NioEventLoopGroup处理io任务的比率,线程数默认为两倍cpu核数。

public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory  threadFactory, int ioRatio) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
eventLoopGroup.setIoRatio(ioRatio);
return eventLoopGroup;
}


第五步,设置acceptorEventLoopGroup,默认是一个线程,按照源码说法在大量的请求下,如果事件循环的接受线程和上面的循环器线程在一个线程池里容易延迟。

// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
// under a lot of load
acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100);


第六步,设置VertxMetrices,这里也是以SPI的方式加载,用来记录Vertx内部执行的监控信息。


private VertxMetrics initialiseMetrics(VertxOptions options) {
if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) {
VertxMetricsFactory factory = options.getMetricsOptions().getFactory();
if (factory == null) {
factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class);
if (factory == null) {
log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath");
}
}
if (factory != null) {
VertxMetrics metrics = factory.metrics(this, options);
Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null");
return metrics;
}
}
return null;
}



8.默认初始化20线程的workerExec,20线程的internalBlockingExec,DeploymentManager(【重要的类】管理所有的VertxFactories,用来部署Verticles)

/**
* The default number of event loop threads to be used = 2 * number of cores on the machine
*/
public static final int DEFAULT_EVENT_LOOP_POOL_SIZE = 2 * CpuCoreSensor.availableProcessors();
/**
* The default number of threads in the worker pool = 20
*/
public static final int DEFAULT_WORKER_POOL_SIZE = 20;
/**
* The default number of threads in the internal blocking pool (used by some internal operations) = 20
*/
public static final int DEFAULT_INTERNAL_BLOCKING_POOL_SIZE = 20;
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = metrics != null ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);



9.创建并开始EventBus,默认是 new EventBusImpl(),用来和其他vertx实例或verticle之间通信,看传递消息源码可以大概知道EventBus通信是通过异步rpc的方式,保持一贯厌恶阻塞的原则。底层是通过他的service proxy组件提供的代理类实现,发送消息异步方法提供一个回调函数 Handler<AsyncResult>,在调用结果发送过来的时候会自动调用绑定的回调函数进行相关的处理。


    synchronized (VertxImpl .this)
{
haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(),
options.getHAGroup(), haEnabled);
createAndStartEventBus(options, resultHandler);
}


//eventBus 发送消息,先判断是否是发送本地消息,然后执行deliverToHandler方法
protected <T> void sendOrPub(SendContextImpl<T> sendContext) {
MessageImpl message = sendContext.message;
if (metrics != null) {
metrics.messageSent(message.address(), !message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
}


protected <T> boolean deliverMessageLocally(MessageImpl msg) {
//..省略些源码
for (HandlerHolder holder : handlers.list) {
deliverToHandler(msg, holder);
}
}


private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
// Each handler gets a fresh copy
@SuppressWarnings("unchecked")
Message<T> copied = msg.copyBeforeReceive();
if (metrics != null) {
metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
}


holder.getContext().runOnContext((v) -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but
// before it was received
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
}
}
});
}



10.初始化ShardDataImpl,这个是Vertx用来共享数据的管理类,内部本质是一个ConcurrentHashMap,不仅可以跨线程共享,还能跨进程在不同的Vertx集群之间共享数据。

    this.sharedData = new SharedDataImpl(this, clusterManager);
//SharedDataImpl内部变量
private final VertxInternal vertx;
private final ClusterManager clusterManager;
private final LocalAsyncLocks localAsyncLocks;
private final ConcurrentMap<String, LocalAsyncMapImpl<?, ?>> localAsyncMaps = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Counter> localCounters = new ConcurrentHashMap<>();
//本地实例共享数据
private final ConcurrentMap<String, LocalMap<?, ?>> localMaps = new ConcurrentHashMap<>();
//集群间共享数据
@Override
public <K, V> void getClusterWideMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) {
Objects.requireNonNull(name, "name");
Objects.requireNonNull(resultHandler, "resultHandler");
if (clusterManager == null) {
throw new IllegalStateException("Can't get cluster wide map if not clustered");
}
clusterManager.<K, V>getAsyncMap(name, ar -> {
if (ar.succeeded()) {
// Wrap it
resultHandler.handle(Future.succeededFuture(new WrappedAsyncMap<K, V>(ar.result())));
} else {
resultHandler.handle(Future.failedFuture(ar.cause()));
}
});
}




至此,Vertx初始化完成,最主要是初始化了各种线程池,设置了EventLoop,并初始化了DeploymentManager这个Verticle容器,为后续发布Verticle做好了准备。


3.2 vertx发布一个verticle

从deployVerticle(String name)入口进入,可以看到DeploymentManager这个类,承担了所有发布Verticle的工作。具体步骤如下:


1)获取发布的Context,UUID给它随机生成一个DeployId。如前面所述,Context是与EventLoop线程绑定的,默认情况下当前线程是没有Context的,默认情况下会生成一个EventLoopContext,参考vertx.getOrCreateContext()的代码。这个Context还承载了WorkPool,OrderTaskQueue,这里的workpool就是我们自定义用来执行阻塞任务的线程池,orderTaskQueue是方便我们设置任务的执行优先级。

public void deployVerticle(String identifier,
DeploymentOptions options,
Handler<AsyncResult<String>> completionHandler) {

if (options.isMultiThreaded() && !options.isWorker()) {

throw new IllegalArgumentException("If multi-threaded then must be worker too");
}

ContextImpl callingContext = vertx.getOrCreateContext();
ClassLoader cl = getClassLoader(options, callingContext);
doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
}

2)DeploymentManager做了些兼容相关的东西,根据我们传的verticle名字去判断VerticleFactory是那种类型的,加载Verticle需要执行的一些额外的操作,通过调用Handler实现。

     


if (iter.hasNext()) {

VerticleFactory verticleFactory = iter.next();
Future<String> fut = Future.future();
if (verticleFactory.requiresResolve()) {

try {

verticleFactory.resolve(identifier, options, cl, fut);
} catch (Exception e) {

try {

fut.fail(e);
} catch (Exception ignore) {

// Too late
}

}

} else {

fut.complete(identifier);
}

fut.setHandler(ar -> {
});

看到这里的代码其实就大概知道Vertx的Handler是怎么执行的。Vertx的Future在设置Handler的时会先判断这个Futrue有没有完成,完成了就执行Handler,没有完成就继续等待,在事件完成时再进行回调(Vert.x 中的 Future 即异步开发模式中的 Future/Promise 模式的实现)。


3)接下来按配置反射生成这个Verticle的实例,默认情况下用当前线程类加载器来加载类,而不会创建一个新的。如果我们需要对我们的程序进行隔离话可以自定义一个类加载器,类似于Tomcat加载不同的war包,做到程序隔离。

Future<T> setHandler(Handler<AsyncResult<T>> handler);
//反射生成实例
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);

继续跟进doDeploy方法,这里就是给verticle创建执行的context,然后启动verticle,runOnContext()。

JsonObject conf = options.getConfig() == null ? new JsonObject() :  options.getConfig().copy(); // Copy it
String poolName = options.getWorkerPoolName();
Deployment parent = parentContext.getDeployment();
DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);
AtomicInteger deployCount = new AtomicInteger();
AtomicBoolean failureReported = new AtomicBoolean();
for (Verticle verticle: verticles) {

WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :

vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
//这里为每个Verticle又创建一个Context,绑定一个EventLoop,后面这个Verticle的生命周期就和这个Context绑定了
if (workerExec != null) {

context.addCloseHook(workerExec);
}

context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));//verticleHolder记录Verticle和Context绑定关系
context.runOnContext(v -> {//verticle在Context上运行
//略
});
}

4)执行runOnContext是异步的,这里就到了事件循环器的核心逻辑了。


@Override
public void runOnContext(Handler<Void> task) {

try {

executeAsync(task);
} catch (RejectedExecutionException ignore) {

// Pool is already shut down
}

}


public void executeAsync(Handler<Void> task) {

// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}


protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {

...略

//将context与当前线程绑定
setContext(current, ContextImpl.this);
if (cTask != null) {

cTask.run();
} else {

hTask.handle(null);
}

...

}

这里有个细节,wrapTask执行任务的线程绑定了当前的context。这样保证了在一个Context上执行任务的都是同一个EventLoop。

public void execute(Runnable task) {

if (task == null) {

throw new NullPointerException("task");
} else {

boolean inEventLoop = this.inEventLoop();
if (inEventLoop) {

this.addTask(task);
} else {

this.startThread();//当前线程要是不是这个eventLoop记录的线程就初始化线程
this.addTask(task);//任务加入队列,让该eventLoop的线程轮询处理
if (this.isShutdown() && this.removeTask(task)) {

reject();
}

}


if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {

this.wakeup(inEventLoop);
}


}

}

5)接下来看看startThread如何工作,先将当前线程设置为eventLoop的线程,然后开始调用NioEventLoop的run方法。

private void doStartThread() {

assert thread == null;
executor.execute(new Runnable() {

@Override
public void run() {

thread = Thread.currentThread();
if (interrupted) {

thread.interrupt();
}


boolean success = false;
updateLastExecutionTime();
try {

SingleThreadEventExecutor.this.run();//这里是调用了NioEventLoop的run方法
success = true;
} catch (Throwable t) {

logger.warn("Unexpected exception from an event executor: ", t);
} finally {

//略略略
});
}
}

6)NioEventLoop不停轮询Selector的事件(这里我们没有暂时没有起Bootstrap去监听socket,没有这些事件),执行taskQueue里的任务。

@Override
protected void run() {

for (; ; ) {

try {

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

case SelectStrategy.CONTINUE:

continue;
case SelectStrategy.SELECT:

select(wakenUp.getAndSet(false));
default:

}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {

try {

processSelectedKeys();
} finally {

// Ensure we always run tasks.
runAllTasks();
}

} else {

final long iostartTime = System.nanoTime();
try {

processSelectedKeys();
} finally {

// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

}

} catch (Throwable t) {

handleLoopException(t);
}

// Always handle shutdown even if the loop processing threw an exception.
try {

if (isShuttingDown()) {

closeAll();
if (confirmShutdown()) {

return;
}

}

} catch (Throwable t) {

handleLoopException(t);
}

}

}

7)从Netty回到我们的Vertx,runOnContext(),在这里可以看到我们熟悉的Verticle的init,start的方法了,到这里vertx启动verticle的流程就结束了。

context.runOnContext(v ->

{

try {

verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
verticle.start(startFuture);
startFuture.setHandler(ar -> {

if (ar.succeeded()) {

if (parent != null) {

if (parent.addChild(deployment)) {

deployment.child = true;
} else {

// Orphan
deployment.undeploy(null);
return;
}

}

VertxMetrics metrics = vertx.metricsSPI();
if (metrics != null) {

metrics.verticleDeployed(verticle);
}

deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {

reportSuccess(deploymentID, callingContext, completionHandler);
}

} else if (failureReported.compareAndSet(false, true)) {

deployment.rollback(callingContext, completionHandler, context, ar.cause());
}

});
} catch (Throwable t) {

if (failureReported.compareAndSet(false, true))

deployment.rollback(callingContext, completionHandler, context, t);
}

});
}

4. 总结

以上就是Vertx的启动发布流程,简单回顾下启动发布流程:

1. 生成Vertx对象,设置EventLoop的线程数,Enventbus的配置。

2. 生成Verticle实例,为Verticle生成Context,并把Vontext绑定EventLoop。

3.在Verticle的Context上执行Verticle的生命周期,init,start方法,从而启动Verticle执行。



更多福利请关注官方订阅号“拍码场

好内容不要独享!快告诉小伙伴们吧!


喜欢请点击↓↓↓


往期文章:












以上是关于JVM上的Reactive套件Vert.x启动过程源码解析的主要内容,如果未能解决你的问题,请参考以下文章

VERT.X介绍

Vert.x 线程模型揭秘

Reactive SQL 客户端 (Quarkus/Vert.X) 中的 Kotlin 协程事务

Spring Webflux vs Vert.x

Vert.x——基于JVM的高性能响应式开发框架

netty与vert.x的区别和联系