FlinkFlink 批处理模式 消费指定的 offset 结束

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 批处理模式 消费指定的 offset 结束相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.14.0 全新的 Kafka Connector

Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka Topic 读取数据和向 Kafka Topic 写入数据,并保证恰好一次次语义。

Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化。最近的 Kafka 客户端向后兼容 broker 版本 0.10.0 或更高版本。关于 Kafka 兼容性的详细信息,请参考 Kafka 官方 文档。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.0</version>
</dependency>

Flink 的流连接器目前不是二进制发行版的一部分。在此处 查看如何与它们链接以进行集群执行。

2. Kafka Source

本部分介绍基于新 数据源 API 的 Kafka Source。

Kafka Source 提供了一个 builder 类来构建 KafkaSource 的实例。下面的代码片段展示了如何构建一个 KafkaSource 来消费来自主题 “input-topic” 最早偏移量的消息,消费者组是“my-group”,并且仅将消息的值反序列化为字符串。

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

构建 KafkaSource 「需要」以下属性:

Bootstrap servers,通过 setBootstrapServers(String)来配置
Topics / partitions to subscribe,请参阅以下 主题-分区订阅 以了解更多详细信息。
Deserializer to parse Kafka messages,更多详细信息请参见以下 DeserializerTopic-partition Subscription#

Kafka 源码提供了 3 种 topic-partition 订阅方式:

主题列表,订阅主题列表中所有分区的消息。例如:

KafkaSource.builder().setTopics("topic-a", "topic-b")

主题模式,从名称与提供的正则表达式匹配的所有主题订阅消息。例如:

KafkaSource.builder().setTopicPattern("topic.*")

分区集,订阅提供的分区集中的分区。例如:

final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
        new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
        new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet)

3.Deserializer

解析 Kafka 消息需要一个反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中KafkaRecordDeserializationSchema定义了如何反序列化一个 Kafka ConsumerRecord。

如果只需要 Kafka ConsumerRecord的值,可以使用 setValueOnlyDeserializer(DeserializationSchema)在 builder 中使用,其中DeserializationSchema定义了如何反序列化 Kafka 消息值的二进制文件。

你还可以使用 Kafka Deserializer 来反序列化 Kafka 消息值. 例如使用 StringDeserializer 将 Kafka 消息值反序列化为字符串:

import org.apache.kafka.common.serialization.StringDeserializer;

KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class));

4.Starting Offset

Kafka Source 能够通过指定 OffsetsInitializer来消费从不同偏移量开始的消息。内置的初始值设定项包括:

KafkaSource.builder()
    // Start from committed offset of the consuming group, without reset strategy
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // Start from the first record whose timestamp is greater than or equals a timestamp
    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
    // Start from earliest offset
    .setStartingOffsets(OffsetsInitializer.earliest())
    // Start from latest offset
    .setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始值设定项无法满足你的要求,您还可以实现自定义偏移量初始值设定项。

如果未指定 offsets 初始值设定项,「则」默认使用 「OffsetsInitializer.earliest()」。

5.Boundedness

Kafka Source 旨在支持流式和批量运行模式。默认情况下,KafkaSource 设置为以流式方式运行,因此永远不会停止,直到 Flink 作业失败或被取消。您可以使用setBounded(OffsetsInitializer)指定停止偏移量并设置以批处理模式运行的源。当所有分区都达到它们的停止偏移量时,Source 将退出。

您还可以将 KafkaSource 设置为在流模式下运行,但仍然使用setUnbounded(OffsetsInitializer). 当所有分区达到其指定的停止偏移量时,Source 将退出。

