如何使用 Flume 在源上执行预处理并将真实文件名保留在 hdfs 接收器中

Posted

技术标签:

【中文标题】如何使用 Flume 在源上执行预处理并将真实文件名保留在 hdfs 接收器中【英文标题】:How to use Flume executing pre-process on source and keeping real filename in hdfs sink 【发布时间】:2014-12-23 13:47:59 【问题描述】:

我是使用 Apache Flume 的新手,我很难理解它是如何工作的。为了解释我的问题,所以我解释了我的需要和我做了什么。

我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。

我确定“假脱机目录”源和 HDFS 接收器是我需要的。那就是给我这个flume.conf文件

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.writeFormat=Text    

#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

结果是输入文件在我的本地文件系统上被重命名为“.complete”,并且数据被上传到 HDFS 上,我猜它是唯一的,由 Flume 生成。

这几乎是我所需要的。

但在上传之前,我想做一些特定于文件的操作(删除标题,转义逗号..)。我不知道该怎么做,我考虑使用拦截器。但是,当数据在水槽中时,它会在事件中转换并流式传输。在他的点上,对文件一无所知。

否则,文件名中会写入原始时间事件,所以我希望这个时间与我的事件相关联,而不是当前日期。

我还想将原始文件名保留在 hdfs 中(其中有一些有用的信息)。

有人有什么建议可以帮助我吗?

【问题讨论】:

【参考方案1】:

如果您指定,原始文件名可以保留为标题

agent.sources.seqGenSrc.fileHeader=true 

然后可以在您的接收器中检索。

如果您想操作文件中的数据,请使用拦截器。您应该知道,事件基本上是 spool 目录中文件中的一行。

最后但同样重要的是,您需要使用 fileHeader 属性将事件通过管道传回正确的文件。这可以通过指定接收器中的路径来实现,如下所示:

agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data/%file

您可以使用前缀和后缀来进一步配置文件名:

hdfs.filePrefix FlumeData   Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix –   Suffix to append to file (eg .avro - NOTE: period is not automatically added)

【讨论】:

感谢您的回答,我对这些设置有疑问。是否可以在 HDFS 上获取与原始本地文件相同的文件? 定义相同。您可以确定的是,只要您使用 fileHeaders,给定时间范围内的事件将被假脱机到同一个文件。 Flume 会将事件路由到正确的文件。不能保证水槽会以相同的顺序刷新事件,你也不能依赖时间。如果假脱机机制比文件超时慢,您还会在 HDFS 的新文件中找到事件。 感谢您回答我的问题。 根据我在路径中使用 %file 的测试,将在 hdfs 中生成绝对文件夹路径,如果最后一个文件夹以原始文件名命名,并且在其中您将有一个文件(基于后缀和前缀)与数据。例如文件 /home/cto/document/myfile.txt 将在 hdfs 的以下路径中结束 /home/cto/document/myfile.txt/flume-xxx.suffix 您可以在任何地方使用 %file 变量,包括 hdfs.prefix 属性。如果您将该变量从路径移动到前缀,您将获得所需的行为。

以上是关于如何使用 Flume 在源上执行预处理并将真实文件名保留在 hdfs 接收器中的主要内容,如果未能解决你的问题,请参考以下文章

ADF 映射数据流,是不是可以在源上执行 SQL?

使用flume将csv文件传输到hdfs,并将它们转换为avro

如何将自己的插件发布到npm上 / 发布到公司npm源上

如何将自己的插件发布到npm上 / 发布到公司npm源上

如何将自己的插件发布到npm上 / 发布到公司npm源上

sh 在源路径中查找文件并将其复制到目标。该命令将覆盖目标文件夹中的文件而不询问。