聊聊flink的SourceFunction

Posted 码匠的流水账

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊flink的SourceFunction相关的知识,希望对你有一定的参考价值。

本文主要研究一下flink的SourceFunction

实例


       // set up the execution environment
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

       dataStreamSource.map(new UpperCaseMapFunc()).print();

       env.execute("sourceFunctionDemo");
  • 这里通过addSource方法来添加自定义的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java


@Public
public interface SourceFunction<T> extends Function, Serializable {

   void run(SourceContext<T> ctx) throws Exception;

   void cancel();

   // ------------------------------------------------------------------------
   //  source context
   // ------------------------------------------------------------------------

   /**
    * Interface that source functions use to emit elements, and possibly watermarks.
    *
    * @param <T> The type of the elements produced by the source.
    */
   @Public // Interface might be extended in the future with additional methods.
   interface SourceContext<T> {

       //......
   }
}
  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java


   /**
    * Interface that source functions use to emit elements, and possibly watermarks.
    *
    * @param <T> The type of the elements produced by the source.
    */
   @Public // Interface might be extended in the future with additional methods.
   interface SourceContext<T> {

       /**
        * Emits one element from the source, without attaching a timestamp. In most cases,
        * this is the default way of emitting elements.
        *
        * <p>The timestamp that the element will get assigned depends on the time characteristic of
        * the streaming program:
        * <ul>
        *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
        *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
        *         current time as the timestamp.</li>
        *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
        *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
        *         operation (like time windows).</li>
        * </ul>
        *
        * @param element The element to emit
        */
       void collect(T element);

       /**
        * Emits one element from the source, and attaches the given timestamp. This method
        * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
        * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
        * on the stream.
        *
        * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
        * This allows programs to switch between the different time characteristics and behaviors
        * without changing the code of the source functions.
        * <ul>
        *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
        *         because processing time never works with element timestamps.</li>
        *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
        *         system's current time, to realize proper ingestion time semantics.</li>
        *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
        * </ul>
        *
        * @param element The element to emit
        * @param timestamp The timestamp in milliseconds since the Epoch
        */
       @PublicEvolving
       void collectWithTimestamp(T element, long timestamp);

       /**
        * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
        * elements with a timestamp {@code t' <= t} will occur any more. If further such
        * elements will be emitted, those elements are considered <i>late</i>.
        *
        * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
        * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
        * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
        * automatic ingestion time watermarks.
        *
        * @param mark The Watermark to emit
        */
       @PublicEvolving
       void emitWatermark(Watermark mark);

       /**
        * Marks the source to be temporarily idle. This tells the system that this source will
        * temporarily stop emitting records and watermarks for an indefinite amount of time. This
        * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
        * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
        * watermarks without the need to wait for watermarks from this source while it is idle.
        *
        * <p>Source functions should make a best effort to call this method as soon as they
        * acknowledge themselves to be idle. The system will consider the source to resume activity
        * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
        * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
        */
       @PublicEvolving
       void markAsTemporarilyIdle();

       /**
        * Returns the checkpoint lock. Please refer to the class-level comment in
        * {@link SourceFunction} for details about how to write a consistent checkpointed
        * source.
        *
        * @return The object to use as the lock
        */
       Object getCheckpointLock();

       /**
        * This method is called by the system to shut down the context.
        */
       void close();
   }
  • SourceContext主要定义了数据源发射数据的接口,这里是collect方法(如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp);除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp(配合TimeCharacteristic.EventTime使用)

  • 此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代

  • 这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据

  • 这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑

  • close方法主要给系统来调用,用于关闭context相关的资源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java


public class Task implements Runnable, TaskActions, CheckpointListener {
   //......

   /**
    * The core work method that bootstraps the task and executes its code.
    */
   @Override
   public void run() {
           //......
           // now load and instantiate the task's invokable code
           invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

           // ----------------------------------------------------------------
           //  actual task core work
           // ----------------------------------------------------------------

           // we must make strictly sure that the invokable is accessible to the cancel() call
           // by the time we switched to running.
           this.invokable = invokable;

           // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
           if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
               throw new CancelTaskException();
           }

           // notify everyone that we switched to running
           notifyObservers(ExecutionState.RUNNING, null);
           taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

           // make sure the user code classloader is accessible thread-locally
           executingThread.setContextClassLoader(userCodeClassLoader);

