如何将一个事件拆分为多个事件以将它们发送到多路复用扇出流

Posted

技术标签:

【中文标题】如何将一个事件拆分为多个事件以将它们发送到多路复用扇出流【英文标题】:How to split an event to multiple events to send them to multiplexed fan out flow 【发布时间】:2015-03-11 08:12:59 【问题描述】:

我们计划使用 kafka flume-ng 集成(Flafka),其中 flume 是 kafka 队列的消费者。 Flume 代理将接收列出命令及其输出的文件,如下所示:

root@host> [Command1]

[Output1]

root@host> [Command2]

[Output2]

该文件可能包含多个命令,一个命令的输出可能很大。我们需要拦截事件(也就是文件数据),并根据命令将事件拆分成多个事件。然后源将流分散到多个通道,将每个子事件发送到一个通道(使用多路复用),每个接收器将命令信息存储到相应的 Hive 表中。 是否可以使用扇出流将事件拆分为多个事件?或者如果我以其他方式问,我们可以在拦截器中将一个事件拆分为多个事件吗?

我已阅读有关正则表达式提取器拦截器和序列化程序的信息,但不确定它是否对这种情况有任何帮助。

【问题讨论】:

【参考方案1】:

如果我理解得很好,您需要将来自 Kafka 队列的原始事件拆分为几个,比如说,子事件。你想知道哪一块 Flume 可以做到这一点。

我认为拦截器不适合这个目的,因为拦截器被“放置”在源和通道之间,它们旨在添加、删除或修改有关 Flume 事件的标头,然后再将其放入通道;同样,他们可以放弃整个事件。但他们无法根据其他现有事件生成多个事件。

我认为您正在寻找类似于附加到源的处理程序,能够解释从 Kafka 获取的事件并在源输出中生成多个 Flume 事件。这个概念类似于您可以附加到HTTPSoure 的处理程序(更多详细信息here)。如果您的源代码可以实现这样的事情,那么您很可能必须开发自己的自定义处理程序,因为您需要的功能非常具体。

【讨论】:

【参考方案2】:

感谢frb的回复。

我想将传入事件拆分为一个水槽源到多个子事件并将它们发送到各自的通道。因此拓扑中的第一个水槽节点会将每个子事件(使用多路复用)路由到可以处理此类信息的特定跃点。

根据您的回复,我知道使用拦截器无法完成。您能否分享处理程序的任何示例或文档?

【讨论】:

我一直在查看Kafka source,但我没有看到与Httpsource 可用的处理程序功能类似的东西。 另外,我一直在分析Interceptor的Java接口,想看看能不能做点什么……intercept有两个方法:第一个有一个事件作为输入,单个事件作为输出;第二个接收事件列表并输出另一个事件列表,但文档明确指出输出列表的大小不得大于输入列表的大小。因此,似乎无法使用自定义拦截器。还在研究中…… 是的,我发现拦截器也是如此。一种可能的解决方案可能是:从 Kafka 读取数据的第一个水槽节点必须具有 Kakfa Source,因此它只会将事件传递给下一个跃点/代理。第二个节点的源可以使用 HTTPSource 类型,它将绑定到端口的本地主机或第一个水槽代理的 IP(如果它在不同的机器上)。这样,第一个节点的接收器会将事件发送到 HTTPSource,HTTPSource 将拆分事件,然后我们可以从那里扇出流。这似乎是一个可行的解决方案?我需要对此进行测试,但不确定这是否可行。 这对我来说似乎是合理的。请记住,您需要为第二个水槽代理的 HTTPSource 创建一个自定义处理程序。这是通过实现HttpSourceHandler 接口来完成的。【参考方案3】:

是的,flume 不能将事件拆分为多个。这是我对这种方法的替代解决方案,以 Kafka 源为例。

首先实现一个扩展Kafka源码的源码类,替换默认的ChannelProcessor对象。

public class XXXSplitSource extends KafkaSource 

    @Override
    public synchronized ChannelProcessor getChannelProcessor()
    
        return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
    

然后,在 ChannelProcessor 代理实现中,您可以使用自定义函数拆分事件。

public class XXXYourChannelProcessorProxy  extends ChannelProcessor 
    public ChannelProcessor  m_downstreamChannelProcessor = null;

    public XXXYourChannelProcessorProxy (ChannelSelector selector) 
        super(selector);
    

    public XXXYourChannelProcessorProxy (ChannelProcessor processor) 
        super(null);
        m_downstreamChannelProcessor = processor;
    

    @Override
    public void processEventBatch(List<Event> events) 
        List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
        m_downstreamChannelProcessor.processEventBatch(generatedEvents);    
    

【讨论】:

以上是关于如何将一个事件拆分为多个事件以将它们发送到多路复用扇出流的主要内容,如果未能解决你的问题,请参考以下文章

LinuxI/O多路复用

NIO和IO多路复用

redis事件

Redis 设计与实现

IO多路复用之epoll总结

IO多路复用之epoll总结