Datastream Connectors--kafka----flink-1.12
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Datastream Connectors--kafka----flink-1.12相关的知识,希望对你有一定的参考价值。
TOC
一、前言
二、Kafka Producer
2.1、构造函数
Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
构造器接收下列参数:
- 事件被写入的默认输出 topic
- 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka client 的 Properties。下列 property 是必须的:
- “bootstrap.servers” (逗号分隔 Kafka broker 列表) - 容错语义
/**
* 生产者
* @return
*/
public static FlinkKafkaProducer createSink()
// kafka 配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "chb1:9092");
props.setProperty("group.id", "test");
return new FlinkKafkaProducer<>(
"topic",
new SimpleStringSchema(),
props);
// 实际调用构造函数
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize)
this(
topicId,
null,
null,
new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema), // 将无键的seaializationSchema转为KafkaSerializationSchema
producerConfig,
semantic,
kafkaProducersPoolSize);
// 内部实现
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
* accepts a @link KafkaSerializationSchema and possibly a custom @link
* FlinkKafkaPartitioner.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key
* of each record (as determined by @link KeyedSerializationSchema#serializeKey(Object)). If
* written records do not have a key (i.e., @link
* KeyedSerializationSchema#serializeKey(Object) returns @code null), they will be
* distributed to Kafka partitions in a round-robin fashion.
*
* @param defaultTopic The default topic to write data to
* @param keyedSchema A serializable serialization schema for turning user objects into a
* kafka-consumable byte[] supporting key/value messages
* @param customPartitioner A serializable partitioner for assigning messages to Kafka
* partitions. If a partitioner is not provided, records will be partitioned by the key of
* each record (determined by @link KeyedSerializationSchema#serializeKey(Object)). If the
* keys are @code null, then records will be distributed to Kafka partitions in a
* round-robin fashion.
* @param kafkaSchema A serializable serialization schema for turning user objects into a
* kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param semantic Defines semantic that will be used by this producer (see @link
* FlinkKafkaProducer.Semantic).
* @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see @link
* FlinkKafkaProducer.Semantic#EXACTLY_ONCE).
*/
private FlinkKafkaProducer(
String defaultTopic,
KeyedSerializationSchema<IN> keyedSchema, // 键值schema @deprecated Use @link KafkaSerializationSchema.
FlinkKafkaPartitioner<IN> customPartitioner,// 自定义分区器,未提供就轮询发到kafka分区,否则使用 KeyedSerializationSchema#serializeKey(Object)
KafkaSerializationSchema<IN> kafkaSchema, // 对SerializationSchema和FlinkKafkaPartitioner的一个封装
Properties producerConfig, // kafka 配置
FlinkKafkaProducer.Semantic semantic, // 容错语义,默认是AT_LEAST_ONCE
int kafkaProducersPoolSize)
super(
new FlinkKafkaProducer.TransactionStateSerializer(),
new FlinkKafkaProducer.ContextStateSerializer());
//.......
2.2、SerializationSchema
Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。
KafkaSerializationSchema 允许用户指定这样的 schema。它会为每个记录调用 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
方法,产生一个写入到 Kafka 的 ProducerRecord。
用户可以对如何将数据写到 Kafka 进行细粒度的控制。你可以通过 producer record:
- 设置 header 值
- 为每个 record 定义 key
- 指定数据的自定义分区
2.3、FlinkKafkaPartitioner
自定义的partitioner 只能在 KeyedSerializationSchema,SerializationSchema中使用。
KakfaSchema将 partitioner 和serializationSchema封装
new KafkaSerializationSchemaWrapper<>(
topicId, customPartitioner, false, serializationSchema),
没有指定,默认使用 FlinkFixedPartitioner
2.4、Flink Kafka Producer 的容错
实现 TwoPhaseCommitSinkFunction
, 详细请看 Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction
启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。
除了启用 Flink 的 checkpointing,你也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:
- Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
- Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)
- Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的
isolation.level(read_committed 或 read_uncommitted - 后者是默认值)
。
注意事项
Semantic.EXACTLY_ONCE
模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。
默认情况下,Kafka broker 将 transaction.max.timeout.ms
设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms
属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms
的值。
在 KafkaConsumer 的 read_committed
模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:
- 用户启动了 transaction1 并使用它写了一些记录
- 用户启动了 transaction2 并使用它编写了一些其他记录
- 用户提交了 transaction2
即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:
- 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
- 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。
注意:Semantic.EXACTLY_ONCE
模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。
注意:Semantic.EXACTLY_ONCE
会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR
的值的话,是不安全的。
三、Kakfa Consumer
以上是关于Datastream Connectors--kafka----flink-1.12的主要内容,如果未能解决你的问题,请参考以下文章
4.Flink入门案例前置说明准备环境代码实现-DataSet-了解DataStream--匿名内部类--处理批DataStream-匿名内部类-处理流LambdaOn-Yarn-掌握