/**
     * By default the KafkaSource is set to run in @link Boundedness#CONTINUOUS_UNBOUNDED manner
     * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in
     * @link Boundedness#BOUNDED manner and stops at some point, one can set an @link
     * OffsetsInitializer to specify the stopping offsets for each partition. When all the
     * partitions have reached their stopping offsets, the KafkaSource will then exit.
     *
     * <p>This method is different from @link #setUnbounded(OffsetsInitializer) that after setting
     * the stopping offsets with this method, @link KafkaSource#getBoundedness() will return
     * @link Boundedness#BOUNDED instead of @link Boundedness#CONTINUOUS_UNBOUNDED.
     *
     * <p>The following @link OffsetsInitializer are commonly used and provided out of the box.
     * Users can also implement their own @link OffsetsInitializer for custom behaviors.
     *
     * <ul>
     *   <li>@link OffsetsInitializer#latest() - stop at the latest offsets of the partitions when
     *       the KafkaSource starts to run.
     *   <li>@link OffsetsInitializer#committedOffsets() - stops at the committed offsets of the
     *       consumer group.
     *   <li>@link OffsetsInitializer#offsets(Map) - stops at the specified offsets for each
     *       partition.
     *   <li>@link OffsetsInitializer#timestamp(long) - stops at the specified timestamp for each
     *       partition. The guarantee of setting the stopping timestamp is that no Kafka records
     *       whose @link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp() is greater
     *       than the given stopping timestamp will be consumed. However, it is possible that some
     *       records whose timestamp is smaller than the specified stopping timestamp are not
     *       consumed.
     * </ul>
     *
     * 默认情况下,KafkaSource被设置为以@link Boundedness#CONTINUOUS_UNBOUNDED的方式运行,
     * 因此在Flink作业失败或取消之前不会停止。为了让KafkaSource以@link Boundedness# bound的方式
     * 运行并在某一点停止,可以设置一个@link OffsetsInitializer来指定每个分区的停止偏移量。
     * 当所有分区都达到停止偏移量时,KafkaSource就会退出。
     *
     * 这个方法不同于@link # setunbound (OffsetsInitializer),在用这个方法设置了停止偏移量之后,
     * @link KafkaSource#getBoundedness()将返回@link Boundedness# bound而不是
     * @link Boundedness#CONTINUOUS_UNBOUNDED。
     *
     * 下面的@link OffsetsInitializer是常用的,并且是开箱即用的。用户也可以为自定义行为实现
     * 他们自己的@link OffsetsInitializer。
     *
     *  1. @link OffsetsInitializer#latest() -当KafkaSource开始运行时,停止在分区的最近偏移量。
     *
     *  2. @link OffsetsInitializer# commitdoffsets() -在消费者组的已提交的偏移量处停止。
     *
     *  3. @link OffsetsInitializer#offset(Map) -在每个分区的指定偏移量处停止。
     *
     *  4. @link OffsetsInitializer#timestamp(long) -在每个分区的指定时间戳处停止。设置停止
     *     时间戳的保证是:如果Kafka记录的@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()
     *     大于给定的停止时间戳,则不会被消耗。但是,一些时间戳小于指定的停止时间戳的记录可能不会被使用。
     *
     *  Kafka Source 旨在支持流式和批量运行模式。默认情况下,KafkaSource 设置为以流式方式运行,因此永远不会停止,
     *  直到 Flink 作业失败或被取消。您可以使用setBounded(OffsetsInitializer)指定停止偏移量并设置
     *  以批处理模式运行的源。当所有分区都达到它们的停止偏移量时,Source 将退出。
     *
     * 您还可以将 KafkaSource 设置为在流模式下运行,但仍然使用setUnbounded(OffsetsInitializer).
     * 当所有分区达到其指定的停止偏移量时,Source 将退出。
     *
     * @param stoppingOffsetsInitializer the @link OffsetsInitializer to specify the stopping
     *     offsets.
     * @return this KafkaSourceBuilder.
     * @see #setUnbounded(OffsetsInitializer)
     */
    public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) 
        this.boundedness = Boundedness.BOUNDED;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        return this;
    

6. Additional Properties

除了上述属性外,您还可以使用setProperties(Properties)和为 KafkaSource 和 KafkaConsumer 设置任意属性setProperty(String, String)。KafkaSource 有以下配置选项:

client.id.prefix 定义用于 Kafka 消费者的客户端 ID 的前缀
partition.discovery.interval.ms定义 Kafka 源发现新分区的时间间隔 im 毫秒。有关更多详细信息,请参阅下面的 动态分区发现。
register.consumer.metrics 指定是否在 Flink 指标组中注册 KafkaConsumer 的指标
commit.offsets.on.checkpoint 指定是否在检查点向 Kafka broker 提交消费偏移量

