FlinkFlink 源码之AsyncFunction异步 IO 源码

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之AsyncFunction异步 IO 源码相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 源码之AsyncFunction

2. 简介

Flink的特点是高吞吐低延迟。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能,这时候怎么办?

为了解决这个问题,Flink引入了AsyncFunction系列接口。使用这些异步接口调用外部服务的时候,不用再同步等待结果返回,只需要将数据存入队列,外部服务接口返回时会更新队列数据状态。在调用外部服务后直接返回处理下一个异步调用,不需要同步等待结果。下游拉取数据的时候直接从队列获取即可。

3.使用方法

在讲解AsyncFunction使用方法之前,我们先“伪造”一个耗时的外部系统调用。调用pullData会立即返回一个CompletableFuture。耗时5秒后生成的数据通过CompletableFuture返回。

public class AsyncIODemo implements Serializable 

    private final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public CompletableFuture<String> pullData(final String source) 

        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        executorService.submit(() -> 
            try 
                Thread.sleep(5000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            completableFuture.complete("Output value: " + source);
        );

        return completableFuture;
    

接下来编写Flink作业:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")

val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String] 
    override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = 
        // 调用前面的外部系统调用,拉取数据
        val future = new AsyncIODemo().pullData(input)
        // 这个方法是非阻塞的,一旦数据获取成功,会立即调用resultFuture.complete方法
        future.whenCompleteAsync(new BiConsumer[String, Throwable] 
            override def accept(t: String, u: Throwable): Unit = 
                resultFuture.complete(Array(t))
            
        )
    
, 10, TimeUnit.SECONDS)
// 上面设置最长异步调用超时时间为10秒

asyncStream.print()
env.execute()

执行Flink作业。我们发现虽然外部系统调用了4次,然而并没有等待20秒后才输出全部4个结果,实际上只等待了5秒左右。AsyncFunction的功能得到了验证。

注意:尽管AsyncFunction字面上为异步调用,实际上asynInvoke方法仍然是同步的。绝不能在该方法中阻塞等待调用结果,这样失去了它原本的作用。应该在此处编写异步回调方法,通过异步方式通知Flink数据已获取完毕。

4.AsyncFunction

从这里开始进入源码分析环节。AsyncFunction接口源码如下:

@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable 

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     *     trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * @link AsyncFunction#asyncInvoke timeout occurred. By default, the result future is
     * exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception 
        resultFuture.completeExceptionally(
                new TimeoutException("Async function call has timed out."));
    

AsyncFunction接口有两个方法:

  • asyncInvoke:异步操作每一个数据流输入元素。方法的第一个参数input为数据流中的元素,第二个参数resultFuture用于收集异步处理的结果或者是错误信息。不要在此方法内同步等待数据处理逻辑,这样会阻塞线程,降低作业吞吐量。
  • timeout:定义数据超时处理逻辑。方法的参数和asyncInvoke相同。AsyncFunction已经提供了默认实现。如果需要自定义超时逻辑,可以覆盖这个方法。

5. ResultFuture

ResultFuture在异步操作的时候用于收集结果或错误。

@PublicEvolving
public interface ResultFuture<OUT> 
    /**
     * Completes the result future with a collection of result objects.
     *
     * <p>Note that it should be called for exactly one time in the user code. Calling this function
     * for multiple times will cause data lose.
     *
     * <p>Put all results in a @link Collection and then emit output.
     *
     * @param result A list of results.
     */
    void complete(Collection<OUT> result);

    /**
     * Completes the result future exceptionally with an exception.
     *
     * @param error A Throwable object.
     */
    void completeExceptionally(Throwable error);

它包含两个方法:

  • complete:如果异步逻辑顺利返回,调用complete方法转入结果数据的集合对象,将数据传递给下游。
  • completeExceptionally:如果异步逻辑需要错误,需要调用这个方法将错误传入。

6.AsyncDataStream

该类是创建异步算子的工具类。它有2种方法:

  • unorderedWait:不保证输出元素的顺序和读入元素顺序相同。
  • orderedWait:保证输出元素的顺序和读入元素顺序相同。

这两种方法每个还对应两个重载方法,但是参数含义是相同的。参数为:

DataStream<IN> in:需要添加异步处理逻辑的数据流。AsyncDataStream实际上是个工具类,并不是一种流的类型。
AsyncFunction<IN, OUT> func:用户定义的异步执行逻辑。
long timeout:异步任务超时时间。
TimeUnit timeUnit:超时时间单位。
int capacity:异步任务初始队列长度。只有部分重载方法有这个参数。默认值为100

下面是orderedWait其中一个重载方法的代码。

public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    TimeUnit timeUnit,
    int capacity) 
    return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);

它调用了addOperator方法,为DataStream添加一个OneInputTransformation,其中包含了AsyncWaitOperator。

其他几个unorderedWait或orderedWait重载方法调用的都是addOperator,不再赘述。

接下来轮到了addOperator方法:

