005- Flume Source之Custom
Posted BearData
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了005- Flume Source之Custom相关的知识,希望对你有一定的参考价值。
在一些业务场景中我们需要自定义Flume Source,Channel以及Sink,本篇我们将介绍如何自定义Source。
一,代码
1. pom.xml
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
2. 主类,CustomSource
package com.colin.beardata;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Random;
/**
* Author: zhangxiong
* Date: 18-8-30 上午10:20
* Desc:
*/
public class CustomSource extends AbstractSource implements Configurable, PollableSource {
@Override
public long getBackOffSleepIncrement() {
// TODO Auto-generated method stub
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
// TODO Auto-generated method stub
return 0;
}
@Override
public Status process() throws EventDeliveryException {
Random random = new Random();
int randomNum = random.nextInt(100);
String text = "Hello beardata:" + random.nextInt(100);
HashMap<String, String> header = new HashMap<>();
header.put("id", Integer.toString(randomNum));
this.getChannelProcessor().processEvent(EventBuilder.withBody(text, Charset.forName("UTF-8"), header));
return Status.READY;
}
@Override
public void configure(Context arg0) {
}
}
3. 打包,拷贝到flume安装的lib目录下
mvn clean package
二,配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = com.colin.beardata.CustomSource
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#bin/flume-ng agent --conf conf --conf-file conf/custom_source.conf --name a1 -Dflume.root.logger=INFO,console
三,测试
我们启动自定义的Source后,在控制台输出了 Hello beardata+100以内的随机数
本篇我们主要介绍了Flume 自定义Source,下一篇我们将介绍Flume Memory Channel。
长按二维码,关注BearData
以上是关于005- Flume Source之Custom的主要内容,如果未能解决你的问题,请参考以下文章