聊聊flink的SourceFunction
Posted 码匠的流水账
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊flink的SourceFunction相关的知识,希望对你有一定的参考价值。
序
本文主要研究一下flink的SourceFunction
实例
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());
dataStreamSource.map(new UpperCaseMapFunc()).print();
env.execute("sourceFunctionDemo");
这里通过addSource方法来添加自定义的SourceFunction
SourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
// ------------------------------------------------------------------------
// source context
// ------------------------------------------------------------------------
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
//......
}
}
SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口
SourceContext
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
/**
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {
/**
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);
/**
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system's current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
/**
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t' <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);
/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();
/**
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock
*/
Object getCheckpointLock();
/**
* This method is called by the system to shut down the context.
*/
void close();
}
SourceContext主要定义了数据源发射数据的接口,这里是collect方法(
如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp
);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用
)此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代
这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据
这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑
close方法主要给系统来调用,用于关闭context相关的资源
Task.run(上游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask
StreamTask.invoke
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
//......
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
//......
// let the task do its work
isRunning = true;
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
LOG.debug("Finished task {}", getName());
//......
}
finally {
// clean up everything we initialized
isRunning = false;
//......
}
}
}
StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask
SourceStreamTask.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@Override
protected void run() throws Exception {
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream
SourceStream.run
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction(
这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter
)
RandomWordSource.run
public class RandomWordSource implements SourceFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);
private volatile boolean isRunning = true;
private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
Thread.sleep(300);
int rnd = (int) (Math.random() * 10 % words.length);
LOGGER.info("emit word: {}", words[rnd]);
ctx.collect(words[rnd]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
RandomWordSource的run方法会一直循环发射数据
StreamSource.LatencyMarksEmitter
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java
private static class LatencyMarksEmitter<OUT> {
private final ScheduledFuture<?> latencyMarkTimer;
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
public void close() {
latencyMarkTimer.cancel(true);
}
}
LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的(
如果latencyTrackingInterval>0的话
),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000
)LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval
定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput
SystemProcessingTimeService.scheduleAtFixedRate
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@Override
public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
long nextTimestamp = getCurrentProcessingTime() + initialDelay;
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return timerService.scheduleAtFixedRate(
new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
initialDelay,
period,
TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}
SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask
RepeatedTriggerTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
/**
* Internal task which is repeatedly called by the processing time service.
*/
private static final class RepeatedTriggerTask implements Runnable {
private final AtomicInteger serviceStatus;
private final Object lock;
private final ProcessingTimeCallback target;
private final long period;
private final AsyncExceptionHandler exceptionHandler;
private long nextTimestamp;
private RepeatedTriggerTask(
final AtomicInteger serviceStatus,
final AsyncExceptionHandler exceptionHandler,
final Object lock,
final ProcessingTimeCallback target,
final long nextTimestamp,
final long period) {
this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
this.lock = Preconditions.checkNotNull(lock);
this.target = Preconditions.checkNotNull(target);
this.period = period;
this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
this.nextTimestamp = nextTimestamp;
}
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(nextTimestamp);
}
nextTimestamp += period;
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
}
}
}
}
RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period
AbstractStreamOperator.CountingOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
this.output = output;
this.numRecordsOut = counter;
}
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
output.emitLatencyMarker(latencyMarker);
}
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
numRecordsOut.inc();
output.collect(outputTag, record);
}
@Override
public void close() {
output.close();
}
}
它实际包装的是RecordWriterOutput
RecordWriterOutput.emitLatencyMarker
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/**
* Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
private SerializationDelegate<StreamElement> serializationDelegate;
//......
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit(
它实际上是通过父类RecordWriter来发射
),来发射LatencyMarker
RecordWriter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/**
* This is used to send LatencyMarks to a random target channel.
*/
public void randomEmit(T record) throws IOException, InterruptedException {
sendToTarget(record, rng.nextInt(numChannels));
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送
Task.run(下游
)
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask
OneInputStreamTask
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor
StreamInputProcessor
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn = new SimpleCounter();
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理
处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理
如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap
StreamMap.processElement
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc
小结
SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口;SourceContext接口主要定义了collect、collectWithTimestamp方法用于发射数据,同时也提供了emitWatermark来发射Watermark
对于数据的发射来说,其调用顺序为Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(
RandomWordSource.run
);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))这里相当于下游会收到userFunction.run发送的用户数据,也会收到定时任务发送的LatencyMarker;下游的调用顺序为Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput里头会根据数据的不同类型做不同处理,如果是用户数据,则调用streamOperator.processElement即StreamMap.processElement --> userFunction.map(
UpperCaseMapFunc.map
)
doc
SourceFunction
以上是关于聊聊flink的SourceFunction的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.12.1 NoClassDefFoundError SourceFunction
flink1.11报错:java.lang.IllegalStateException: No ExecutorFactory found to execute the application(代码片