           // run the invokable
           invokable.invoke();

           //......
   }
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java


@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
       extends AbstractInvokable
       implements AsyncExceptionHandler {

       //......

   @Override
   public final void invoke() throws Exception {

       boolean disposed = false;
       try {
           //......

           // let the task do its work
           isRunning = true;
           run();

           // if this left the run() method cleanly despite the fact that this was canceled,
           // make sure the "clean shutdown" is not attempted
           if (canceled) {
               throw new CancelTaskException();
           }

           LOG.debug("Finished task {}", getName());

           //......
       }
       finally {
           // clean up everything we initialized
           isRunning = false;

           //......
       }
   }
}
  • StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java


   @Override
   protected void run() throws Exception {
       headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
   }
  • SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java


   public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
       run(lockingObject, streamStatusMaintainer, output);
   }

   public void run(final Object lockingObject,
           final StreamStatusMaintainer streamStatusMaintainer,
           final Output<StreamRecord<OUT>> collector) throws Exception {

       final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

       final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
       final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
           ? getExecutionConfig().getLatencyTrackingInterval()
           : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

       LatencyMarksEmitter<OUT> latencyEmitter = null;
       if (latencyTrackingInterval > 0) {
           latencyEmitter = new LatencyMarksEmitter<>(
               getProcessingTimeService(),
               collector,
               latencyTrackingInterval,
               this.getOperatorID(),
               getRuntimeContext().getIndexOfThisSubtask());
       }

       final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

       this.ctx = StreamSourceContexts.getSourceContext(
           timeCharacteristic,
           getProcessingTimeService(),
           lockingObject,
           streamStatusMaintainer,
           collector,
           watermarkInterval,
           -1);

       try {
           userFunction.run(ctx);

           // if we get here, then the user function either exited after being done (finite source)
           // or the function was canceled or stopped. For the finite source case, we should emit
           // a final watermark that indicates that we reached the end of event-time
           if (!isCanceledOrStopped()) {
               ctx.emitWatermark(Watermark.MAX_WATERMARK);
           }
       } finally {
           // make sure that the context is closed in any case
           ctx.close();
           if (latencyEmitter != null) {
               latencyEmitter.close();
           }
       }
   }
  • SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction(这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter)

RandomWordSource.run


public class RandomWordSource implements SourceFunction<String> {

   private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

   private volatile boolean isRunning = true;

   private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

   @Override
   public void run(SourceContext<String> ctx) throws Exception {
       while (isRunning) {
           Thread.sleep(300);
           int rnd = (int) (Math.random() * 10 % words.length);
           LOGGER.info("emit word: {}", words[rnd]);
           ctx.collect(words[rnd]);
       }
   }