private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
    DataStream<IN> in,
    AsyncFunction<IN, OUT> func,
    long timeout,
    int bufSize,
    OutputMode mode) 

    TypeInformation<OUT> outTypeInfo =
        TypeExtractor.getUnaryOperatorReturnType(
        func,
        AsyncFunction.class,
        0,
        1,
        new int[] 1, 0,
        in.getType(),
        Utils.getCallLocationName(),
        true);

    // create transform
    AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
        new AsyncWaitOperatorFactory<>(
        in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);

    return in.transform("async wait operator", outTypeInfo, operatorFactory);

这个方法创建了一个AsyncWaitOperatorFactory,将其包装入transformation。factory在生成ExecutionGraph的时候将创建出AsyncWaitOperator。下一节我们一起分析下异步操作的核心AsyncWaitOperator。

7.AsyncWaitOperator

我们从AsyncWaitOperator的构造方法开始。构造方法参数中最重要的是outputMode,它决定了异步处理任务队列的类型,从而决定用户数据异步处理后是否严格按照输入顺序输出。

public AsyncWaitOperator(
    @Nonnull AsyncFunction<IN, OUT> asyncFunction,
    long timeout,
    int capacity,
    @Nonnull AsyncDataStream.OutputMode outputMode,
    @Nonnull ProcessingTimeService processingTimeService,
    @Nonnull MailboxExecutor mailboxExecutor) 
    super(asyncFunction);

    // 设置可以和下游算子组成OperatorChain
    setChainingStrategy(ChainingStrategy.ALWAYS);

    Preconditions.checkArgument(
        capacity > 0, "The number of concurrent async operation should be greater than 0.");
    // 默认队列长度
    this.capacity = capacity;

    // 枚举值,决定用户数据异步处理后是否严格按照输入顺序输出
    this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

    // 异步处理超时时间
    this.timeout = timeout;

    // 时间服务,用于设置定时器,检测超时等
    this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

    // 用户作业执行线程池
    this.mailboxExecutor = mailboxExecutor;

在operator创建出来后紧接着会执行setup方法,进行初始化操作。

@Override
public void setup(
    StreamTask<?, ?> containingTask,
    StreamConfig config,
    Output<StreamRecord<OUT>> output) 
    // 调用父类初始化逻辑
    super.setup(containingTask, config, output);

    // 创建元素序列化器
    this.inStreamElementSerializer =
        new StreamElementSerializer<>(
        getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

    switch (outputMode) 
        case ORDERED:
            // 如果需要保持输出数据有序
            // 创建的队列为OrderedStreamElementQueue
            queue = new OrderedStreamElementQueue<>(capacity);
            break;
        case UNORDERED:
            // 如果不需要保持输出有序
            // 创建的队列为UnorderedStreamElementQueue
            queue = new UnorderedStreamElementQueue<>(capacity);
            break;
        default:
            throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
    

    this.timestampedCollector = new TimestampedCollector<>(output);

setup方法根据outputMode是否保证输出元素顺序,来决定创建的StreamElementQueue。

接下来是处理元素的processElement方法。上游每个元素到来的时候,都会调用这个方法。

@Override
public void processElement(StreamRecord<IN> element) throws Exception 
    // add element first to the queue
    // 将元素放入队列中
    // 返回队列的entry
    // 队列中的entry类型实现了ResultFuture接口,后面介绍
    final ResultFuture<OUT> entry = addToWorkQueue(element);

    // 创建ResultHandler,包装了超时定时器,输入数据和resultFuture
    // 用来操作resultFuture和超时定时器
    final ResultHandler resultHandler = new ResultHandler(element, entry);

    // register a timeout for the entry if timeout is configured
    // 如果配置了超时时间
    if (timeout > 0L) 
        // 计算超时时刻
        final long timeoutTimestamp =
            timeout + getProcessingTimeService().getCurrentProcessingTime();

        // 注册一个定时器,在超时的时刻调用AsyncFunction的timeout方法
        final ScheduledFuture<?> timeoutTimer =
            getProcessingTimeService()
            .registerTimer(
            timeoutTimestamp,
            timestamp ->
            userFunction.timeout(
                element.getValue(), resultHandler));

        // 设置定时器给resultHandler
        resultHandler.setTimeoutTimer(timeoutTimer);
    

    // 调用AsyncFunction的asyncInvoke方法
    userFunction.asyncInvoke(element.getValue(), resultHandler);

继续查看addToWorkQueue方法,将元素放入任务队列中。

private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
    throws InterruptedException 

    Optional<ResultFuture<OUT>> queueEntry;
    
    // 如果元素添加队列失败,说明队列已满
    // 需要当前线程让出执行机会给mailboxExecutor,即执行用户自定义处理逻辑
    while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) 
        mailboxExecutor.yield();
    

    // 添加队列成功,返回ResultFuture
    return queueEntry.get();

workQueue我们在后面讨论。接下来分析ResultHandler。

8.ResultHandler

ResultHandler是ResultFuture的实现类,为AsyncFunction中两个方法的参数,让用户使用。分别处理异步处理完成(complete)和异步处理异常(completeExceptionally)两种情况。

ResultHandler持有4个成员变量:

