# 22.Flink-高级特性-新特性-异步IO原理
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了# 22.Flink-高级特性-新特性-异步IO原理相关的知识,希望对你有一定的参考价值。
22.Flink-高级特性-新特性-异步IO-了解
22.1.原理
22.1.1.异步IO操作的需求
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html
Async I/O是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.12版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在这之前,我们的程序无法发送用户b的查询请求。这是一种同步访问方式,如下图所示:
22.2.API
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>>
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception
client = new DatabaseClient(host, post, credentials);
@Override
public void close() throws Exception
client.close();
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>()
@Override
public String get()
try
return result.get();
catch (InterruptedException | ExecutionException e)
// Normally handled explicitly.
return null;
).thenAccept( (String dbResult) ->
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
);
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
注意:如果要使用异步IO,对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx。
2.如果Client不支持可以使用线程池来模拟异步请求。
DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 使用异步io的先决条件
* 1.数据库(或key/value存储)提供支持异步请求的client。
* 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。
*/
public class ASyncIODemo
public static void main(String[] args) throws Exception
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//数据源中只有id
//DataStreamSource[1,2,3,4,5]
DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>()
private Boolean flag = true;
@Override
public void run(SourceContext<CategoryInfo> ctx) throws Exception
Integer[] ids = 1, 2, 3, 4, 5;
for (Integer id : ids)
ctx.collect(new CategoryInfo(id, null));
@Override
public void cancel()
this.flag = false;
);
//3.Transformation
//方式一:Java-vertx中提供的异步client实现异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);
//方式二:mysql中同步client+线程池模拟异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);
//4.Sink
result1.print("方式一:Java-vertx中提供的异步client实现异步IO \\n");
result2.print("方式二:MySQL中同步client+线程池模拟异步IO \\n");
//5.execute
env.execute();
@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo
private Integer id;
private String name;
//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient
private static transient Connection connection;
private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://localhost:3306/bigdata";
private static final String USER = "root";
private static final String PASSWORD = "root";
static
init();
private static void init()
try
Class.forName(JDBC_DRIVER);
catch (ClassNotFoundException e)
System.out.println("Driver not found!" + e.getMessage());
try
connection = DriverManager.getConnection(URL, USER, PASSWORD);
catch (SQLException e)
System.out.println("init connection failed!" + e.getMessage());
public void close()
try
if (connection != null)
connection.close();
catch (SQLException e)
System.out.println("close connection failed!" + e.getMessage());
public CategoryInfo query(CategoryInfo category)
try
String sql = "select id,name from t_category where id = "+ category.getId();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql);
if (rs != null && rs.next())
category.setName(rs.getString("name"));
catch (SQLException e)
System.out.println("query failed!" + e.getMessage());
return category;
/**
* 方式一:Java-vertx中提供的异步client实现异步IO
*/
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo>
private transient SQLClient mySQLClient;
@Override
public void open(Configuration parameters) throws Exception
JsonObject mySQLClientConfig = new JsonObject();
mySQLClientConfig
.put("driver_class", "com.mysql.jdbc.Driver")
.put("url", "jdbc:mysql://localhost:3306/bigdata")
.put("user", "root")
.put("password", "root")
.put("max_pool_size", 20);
VertxOptions options = new VertxOptions();
options.setEventLoopPoolSize(10);
options.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(options);
//根据上面的配置参数获取异步请求客户端
mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
//使用异步客户端发送异步请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception
mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>()
@Override
public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult)
if (sqlConnectionAsyncResult.failed())
return;
SQLConnection connection = sqlConnectionAsyncResult.result();
connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>()
@Override
public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult)
if (resultSetAsyncResult.succeeded())
List<JsonObject> rows = resultSetAsyncResult.result().getRows();
for (JsonObject jsonObject : rows)
CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
resultFuture.complete(Collections.singletonList(categoryInfo));
);
);
@Override
public void close() throws Exception
mySQLClient.close();
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
/**
* 方式二:同步调用+线程池模拟异步IO
*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo>
private transient MysqlSyncClient client;
private ExecutorService executorService;//线程池
@Override
public void open(Configuration parameters) throws Exception
super.open(parameters);
client = new MysqlSyncClient();
executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//异步发送请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws JavaScript高级之ECMASript 78 9 10 新特性