Flink之基于Vertx的Mysql异步IO
Posted xingoo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink之基于Vertx的Mysql异步IO相关的知识,希望对你有一定的参考价值。
导读
在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路可能是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库查询时间很长,那有可能会阻塞流计算的整体流程。因此可以考虑异步的方式请求数据库,当数据返回时,该事件再继续执行下面的操作。这样提升了流计算的并发度,但是也增加了数据库的访问以及网络带宽的压力。
1 Flink中的异步IO
在Flink中提供了一种异步IO的模式,不需要使用map函数阻塞式的加载数据,而是使用异步方法同时处理大量请求。不过这就需要数据库支持异步请求,如果不支持异步请求也可以手动维护线程池调用,只不过效率上没有原生的异步client更高效。比如mysql可以通过Vertx支持异步查询,HBase2.x也支持异步查询。
一般要实现Flink的异步查询需要自定义几个方法:
class MyAsyncReq extends RichAsyncFunction<IN,OUT>{
@Override
public void open(..) throws Exception {}
@Override
public void close() throws Exception {}
@Override
public void asyncInvoke(..) throws Exception {}
}
其中open中需要定义连接或者连接池,close中进行释放,asyncInvoke执行异步查询。
AsyncDataStream.unorderedWait(
stream,
new MyAsyncReq(),
1000,
TimeUnit.MILLISECONDS,
100);
使用的使用执行下面的方法即可,
stream为主要的事件流,
myasyncreq是异步IO类,
1000为异步请求的超时时间,
100是同时进行异步请求的最大数量
另外,由于是异步请求,所以可能请求结束后顺序与原来的顺序就不一致了。使用unordered时会以异步请求结束的时间为准,ordered会以事件时间为准。
2 基于Vertx实现的Mysql异步IO
如果外部数据源是Mysql,一般的jdbc连接都是同步机制的,看浪尖大大的文章,推荐了一个异步JDBC组件——Vertx,下面就以Vertx为例作为异步IO的Client。
maven引入除flink之外其他的jar:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.8.3</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.3</version>
</dependency>
先在open中创建SQLClient,它内部维护了自己的异步请求服务;然后再close中关闭client;在asyncInvoke中调用获取connection,执行查询,并释放连接。
public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {
private SQLClient client;
@Override
public void open(Configuration parameters) throws Exception {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setWorkerPoolSize(10)
.setEventLoopPoolSize(10));
JsonObject config = new JsonObject()
.put("url", "jdbc:mysql://xx:3306/base")
.put("driver_class", "com.mysql.cj.jdbc.Driver")
.put("max_pool_size", 10)
.put("user", "x")
.put("password", "x");
client = JDBCClient.createShared(vertx, config);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {
client.getConnection(conn -> {
if (conn.failed()) {
return;
}
final SQLConnection connection = conn.result();
connection.query("select id, name from t where id = " + input.getId(), res2 -> {
ResultSet rs = new ResultSet();
if (res2.succeeded()) {
rs = res2.result();
}
List<Store> stores = new ArrayList<>();
for (JsonObject json : rs.getRows()) {
Store s = new Store();
s.setId(json.getInteger("id"));
s.setName(json.getString("name"));
stores.add(s);
}
connection.close();
resultFuture.complete(stores);
});
});
}
}
注意,一定要在query的返回调用方法中手动释放connection,不然马上就会报连接池耗尽的异常。使用时就没什么区别了:
AsyncDataStream
.unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10)
.print();
3 参考
1 vertx:
https://vertx.io/docs/vertx-jdbc-client/java/
2 设计思想参考:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
xingoo
专注大数据与机器学习
以上是关于Flink之基于Vertx的Mysql异步IO的主要内容,如果未能解决你的问题,请参考以下文章
java vertx写出较为简单的便于阅读的顺序串行异步代码
FlinkFlink 源码之AsyncFunction异步 IO 源码