005- Flume Source之Custom

Posted BearData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了005- Flume Source之Custom相关的知识,希望对你有一定的参考价值。

      在一些业务场景中我们需要自定义Flume Source,Channel以及Sink,本篇我们将介绍如何自定义Source。

一,代码

1. pom.xml


<dependency>

<groupId>org.apache.flume</groupId>

<artifactId>flume-ng-core</artifactId>

<version>1.8.0</version>

</dependency>


2. 主类,CustomSource


package com.colin.beardata;

import org.apache.flume.Context;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.EventBuilder;

import org.apache.flume.source.AbstractSource;

import java.nio.charset.Charset;

import java.util.HashMap;

import java.util.Random;

/**

* Author: zhangxiong

* Date: 18-8-30 上午10:20

* Desc:

*/


public class CustomSource extends AbstractSource implements Configurable, PollableSource {

@Override

public long getBackOffSleepIncrement() {

// TODO Auto-generated method stub

return 0;

}

@Override

public long getMaxBackOffSleepInterval() {

// TODO Auto-generated method stub

return 0;

}

@Override

public Status process() throws EventDeliveryException {

Random random = new Random();

int randomNum = random.nextInt(100);

String text = "Hello beardata:" + random.nextInt(100);

HashMap<String, String> header = new HashMap<>();

header.put("id", Integer.toString(randomNum));

this.getChannelProcessor().processEvent(EventBuilder.withBody(text, Charset.forName("UTF-8"), header));

return Status.READY;

}

@Override

public void configure(Context arg0) {

}

}


3. 打包,拷贝到flume安装的lib目录下

mvn clean package

二,配置

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 指定Flume source

a1.sources.r1.type = com.colin.beardata.CustomSource

# 指定Flume sink

a1.sinks.k1.type = logger

# 指定Flume channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

# 绑定source和sink到channel上

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

#bin/flume-ng agent --conf conf --conf-file conf/custom_source.conf --name a1 -Dflume.root.logger=INFO,console

三,测试

我们启动自定义的Source后,在控制台输出了 Hello beardata+100以内的随机数

本篇我们主要介绍了Flume 自定义Source,下一篇我们将介绍Flume Memory Channel。



长按二维码,关注BearData

以上是关于005- Flume Source之Custom的主要内容,如果未能解决你的问题,请参考以下文章

Flume高级组件之Source Interceptors及项目实践

002- Flume Source之TailDir

004- Flume Source之Kafka

Flume之核心架构深入解析

Flume之核心架构深入解析

Hadoop实战-Flume之Source replicating(十四)