   @Override
   public void cancel() {
       isRunning = false;
   }
}
  • RandomWordSource的run方法会一直循环发射数据

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java


   private static class LatencyMarksEmitter<OUT> {
       private final ScheduledFuture<?> latencyMarkTimer;

       public LatencyMarksEmitter(
               final ProcessingTimeService processingTimeService,
               final Output<StreamRecord<OUT>> output,
               long latencyTrackingInterval,
               final OperatorID operatorId,
               final int subtaskIndex) {

           latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
               new ProcessingTimeCallback() {
                   @Override
                   public void onProcessingTime(long timestamp) throws Exception {
                       try {
                           // ProcessingTimeService callbacks are executed under the checkpointing lock
                           output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                       } catch (Throwable t) {
                           // we catch the Throwables here so that we don't trigger the processing
                           // timer services async exception handler
                           LOG.warn("Error while emitting latency marker.", t);
                       }
                   }
               },
               0L,
               latencyTrackingInterval);
       }

       public void close() {
           latencyMarkTimer.cancel(true);
       }
   }
  • LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的(如果latencyTrackingInterval>0的话),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L(这里使用的是后者的配置,即为2000)

  • LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval

  • 定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java


   @Override
   public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
       long nextTimestamp = getCurrentProcessingTime() + initialDelay;

       // we directly try to register the timer and only react to the status on exception
       // that way we save unnecessary volatile accesses for each timer
       try {
           return timerService.scheduleAtFixedRate(
               new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
               initialDelay,
               period,
               TimeUnit.MILLISECONDS);
       } catch (RejectedExecutionException e) {
           final int status = this.status.get();
           if (status == STATUS_QUIESCED) {
               return new NeverCompleteFuture(initialDelay);
           }
           else if (status == STATUS_SHUTDOWN) {
               throw new IllegalStateException("Timer service is shut down");
           }
           else {
               // something else happened, so propagate the exception
               throw e;
           }
       }
   }

   @Override
   public long getCurrentProcessingTime() {
       return System.currentTimeMillis();
   }
  • SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java


   /**
    * Internal task which is repeatedly called by the processing time service.
    */
   private static final class RepeatedTriggerTask implements Runnable {

       private final AtomicInteger serviceStatus;
       private final Object lock;
       private final ProcessingTimeCallback target;
       private final long period;
       private final AsyncExceptionHandler exceptionHandler;

       private long nextTimestamp;

       private RepeatedTriggerTask(
               final AtomicInteger serviceStatus,
               final AsyncExceptionHandler exceptionHandler,
               final Object lock,
               final ProcessingTimeCallback target,
               final long nextTimestamp,
               final long period) {

           this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
           this.lock = Preconditions.checkNotNull(lock);
           this.target = Preconditions.checkNotNull(target);
           this.period = period;
           this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

           this.nextTimestamp = nextTimestamp;
       }

       @Override
       public void run() {
           synchronized (lock) {
               try {
                   if (serviceStatus.get() == STATUS_ALIVE) {
                       target.onProcessingTime(nextTimestamp);
                   }

                   nextTimestamp += period;
               } catch (Throwable t) {
                   TimerException asyncException = new TimerException(t);
                   exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
               }
           }
       }
   }
  • RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java


   /**
    * Wrapping {@link Output} that updates metrics on the number of emitted elements.
    */
   public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
       private final Output<StreamRecord<OUT>> output;
       private final Counter numRecordsOut;

       public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
           this.output = output;
           this.numRecordsOut = counter;
       }

       @Override
       public void emitWatermark(Watermark mark) {
           output.emitWatermark(mark);
       }

       @Override
       public void emitLatencyMarker(LatencyMarker latencyMarker) {
           output.emitLatencyMarker(latencyMarker);
       }

       @Override
       public void collect(StreamRecord<OUT> record) {
           numRecordsOut.inc();
           output.collect(record);
       }

       @Override
       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
           numRecordsOut.inc();
           output.collect(outputTag, record);
       }

       @Override
       public void close() {
           output.close();
       }
   }
  • 它实际包装的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java


/**
* Implementation of {@link Output} that sends data using a {@link RecordWriter}.
*/
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

   private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

   private SerializationDelegate<StreamElement> serializationDelegate;

   //......

   @Override
   public void emitLatencyMarker(LatencyMarker latencyMarker) {
       serializationDelegate.setInstance(latencyMarker);

       try {
           recordWriter.randomEmit(serializationDelegate);
       }
       catch (Exception e) {
           throw new RuntimeException(e.getMessage(), e);
       }
   }
}
  • 这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit(它实际上是通过父类RecordWriter来发射),来发射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java


   /**
    * This is used to send LatencyMarks to a random target channel.
    */
   public void randomEmit(T record) throws IOException, InterruptedException {
       sendToTarget(record, rng.nextInt(numChannels));
   }

   private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
       RecordSerializer<T> serializer = serializers[targetChannel];

       SerializationResult result = serializer.addRecord(record);

       while (result.isFullBuffer()) {
           if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
               // If this was a full record, we are done. Not breaking
               // out of the loop at this point will lead to another
               // buffer request before breaking out (that would not be
               // a problem per se, but it can lead to stalls in the
               // pipeline).
               if (result.isFullRecord()) {
                   break;
               }
           }
           BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

           result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
       }
       checkState(!serializer.hasSerializedData(), "All data should be written at once");

       if (flushAlways) {
           targetPartition.flush(targetChannel);
       }
   }
  • RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java


public class Task implements Runnable, TaskActions, CheckpointListener {
   //......

   /**
    * The core work method that bootstraps the task and executes its code.
    */
   @Override
   public void run() {
           //......
           // now load and instantiate the task's invokable code
           invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

           // ----------------------------------------------------------------
           //  actual task core work
           // ----------------------------------------------------------------

           // we must make strictly sure that the invokable is accessible to the cancel() call
           // by the time we switched to running.
           this.invokable = invokable;

           // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
           if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
               throw new CancelTaskException();
           }