timeoutTimer:定时器,在数据计算完毕(调用了complete方法的时候),需要将timer清除,所以需要持有定时器。
inputRecord:数据流中的原始数据。
resultFuture:实际为元素队列中的entry。这个后面介绍。
completed:用来表示异步计算是否完成。

用户的自定义异步处理逻辑在AsyncFunction中,异步处理完成的时候需要调用ResultHandler的complete方法。这个方法将completed变量标记为true。然后调用processInMainbox方法。

@Override
public void complete(Collection<OUT> results) 
    Preconditions.checkNotNull(
        results, "Results must not be null, use empty collection to emit nothing");

    // already completed (exceptionally or with previous complete call from ill-written
    // AsyncFunction), so
    // ignore additional result
    if (!completed.compareAndSet(false, true)) 
        return;
    

    processInMailbox(results);

processInMainbox方法在MailboxExecutor线程池执行resultFuture的complete方法,通知持有这些元素的队列,该元素已经处理完毕。然后清除掉超时时间timer。最后调用outputCompletedElement,输出已完成的元素到下游。对应的代码如下所示:

private void processInMailbox(Collection<OUT> results) 
    // move further processing into the mailbox thread
    mailboxExecutor.execute(
        () -> processResults(results),
        "Result in AsyncWaitOperator of input %s",
        results);


private void processResults(Collection<OUT> results) 
    // Cancel the timer once we've completed the stream record buffer entry. This will
    // remove the registered
    // timer task
    if (timeoutTimer != null) 
        // canceling in mailbox thread avoids
        // https://issues.apache.org/jira/browse/FLINK-13635
        timeoutTimer.cancel(true);
    

    // update the queue entry with the result
    resultFuture.complete(results);
    // now output all elements from the queue that have been completed (in the correct
    // order)
    outputCompletedElement();


private void outputCompletedElement() 
    if (queue.hasCompletedElements()) 
        // emit only one element to not block the mailbox thread unnecessarily
        queue.emitCompletedElement(timestampedCollector);
        // if there are more completed elements, emit them with subsequent mails
        if (queue.hasCompletedElements()) 
            mailboxExecutor.execute(
                this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
        
    

9.StreamElementQueue

这一节我们分析异步处理的核心:StreamElementQueue。所有需要异步处理的数据都会在此队列中排队。

此队列需要支持是否保持输出元素顺序这两种情形,因此它具有两个实现类:

  • OrderedStreamElementQueue:元素输出的顺序严格和输入的顺序一致。
  • UnorderedStreamElementQueue:不保证元素输出的顺序和输入的一致。
    该接口有如下方法:
@Internal
public interface StreamElementQueue<OUT> 

    // 尝试将元素放入队列,如果队列已满,返回Optional.EMPTY
    // 返回一个ResultFuture对象
    Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);

    // 弹出队列头部一个已经完成异步处理的元素给outputCollector
    void emitCompletedElement(TimestampedCollector<OUT> output);

    // 检查队列头部元素是否已完成异步处理
    boolean hasCompletedElements();

    // 其余方法省略
    // ...

下面分别介绍这两种子类Queue。

9.1 OrderedStreamElementQueue

这个队列保证了输出元素顺序和输入元素顺序严格一致。它使用一个Queue<StreamElementQueueEntry>类型队列保存输入数据。Queue使用的是ArrayDeque类型。

添加元素的tryPut方法如下。如果添加成功(未超出队列容量限制),返回ResultFuture,否则返回Optional.EMPTY。

@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) 
    if (queue.size() < capacity) 
        // 只有队列有剩余空间的情况下才加入队列
        // 根据element的类型(数据还是watermark),构造对应的队列entry
        StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

        // 将entry加入队列
        queue.add(queueEntry);

        LOG.debug(
            "Put element into ordered stream element queue. New filling degree "
            + "(/).",
            queue.size(),
            capacity);

        return Optional.of(queueEntry);
     else 
        LOG.debug(
            "Failed to put element into ordered stream element queue because it "
            + "was full (/).",
            queue.size(),
            capacity);

        // 如果超出队列容量,返回EMPTY
        return Optional.empty();
    

createEntry方法根据element的类型,创建不同的队列entry(StreamElementQueueEntry)。如果元素是数据类型,创建StreamRecordQueueEntry,如果元素是watermark,则创建WatermarkQueueEntry。

private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) 
    if (streamElement.isRecord()) 
        return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
    
    if (streamElement.isWatermark()) 
        return new WatermarkQueueEntry<>((Watermark) streamElement);
    
    throw new UnsupportedOperationException("Cannot enqueue " + streamElement);

从队列中取出元素的方法为emitCompletedElement。OrderedStreamElementQueue从队列的头部获取一个元素,发送给outputCollector。hasCompletedElements方法也是检测队列头部的元素是否已经完成异步处理。所以说OrderedStreamElementQueue能够保证输出数据和输入数据的顺序严格一致。但是带来的问题是处理延迟会受到异步处理时间的影响。

@Override
FlinkFlink 源码之ExecutionGraph

FlinkFlink 源码之RPC调用

FlinkFlink 源码之Buffer Debloating

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之OperatorChain

FlinkFlink 源码之 安全认证