Kafka JDBC Sink Connector 在雪花中找不到表
Posted
技术标签:
【中文标题】Kafka JDBC Sink Connector 在雪花中找不到表【英文标题】:Kafka JDBC Sink Connector can't find tables in Snowflake 【发布时间】:2021-12-21 17:39:30 【问题描述】:我正在尝试使用 JDBC Sink 连接器将数据写入 Snowflake,但我不断收到以下错误,提示无法在数据库中找到该表(我已禁用自动创建,因为 Snowflake 不支持它) .
[2021-11-08 18:52:19,300] INFO [test-connector-jdbc|task-0] Checking Generic dialect for existence of TABLE "test_database"."test_schema"."test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:575)
[2021-11-08 18:52:19,455] INFO [test-connector-jdbc|task-0] Using Generic dialect TABLE "test_database"."test_schema"."test_table" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:583)
[2021-11-08 18:52:19,572] ERROR [test-connector-jdbc|task-0] Error encountered in task test-connector-jdbc-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "test_database"."test_schema"."test_table" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:118)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:133)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我可以看到从 Snowflake 中的 JDBC 驱动程序运行以下查询(并直接在 Snowflake 中运行它,并在 JDBC 连接器中配置相同的用户,确实返回数据。
show /* JDBC:DatabaseMetaData.getTables() */ tables like 'TEST_TABLE' in account
最后我的配置如下:
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:snowflake://######.eu-west-1.snowflakecomputing.com/?db=test_database&schema=test_schema&role=role",
"connection.user": "test_user",
"connection.password": "test_password",
"topics.regex": "test_table",
"table.name.format": "test_database.test_schema.test_table_$topic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq",
"errors.deadletterqueue.topic.replication.factor":"1",
"errors.log.enable":"true",
"quote.sql.identifiers":"never"
知道我在这里可能缺少什么吗?
【问题讨论】:
你有没有想过使用雪花的Kafka connector? 是的,这是我正在研究的选项之一。它将需要对数据进行更多的后处理,因为它只将数据加载到 VARIANT 列中——我们仍然需要提取值、转换数据类型并自己构建合并语句(我希望我能得到“免费”使用 JDBC 连接器。 【参考方案1】:原来这是由使用 getTables()
的 Kafka Connect JDBC 连接器引起的,它区分大小写,如此答案中所强调的:https://***.com/a/62232384/657468
"quote.sql.identifiers":"never"
参数仅适用于连接器尝试插入时,而不是在数据库中查找现有表时。
【讨论】:
【参考方案2】:我能够通过自动创建和使用 JDBC 接收器的 SMT 来实现这一点。
我下载了 Snowflake JDBC 驱动程序并将 jar 添加到 Kafka 实例中的连接器类路径中。我在我的 Kafka 实例上遇到了一个问题,抱怨 Snowflake 中没有合适的 JDBC 驱动程序。
这是我的配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
transforms.unwrap.delete.handling.mode=rewrite
transforms.InsertField.timestamp.field=rowtime
tasks.max=1
timezone=utc
transforms=unwrap,InsertField
value.converter.enhanced.avro.schema.support=true
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
auto.evolve=true
transforms.unwrap.drop.tombstones=true
value.converter.schema.enable=true
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
value.converter=io.confluent.connect.avro.AvroConverter
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
dialect.name=OracleDatabaseDialect
topics=<test_table>
key.converter.enhanced.avro.schema.support=true
value.converter.schema.registry.url=http://localhost:8081
auto.create=true
connection.url=jdbc:snowflake://<snowflake_account>.snowflakecomputing.com/?user=<user>&password=<secret_password>&warehouse=demo_wh&db=demo_db&schema=public&role=sysadmin
pk.mode=record_key
key.converter.schema.registry.url=http://localhost:8081
pk.fields=SquareId
【讨论】:
以上是关于Kafka JDBC Sink Connector 在雪花中找不到表的主要内容,如果未能解决你的问题,请参考以下文章
Kafka JDBC Sink Connector 在雪花中找不到表
kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)
Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?
无法将 Kafka 与 InfluxDB Sink Connector 连接