如果某些消息错过输入,是不是可以将(接收器连接器)数据重新摄取到数据库中

Posted

技术标签:

【中文标题】如果某些消息错过输入,是不是可以将(接收器连接器)数据重新摄取到数据库中【英文标题】:Is it possible to re-ingest (sink connector) data into the db if some messages got missed entering如果某些消息错过输入,是否可以将(接收器连接器)数据重新摄取到数据库中 【发布时间】:2021-12-14 19:33:20 【问题描述】:

目前,我设置了 2 个单独的连接器,运行 JDBC Sink 连接器,以摄取从生产者生成的主题以读入数据库。有时,我会在日志中看到错误,导致生成的消息无法存储到数据库中。 我经常看到的错误是

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id:11
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'topic-io..models.avro.Topic' not found; error code404

这是真的,因为 TopicRecordName 不应该指向这个主题,而是我指向的另一个主题,它应该指向 models.avro.Topic

我想知道这种情况是否经常发生,有没有办法在生成消息后将这些生成的记录/消息重新摄取到数据库中。例如,如果在 12am-1am 期间产生了消息,并且日志中出现了某种错误并且在该时间段内未能使用这些消息,则配置或偏移可以通过将其重新摄取到数据库来恢复它。该错误是由于架构注册表链接未能读取/链接到正确的架构链接。它失败了,因为它读取了不正确的工作文件,因为我的一个工作文件有一个value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy,而另一个连接器没有读取该主题名。

目前,我将consumer.auto.offset.reset=earliest 设置为开始阅读消息。 有没有办法将这些数据恢复到文件中,并且我可以恢复这些数据,因为我正在部署到生产环境中,并且必须始终将数据消耗到数据库中而没有任何错误。

【问题讨论】:

【参考方案1】:

您可以使用死信队列配置将错误记录发送到需要监控的新主题,而不是弄乱消费者组偏移量,这最终会导致正确处理的数据再次被消费和复制并在主题保留完全丢弃事件之前消费

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

我的一个工作文件有一个[不同的配置]

这就是配置管理软件很重要的原因。不要在没有更新所有服务器的进程的情况下修改分布式系统中的一台服务器。如果您不在 Kubernetes 中运行连接器,Ansible/Terraform 是最常见的

【讨论】:

是否使用死信队列恢复数据,您是什么意思将错误记录发送到新主题。我只想将那些丢失的消息恢复到备份中,然后像快照一样重新摄取它。我应该将其设置为最新而不是最早 不,它不会恢复任何东西,它将所有导致错误的记录移动到一个全新的主题,这样整个原始消费者组就不会停止和崩溃。因此,您需要多个连接器来读取这两个主题。偏移重置值与那个无关 我不太关注你。当您说它将所有错误移动到一个新主题和多个主题以从两个主题中读取时?这是否意味着我们将 topic1 移动到 topic2 作为新主题,并且 topic2 也将基于 topic1 生成新记录。而topic1和topic2会有相同的记录吗?能不能简单点 我只是想看看是否有可能重新摄取丢失的记录,因为这是在生产中。我想知道有没有办法像快照一样,我自己/手动通过文件将它重新放回数据库 不,死信队列将包含从第一个消费者连接器失败的所有记录。您不应该使用相同的连接器来读取两个主题,因为它们可能会再次失败,并且您最终会出现一些无限的消费者循环......就像我已经说过的那样,是的,可以做你想做的事.不是快照,也不是文件,而是第二个主题……您阅读链接的博客了吗?

以上是关于如果某些消息错过输入,是不是可以将(接收器连接器)数据重新摄取到数据库中的主要内容,如果未能解决你的问题,请参考以下文章

iOS:是不是可以控制或选择从网络接收消息的方式?

结构化流 - Foreach接收器

重启时 Kafka 和 Flink 重复消息

我需要一个提示 - 是不是可以将消息推送到仅位于某些特定区域的 iphone(基于地理的通知)?

sms broadcast receiver 是啥意思

RabbitMQ消息应答重新入队