FlinkFlink 源码之AsyncFunction异步 IO 源码
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之AsyncFunction异步 IO 源码相关的知识,希望对你有一定的参考价值。
1.概述
2. 简介
Flink的特点是高吞吐低延迟。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能,这时候怎么办?
为了解决这个问题,Flink引入了AsyncFunction系列接口。使用这些异步接口调用外部服务的时候,不用再同步等待结果返回,只需要将数据存入队列,外部服务接口返回时会更新队列数据状态。在调用外部服务后直接返回处理下一个异步调用,不需要同步等待结果。下游拉取数据的时候直接从队列获取即可。
3.使用方法
在讲解AsyncFunction使用方法之前,我们先“伪造”一个耗时的外部系统调用。调用pullData会立即返回一个CompletableFuture。耗时5秒后生成的数据通过CompletableFuture返回。
public class AsyncIODemo implements Serializable
private final ExecutorService executorService = Executors.newFixedThreadPool(4);
public CompletableFuture<String> pullData(final String source)
CompletableFuture<String> completableFuture = new CompletableFuture<>();
executorService.submit(() ->
try
Thread.sleep(5000);
catch (InterruptedException e)
e.printStackTrace();
completableFuture.complete("Output value: " + source);
);
return completableFuture;
接下来编写Flink作业:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.fromElements("Alpha", "Beta", "Gamma", "Delta")
val asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction[String, String]
override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit =
// 调用前面的外部系统调用,拉取数据
val future = new AsyncIODemo().pullData(input)
// 这个方法是非阻塞的,一旦数据获取成功,会立即调用resultFuture.complete方法
future.whenCompleteAsync(new BiConsumer[String, Throwable]
override def accept(t: String, u: Throwable): Unit =
resultFuture.complete(Array(t))
)
, 10, TimeUnit.SECONDS)
// 上面设置最长异步调用超时时间为10秒
asyncStream.print()
env.execute()
执行Flink作业。我们发现虽然外部系统调用了4次,然而并没有等待20秒后才输出全部4个结果,实际上只等待了5秒左右。AsyncFunction的功能得到了验证。
注意:尽管AsyncFunction字面上为异步调用,实际上asynInvoke方法仍然是同步的。绝不能在该方法中阻塞等待调用结果,这样失去了它原本的作用。应该在此处编写异步回调方法,通过异步方式通知Flink数据已获取完毕。
4.AsyncFunction
从这里开始进入源码分析环节。AsyncFunction接口源码如下:
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable
/**
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* @link AsyncFunction#asyncInvoke timeout occurred. By default, the result future is
* exceptionally completed with a timeout exception.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
AsyncFunction接口有两个方法:
- asyncInvoke:异步操作每一个数据流输入元素。方法的第一个参数input为数据流中的元素,第二个参数resultFuture用于收集异步处理的结果或者是错误信息。不要在此方法内同步等待数据处理逻辑,这样会阻塞线程,降低作业吞吐量。
- timeout:定义数据超时处理逻辑。方法的参数和asyncInvoke相同。AsyncFunction已经提供了默认实现。如果需要自定义超时逻辑,可以覆盖这个方法。
5. ResultFuture
ResultFuture在异步操作的时候用于收集结果或错误。
@PublicEvolving
public interface ResultFuture<OUT>
/**
* Completes the result future with a collection of result objects.
*
* <p>Note that it should be called for exactly one time in the user code. Calling this function
* for multiple times will cause data lose.
*
* <p>Put all results in a @link Collection and then emit output.
*
* @param result A list of results.
*/
void complete(Collection<OUT> result);
/**
* Completes the result future exceptionally with an exception.
*
* @param error A Throwable object.
*/
void completeExceptionally(Throwable error);
它包含两个方法:
- complete:如果异步逻辑顺利返回,调用complete方法转入结果数据的集合对象,将数据传递给下游。
- completeExceptionally:如果异步逻辑需要错误,需要调用这个方法将错误传入。
6.AsyncDataStream
该类是创建异步算子的工具类。它有2种方法:
- unorderedWait:不保证输出元素的顺序和读入元素顺序相同。
- orderedWait:保证输出元素的顺序和读入元素顺序相同。
这两种方法每个还对应两个重载方法,但是参数含义是相同的。参数为:
DataStream<IN> in:需要添加异步处理逻辑的数据流。AsyncDataStream实际上是个工具类,并不是一种流的类型。
AsyncFunction<IN, OUT> func:用户定义的异步执行逻辑。
long timeout:异步任务超时时间。
TimeUnit timeUnit:超时时间单位。
int capacity:异步任务初始队列长度。只有部分重载方法有这个参数。默认值为100。
下面是orderedWait其中一个重载方法的代码。
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity)
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
它调用了addOperator方法,为DataStream添加一个OneInputTransformation,其中包含了AsyncWaitOperator。
其他几个unorderedWait或orderedWait重载方法调用的都是addOperator,不再赘述。
接下来轮到了addOperator方法:
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode)
TypeInformation<OUT> outTypeInfo =
TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncFunction.class,
0,
1,
new int[] 1, 0,
in.getType(),
Utils.getCallLocationName(),
true);
// create transform
AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);
return in.transform("async wait operator", outTypeInfo, operatorFactory);
这个方法创建了一个AsyncWaitOperatorFactory,将其包装入transformation。factory在生成ExecutionGraph的时候将创建出AsyncWaitOperator。下一节我们一起分析下异步操作的核心AsyncWaitOperator。
7.AsyncWaitOperator
我们从AsyncWaitOperator的构造方法开始。构造方法参数中最重要的是outputMode,它决定了异步处理任务队列的类型,从而决定用户数据异步处理后是否严格按照输入顺序输出。
public AsyncWaitOperator(
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull ProcessingTimeService processingTimeService,
@Nonnull MailboxExecutor mailboxExecutor)
super(asyncFunction);
// 设置可以和下游算子组成OperatorChain
setChainingStrategy(ChainingStrategy.ALWAYS);
Preconditions.checkArgument(
capacity > 0, "The number of concurrent async operation should be greater than 0.");
// 默认队列长度
this.capacity = capacity;
// 枚举值,决定用户数据异步处理后是否严格按照输入顺序输出
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
// 异步处理超时时间
this.timeout = timeout;
// 时间服务,用于设置定时器,检测超时等
this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
// 用户作业执行线程池
this.mailboxExecutor = mailboxExecutor;
在operator创建出来后紧接着会执行setup方法,进行初始化操作。
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output)
// 调用父类初始化逻辑
super.setup(containingTask, config, output);
// 创建元素序列化器
this.inStreamElementSerializer =
new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
switch (outputMode)
case ORDERED:
// 如果需要保持输出数据有序
// 创建的队列为OrderedStreamElementQueue
queue = new OrderedStreamElementQueue<>(capacity);
break;
case UNORDERED:
// 如果不需要保持输出有序
// 创建的队列为UnorderedStreamElementQueue
queue = new UnorderedStreamElementQueue<>(capacity);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
this.timestampedCollector = new TimestampedCollector<>(output);
setup方法根据outputMode是否保证输出元素顺序,来决定创建的StreamElementQueue。
接下来是处理元素的processElement方法。上游每个元素到来的时候,都会调用这个方法。
@Override
public void processElement(StreamRecord<IN> element) throws Exception
// add element first to the queue
// 将元素放入队列中
// 返回队列的entry
// 队列中的entry类型实现了ResultFuture接口,后面介绍
final ResultFuture<OUT> entry = addToWorkQueue(element);
// 创建ResultHandler,包装了超时定时器,输入数据和resultFuture
// 用来操作resultFuture和超时定时器
final ResultHandler resultHandler = new ResultHandler(element, entry);
// register a timeout for the entry if timeout is configured
// 如果配置了超时时间
if (timeout > 0L)
// 计算超时时刻
final long timeoutTimestamp =
timeout + getProcessingTimeService().getCurrentProcessingTime();
// 注册一个定时器,在超时的时刻调用AsyncFunction的timeout方法
final ScheduledFuture<?> timeoutTimer =
getProcessingTimeService()
.registerTimer(
timeoutTimestamp,
timestamp ->
userFunction.timeout(
element.getValue(), resultHandler));
// 设置定时器给resultHandler
resultHandler.setTimeoutTimer(timeoutTimer);
// 调用AsyncFunction的asyncInvoke方法
userFunction.asyncInvoke(element.getValue(), resultHandler);
继续查看addToWorkQueue方法,将元素放入任务队列中。
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
throws InterruptedException
Optional<ResultFuture<OUT>> queueEntry;
// 如果元素添加队列失败,说明队列已满
// 需要当前线程让出执行机会给mailboxExecutor,即执行用户自定义处理逻辑
while (!(queueEntry = queue.tryPut(streamElement)).isPresent())
mailboxExecutor.yield();
// 添加队列成功,返回ResultFuture
return queueEntry.get();
workQueue我们在后面讨论。接下来分析ResultHandler。
8.ResultHandler
ResultHandler是ResultFuture的实现类,为AsyncFunction中两个方法的参数,让用户使用。分别处理异步处理完成(complete)和异步处理异常(completeExceptionally)两种情况。
ResultHandler持有4个成员变量:
timeoutTimer:定时器,在数据计算完毕(调用了complete方法的时候),需要将timer清除,所以需要持有定时器。
inputRecord:数据流中的原始数据。
resultFuture:实际为元素队列中的entry。这个后面介绍。
completed:用来表示异步计算是否完成。
用户的自定义异步处理逻辑在AsyncFunction中,异步处理完成的时候需要调用ResultHandler的complete方法。这个方法将completed变量标记为true。然后调用processInMainbox方法。
@Override
public void complete(Collection<OUT> results)
Preconditions.checkNotNull(
results, "Results must not be null, use empty collection to emit nothing");
// already completed (exceptionally or with previous complete call from ill-written
// AsyncFunction), so
// ignore additional result
if (!completed.compareAndSet(false, true))
return;
processInMailbox(results);
processInMainbox方法在MailboxExecutor线程池执行resultFuture的complete方法,通知持有这些元素的队列,该元素已经处理完毕。然后清除掉超时时间timer。最后调用outputCompletedElement,输出已完成的元素到下游。对应的代码如下所示:
private void processInMailbox(Collection<OUT> results)
// move further processing into the mailbox thread
mailboxExecutor.execute(
() -> processResults(results),
"Result in AsyncWaitOperator of input %s",
results);
private void processResults(Collection<OUT> results)
// Cancel the timer once we've completed the stream record buffer entry. This will
// remove the registered
// timer task
if (timeoutTimer != null)
// canceling in mailbox thread avoids
// https://issues.apache.org/jira/browse/FLINK-13635
timeoutTimer.cancel(true);
// update the queue entry with the result
resultFuture.complete(results);
// now output all elements from the queue that have been completed (in the correct
// order)
outputCompletedElement();
private void outputCompletedElement()
if (queue.hasCompletedElements())
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent mails
if (queue.hasCompletedElements())
mailboxExecutor.execute(
this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
9.StreamElementQueue
这一节我们分析异步处理的核心:StreamElementQueue。所有需要异步处理的数据都会在此队列中排队。
此队列需要支持是否保持输出元素顺序这两种情形,因此它具有两个实现类:
- OrderedStreamElementQueue:元素输出的顺序严格和输入的顺序一致。
- UnorderedStreamElementQueue:不保证元素输出的顺序和输入的一致。
该接口有如下方法:
@Internal
public interface StreamElementQueue<OUT>
// 尝试将元素放入队列,如果队列已满,返回Optional.EMPTY
// 返回一个ResultFuture对象
Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement);
// 弹出队列头部一个已经完成异步处理的元素给outputCollector
void emitCompletedElement(TimestampedCollector<OUT> output);
// 检查队列头部元素是否已完成异步处理
boolean hasCompletedElements();
// 其余方法省略
// ...
下面分别介绍这两种子类Queue。
9.1 OrderedStreamElementQueue
这个队列保证了输出元素顺序和输入元素顺序严格一致。它使用一个Queue<StreamElementQueueEntry>类型队列保存输入数据。Queue使用的是ArrayDeque类型。
添加元素的tryPut方法如下。如果添加成功(未超出队列容量限制),返回ResultFuture,否则返回Optional.EMPTY。
@Override
public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement)
if (queue.size() < capacity)
// 只有队列有剩余空间的情况下才加入队列
// 根据element的类型(数据还是watermark),构造对应的队列entry
StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);
// 将entry加入队列
queue.add(queueEntry);
LOG.debug(
"Put element into ordered stream element queue. New filling degree "
+ "(/).",
queue.size(),
capacity);
return Optional.of(queueEntry);
else
LOG.debug(
"Failed to put element into ordered stream element queue because it "
+ "was full (/).",
queue.size(),
capacity);
// 如果超出队列容量,返回EMPTY
return Optional.empty();
createEntry方法根据element的类型,创建不同的队列entry(StreamElementQueueEntry)。如果元素是数据类型,创建StreamRecordQueueEntry,如果元素是watermark,则创建WatermarkQueueEntry。
private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement)
if (streamElement.isRecord())
return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
if (streamElement.isWatermark())
return new WatermarkQueueEntry<>((Watermark) streamElement);
throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
从队列中取出元素的方法为emitCompletedElement。OrderedStreamElementQueue从队列的头部获取一个元素,发送给outputCollector。hasCompletedElements方法也是检测队列头部的元素是否已经完成异步处理。所以说OrderedStreamElementQueue能够保证输出数据和输入数据的顺序严格一致。但是带来的问题是处理延迟会受到异步处理时间的影响。
@Override
FlinkFlink 源码之ExecutionGraph
FlinkFlink 源码之Buffer Debloating