如何配置 Flume 来监听 web api http 请愿

Posted

技术标签:

【中文标题】如何配置 Flume 来监听 web api http 请愿【英文标题】:How to configure Flume to listen a web api http petitions 【发布时间】:2018-03-14 19:05:42 【问题描述】:

我已经构建了一个 api web 应用程序,它发布在 IIS 服务器上,我正在尝试配置 Apache Flume 来监听该 web api 并将 http 请愿的响应保存在 HDFS 中,这是我需要的 post 方法听:

    [HttpPost]
    public IEnumerable<Data> obtenerValores(arguments arg)
    
        Random rdm = new Random();

        int ano = arg.ano;
        int rdmInt;
        decimal rdmDecimal;

        int anoActual = DateTime.Now.Year;
        int mesActual = DateTime.Now.Month;

        List<Data> ano_mes_sales = new List<Data>();

        while (ano <= anoActual)
        
            int mes = 1;
            while ((anoActual == ano && mes <= mesActual) || (ano < anoActual && mes <= 12))
            
                rdmInt = rdm.Next();
                rdmDecimal = (decimal)rdm.NextDouble();
                Data anoMesSales = new Data(ano, mes,(rdmInt * rdmDecimal));
                ano_mes_sales.Add(anoMesSales);

                mes++;
            
            ano++;
        
        return ano_mes_sales;
    

Flume 在 VMware 虚拟机 CentO 上运行,这是我尝试配置 Flume 来监听该应用程序:

# Sources, channels, and sinks are defined per # agent name, in this case 'tier1'.
a1.sources  = source1
a1.channels = channel1
a1.sinks    = sink1
a1.sources.source1.interceptors = i1 i2 
a1.sources.source1.interceptors.i1.type = host
a1.sources.source1.interceptors.i1.preserveExisting = false
a1.sources.source1.interceptors.i1.hostHeader = host
a1.sources.source1.interceptors.i2.type = timestamp

# For each source, channel, and sink, set # standard properties.
a1.sources.source1.type     = org.apache.flume.source.http.HTTPSource
a1.sources.source1.bind     = transacciones.misionempresarial.com/CSharpFlume
a1.sources.source1.port     = 80

# JSONHandler is the default for the httpsource # 
a1.sources.source1.handler = org.apache.flume.source.http.JSONHandler
a1.sources.source1.channels = channel1
a1.channels.channel1.type   = memory
a1.sinks.sink1.type         = hdfs
a1.sinks.sink1.hdfs.path = /monthSales
a1.sinks.sink1.hdfs.filePrefix = event-file-prefix-
a1.sinks.sink1.hdfs.round = false
a1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel.
a1.channels.channel1.capacity = 1000 

我正在使用 curl 发帖,这是我的尝试:

curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '["ano":"2010"]' http://transacciones.misionempresarial.com/CSharpFlume/api/SourceFlume/ObtenerValores

我只得到这个错误:

"Message":"Error."

我的问题是,什么是配置水槽以监听我的 web api 的 http 请愿的正确方法,我缺少什么?

【问题讨论】:

你看过 Flume 日志吗?在这里发帖会很有帮助。我没有使用 Flume 的 http 源,但可以建议使用 Kafka REST API github.com/confluentinc/kafka-rest 和 Flume Kafka 源,而不是将 HTTP 消息直接发送到 Flume。如果您可以灵活地进一步更改架构,请将 Kafka 替换为将输出流写入 HDFS 的 Spark Steaming。 【参考方案1】:

标准的 Flume 'HTTPSource' 及其默认的 JSONHandler 只会以特定的、以 Flume 为中心的格式处理事件。

该格式记录在 in the user manual 中,并且还在 JSONHandler source code 开头的 cmets 中。

总之,它期望接收一个 JSON 对象列表,每个对象包含 headers(键/值对,映射到 Flume 事件标头)和 body(一个简单的字符串,映射到 Flume 事件主体) )。

举个例子,如果你发送:

["headers": , "body": "\"ano\":\"2010\""]

我想你会得到你想要的。

如果您无法灵活更改您发送的内容,那么您可以使用org.apache.flume.source.http.BLOBHandler,具体取决于您尝试执行的处理(注意。手册中没有关于此的文档,只有对于org.apache.flume.sink.solr.morphline.BlobHandler - 它们不是一回事,但在FLUME-2718 中有一些注释),或者您可能需要提供自己的Flume 的HTTPSourceHandler 接口的实现。

附注:HTTP Source bind 选项需要主机名或 IP 地址。您可能很幸运,您的值被视为主机名,而路径被忽略。

【讨论】:

以上是关于如何配置 Flume 来监听 web api http 请愿的主要内容,如果未能解决你的问题,请参考以下文章

[Flume]使用 Flume 来传递web log 到 hdfs 的例子

flume简介与监听文件目录并sink至hdfs实战

实时监听hive日志文件,并将内容打印到控制台上

Flume配置参数详解

Flume NG 学习笔记Source配置

flume与kafka集成配置