# 22.Flink-高级特性-新特性-异步IO原理

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了# 22.Flink-高级特性-新特性-异步IO原理相关的知识,希望对你有一定的参考价值。

22.Flink-高级特性-新特性-异步IO-了解

22.1.原理

22.1.1.异步IO操作的需求

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

Async I/O是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.12版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在这之前,我们的程序无法发送用户b的查询请求。这是一种同步访问方式,如下图所示:

22.2.API

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> 

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception 
        client = new DatabaseClient(host, post, credentials);
    

    @Override
    public void close() throws Exception 
        client.close();
    

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception 

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() 

            @Override
            public String get() 
                try 
                    return result.get();
                 catch (InterruptedException | ExecutionException e) 
                    // Normally handled explicitly.
                    return null;
                
            
        ).thenAccept( (String dbResult) -> 
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        );
    


// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意:如果要使用异步IO,对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx。
2.如果Client不支持可以使用线程池来模拟异步请求。

DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 使用异步io的先决条件
 * 1.数据库(或key/value存储)提供支持异步请求的client。
 * 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。
 */
public class ASyncIODemo 
    public static void main(String[] args) throws Exception 
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //数据源中只有id
        //DataStreamSource[1,2,3,4,5]
        DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() 
            private Boolean flag = true;
            @Override
            public void run(SourceContext<CategoryInfo> ctx) throws Exception 
                Integer[] ids = 1, 2, 3, 4, 5;
                for (Integer id : ids) 
                    ctx.collect(new CategoryInfo(id, null));
                
            
            @Override
            public void cancel() 
                this.flag = false;
            
        );
        //3.Transformation


        //方式一:Java-vertx中提供的异步client实现异步IO
        //unorderedWait无序等待
        SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream
                .unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);

        //方式二:mysql中同步client+线程池模拟异步IO
        //unorderedWait无序等待
        SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
                .unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);

        //4.Sink
        result1.print("方式一:Java-vertx中提供的异步client实现异步IO \\n");
        result2.print("方式二:MySQL中同步client+线程池模拟异步IO \\n");

        //5.execute
        env.execute();
    


@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo 
    private Integer id;
    private String name;


//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient 
    private static transient Connection connection;
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String URL = "jdbc:mysql://localhost:3306/bigdata";
    private static final String USER = "root";
    private static final String PASSWORD = "root";

    static 
        init();
    

    private static void init() 
        try 
            Class.forName(JDBC_DRIVER);
         catch (ClassNotFoundException e) 
            System.out.println("Driver not found!" + e.getMessage());
        
        try 
            connection = DriverManager.getConnection(URL, USER, PASSWORD);
         catch (SQLException e) 
            System.out.println("init connection failed!" + e.getMessage());
        
    

    public void close() 
        try 
            if (connection != null) 
                connection.close();
            
         catch (SQLException e) 
            System.out.println("close connection failed!" + e.getMessage());
        
    

    public CategoryInfo query(CategoryInfo category) 
        try 
            String sql = "select id,name from t_category where id = "+ category.getId();
            Statement statement = connection.createStatement();
            ResultSet rs = statement.executeQuery(sql);
            if (rs != null && rs.next()) 
                category.setName(rs.getString("name"));
            
         catch (SQLException e) 
            System.out.println("query failed!" + e.getMessage());
        
        return category;
    


/**
 * 方式一:Java-vertx中提供的异步client实现异步IO
 */
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> 
    private transient SQLClient mySQLClient;

    @Override
    public void open(Configuration parameters) throws Exception 
        JsonObject mySQLClientConfig = new JsonObject();
        mySQLClientConfig
                .put("driver_class", "com.mysql.jdbc.Driver")
                .put("url", "jdbc:mysql://localhost:3306/bigdata")
                .put("user", "root")
                .put("password", "root")
                .put("max_pool_size", 20);

        VertxOptions options = new VertxOptions();
        options.setEventLoopPoolSize(10);
        options.setWorkerPoolSize(20);
        Vertx vertx = Vertx.vertx(options);
        //根据上面的配置参数获取异步请求客户端
        mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
    

    //使用异步客户端发送异步请求
    @Override
    public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception 
        mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() 
            @Override
            public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) 
                if (sqlConnectionAsyncResult.failed()) 
                    return;
                
                SQLConnection connection = sqlConnectionAsyncResult.result();
                connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() 
                    @Override
                    public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) 
                        if (resultSetAsyncResult.succeeded()) 
                            List<JsonObject> rows = resultSetAsyncResult.result().getRows();
                            for (JsonObject jsonObject : rows) 
                                CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
                                resultFuture.complete(Collections.singletonList(categoryInfo));
                            
                        
                    
                );
            
        );
    
    @Override
    public void close() throws Exception 
        mySQLClient.close();
    

    @Override
    public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception 
        System.out.println("async call time out!");
        input.setName("未知");
        resultFuture.complete(Collections.singleton(input));
    


/**
 * 方式二:同步调用+线程池模拟异步IO
 */
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> 
    private transient MysqlSyncClient client;
    private ExecutorService executorService;//线程池

    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);
        client = new MysqlSyncClient();
        executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    

    //异步发送请求
    @Override
    public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws JavaScript高级之ECMASript 78 9 10 新特性

高级OOP特性

5、Redis6.0版的新特性

logback高级特性使用-异步记录日志

《MySQL系列-InnoDB引擎10》InnoDB关键特性-异步IO

《MySQL系列-InnoDB引擎10》InnoDB关键特性-异步IO