kafka connect - 审计 - 在任务完成时触发事件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect - 审计 - 在任务完成时触发事件相关的知识,希望对你有一定的参考价值。

我们正在使用kafka构建一个异常管理工具。将有源连接器 - 它将从物理文件中提取记录。另一方面,将有sink连接(mongodb-sinkconnect),它将从主题中提取记录并将其推送到mongoDb。一切正常。

我们需要在不同的主题中捕获事件(用于审计目的)。诸如的事件,

  1. 源任务(文件轮询任务)启动事件示例,如果收到文件A.
  2. 源任务(文件轮询任务)结束事件示例,如果文件A已完全处理
  3. Sink Task(将记录推送到mongodb任务)启动事件示例,文件A的记录由mongodb-connect开始处理
  4. Sink Task(将记录推送到mongodb任务)结束事件示例,文件A的记录完全推送到MongoDB

我在这里有几个问题:1。我们能够通过在SourceTask中实例化KafkaProducer将事件发送到不同的主题,一旦文件被完全处理,我们发送一个事件

public class FileSourceTask extends SourceTask {
    private Producer<Key, Event> auditProducer;

    public void start(Map<String, String> props) {
       auditProducer = new KafkaProducer<Key, Event>(auditProps);
    }

    public List<SourceRecord> poll() {
        List<SourceRecord> results = this.filePoller.poll();
        if(results.isEmpty() && eventNotSentForCurrentFile) {
          Event event = new Event();
          auditProducer.send(
          new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));

        }
       // futher processing  
     }

上述方法是否正确?

  1. 上面的解决方案工作正常 - 因为它运行一个任务(maxTasks = 1),但对于我们的用例,在sink任务(mongoDB连接)中实现这一点非常困难。由于此主题已分区,因此将创建许多任务。我们无法跟踪接收器任务的开始事件和结束事件

请建议一种方法来解决这个问题。

非常感谢。

答案

我想,你可以围绕Kafka-connect ReST API构建一些东西

https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-status

但是,通过这种方式,您需要将观察者保持在连接器状态,并且一旦完成连接器的所有任务,您就可以采取措施。

以上是关于kafka connect - 审计 - 在任务完成时触发事件的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:没有为连接器创建任务

Kafka Connect 接收器任务中多久触发一次 put()?

Kafka Connect 堆空间不足

一起聊聊Kafka Connect

Kafka Connect警报选项?

今日直播一起聊聊Kafka Connect