Kafka JDBC Sink Connector 在雪花中找不到表

Posted

技术标签:

【中文标题】Kafka JDBC Sink Connector 在雪花中找不到表【英文标题】:Kafka JDBC Sink Connector can't find tables in Snowflake 【发布时间】:2021-12-21 17:39:30 【问题描述】:

我正在尝试使用 JDBC Sink 连接器将数据写入 Snowflake,但我不断收到以下错误,提示无法在数据库中找到该表(我已禁用自动创建,因为 Snowflake 不支持它) .

[2021-11-08 18:52:19,300] INFO [test-connector-jdbc|task-0] Checking Generic dialect for existence of TABLE "test_database"."test_schema"."test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:575)
[2021-11-08 18:52:19,455] INFO [test-connector-jdbc|task-0] Using Generic dialect TABLE "test_database"."test_schema"."test_table" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:583)
[2021-11-08 18:52:19,572] ERROR [test-connector-jdbc|task-0] Error encountered in task test-connector-jdbc-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "test_database"."test_schema"."test_table" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:118)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:133)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

我可以看到从 Snowflake 中的 JDBC 驱动程序运行以下查询(并直接在 Snowflake 中运行它,并在 JDBC 连接器中配置相同的用户,确实返回数据。

show /* JDBC:DatabaseMetaData.getTables() */ tables like 'TEST_TABLE' in account

最后我的配置如下:


    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:snowflake://######.eu-west-1.snowflakecomputing.com/?db=test_database&schema=test_schema&role=role",
    "connection.user": "test_user",
    "connection.password": "test_password",
    "topics.regex": "test_table",
    "table.name.format": "test_database.test_schema.test_table_$topic",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq",
    "errors.deadletterqueue.topic.replication.factor":"1",
    "errors.log.enable":"true",
    "quote.sql.identifiers":"never"


知道我在这里可能缺少什么吗?

【问题讨论】:

你有没有想过使用雪花的Kafka connector? 是的,这是我正在研究的选项之一。它将需要对数据进行更多的后处理,因为它只将数据加载到 VARIANT 列中——我们仍然需要提取值、转换数据类型并自己构建合并语句(我希望我能得到“免费”使用 JDBC 连接器。 【参考方案1】:

原来这是由使用 getTables() 的 Kafka Connect JDBC 连接器引起的,它区分大小写,如此答案中所强调的:https://***.com/a/62232384/657468

"quote.sql.identifiers":"never" 参数仅适用于连接器尝试插入时,而不是在数据库中查找现有表时。

【讨论】:

【参考方案2】:

我能够通过自动创建和使用 JDBC 接收器的 SMT 来实现这一点。

我下载了 Snowflake JDBC 驱动程序并将 jar 添加到 Kafka 实例中的连接器类路径中。我在我的 Kafka 实例上遇到了一个问题,抱怨 Snowflake 中没有合适的 JDBC 驱动程序。

这是我的配置:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
transforms.unwrap.delete.handling.mode=rewrite
transforms.InsertField.timestamp.field=rowtime
tasks.max=1
timezone=utc
transforms=unwrap,InsertField
value.converter.enhanced.avro.schema.support=true
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
auto.evolve=true
transforms.unwrap.drop.tombstones=true
value.converter.schema.enable=true
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
value.converter=io.confluent.connect.avro.AvroConverter
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
dialect.name=OracleDatabaseDialect
topics=<test_table>
key.converter.enhanced.avro.schema.support=true
value.converter.schema.registry.url=http://localhost:8081
auto.create=true
connection.url=jdbc:snowflake://<snowflake_account>.snowflakecomputing.com/?user=<user>&password=<secret_password>&warehouse=demo_wh&db=demo_db&schema=public&role=sysadmin
pk.mode=record_key
key.converter.schema.registry.url=http://localhost:8081
pk.fields=SquareId

【讨论】:

以上是关于Kafka JDBC Sink Connector 在雪花中找不到表的主要内容,如果未能解决你的问题,请参考以下文章

Kafka JDBC Sink Connector 在雪花中找不到表

kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)

Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?

无法将 Kafka 与 InfluxDB Sink Connector 连接

Kafka HDFS Sink Connector Protobuf 未写入

Kafka Confluent HTTP Sink Connector 的开源替代方案 [关闭]