Flink之基于Vertx的Mysql异步IO

Posted xingoo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之基于Vertx的Mysql异步IO相关的知识,希望对你有一定的参考价值。

导读


在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库查询时间很长,那有可能会阻塞流计算的整体流程。因此可以考虑异步的方式请求数据库,当数据返回时,该事件再继续执行下面的操作。这样提升了流计算的并发度,但是也增加了数据库的访问以及网络带宽的压力。



1 Flink中的异步IO


在Flink中提供了一种异步IO的模式,不需要使用map函数阻塞式的加载数据,而是使用异步方法同时处理大量请求。不过这就需要数据库支持异步请求,如果不支持异步请求也可以手动维护线程池调用,只不过效率上没有原生的异步client更高效。比如mysql可以通过Vertx支持异步查询,HBase2.x也支持异步查询。


一般要实现Flink的异步查询需要自定义几个方法:

class MyAsyncReq extends RichAsyncFunction<IN,OUT>{ @Override public void open(..) throws Exception {} @Override public void close() throws Exception {} @Override public void asyncInvoke(..) throws Exception {}}

其中open中需要定义连接或者连接池,close中进行释放,asyncInvoke执行异步查询。

AsyncDataStream.unorderedWait(stream, new MyAsyncReq(), 1000, TimeUnit.MILLISECONDS, 100);

使用的使用执行下面的方法即可,

stream为主要的事件流,

myasyncreq是异步IO类,

1000为异步请求的超时时间,

100是同时进行异步请求的最大数量

另外,由于是异步请求,所以可能请求结束后顺序与原来的顺序就不一致了。使用unordered时会以异步请求结束的时间为准,ordered会以事件时间为准。


2 基于Vertx实现的Mysql异步IO


如果外部数据源是Mysql,一般的jdbc连接都是同步机制的,看浪尖大大的文章,推荐了一个异步JDBC组件——Vertx,下面就以Vertx为例作为异步IO的Client。


maven引入除flink之外其他的jar:

<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.13</version></dependency><dependency> <groupId>io.vertx</groupId> <artifactId>vertx-jdbc-client</artifactId> <version>3.8.3</version></dependency><dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.8.3</version></dependency>

先在open中创建SQLClient,它内部维护了自己的异步请求服务;然后再close中关闭client;在asyncInvoke中调用获取connection,执行查询,并释放连接。

public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {
private SQLClient client;
@Override public void open(Configuration parameters) throws Exception { Vertx vertx = Vertx.vertx(new VertxOptions() .setWorkerPoolSize(10) .setEventLoopPoolSize(10));
JsonObject config = new JsonObject() .put("url", "jdbc:mysql://xx:3306/base") .put("driver_class", "com.mysql.cj.jdbc.Driver") .put("max_pool_size", 10) .put("user", "x") .put("password", "x");
client = JDBCClient.createShared(vertx, config); }
@Override public void close() throws Exception { client.close(); }
@Override public void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception { client.getConnection(conn -> { if (conn.failed()) { return; }
final SQLConnection connection = conn.result(); connection.query("select id, name from t where id = " + input.getId(), res2 -> { ResultSet rs = new ResultSet(); if (res2.succeeded()) { rs = res2.result(); }
List<Store> stores = new ArrayList<>(); for (JsonObject json : rs.getRows()) { Store s = new Store(); s.setId(json.getInteger("id")); s.setName(json.getString("name")); stores.add(s); } connection.close(); resultFuture.complete(stores); }); }); }}


注意,一定要在query的返回调用方法中手动释放connection,不然马上就会报连接池耗尽的异常。使用时就没什么区别了:

AsyncDataStream.unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10).print();


3 参考


1 vertx:

https://vertx.io/docs/vertx-jdbc-client/java/


2 设计思想参考:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673



xingoo

专注大数据与机器学习




以上是关于Flink之基于Vertx的Mysql异步IO的主要内容,如果未能解决你的问题,请参考以下文章

java vertx写出较为简单的便于阅读的顺序串行异步代码

FlinkFlink 源码之AsyncFunction异步 IO 源码

Vertx,融合JavaRubyPython等语言的高性能架构

Vert.x简介原理与HelloWorld

Flink通过异步IO实现redis维表join

Vertx 初体验