Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”
Posted
技术标签:
【中文标题】Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”【英文标题】:Kafka sink Error "This connector requires that records from Kafka contain the keys for the Cassandra table" 【发布时间】:2019-03-30 02:12:39 【问题描述】:我正在尝试使用 kafka 将从 Sap 读取的所有表同步到 cassandra 这是我的 cassandra 配置
"name": "cassandra",
"config":
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "5",
"topics" :"sap_table1,sap_table2",
"cassandra.keyspace": "sap",
"cassandra.compression":"SNAPPY",
"cassandra.consistency.level":"LOCAL_QUORUM",
"cassandra.write.mode":"Update",
"transforms":"prune",
"transforms.prune.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.prune.whitelist":"CreatedAt,Id,Text,Source,Truncated",
"transforms.ValueToKey.fields":"ROWTIME"
我收到了这个错误
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584) org.apache.kafka.connect.errors.DataException: Record with a null key was encountered. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
从 kafka sap 连接器生成的所有表都没有密钥我不知道这是不是问题
如果我在做任何事情,请告诉我
谢谢
【问题讨论】:
【参考方案1】:"ROWTIME"
仅作为 KSQL 概念存在。它实际上不是您的值中的字段,因此键被设置为 null。
另外,ValueToKey
未列在 transforms
列表中,因此它甚至没有被应用。您还必须添加"transforms.ValueToKey.type"
。
您必须使用不同的转换方法将记录时间戳设置为 ConnectRecord 消息键
【讨论】:
【参考方案2】:该错误意味着您的数据未序列化,因此它不是 json 格式或字典格式 'key':'value'。 如果您直接从代理读取您的数据作为故障排除方法,您会发现您的数据只有值而没有任何键:
使用此命令从代理读取您的数据:
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic your_topic_name--from-beginning
所以解决此问题的最佳方法是添加 序列化程序到您的发布者配置文件中。 尝试将此文件作为源连接器或发布者
name=src-view
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
topic.prefix=test-
connection.url=jdbc:postgresql://127.0.0.1:5434/test?user=testuser&password=testpass
mode=incrementing
incrementing.column.name=id
table.types=table
table.whitelist=table_name
validate.non.null=false
batch.max.rows=10000
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
以下是反序列化数据的消费者(sink.conf):
name=cas-dest
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=your_topic_name
cassandra.contact.points=127.0.0.1
cassandra.port=9042
cassandra.keyspace=your_keyspace_name
cassandra.write.mode=Update
cassandra.keyspace.create.enabled=true
cassandra.table.manage.enabled=true
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
transforms=createKey
transforms.createKey.fields=id,timestamp
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
根据您的数据更改 createKey.fields 并小心,因为它将是您的分区键,因此请在选择您的键之前阅读 cassandra 中的数据建模,它应该存在于您的数据键中。
【讨论】:
以上是关于Kafka 接收器错误“此连接器要求来自 Kafka 的记录包含 Cassandra 表的密钥”的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 代理正常关闭,错误的元数据被传递到 Kafka 连接客户端
Confluent Cloud Kafka - 审计日志集群:接收器连接器
kafka-connect 到同步数据库的硬删除事件不起作用或出现错误