如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化程序

Posted

技术标签:

【中文标题】如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化程序【英文标题】:How to implement FlinkKafkaProducer serializer for Kafka 2.2 【发布时间】:2020-02-26 21:13:28 【问题描述】:

我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 版本 1.9)。我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群。因此,我着手更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(如 Flink 文档所建议的那样)。但是,我遇到了 Kafka 制作人的一些问题。我无法使用已弃用的构造函数来序列化数据(不足为奇),并且我无法在网上找到任何关于如何实现序列化器的实现或示例(所有示例都使用较旧的 Kafka 连接器)

目前的实现(针对Kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

当尝试实现以下 FlinkKafkaProducer 时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

我一直无法弄清楚为什么。 FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我不知道如何序列化数据。 以下是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() 
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) 
                        return null;
                    
                ,
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但我不明白如何实现 KafkaSerializationSchema,而且我在网上或 Flink 文档中都找不到这方面的示例。

有没有人有实现这个的经验或关于为什么 FlinkProducer 在步骤中得到 NullPointerException 的任何提示?

【问题讨论】:

【参考方案1】:

如果你只是向 Kafka 发送字符串:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>

    private String topic;   

    public ProducerStringSerializationSchema(String topic) 
        super();
        this.topic = topic;
    

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) 
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    


用于发送 Java 对象:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) 
            super();
            this.topic = topic;
        

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) 
            byte[] b = null;
            if (mapper == null) 
                mapper = new ObjectMapper();
            
             try 
                b= mapper.writeValueAsBytes(obj);
             catch (JsonProcessingException e) 
                // TODO 
            
            return new ProducerRecord<byte[], byte[]>(topic, b);
        

    

在您的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

【讨论】:

谢谢!现在我可以运行处理器而不会崩溃。然而,这引入了另一个问题(不确定它是否相关),但现在我得到 org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. 在启动 Flink 处理器时(和它不断重试)。消费效果很好,所以我想这与生产者有关。 显然 FlinkKafkaProducer.Semantic.EXACTLY_ONCE 是罪魁祸首,更改为 AT_LEAST_ONCE 消除了超时。我需要深入研究我的配置以了解原因。感谢您快速而完美的回答! 您需要将 transaction.timeout.ms 属性设置为 1 小时——请参阅我对这个问题的回答。 为什么不将timestamp 传递到ProducerRecord 中? @cricket_007 没有传递时间戳,因为我的记录没有任何与之关联的时间戳。在这种情况下,null 将被传递,生产者默认使用System.currentTimeMillis() 分配时间戳【参考方案2】:

关于 FlinkKafkaProducer.Semantic.EXACTLY_ONCE 的超时处理,你应该阅读https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-011-and-newer,尤其是这部分:

Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复之后提交在采取检查点之前启动的事务的能力。如果 Flink 应用程序崩溃和完成重启之间的时间大于 Kafka 的事务超时时间,则会出现数据丢失(Kafka 会自动中止超过超时时间的事务)。考虑到这一点,请根据您的预期停机时间适当配置您的事务超时。

Kafka 代理默认将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为生产者设置大于其值的事务超时。 FlinkKafkaProducer011 默认将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应增加 transaction.max.timeout.ms。

【讨论】:

以上是关于如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化程序的主要内容,如果未能解决你的问题,请参考以下文章

如何为 Kafka 生产者配置日志记录?

如何为 DeadLetter Kafka 创建测试

@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者

如何为事件驱动的微服务实现“去抖动”?

如何为Kafka集群选择合适的Topics/Partitions数量

如何为Kafka集群选择合适的Partitions数量