Datastream Connectors--kafka----flink-1.12
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Datastream Connectors--kafka----flink-1.12相关的知识,希望对你有一定的参考价值。
目录
一、前言
二、Kafka Producer
2.1、构造函数
Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
构造器接收下列参数:
- 事件被写入的默认输出 topic
- 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka client 的 Properties。下列 property 是必须的:
- “bootstrap.servers” (逗号分隔 Kafka broker 列表) - 容错语义
/**
* 生产者
* @return
*/
public static FlinkKafkaProducer createSink()
// kafka 配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "chb1:9092");
props.setProperty("group.id", "test");
return new FlinkKafkaProducer<>(
"topic",
new SimpleStringSchema(),
props);
// 实际调用构造函数
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize)
this(
topicId,
null,
null,
new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema), // 将无键的seaializationSchema转为KafkaSerializationSchema
producerConfig,
semantic,
kafkaProducersPoolSize);
// 内部实现
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
* accepts a @link KafkaSerializationSchema and possibly a custom @link
* FlinkKafkaPartitioner.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key
* of each record (as determined by @link KeyedSerializationSchema#serializeKey(Object)). If
* written records do not have a key (i.e., @link
* KeyedSerializationSchema#serializeKey(Object) returns @code null), they will be
* distributed to Kafka partitions in a round-robin fashion.
*
* @param defaultTopic The default topic to write data to
* @param keyedSchema A serializable serialization schema for turning user objects into a
* kafka-consumable byte[] supporting key/value messages
* @param customPartitioner A serializable partitioner for assigning messages to Kafka
* partitions. If a partitioner is not provided, records will be partitioned by the key of
* each record (determined by @link KeyedSerializationSchema#serializeKey(Object)). If the
* keys are @code null, then records will be distributed to Kafka partitions in a
* round-robin fashion.
* @param kafkaSchema A serializable serialization schema for turning user objects into a
* kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see @link
* FlinkKafkaProducer.Semantic).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see @link
* FlinkKafkaProducer.Semantic#EXACTLY_ONCE).
*/
private FlinkKafkaProducer(
String defaultTopic,
KeyedSerializationSchema<IN> keyedSchema, // 键值schema @deprecated Use @link KafkaSerializationSchema.
FlinkKafkaPartitioner<IN> customPartitioner,// 自定义分区器,未提供就轮询发到kafka分区,否则使用 KeyedSerializationSchema#serializeKey(Object)
KafkaSerializationSchema<IN> kafkaSchema, // 对SerializationSchema和FlinkKafkaPartitioner的一个封装
Properties producerConfig, // kafka 配置
FlinkKafkaProducer.Semantic semantic, // 容错语义,默认是AT_LEAST_ONCE
int kafkaProducersPoolSize)
super(
new FlinkKafkaProducer.TransactionStateSerializer(),
new FlinkKafkaProducer.ContextStateSerializer());
//.......
2.2、SerializationSchema
Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。
KafkaSerializationSchema 允许用户指定这样的 schema。它会为每个记录调用 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
方法,产生一个写入到 Kafka 的 ProducerRecord。
用户可以对如何将数据写到 Kafka 进行细粒度的控制。你可以通过 producer record:
- 设置 header 值
- 为每个 record 定义 key
- 指定数据的自定义分区
2.3、FlinkKafkaPartitioner
自定义的partitioner 只能在 KeyedSerializationSchema,SerializationSchema中使用。
KakfaSchema将 partitioner 和serializationSchema封装
new KafkaSerializationSchemaWrapper<>(
topicId, customPartitioner, false, serializationSchema),
没有指定,默认使用 FlinkFixedPartitioner
2.4、Flink Kafka Producer 的容错
实现 TwoPhaseCommitSinkFunction
, 详细请看 Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction
启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。
除了启用 Flink 的 checkpointing,你也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:
- Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
- Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)
- Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的
isolation.level(read_committed 或 read_uncommitted - 后者是默认值)
。
注意事项
Semantic.EXACTLY_ONCE
模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。
默认情况下,Kafka broker 将 transaction.max.timeout.ms
设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms
属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms
的值。
在 KafkaConsumer 的 read_committed
模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:
- 用户启动了 transaction1 并使用它写了一些记录
- 用户启动了 transaction2 并使用它编写了一些其他记录
- 用户提交了 transaction2
即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:
- 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
- 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。
注意:Semantic.EXACTLY_ONCE
模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。
注意:Semantic.EXACTLY_ONCE
会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR
的值的话,是不安全的。
三、Kakfa Consumer
3.1、构造函数
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。
构造函数接受以下参数:
Topic 名称或者名称列表
用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema
Kafka 消费者的属性。需要以下属性:
- “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
- “group.id” 消费组 ID
Properties props = new Properties();
props.setProperty("bootstrap.servers", "chb1:9092");
props.setProperty("group.id", "test");
// 第一种
FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
3.2、DeserializationSchema
Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。
为了方便使用,Flink 提供了以下几种 schemas:
-
TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema)
基于 Flink 的 TypeInformation 创建 schema。 如果该数据的读和写都发生在 Flink 中,那么这将是非常有用的。此 schema 是其他通用序列化方法的高性能 Flink 替代方案。 -
JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)
将序列化的 JSON 转化为 ObjectNode 对象,可以使用objectNode.get("field").as(Int/String/...)()
来访问某个字段。 KeyValue objectNode 包含一个含所有字段的 key 和 values 字段,以及一个可选的”metadata”
字段,可以访问到消息的offset、partition、topic
等信息。 -
AvroDeserializationSchema
使用静态提供的 schema 读取 Avro 格式的序列化数据。 它能够从 Avro 生成的类(AvroDeserializationSchema.forSpecific(...))
中推断出 schema,或者可以与GenericRecords
一起使用手动提供的 schema(用AvroDeserializationSchema.forGeneric(...))
。此反序列化 schema 要求序列化记录不能包含嵌入式架构!- 此模式还有一个版本,可以在
Confluent Schema Registry
中查找编写器的 schema(用于编写记录的 schema)。 - 使用这些反序列化 schema 记录将读取从 schema 注册表检索到的 schema 转换为静态提供的 schema(或者通过
ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
或 `ConfluentRegistryAvroDeserializationSchema.forSpecific(…))``。
- 此模式还有一个版本,可以在
要使用此反序列化 schema 必须添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.12.7</version>
</dependency>
or
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.12.7</version>
</dependency>
当遇到因一些原因而无法反序列化的损坏消息时,反序列化 schema 会返回 null,以允许 Flink Kafka 消费者悄悄地跳过损坏的消息。请注意,由于 consumer 的容错能力(请参阅下面的部分以获取更多详细信息),在损坏的消息上失败作业将使 consumer 尝试再次反序列化消息。因此,如果反序列化仍然失败,则 consumer 将在该损坏的消息上进入不间断重启和失败的循环。
3.3、配置 Kafka Consumer 开始消费的位置
Flink Kafka Consumer 允许通过配置来确定 Kafka 分区的起始位置。
//从最早或者最新的记录开始消费,在这些模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。
myConsumer.setStartFromEarliest(); // 尽可能从最早的记录开始
myConsumer.setStartFromLatest(); // 从最新的记录开始
// 从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。
// 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。
// 在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
// 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。
// 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
myConsumer.setStartFromGroupOffsets(); // 默认的方法
你也可以为每个分区指定 consumer 应该开始消费的具体 offset:
// 指定从 myTopic 主题的 0 、1 和 2 分区的指定偏移量开始消费。offset 值是 consumer 应该为每个分区读取的下一条消息。
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
// 请注意:如果 consumer 需要读取在提供的 offset 映射中没有指定 offset 的分区,那么它将回退到该特定分区的默认组偏移行为(即 setStartFromGroupOffsets())。
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,这些起始位置配置方法不会影响消费的起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定(有关 checkpointing 的信息,请参阅下一节,以便为 consumer 启用容错功能)。
3.4、Kafka Consumer 和容错
伴随着启用 Flink 的 checkpointing 后,Flink Kafka Consumer 将使用 topic 中的记录,并以一致的方式定期检查其所有 Kafka offset 和其他算子的状态。如果 Job 失败,Flink 会将流式程序恢复到最新 checkpoint 的状态,并从存储在 checkpoint 中的 offset 开始重新消费 Kafka 中的消息。
因此,设置 checkpoint 的间隔定义了程序在发生故障时最多需要返回多少。
为了使 Kafka Consumer 支持容错,需要在 执行环境 中启用拓扑的 checkpointing。
如果未启用 checkpoint,那么 Kafka consumer 将定期向 Zookeeper 提交 offset。
3.5、Kafka Consumer Topic 和分区发现
3.5.1、分区发现
Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(即,当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。
默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。
3.5.2、Topic 发现
在更高的级别上,Flink Kafka Consumer 还能够使用正则表达式基于 Topic 名称的模式匹配来发现 Topic。请看下面的例子:
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"), // 正则匹配topic
new SimpleStringSchema(),
properties);
要允许 consumer 在作业开始运行后发现动态创建的主题,那么请为 flink.partition-discovery.interval-millis
设置非负值。这允许 consumer 发现名称与指定模式匹配的新主题的分区。
3.6、Kafka Consumer 提交 Offset 的行为配置
Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。
配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。
-
禁用 Checkpointing: Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 Properties 中
enable.auto.commit
或者auto.commit.interval.ms
的 值设置适当值。 -
启用 Checkpointing:当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的
setCommitOffsetsOnCheckpoints(boolean)
方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。- 注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。
3.7、Kafka Consumer 和 时间戳抽取以及 watermark 发送
在许多场景中,记录的时间戳是(显式或隐式)嵌入到记录本身中。此外,用户可能希望定期或以不规则的方式 Watermark,例如基于 Kafka 流中包含当前事件时间的 watermark 的特殊记录。对于这些情况,Flink Kafka Consumer 允许指定 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks。
你可以按照此处的说明指定自定义时间戳抽取器或者 Watermark 发送器,或者使用 内置的。你也可以通过以下方式将其传递给你的
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
请注意: 由于 watermark assigner 依赖于从 Kafka 读取的消息来上涨其 watermark (通常就是这种情况),那么所有主题和分区都需要有连续的消息流。否则,整个应用程序的 watermark 将无法上涨,所有基于时间的算子(例如时间窗口或带有计时器的函数)也无法运行。单个的 Kafka 分区也会导致这种反应。考虑设置适当的 idelness timeouts 来缓解这个问题
以上是关于Datastream Connectors--kafka----flink-1.12的主要内容,如果未能解决你的问题,请参考以下文章
4.Flink入门案例前置说明准备环境代码实现-DataSet-了解DataStream--匿名内部类--处理批DataStream-匿名内部类-处理流LambdaOn-Yarn-掌握