Flume学习系列---- Custom Interceptors(自定义拦截器)
Posted Java不睡觉
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume学习系列---- Custom Interceptors(自定义拦截器)相关的知识,希望对你有一定的参考价值。
我又是前言: 说一下学习自定义拦截器方法吧,导入flume的源码后,看flume-ng-core-xxx.jar
下的org.apache.flume.interceptor
包,里面有flume为我们写好的一些拦截器,我们只需要仿照这些类去写即可,同时可以查看接口的javadoc知道要重写的方法是什么作用。
一、流程
①搭建flume开发环境(巧妇难为无米之炊,你没开发环境怎么玩,程序都不知道你写的类是个啥)
②新建一个类,实现Interceptor
接口,重写intercept(Event event)
方法
③新建一个类,实现Interceptor.Builder
接口,重写configure(Context context)
和build()
方法
④打成jar包放到flume的lib目录下
⑤编写相应的flume.conf文件,将type值使用类的全限定名指定我们的拦截器。如果有自定义属性,需要配置该自定义属性。
二、搭建开发环境
新建一个maven工程,在pom.xml中添加如下依赖(我这里的版本号和我的flume版本号一致):
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency></dependencies>
二、编写自定义Interceptor和自定义Builder
代码不难,希望大家不要有恐惧心理,一看到代码段就脑壳疼。
package com.zhb.flume;
import java.util.List;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.base.Charsets;
public class MyInterceptor implements Interceptor {
private String ipAddress = null;
// 自定义属性 serviceId
public MyInterceptor(String ipAddress) {
this.ipAddress = ipAddress;
}
public void initialize() {
// TODO Auto-generated method stub
}
//拦截器的核心
public Event intercept(Event event) {
//获得body的内容
String eventBody = new String(event.getBody(), Charsets.UTF_8);
String fmt = "%s %s";
// 添加ipAddress 到event的开头
event.setBody(String.format(fmt, ipAddress, eventBody).getBytes());
return event;
}
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
public void close() {
// TODO Auto-generated method stub
}
package com.zhb.flume;
import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;
public class AppendIPBuilder implements Interceptor.Builder {
private String ipAddress = null;
public void configure(Context context) {
// set argument serviceId
String configServiceId = context.getString("ipAddress");
ipAddress = configServiceId;
}
public Interceptor build() {
return new MyInterceptor(ipAddress);
}
}
三、编写flume的配置文件appendIP.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zhb.flume.AppendIPBuilder
#这里配置的值和Builder里的变量名字要一样
a1.sources.r1.interceptors.i1.ipAddress= 192.168.1.101
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100
四、运行flume进行测试
首先进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/appendIP.conf -Dflume.root.logger=INFO,console -n a1
成功启动后,新开一个终端输入echo "AppendIPAddress" | nc 127.0.0.1 50000
这时,flume启动的终端的小齿轮转了起来,会心一笑,一定是成功了。
结果如下:
以上是关于Flume学习系列---- Custom Interceptors(自定义拦截器)的主要内容,如果未能解决你的问题,请参考以下文章