Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误

Posted

技术标签:

【中文标题】Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误【英文标题】:Kafka connect to Bigquery connector without schema registry giving error 【发布时间】:2022-01-07 06:16:01 【问题描述】:

我在不提供架构的情况下保存到 bigquery 时遇到问题。

Kafka 连接配置

tasks.max: 1
topics: sample_topic
project: gcp-project-name
defaultDataset: dataset-name
keyfile: key_file_path
group.id: bq-connector
confluent.license: licence_file_path

错误

[2021-12-01 11:10:16,554] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to unionize schemas of records for the table GenericDataclassInfo=[datasetId, projectId, tableId], datasetId=dataset_name, tableId=table_name (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-9-thread-1" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to unionize schemas of records for the table GenericDataclassInfo=[datasetId, projectId, tableId], datasetId=dataset_name, tableId=table_name
Caused by: Could not convert to BigQuery schema with a batch of tombstone records.
    at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:283)
    at com.wepay.kafka.connect.bigquery.SchemaManager.createTable(SchemaManager.java:226)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptTableCreate(AdaptiveBigQueryWriter.java:168)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:115)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Could not convert to BigQuery schema with a batch of tombstone records.
    at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:301)
    at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:280)
    ... 8 more
[2021-12-01 11:10:16,606] ERROR WorkerSinkTaskid=kafka-to-bigquery-sink-connector-0 Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:385)

编辑 1:

我在下面添加了配置,但同样的错误

allowBigQueryRequiredFieldRelaxation: true
allBQFieldsNullable: true

【问题讨论】:

See logs for more detail... 可以提供更多日志吗?否则,正如错误所说,空值(墓碑记录)无法写入连接器 添加了详细的日志。 【参考方案1】:

为配置设置严格的界限充其量是可行的。

在阅读documentation和其他讨论后,发现还有几个属性可以正确设置如下。

sanitizeTopics: false
autoCreateTables: false
autoUpdateSchemas: false
schemaRetriever: com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
bufferSize: 100
maxWriteSize: 100
tableWriteWait: 1000
timestamp: UTC
bigQueryPartitionDecorator: false

【讨论】:

以上是关于Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误的主要内容,如果未能解决你的问题,请参考以下文章

无法将 Google Ads 连接到 BigQuery

Debezium Kafka 连接器 mongodb:将 kafka 连接器连接到 mongodb 时出错

无法连接到 spotify kafka 容器,基本连接问题

Sprint 启动 kafka Consumer 无法连接到 kafka 容器

为啥我无法从外部连接到 Kafka?

如何在 Heroku 中将 Kafka 连接到 Postgres