ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

Posted 青冬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer相关的知识,希望对你有一定的参考价值。

ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

背景

在Flink使用Kafka进行Source 或者Sink的时候需要进行KeyValue的序列化和反序列化操作。现在应该没有人用Kafka 0.XX版本的了吧,估计都是1.X 或者2.X版本。
在这两个版本中,可以直接使用更新的Flink-Kafka-connection 包,具体使用可以参照官网。

报错类型

  • 消费和生产弄反。
    在消费的时候使用反序列化器 Source,在生产的时候使用序列化器 Sink。但其实在1.X以后,Flink就可以不用特意区分两者的区别,可以使用以下代码:
		// Source端
        val source: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](consumerTopic, new SimpleStringSchema(), propertiesSource)
        flinkKafkaConsumer
          .setStartFromGroupOffsets()
          .setCommitOffsetsOnCheckpoints(false)

		// Sink端
		val sink = new FlinkKafkaProducer[String](brokerTopic, new SimpleStringSchema(), propertiesSink)
        

可以看到,上下都是使用了Schema进行简单的拼接,所以无需担心在使用的时候序列化和反序列化指定错误。

  • 依赖包重复
    在使用Flink连接Kafka的时候,需要两个依赖,一个是Flink端使用的flink-connector-kafka依赖包,另一个是kafka-clients包,两个包一起使用才可以连接到Kafka中。
    如果在使用的过程中少了kafka-clients会导致找不到对应依赖。
    但在我们这个报错过程中,大概率是打包时候将依赖包打入,并且在提交指定Flink依赖jar的时候也有对应的jar包。
    如pom文件:
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink-cersion}</version>
                <scope>complie</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
                <scope>complie</scope>
            </dependency>

Flink指定的jars目录也有对应的jar:
在这里插入图片描述
那么这个时候我们对应的pom文件中的依赖需要改成 provide状态,让打出的jar并不携带这些依赖包:

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink-cersion}</version>
                <scope>provide</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
                <scope>provide</scope>
            </dependency>

以上是关于ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer的主要内容,如果未能解决你的问题,请参考以下文章

zabbix is not running value is no

There is no known eureka server; cluster server list is empty

There is No Alternative~最小生成树变形

Flutter环境配置 ! NO_PROXY is not set

需要 Python 3.7 中的 Mysql 查询以从具有列 (table_no, is_new) 的表中选择记录

Authentication token is no longer valid