flume1.7.0自定义拦截器使用完整示例
Posted 数港
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume1.7.0自定义拦截器使用完整示例相关的知识,希望对你有一定的参考价值。
flume版本:flume1.7.0
Flume
拦截器(interceptor
)的主要作用,当Source
读取events
时:
– 在events header
中加入一些有用的信息
– 对events
的内容(header
或body
)进行过滤,完成初步的数据清洗。
拦截器是简单的插件式组件,设置在source
和channel
之间,在source
接收到数据写入channel
之前,拦截器都可以对event
进行转换或者删除。每个拦截器只处理同一个source
接收到的event
。flume
官方实现了一些拦截器也可以自定义拦截器,这在实际业务场景中非常有用。本文主要需求是通过自定义拦截器,实现向每条数据开头增加个标识字段,方便后续处理数据时判断数据的来源(在event
的body
的开始部分增加识别符)。
自定义拦截器只需要实现Interceptor
的继承类。本文使用maven
开发,具体步骤如下:
第一,导入flume-core包,引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
第二,创建一个实现Interceptor接口的类:
import java.nio.charset.Charset;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.HostInterceptor;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Interceptor class that appends a static, pre-configured body to all events.
*
* Properties:<p>
*
* key: Key fen ge fu.
* (default is "key")<p>
*
* value: Value content.
* (default is "value")<p>
*
* preserveExisting: Whether to preserve an existing value for 'key'
* (default is true)<p>
*
* Sample config:<p>
*
* <code>
* agent.sources.r1.channels = c1<p>
* agent.sources.r1.type = SEQ<p>
* agent.sources.r1.interceptors = i1<p>
* agent.sources.r1.interceptors.i1.type = body<p>
* agent.sources.r1.interceptors.i1.key = datacenter<p>
* agent.sources.r1.interceptors.i1.value= NYC_01<p>
* </code>
*
*/
public class BodyInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(BodyInterceptor.class);
private final String key;
private final String value;
/**
* Only {@link HostInterceptor.Builder} can build me
*/
private BodyInterceptor(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charset.forName("UTF-8"));
try{
// body为原始数据,newBody为处理后的数据,value:增加的识别符,key:增加的分隔符
String newBody = value + key + body;
event.setBody(newBody.getBytes());
}catch (Exception e){
e.printStackTrace();
}
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instance of the BodyInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private String key;
private String value;
@Override
public void configure(Context context) {
key = context.getString(Constants.KEY, Constants.KEY_DEFAULT);
value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
}
@Override
public Interceptor build() {
logger.info(String.format(
"Creating BodyInterceptor: key=%s,value=%s",key, value));
return new BodyInterceptor(key, value);
}
}
public static class Constants {
public static final String KEY = "key";
public static final String KEY_DEFAULT = "key";
public static final String VALUE = "value";
public static final String VALUE_DEFAULT = "value";
}
}
主要代码说明:BodyInterceptor
类用来拦截event
的body
信息并拼接相应信息,内部类Builder
类用来启动这个拦截器。
第三,将上面的代码打成一个jar包,上传至flume所在服务器,放置于$FLUME_HOME/lib
中。
第四,配置flume agent的配置文件,增加interceptor配置,红色部分,如下所示:
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources = filedir
#source配置
agent.sources.filedir.type = spooldir
agent.sources.filedir.spoolDir = /home/hadoop/bodyinterceptortest
agent.sources.filedir.deletePolicy= immediate
agent.sources.filedir.decodeErrorPolicy= IGNORE
agent.sources.filedir.includePattern= ^.*.*\\.txt$
agent.sources.filedir.ignorePattern= ^(.)*\\.tmp$
agent.sources.filedir.channels = memoryChannel
agent.sources.filedir.interceptors = i1
agent.sources.filedir.interceptors.i1.type = cn.com.bonc.flume.BodyInterceptor$Builder
agent.sources.filedir.interceptors.i1.key = |
agent.sources.filedir.interceptors.i1.value= bodyinterceptor
# channel配置
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 250000
agent.channels.memoryChannel.transactionCapacity = 200000
agent.channels.memoryChannel.byteCapacityBufferPercentage = 20
agent.channels.memoryChannel.byteCapacity = 524288000
#sink配置-cs_calllog_acess
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.topic = xxx
agent.sinks.kafkaSink.flumeBatchSize = 50000
agent.sinks.kafkaSink.kafka.bootstrap.servers = xxx:9092
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = snappy
第五,启动flume,interceptor生效,启动命令如下:
bin/flume-ng agent --conf conf -f conf/bodyinterceptor.conf -n agent -Dflume.root.logger=INFO,console
第六,验证。
数据源
106468759xxxx|250848|31|2018-05-19 15:32:07.0240096|6|dbscar.com|long.dbscar.com
106468759xxxx|250848|31|2018-05-19 15:32:04.2311649|6|baidu.com|map.baidu.com
106469884xxxx|250762|21|2018-05-19 15:32:05.5038380|6|mlizhi.com|admin.mlizhi.com
106468759xxxx|250848|31|2018-05-19 15:32:16.3832167|6|baidu.com|map.baidu.com
106468759xxxx|250848|31|2018-05-19 15:32:11.6441823|6|baidu.com|map.baidu.com
106464042xxxx|260304|31|2018-05-19 15:33:11.6937589|6|amap.com|restapi.amap.com
106468698xxxx|250040|32|2018-05-19 15:31:23.5020435|6|gepush.com|talk.gepush.com
106468759xxxx|250848|31|2018-05-19 15:32:11.7572139|6|baidu.com|map.baidu.com
106467913xxxx|258834|31|2018-05-19 15:31:22.4791450|6|tianxing.com|open.tianxing.com
106461499xxxx|253284|11|2018-05-19 15:33:44.4231527|6|huya.com|streamhls.huya.com
106468759xxxx|250848|31|2018-05-19 15:32:13.2241204|6|baidu.com|map.baidu.com
106468759xxxx|250848|31|2018-05-19 15:32:18.6443484|6|baidu.com|map.baidu.com
106464087xxxx|258497|31|2018-05-19 15:32:58.2904478|6|sinaimg.cn|n.sinaimg.cn
106468695xxxx|253231|21|2018-05-19 15:33:44.5599505|6|amap.com|mps.amap.com
结果
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:07.0240096|6|dbscar.com|long.dbscar.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:04.2311649|6|baidu.com|map.baidu.com
bodyinterceptor|106469884xxxx|250762|21|2018-05-19 15:32:05.5038380|6|mlizhi.com|admin.mlizhi.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:16.3832167|6|baidu.com|map.baidu.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:11.6441823|6|baidu.com|map.baidu.com
bodyinterceptor|106464042xxxx|260304|31|2018-05-19 15:33:11.6937589|6|amap.com|restapi.amap.com
bodyinterceptor|106468698xxxx|250040|32|2018-05-19 15:31:23.5020435|6|gepush.com|talk.gepush.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:11.7572139|6|baidu.com|map.baidu.com
bodyinterceptor|106467913xxxx|258834|31|2018-05-19 15:31:22.4791450|6|tianxing.com|open.tianxing.com
bodyinterceptor|106461499xxxx|253284|11|2018-05-19 15:33:44.4231527|6|huya.com|streamhls.huya.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:13.2241204|6|baidu.com|map.baidu.com
bodyinterceptor|106468759xxxx|250848|31|2018-05-19 15:32:18.6443484|6|baidu.com|map.baidu.com
bodyinterceptor|106464087xxxx|258497|31|2018-05-19 15:32:58.2904478|6|sinaimg.cn|n.sinaimg.cn
bodyinterceptor|106468695xxxx|253231|21|2018-05-19 15:33:44.5599505|6|amap.com|mps.amap.com
结论,达到了预期的目的:向每一条数据的开头都增加了标识字符bodyinterceptor
,程序稳定性还有待进一步测试。
但是这里有一个前提:不建议通过对event的body解析来设置header,因为flume就是一个水槽,水槽是不会在中间对水进行加工的,要加工,等水流出去了再加工。
同步更新:http://www.smartleon.net/archives/457
以上是关于flume1.7.0自定义拦截器使用完整示例的主要内容,如果未能解决你的问题,请参考以下文章
Groovy自定义 Xml 生成器 BuilderSupport ( 创建 XmlNode 节点 | 管理 XmlNode 节点并将根节点转为 Xml 信息 | 完整代码示例 )
Springboot中使用自定义参数注解获取 token 中用户数据
Android Gradle 插件Gradle 自定义 Plugin 插件 ⑥ ( 在 buildSrc 模块中依赖 Android Gradle 插件 | 完整代码示例 )
Android Gradle 插件Gradle 自定义 Plugin 插件 ⑥ ( 在 buildSrc 模块中依赖 Android Gradle 插件 | 完整代码示例 )
GroovyMOP 元对象协议与元编程 ( 使用 Groovy 元编程进行函数拦截 | 实现 GroovyInterceptable 接口 | 重写 invokeMethod 方法 )