Flink Kafka EXACTLY_ONCE 导致 KafkaException ByteArraySerializer 不是 Serializer 的实例

Posted

技术标签:

【中文标题】Flink Kafka EXACTLY_ONCE 导致 KafkaException ByteArraySerializer 不是 Serializer 的实例【英文标题】:Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer 【发布时间】:2020-10-09 11:12:47 【问题描述】:

所以,我正在尝试在我的 Flink Kafka 流作业中启用 EXACTLY_ONCE 语义以及检查点。

但是我没有让它工作,所以我尝试从 Github 下载测试示例代码: https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

所以运行它工作正常。但是,启用检查点时出现错误。 或者,如果我将 EXACTLY_ONCE 更改为 AT_LEAST_ONCE 语义并启用检查点,它工作正常。但是当将其更改为 EXACTLY_ONCE 时,我再次收到此错误。

我得到的异常:

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
    ... 12 more

为了在我的环境中工作,我对代码进行了细微的更改。我在 docker 内的 flink 操作操场内运行它。 (这个https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html)。最新版本,1.10 和里面提供的 kafka 版本是 2.2.1

    public static void main(String[] args) throws Exception 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1_000);

        String inputTopic = "my-input";
        String outputTopic = "my-output";
        String kafkaHost = "kafka:9092";

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");


        DataStream<KafkaEvent> input = env
                .addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
                        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
                .keyBy("word")
                .map(new RollingAdditionMapper());

        input.addSink(
                new FlinkKafkaProducer<>(
                        outputTopic,
                        new KafkaEventSerializationSchema(outputTopic),
                        kafkaProps,
                        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

        env.execute("Modern Kafka Example");
    

可以找到示例中的其他类: https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base

我确实尝试将序列化更改为使用 KafkaSerializationSchema,而不是使用 SerializationSchema 的示例代码。但是,下面的代码也没有帮助。同样的错误。

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> 

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String topic;

    public KafkaEventSerializationSchema(String topic) 
        this.topic = topic;
    

    @Override
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) 
        return new ProducerRecord<>(topic, element.toString().getBytes());
    



感谢所有帮助。我一直无法在 flink 和 kafka 之间找到任何 EXACTLY_ONCE garantuee 的在线工作代码。只加载谈论它的文章,但不加载实际的实质性工作代码。这就是我在这里想要实现的全部目标。

【问题讨论】:

在这里 (***.com/a/58644689/2096986) 他们在 ObjSerializationSchema 类上实现了自己的 KafkaSerializationSchema。我想它可以帮助解决您的问题,因为它是一个序列化错误。 KafkaEventSerializationSchema 是我在示例中使用的那个。我现在无法链接,似乎 github 已关闭。无论如何,它也像您建议的那样扩展了 KafkaSerializationSchema。我不明白是什么问题。如果序列化是一个问题。为什么不使用 EXACTLY_ONCE 时它会起作用?使用时不工作? 所以KafkaEventSerializationSchema 是您创建的实现KafkaSerializationSchema 的类。在 java 文档上说这个警告 Please also implement KafkaContextAware if your serialization schema needs information about the available partitions and the number of parallel subtasks along with the subtask ID on which the Kafka Producer is running. ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/… 。也许如果您发布课程,则更容易识别错误。 我会更新帖子。我不能在这里发布代码。 根据这个答案***.com/a/58648019/2096986,我猜这与您使用EXACTLY_ONCE时的超时有关 【参考方案1】:

我遇到了同样的问题,并明确为生产者设置了超时帮助。 properties.setProperty("transaction.timeout.ms", "900000");

【讨论】:

以上是关于Flink Kafka EXACTLY_ONCE 导致 KafkaException ByteArraySerializer 不是 Serializer 的实例的主要内容,如果未能解决你的问题,请参考以下文章

Flink生产数据到Kafka频繁出现事务失效导致任务重启

大数据flink保证Exactly_Once的理解

Flink 一次性消息处理

flink消费kafka细节

flink处理数据从kafka到另外一个kafka

Flink kafka kerberos的配置