           // notify everyone that we switched to running
           notifyObservers(ExecutionState.RUNNING, null);
           taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

           // make sure the user code classloader is accessible thread-locally
           executingThread.setContextClassLoader(userCodeClassLoader);

           // run the invokable
           invokable.invoke();

           //......
   }
}
  • 下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

    @Override
   protected void run() throws Exception {
       // cache processor reference on the stack, to make the code more JIT friendly
       final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

       while (running && inputProcessor.processInput()) {
           // all the work happens in the "processInput" method
       }
   }
  • Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java


   public boolean processInput() throws Exception {
       if (isFinished) {
           return false;
       }
       if (numRecordsIn == null) {
           try {
               numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
           } catch (Exception e) {
               LOG.warn("An exception occurred during the metrics setup.", e);
               numRecordsIn = new SimpleCounter();
           }
       }

       while (true) {
           if (currentRecordDeserializer != null) {
               DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

               if (result.isBufferConsumed()) {
                   currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                   currentRecordDeserializer = null;
               }

               if (result.isFullRecord()) {
                   StreamElement recordOrMark = deserializationDelegate.getInstance();

                   if (recordOrMark.isWatermark()) {
                       // handle watermark
                       statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                       continue;
                   } else if (recordOrMark.isStreamStatus()) {
                       // handle stream status
                       statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                       continue;
                   } else if (recordOrMark.isLatencyMarker()) {
                       // handle latency marker
                       synchronized (lock) {
                           streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                       }
                       continue;
                   } else {
                       // now we can do the actual processing
                       StreamRecord<IN> record = recordOrMark.asRecord();
                       synchronized (lock) {
                           numRecordsIn.inc();
                           streamOperator.setKeyContextElement1(record);
                           streamOperator.processElement(record);
                       }
                       return true;
                   }
               }
           }

           final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
           if (bufferOrEvent != null) {
               if (bufferOrEvent.isBuffer()) {
                   currentChannel = bufferOrEvent.getChannelIndex();
                   currentRecordDeserializer = recordDeserializers[currentChannel];
                   currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
               }
               else {
                   // Event received
                   final AbstractEvent event = bufferOrEvent.getEvent();
                   if (event.getClass() != EndOfPartitionEvent.class) {
                       throw new IOException("Unexpected event: " + event);
                   }
               }
           }
           else {
               isFinished = true;
               if (!barrierHandler.isEmpty()) {
                   throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
               }
               return false;
           }
       }
   }
  • processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理

  • 处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理

  • 如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java


/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
       extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
       implements OneInputStreamOperator<IN, OUT> {

   private static final long serialVersionUID = 1L;

   public StreamMap(MapFunction<IN, OUT> mapper) {
       super(mapper);
       chainingStrategy = ChainingStrategy.ALWAYS;
   }

   @Override
   public void processElement(StreamRecord<IN> element) throws Exception {
       output.collect(element.replace(userFunction.map(element.getValue())));
   }
}
  • 这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc

小结

  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口;SourceContext接口主要定义了collect、collectWithTimestamp方法用于发射数据,同时也提供了emitWatermark来发射Watermark

  • 对于数据的发射来说,其调用顺序为Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run);SourceStream.run里头在调用userFunction.run之前会判断latencyTrackingInterval是否大于0,如果大于0则会创建LatencyMarksEmitter,它注册了定时任务来定时回调ProcessingTimeCallback的onProcessingTime方法,来触发output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))

  • 这里相当于下游会收到userFunction.run发送的用户数据,也会收到定时任务发送的LatencyMarker;下游的调用顺序为Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;可以看到StreamInputProcessor.processInput里头会根据数据的不同类型做不同处理,如果是用户数据,则调用streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc

  • SourceFunction

以上是关于聊聊flink的SourceFunction的主要内容,如果未能解决你的问题,请参考以下文章

Flink SourceFunction 初了解

Flink 1.12.1 NoClassDefFoundError SourceFunction

flink1.7自定义source实现

Flink 操作示例 —— 输入与输出

flink1.11报错:java.lang.IllegalStateException: No ExecutorFactory found to execute the application(代码片

Flink进阶系列--FLIP-27新的Source架构