在logstash中以事务方式发送事件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在logstash中以事务方式发送事件相关的知识,希望对你有一定的参考价值。

我试图使用logstash从TCP套接字接收事件,并将它们输出到Kafka主题。我当前的配置能够完美地完成,但我希望能够以事务方式向Kafka执行事件。我的意思是,在收到提交消息之前,系统不应该将事件发送到kafka:

START TXN 123         --No message sent to Kafka
123 - Event1 Message  --No message sent to Kafka
123 - Event2 Message  --No message sent to Kafka
123 - Event3 Message  --No message sent to Kafka
COMMIT TXN 123           --Event1, Event2, Event3 messages sent to Kafka

是否有可能仅使用logstash实现此目的,还是应该在源代码和logstash之间引入任何其他事务协调器?这是我当前的配置:

input {
  tcp {
    port => 9000
  }
}

output {
  kafka { 
    bootstrap_servers => "localhost:9092"
    topic_id =>  "alpayk"
  }
}

为了这个目的,我尝试使用logstash的聚合过滤器,但我不能最终得到一些有效的东西。

非常感谢你提前

答案

我终于决定将Apache Flume用于此目的。我修改了它的netcat源代码,以便uncomited消息驻留在flume的堆中,一旦收到事务的提交消息,所有消息都被发送到kafka sink。

我将消息存储位置从水槽堆更改为外部缓存,这样我就可以在事务异常终止或回滚时使存储的消息到期。

下面是我对该事务逻辑的一段代码:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
    String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
    String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
    ArrayList<Event> events = cachedEvents.get(txnId);

    if (message.equals("COMMIT")) {

        System.out.println("@@@@@ COMMIT RECEIVED");

        if (events != null) {
            for (Event eventItem : events) {
                ChannelException ex = null;
                try {
                    source.getChannelProcessor().processEvent(eventItem);
                } catch (ChannelException chEx) {
                    ex = chEx;
                }

                if (ex == null) {
                    counterGroup.incrementAndGet("events.processed");
                } else {
                    counterGroup.incrementAndGet("events.failed");
                    logger.warn("Error processing event. Exception follows.", ex);
                }
            }

            cachedEvents.remove(txnId);
        }
    } else {
        System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
        if (events == null) {
            events = new ArrayList<Event>();
        }
        events.add(EventBuilder.withBody(message.getBytes()));
        cachedEvents.put(txnId, events);
    }
}

我将此代码添加到了Flume的netcat源代码的processEvents方法中。我不想使用Ruby代码,这就是我决定改用Flume的原因。但是,在logstash中也可以执行相同的操作。

谢谢

以上是关于在logstash中以事务方式发送事件的主要内容,如果未能解决你的问题,请参考以下文章

在片段中以编程方式在视图中包含布局

如何在android中以编程方式在片段之间导航?

提高android应用程序中以编程方式捕获的图像的质量[重复]

架构模式: 事务日志跟踪

在 Slick 3 中以事务方式使用

使用Logstash filter grok过滤日志文件