Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误
Posted
技术标签:
【中文标题】Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误【英文标题】:Kafka connect to Bigquery connector without schema registry giving error 【发布时间】:2022-01-07 06:16:01 【问题描述】:我在不提供架构的情况下保存到 bigquery 时遇到问题。
Kafka 连接配置
tasks.max: 1
topics: sample_topic
project: gcp-project-name
defaultDataset: dataset-name
keyfile: key_file_path
group.id: bq-connector
confluent.license: licence_file_path
错误
[2021-12-01 11:10:16,554] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to unionize schemas of records for the table GenericDataclassInfo=[datasetId, projectId, tableId], datasetId=dataset_name, tableId=table_name (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:70)
Exception in thread "pool-9-thread-1" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to unionize schemas of records for the table GenericDataclassInfo=[datasetId, projectId, tableId], datasetId=dataset_name, tableId=table_name
Caused by: Could not convert to BigQuery schema with a batch of tombstone records.
at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:283)
at com.wepay.kafka.connect.bigquery.SchemaManager.createTable(SchemaManager.java:226)
at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.attemptTableCreate(AdaptiveBigQueryWriter.java:168)
at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:115)
at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Could not convert to BigQuery schema with a batch of tombstone records.
at com.wepay.kafka.connect.bigquery.SchemaManager.getAndValidateProposedSchema(SchemaManager.java:301)
at com.wepay.kafka.connect.bigquery.SchemaManager.getTableInfo(SchemaManager.java:280)
... 8 more
[2021-12-01 11:10:16,606] ERROR WorkerSinkTaskid=kafka-to-bigquery-sink-connector-0 Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:385)
编辑 1:
我在下面添加了配置,但同样的错误
allowBigQueryRequiredFieldRelaxation: true
allBQFieldsNullable: true
【问题讨论】:
See logs for more detail
... 可以提供更多日志吗?否则,正如错误所说,空值(墓碑记录)无法写入连接器
添加了详细的日志。
【参考方案1】:
为配置设置严格的界限充其量是可行的。
在阅读documentation和其他讨论后,发现还有几个属性可以正确设置如下。
sanitizeTopics: false
autoCreateTables: false
autoUpdateSchemas: false
schemaRetriever: com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
bufferSize: 100
maxWriteSize: 100
tableWriteWait: 1000
timestamp: UTC
bigQueryPartitionDecorator: false
【讨论】:
以上是关于Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误的主要内容,如果未能解决你的问题,请参考以下文章
Debezium Kafka 连接器 mongodb:将 kafka 连接器连接到 mongodb 时出错