KafkaConsumer 的配置可以参考 Apache Kafka文档 了解更多。

请注意,即使配置了以下键,构建器也会覆盖它:

key.deserializer 始终设置为 ByteArrayDeserializer
value.deserializer 始终设置为 ByteArrayDeserializer
auto.offset.reset.strategy被起始偏移量覆盖OffsetsInitializer#getAutoOffsetResetStrategy()
partition.discovery.interval.ms被调用时被覆盖为 -1setBounded(OffsetsInitializer)

下面的代码片段显示了配置 KafkaConsumer 以使用“PLAIN”作为 SASL 机制并提供 JAAS 配置:

KafkaSource.builder()
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"username\\" passw

7.Dynamic Partition Discovery

为了在不重启 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题-分区订阅模式下定期发现新分区。要启用分区发现,请为 property 设置一个非负值partition.discovery.interval.ms:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds

默认情况下「禁用」分区发现。您需要明确设置分区发现间隔才能启用此功能。

Event Time and Watermarks

默认情况下,记录将使用嵌入在 Kafka 中的时间戳ConsumerRecord作为事件时间。您可以定义自己WatermarkStrategy的从记录本身提取事件时间,并在下游发出水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

本文档 描述了有关如何定义WatermarkStrategy.

Consumer Offset Committing

Kafka source 在 checkpoint 「完成」时提交当前消费的 offset ,以保证 Flink 的 checkpoint 状态和 Kafka brokers 上的 commit offset 的一致性。

如果未启用检查点,则 Kafka 源依赖于 Kafka 消费者内部的自动定期偏移提交逻辑,由Kafka 消费者的属性配置enable.auto.commit并在其属性中配置auto.commit.interval.ms。

需要注意的是 Kafka source 不依赖提交偏移量来实现容错。提交偏移量只是为了暴露消费者和消费组的进度以供监控。

Monitoring

Kafka Source 在各自的 范围内 公开以下指标。

ScopeMetricsUser VariablesDescriptionType
OperatorcurrentEmitEventTimeLagn/a从记录事件时间戳到源连接器发出记录的时间跨度¹: currentEmitEventTimeLag = EmitTime - EventTime.Gauge
OperatorwatermarkLagn/a水印滞后于墙时钟时间的时间跨度: watermarkLag = CurrentTime - WatermarkGauge
OperatorsourceIdleTimen/a源没有处理任何记录的时间跨度: sourceIdleTime = CurrentTime - LastRecordProcessTimeGauge
OperatorpendingRecordsn/a源尚未提取的记录数。例如 Kafka 分区中消费者偏移后的可用记录。Gauge
OperatorKafkaSourceReader.commitsSucceededn/a如果偏移提交被打开并且检查点被开启,那么成功的偏移提交到 Kafka 的总数。Counter
OperatorKafkaSourceReader.commitsFailedn/a如果打开偏移提交并启用检查点,则向 Kafka 提交偏移提交失败的总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方式,因此提交失败不会影响 Flink 的检查点分区偏移量的完整性。Counter
OperatorKafkaSourceReader.committedOffsetstopic, partition对于每个分区,最后一次成功提交到 Kafka 的偏移量。可以通过主题名称和分区 id 指定特定分区的指标。Gauge
OperatorKafkaSourceReader.currentOffsetstopic, partition每个分区的消费者当前读取偏移量。可以通过主题名称和分区 id 指定特定分区的指标。Gauge

以上是关于FlinkFlink 批处理模式 消费指定的 offset 结束的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 消费 kafka 实现 限流处理 RateLimiter

FlinkFlink使用默认的connector无法指定ConsumerRebalanceListener

flinkFlink 1.12.2 源码浅析 :Task数据输出

Flinkflink-1.12 通过 -t 指定模式后无法指定yarn参数

FlinkFlink源码分析——批处理模式JobGraph的创建

FlinkFlink on yarn 支持指定 application id 吗? yarn.application.id