使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record

Posted

技术标签:

【中文标题】使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record【英文标题】:Error publishing record using KafkaConnect JDBC Source: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record 【发布时间】:2021-05-04 18:19:26 【问题描述】:

我想使用 Kafka Connect JDBC Source Connector(Postgres) 将事件发布到 Kafka

我有一个发件箱表,在使用 KafkaAvroSerializer 序列化它们之后,我将有效负载 id 和有效负载存储为字节。

被序列化的对象是一个 avro 生成的 SpecificRecord 类,例如 EmployeeCreatedEvent

postgres中发件箱表的数据类型:

payload bytea,
payload_id bytea

我为 Kafka Connect 转换器编写了一个自定义 SMT。 代码将数据、payload和payload_id反序列化为'''GenericData.Record'''

但我收到以下错误:

Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record

我的环境: 汇合 6.0.1

配置:

key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter

ConnectRecord 值有 2 个元素:subject_id 和主题,它们是 byte[]。我想使用 Key=payload_id value=payload 如果我这样做:

final byte[] subjectId = (byte[]) values.get("subject_id"); 
final byte[] retrievedPayload = (byte[]) values.get("subject"); 
I get the Exception: DataException: Invalid type for STRUCT: class [B

在创建新的 ConnectRecord 之前,我正在从模式注册表中获取模式并转换为 connectSchema。

record.newRecord("mytopic", record.kafkaPartition(), derivedKeySchema, values.get("subject_id"), derivedValueSchema, values.get("subject"), record.timestamp());

我一开始就从模式注册表中检索模式,并在创建新的连接记录时使用它。

全栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:311)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class [B
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:597)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:344)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:311)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

谁能提供解决方案? 是否可以将 SpecificRecord 对象转换为 JSON?,如果可以,我可以将它们作为 json 而不是字节存储在发件箱表中。

谢谢。

【问题讨论】:

当您基本上像使用常规 Kafka 主题一样使用数据库时,我不确定我是否理解数据库的用途...您需要展示您的代码,但您会得到一个Struct(payload, payload_id) 来自数据库,如果您使用的是 AvroConverter,那已经是 Avro 数据。如果你想要 json,请使用 json 转换器 DB 作为发件箱模式的一部分。在发布事件时使用 TopicRecordNameStrategy 我已经在使用 AvroConverter,例如 key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter ConnectRecord 值有 2 个元素:subject_id 和主题和它们是字节 []。我想使用 Key=payload_id value=payload 如果我这样做:''' final byte[] subjectId = (byte[]) values.get("subject_id"); final byte[] retrievedPayload = (byte[]) values.get("subject"); ''' 我得到异常:DataException:STRUCT 的类型无效:类 [B 请告知 1) 我不知道“发件箱模式”是什么意思,但我看到博客单独使用 Kafka 2) 代码在 cmets 中难以阅读。请edit 您的问题包含格式化代码,以及完整的堆栈跟踪作为minimal reproducible example。初始错误的主要问题是,在 SMT 中,您必须返回 ConnectRecord 以及来自 Connect API 的 Schema,而不是像 Avro 记录这样的特定序列化类型 @OneCricketeer,我已经更新了问题,请看一下。 @OneCricketeer,我真正感兴趣的是:如何在使用 KafkaAvroSerializer 序列化后使用 Kafka Connect 将字节 [] 格式的数据写入 Kafka?我想使用 TopicRecordNameStrategy 因为我想为同一个用例聚合事件 【参考方案1】:

postgres中发件箱表的数据类型:

我假设这些列仅用于记录值?如果不是,您需要一个 ID+bytea 作为记录键。也可以是记录时间戳,也可以加上主题名

老实说,我认为将 ID 提取到数据库列中没有任何价值,因为它已经是有效负载的一部分,如果需要,您可能可以使用数据库查询来提取它(我不确定 Postgres 中的字节函数,但是好像是可以做的)

在任何情况下,您都需要使用 ByteArrayConverter 来访问二进制数据,然后使用 Connect 转换来获取 JDBC 连接器期望的 Struct 值中的数据

我为 Kafka Connect 转换器编写了一个自定义 SMT。代码将数据、payload和payload_id反序列化为'''GenericData.Record'''

好的,这定义了连接记录的值,但 Connect 会将其解释为只是一个字节数组,除非您在转换中的某处也调用了 AvroData.toConnectSchema

或者,这里有一些使用原始字节执行此操作的伪代码

// class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> 
     
    @Override
    public R apply(R r) 
        final Object value = r.value();
        
        byte[] valueAsBytes = (byte[]) value;
        ByteBuffer b = ByteBuffer.wrap(valueAsBytes);
         b.get();
         int id = b.getInt();
         byte[] payload = b.slice().toArray();

         // TODO: define the payload to forward. 
         // note: these are Connect API imports, not Avro types 
         Schema valueSchema; // int id; bytes payload 
         Struct updatedValue; 

        return r.newRecord(topic, r.kafkaPartition(),
                        r.keySchema(), r.key(),
                        valueSchema, updatedValue,
                        r.timestamp());
    

如果您仍然收到类似“STRUCT 的类型无效:类 [B”的错误,那是因为 ByteArrayConverter 在转换后也以某种方式被应用了,您最好只使用 Kafka Streams 作业,使用 Bytes 消费serde,操作字节,生成实际的 Avro(或您首选的 JSON)有效负载,然后像往常一样使用 Connect,无需任何转换

【讨论】:

@OneCrickteer 谢谢。可以通过以下代码解析:''' GenericData.Record key = (GenericData.Record) outboxAvroSerDeService.deSerializeKey(keyB); GenericData.Record 值 = (GenericData.Record) outboxAvroSerDeService.deSerializeValue(valB); AvroData avroData = .....; org.apache.avro.Schema avroKSchema = avroData.fromConnectSchema(kSchema); org.apache.avro.Schema avroVSchema = avroData.fromConnectSchema(vSchema); Struct keyAsStruct = (Struct)avroData.toConnectData(avroKSchema, key).value(); Struct payloadAsStruct = (Struct)avroData.toConnectData(avroVSchema, value).value();''' 还有一个问题:如何确保将所有这些事件写入单个分区,以便对其进行排序? 分区是newRecord方法的第二个参数

以上是关于使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时

Kafka Connect - MongoDB 源连接器 - 管道不工作

KafkaConnect Offset存储设计

如何为在 kubernetes 集群上运行的 Kafka Connect 配置 MongoDB 官方源连接器

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Kafka JDBC Sink 句柄数组数据类型