flume中自定义sink InterCeptor
Posted 飞机耳朵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume中自定义sink InterCeptor相关的知识,希望对你有一定的参考价值。
SinkProcessor: ============================ FailOver: Load balancing : //负载均衡处理器 //round_robin 轮询 1-2-3-1-2-3-... //random 随机 1-3-2-3-1-... 1、round_robin 轮询 1-2-3-1-2-3-... 2、random 随机: 自定义Sink && InterCeptor ============================================= 1、pom <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.8.0</version> </dependency> 2、编写sink: public class MySink extends AbstractSink { public Status process() throws EventDeliveryException { //初始化status Status result = Status.READY; //得到channel对象 Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; try { //开启事务 transaction.begin(); //从channel中获取事件 event = channel.take(); if (event != null) { //在事件中手动添加头部 Map<String,String> map = new HashMap<String, String>(); map.put("TimeStamp",System.currentTimeMillis() + ""); event.setHeaders(map); //获取body byte[] body = event.getBody(); //获取head值 String headVal = event.getHeaders().get("TimeStamp"); System.out.println("head: "+ headVal + "\tbody: " + new String(body)); } else { //没有事件,即为backoff result = Status.BACKOFF; } //提交事务 transaction.commit(); } catch (Exception ex) { //回滚事务 transaction.rollback(); throw new EventDeliveryException("Failed to log event: " + event, ex); } finally { //关闭事务 transaction.close(); } return result; } } 3、打包并放在/soft/flume/lib下 4、使用自定义sink a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = com.oldboy.flume.MySink # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 自定义拦截器:InterCeptor event大小拦截器 ================================ 自定义限速拦截器: 1、设置参数 2、设置带参构造 3、创建Builder内部类,通过Builder构造对象 1)通过configure得到参数的值或默认值 2)通过build方法,构建InterCeptor对象 4、在Constants内部类设置常量值 public class SpeedInterceptor implements Interceptor { private int speed; public SpeedInterceptor(int speed) { this.speed = speed; } public void initialize() { } /** * 对event进行修改 * 限速拦截器,限速范围,单个事件 * 时间范围需注意第一个时间 * speed = bodySize / time * * 对上一个事件进行速度计算,如果速度过快,sleep * lastTime * lastBodySize */ private long lastTime = -1 ; private long lastBodySize = 0; public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); //获取body的长度 long bodySize = event.getBody().length; //获取当前时间 long current = System.currentTimeMillis(); //第一个事件 if(lastTime == -1){ lastTime = current; lastBodySize = bodySize; } //非第一个事件 else { long duration = current - lastTime; int currSpeed = (int) ((double)lastBodySize / duration * 1000); //速度没超 if( speed >= currSpeed){ return event; } //速度超了 else { try { Thread.sleep(lastBodySize/speed * 1000 - duration); } catch (Exception e) { e.printStackTrace(); } } lastBodySize = bodySize; lastTime = System.currentTimeMillis(); } return event; } public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } public void close() { } public static class Builder implements Interceptor.Builder { private int speed; public void configure(Context context) { //相当于 context.getInteger("speed",1024); speed = context.getInteger(Constants.SPEED, Constants.SPEED_DEFAULT); } public Interceptor build() { return new SpeedInterceptor(this.speed); } } public static class Constants { public static final String SPEED = "speed"; public static final int SPEED_DEFAULT = 1024; } } 自定义拦截器使用方法: ============================== 1、编写代码,打包并放入/soft/flume/lib下 2、编写配置文件i_speed.conf # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = seq # 给拦截器起名 a1.sources.r1.interceptors = i1 # 指定拦截器类型 a1.sources.r1.interceptors.i1.type = com.oldboy.flume.SpeedInterceptor$Builder a1.sources.r1.interceptors.i1.speed = 1 a1.sources.r1.interceptors.i1.speed2 = 10 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、flume-ng agent -n a1 -f i_speed.conf 注意:SinkProcessor和ChannelSelector ChannelSelector:挑选通道 ----> sink SinkProcessor: 挑选sink 在配置SinkProcessor的时候,注意ChannelSelector要设为默认(不配置) 使用ZK进行flume配置管理: ============================================ 1、在zk客户端创建节点(/flume/a1) //注意:节点a1是agent名称 zkCli.sh -server s102:2181 2、在/flume/a1节点添加数据,使用zooInspector即可 //中文字符会出现乱码 可以使用idea插件(zookeeper) a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、尝试启动flume flume-ng agent -n a1 -z s102:2181 -p /flume
以上是关于flume中自定义sink InterCeptor的主要内容,如果未能解决你的问题,请参考以下文章
Flume自定义SourceSink和Interceptor(简单功能实现)