Datastream Connectors--kafka----flink-1.12

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Datastream Connectors--kafka----flink-1.12相关的知识,希望对你有一定的参考价值。

TOC

一、前言

二、Kafka Producer

2.1、构造函数

Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

构造器接收下列参数:

  • 事件被写入的默认输出 topic
  • 序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
  • Kafka client 的 Properties。下列 property 是必须的:
    - “bootstrap.servers” (逗号分隔 Kafka broker 列表)
  • 容错语义
    /**
     * 生产者
     * @return
     */
    public static FlinkKafkaProducer createSink() 
        // kafka 配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "chb1:9092");
        props.setProperty("group.id", "test");

        return new FlinkKafkaProducer<>(
                "topic",
                new SimpleStringSchema(),
                props);
    

	// 实际调用构造函数

    public FlinkKafkaProducer(
            String topicId,
            SerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            @Nullable FlinkKafkaPartitioner<IN> customPartitioner,
            FlinkKafkaProducer.Semantic semantic,
            int kafkaProducersPoolSize) 
        this(
                topicId,
                null,
                null,
                new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema), // 将无键的seaializationSchema转为KafkaSerializationSchema
                producerConfig,
                semantic,
                kafkaProducersPoolSize);
    

// 内部实现 
    /**
     * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to the topic. It
     * accepts a @link KafkaSerializationSchema and possibly a custom @link
     * FlinkKafkaPartitioner.
     *
     * <p>If a partitioner is not provided, written records will be partitioned by the attached key
     * of each record (as determined by @link KeyedSerializationSchema#serializeKey(Object)). If
     * written records do not have a key (i.e., @link
     * KeyedSerializationSchema#serializeKey(Object) returns @code null), they will be
     * distributed to Kafka partitions in a round-robin fashion.
     *
     * @param defaultTopic The default topic to write data to
     * @param keyedSchema A serializable serialization schema for turning user objects into a
     *     kafka-consumable byte[] supporting key/value messages
     * @param customPartitioner A serializable partitioner for assigning messages to Kafka
     *     partitions. If a partitioner is not provided, records will be partitioned by the key of
     *     each record (determined by @link KeyedSerializationSchema#serializeKey(Object)). If the
     *     keys are @code null, then records will be distributed to Kafka partitions in a
     *     round-robin fashion.
     * @param kafkaSchema A serializable serialization schema for turning user objects into a
     *     kafka-consumable byte[] supporting key/value messages
     * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
     *     the only required argument.
     * @param semantic Defines semantic that will be used by this producer (see @link
     *     FlinkKafkaProducer.Semantic).
     * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see @link
     *     FlinkKafkaProducer.Semantic#EXACTLY_ONCE).
     */
    private FlinkKafkaProducer(
            String defaultTopic,
            KeyedSerializationSchema<IN> keyedSchema, //  键值schema  @deprecated Use @link KafkaSerializationSchema.
            FlinkKafkaPartitioner<IN> customPartitioner,// 自定义分区器,未提供就轮询发到kafka分区,否则使用 KeyedSerializationSchema#serializeKey(Object)
            KafkaSerializationSchema<IN> kafkaSchema, // 对SerializationSchema和FlinkKafkaPartitioner的一个封装
            Properties producerConfig, // kafka 配置
            FlinkKafkaProducer.Semantic semantic, // 容错语义,默认是AT_LEAST_ONCE
            int kafkaProducersPoolSize) 
        super(
                new FlinkKafkaProducer.TransactionStateSerializer(),
                new FlinkKafkaProducer.ContextStateSerializer());
       //.......
   

2.2、SerializationSchema

Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。

KafkaSerializationSchema 允许用户指定这样的 schema。它会为每个记录调用 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) 方法,产生一个写入到 Kafka 的 ProducerRecord。

用户可以对如何将数据写到 Kafka 进行细粒度的控制。你可以通过 producer record:

  • 设置 header 值
  • 为每个 record 定义 key
  • 指定数据的自定义分区

2.3、FlinkKafkaPartitioner

自定义的partitioner 只能在 KeyedSerializationSchema,SerializationSchema中使用。

KakfaSchema将 partitioner 和serializationSchema封装

	new KafkaSerializationSchemaWrapper<>(
                        topicId, customPartitioner, false, serializationSchema),

没有指定,默认使用 FlinkFixedPartitioner

2.4、Flink Kafka Producer 的容错

实现 TwoPhaseCommitSinkFunction详细请看 Flink保证exactly-once机制介绍:checkpoint及TwoPhaseCommitSinkFunction

启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。

除了启用 Flink 的 checkpointing,你也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:

  • Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
  • Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)
  • Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level(read_committed 或 read_uncommitted - 后者是默认值)

注意事项
Semantic.EXACTLY_ONCE 模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。

默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。

在 KafkaConsumer 的 read_committed 模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:

  • 用户启动了 transaction1 并使用它写了一些记录
  • 用户启动了 transaction2 并使用它编写了一些其他记录
  • 用户提交了 transaction2

即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:

  • 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。
  • 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。

注意Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。

注意Semantic.EXACTLY_ONCE 会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的话,是不安全的。

三、Kakfa Consumer

以上是关于Datastream Connectors--kafka----flink-1.12的主要内容,如果未能解决你的问题,请参考以下文章

DataStream API

4.Flink入门案例前置说明准备环境代码实现-DataSet-了解DataStream--匿名内部类--处理批DataStream-匿名内部类-处理流LambdaOn-Yarn-掌握

Apache Flink -Streaming(DataStream API)

Datastream 开发打包问题

使用 Flink 获取 DataStream 的文件名

Flink DataStream Split 实现分流