使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题

Posted

技术标签:

【中文标题】使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题【英文标题】:Issues using Kafka KSQL AVRO table as a source for a Kafka Connect JDBC Sink 【发布时间】:2019-05-12 20:30:18 【问题描述】:

我已经为此苦苦挣扎了大约一周,现在试图获得一个简单的(3 个字段)AVRO 格式的 KSQL 表作为 JDBC 连接器接收器 (mysql) 的源

我收到以下错误(在 INFO 行之后):

[2018-12-11 18:58:50,678] INFO Setting metadata for table "DSB_ERROR_TABLE_WINDOWED" to Tablename='"DSB_ERROR_TABLE_WINDOWED"', columns=[Column'MOD_CLASS', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR, Column'METHOD', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR, Column'COUNT', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT] (io.confluent.connect.jdbc.util.TableDefinitions)

[2018-12-11 18:58:50,679] ERROR WorkerSinkTaskid=dev-dsb-errors-mysql-sink-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DSB_ERROR_TABLE_WINDOWED
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我可以看出接收器在提取架构时正在正确执行某些操作(请参阅上面的错误之前的内容),并且在数据库中使用正确的架构成功创建了表:

MariaDB [dsb_errors_ksql]> describe  DSB_ERROR_TABLE_WINDOWED;
+-----------+--------------+------+-----+---------+-------+
| Field     | Type         | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| MOD_CLASS | varchar(256) | YES  |     | NULL    |       |
| METHOD    | varchar(256) | YES  |     | NULL    |       |
| COUNT     | bigint(20)   | YES  |     | NULL    |       |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)

这是 KTABLE 的定义:

ksql> describe extended DSB_ERROR_TABLE_windowed;

Name                 : DSB_ERROR_TABLE_WINDOWED
Type                 : TABLE
Key field            : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : DSB_ERROR_TABLE_WINDOWED (partitions: 4, replication: 1)

 Field     | Type
---------------------------------------
 ROWTIME   | BIGINT           (system)
 ROWKEY    | VARCHAR(STRING)  (system)
 MOD_CLASS | VARCHAR(STRING)
 METHOD    | VARCHAR(STRING)
 COUNT     | BIGINT
---------------------------------------

Queries that write into this TABLE
-----------------------------------
CTAS_DSB_ERROR_TABLE_WINDOWED_37 : create table DSB_ERROR_TABLE_windowed  with (value_format='avro') as  select mod_class, method, count(*) as count  from DSB_ERROR_STREAM window session ( 60 seconds) group by mod_class, method   having count(*) > 0;

在此表的架构注册表中自动生成了一个条目(但没有键条目):


    "subject": "DSB_ERROR_TABLE_WINDOWED-value",
    "version": 7,
    "id": 143,
    "schema": "\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[\"name\":\"MOD_CLASS\",\"type\":[\"null\",\"string\"],\"default\":null,\"name\":\"METHOD\",\"type\":[\"null\",\"string\"],\"default\":null,\"name\":\"COUNT\",\"type\":[\"null\",\"long\"],\"default\":null]"

这里是 Connect Worker 的定义:

 "name": "dev-dsb-errors-mysql-sink",
   "config":  
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "DSB_ERROR_TABLE_WINDOWED", 
        "connection.url": "jdbc:mysql://os-compute-d01.maeagle.corp:32692/dsb_errors_ksql?user=xxxxxx&password=xxxxxx",
        "auto.create": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://kafka-d01.maeagle.corp:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
          

我的理解(可能是错误的)是 KSQL 应该在模式注册表中创建适当的 AVRO 模式,而 Kafka Connect 应该能够正确读取这些模式。正如我上面提到的,在 Mysql 中正在生成适当的表时,有些东西正在工作,尽管我很惊讶没有创建关键字段......

大多数帖子和示例都使用 JSON 而不是 AVRO,因此它们并不是特别有用。

好像是在读取和插入主题记录的反序列化部分……

我现在不知所措,可以使用一些指导。

我也通过github开了一张类似的票:

https://github.com/confluentinc/ksql/issues/2250

问候,

--约翰

【问题讨论】:

Key format : STRING,而且 KSQL 还不支持 Avro 键。因此,注册表中没有主题(因为它不是 Avro)...我不确定错误消息,但问题似乎是 JDBC 接收器,而不是 KSQL 这个问题似乎与github.com/confluentinc/ksql/issues/2233有关。 > 一个窗口键由一个键和一个窗口开始时间组成。 TimeWindowedSerde 将这些序列化为一个字节缓冲区,其中前 x 个字节是由 key serde(在本例中为 StringSerde)序列化的 key,最后 8 个字节是窗口开始时间,如 long。这可以解释为什么会发生错误。如果我做同一张表,但没有窗口,我可以成功使用连接器导入 Mysql... 【参考方案1】:

正如上面 John 所说,主题记录中的 key 不是字符串,而是用单个 Java 序列化 64 位整数后缀的字符串,表示窗口开始时间。

Connect 不附带可以处理窗口密钥格式的 SMT。但是,可以写一个来去除整数并返回自然键。然后,您可以将其包含在类路径中并更新您的连接配置。

如果您需要数据库中的窗口开始时间,那么您可以更新您的 ksqlDB 查询以将窗口开始时间作为字段包含在值中。

【讨论】:

以上是关于使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题的主要内容,如果未能解决你的问题,请参考以下文章

从 Avro Debezium 数据创建基于 Avro 的 KSQL 流会生成奇怪的模式

在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。

Kafka kSQL sql查询

kafka sql入门

Kafka KSQL入门

如何从具有小数类型列的主题在 ksql 中创建流