使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record
Posted
技术标签:
【中文标题】使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record【英文标题】:Error publishing record using KafkaConnect JDBC Source: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record 【发布时间】:2021-05-04 18:19:26 【问题描述】:我想使用 Kafka Connect JDBC Source Connector(Postgres) 将事件发布到 Kafka
我有一个发件箱表,在使用 KafkaAvroSerializer
序列化它们之后,我将有效负载 id 和有效负载存储为字节。
被序列化的对象是一个 avro 生成的 SpecificRecord 类,例如 EmployeeCreatedEvent
postgres中发件箱表的数据类型:
payload bytea,
payload_id bytea
我为 Kafka Connect 转换器编写了一个自定义 SMT。 代码将数据、payload和payload_id反序列化为'''GenericData.Record'''
但我收到以下错误:
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record
我的环境: 汇合 6.0.1
配置:
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter
ConnectRecord 值有 2 个元素:subject_id 和主题,它们是 byte[]。我想使用 Key=payload_id value=payload 如果我这样做:
final byte[] subjectId = (byte[]) values.get("subject_id");
final byte[] retrievedPayload = (byte[]) values.get("subject");
I get the Exception: DataException: Invalid type for STRUCT: class [B
在创建新的 ConnectRecord 之前,我正在从模式注册表中获取模式并转换为 connectSchema。
record.newRecord("mytopic", record.kafkaPartition(), derivedKeySchema, values.get("subject_id"), derivedValueSchema, values.get("subject"), record.timestamp());
我一开始就从模式注册表中检索模式,并在创建新的连接记录时使用它。
全栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class [B
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:597)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:344)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
谁能提供解决方案? 是否可以将 SpecificRecord 对象转换为 JSON?,如果可以,我可以将它们作为 json 而不是字节存储在发件箱表中。
谢谢。
【问题讨论】:
当您基本上像使用常规 Kafka 主题一样使用数据库时,我不确定我是否理解数据库的用途...您需要展示您的代码,但您会得到一个Struct(payload, payload_id)
来自数据库,如果您使用的是 AvroConverter,那已经是 Avro 数据。如果你想要 json,请使用 json 转换器
DB 作为发件箱模式的一部分。在发布事件时使用 TopicRecordNameStrategy 我已经在使用 AvroConverter,例如 key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter ConnectRecord 值有 2 个元素:subject_id 和主题和它们是字节 []。我想使用 Key=payload_id value=payload 如果我这样做:''' final byte[] subjectId = (byte[]) values.get("subject_id"); final byte[] retrievedPayload = (byte[]) values.get("subject"); ''' 我得到异常:DataException:STRUCT 的类型无效:类 [B 请告知
1) 我不知道“发件箱模式”是什么意思,但我看到博客单独使用 Kafka 2) 代码在 cmets 中难以阅读。请edit 您的问题包含格式化代码,以及完整的堆栈跟踪作为minimal reproducible example。初始错误的主要问题是,在 SMT 中,您必须返回 ConnectRecord 以及来自 Connect API 的 Schema,而不是像 Avro 记录这样的特定序列化类型
@OneCricketeer,我已经更新了问题,请看一下。
@OneCricketeer,我真正感兴趣的是:如何在使用 KafkaAvroSerializer 序列化后使用 Kafka Connect 将字节 [] 格式的数据写入 Kafka?我想使用 TopicRecordNameStrategy 因为我想为同一个用例聚合事件
【参考方案1】:
postgres中发件箱表的数据类型:
我假设这些列仅用于记录值?如果不是,您需要一个 ID+bytea 作为记录键。也可以是记录时间戳,也可以加上主题名
老实说,我认为将 ID 提取到数据库列中没有任何价值,因为它已经是有效负载的一部分,如果需要,您可能可以使用数据库查询来提取它(我不确定 Postgres 中的字节函数,但是好像是可以做的)
在任何情况下,您都需要使用 ByteArrayConverter 来访问二进制数据,然后使用 Connect 转换来获取 JDBC 连接器期望的 Struct 值中的数据
我为 Kafka Connect 转换器编写了一个自定义 SMT。代码将数据、payload和payload_id反序列化为'''GenericData.Record'''
好的,这定义了连接记录的值,但 Connect 会将其解释为只是一个字节数组,除非您在转换中的某处也调用了 AvroData.toConnectSchema
或者,这里有一些使用原始字节执行此操作的伪代码
// class MyTransform<R extends ConnectRecord<R>> implements Transformation<R>
@Override
public R apply(R r)
final Object value = r.value();
byte[] valueAsBytes = (byte[]) value;
ByteBuffer b = ByteBuffer.wrap(valueAsBytes);
b.get();
int id = b.getInt();
byte[] payload = b.slice().toArray();
// TODO: define the payload to forward.
// note: these are Connect API imports, not Avro types
Schema valueSchema; // int id; bytes payload
Struct updatedValue;
return r.newRecord(topic, r.kafkaPartition(),
r.keySchema(), r.key(),
valueSchema, updatedValue,
r.timestamp());
如果您仍然收到类似“STRUCT 的类型无效:类 [B”的错误,那是因为 ByteArrayConverter 在转换后也以某种方式被应用了,您最好只使用 Kafka Streams 作业,使用 Bytes 消费serde,操作字节,生成实际的 Avro(或您首选的 JSON)有效负载,然后像往常一样使用 Connect,无需任何转换
【讨论】:
@OneCrickteer 谢谢。可以通过以下代码解析:''' GenericData.Record key = (GenericData.Record) outboxAvroSerDeService.deSerializeKey(keyB); GenericData.Record 值 = (GenericData.Record) outboxAvroSerDeService.deSerializeValue(valB); AvroData avroData = .....; org.apache.avro.Schema avroKSchema = avroData.fromConnectSchema(kSchema); org.apache.avro.Schema avroVSchema = avroData.fromConnectSchema(vSchema); Struct keyAsStruct = (Struct)avroData.toConnectData(avroKSchema, key).value(); Struct payloadAsStruct = (Struct)avroData.toConnectData(avroVSchema, value).value();''' 还有一个问题:如何确保将所有这些事件写入单个分区,以便对其进行排序? 分区是newRecord
方法的第二个参数以上是关于使用 KafkaConnect JDBC 源发布记录时出错:STRUCT 的类型无效:类 org.apache.avro.generic.GenericData$Record的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时
Kafka Connect - MongoDB 源连接器 - 管道不工作
如何为在 kubernetes 集群上运行的 Kafka Connect 配置 MongoDB 官方源连接器