自定义Source sink

Posted xjqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义Source sink相关的知识,希望对你有一定的参考价值。

  1 导入pom依赖
  2 <dependencies>
  3     <dependency>
  4         <groupId>org.apache.flume</groupId>
  5         <artifactId>flume-ng-core</artifactId>
  6         <version>1.7.0</version>
  7 </dependency>
  8 
  9 </dependencies>
 10 编写代码
 11 package com.baway.flume;
 12 
 13 import org.apache.flume.Context;
 14 import org.apache.flume.Event;
 15 import org.apache.flume.EventDeliveryException;
 16 import org.apache.flume.PollableSource;
 17 import org.apache.flume.channel.ChannelProcessor;
 18 import org.apache.flume.conf.Configurable;
 19 import org.apache.flume.event.SimpleEvent;
 20 import org.apache.flume.source.AbstractSource;
 21 
 22 import java.util.HashMap;
 23 
 24 
 25 public class MySource extends AbstractSource implements Configurable, PollableSource {
 26 
 27     //前缀参数,从配置文件中获取
 28     private String prefix;
 29     //后缀参数,从配置文件中获取
 30     private String suffix;
 31     //数据生成延迟时间参数
 32     private Long delay;
 33 
 34     //数据生成条数参数
 35     private int n;
 36 
 37     public Status process() throws EventDeliveryException {
 38         ChannelProcessor channelProcessor = getChannelProcessor();
 39         Status status;
 40 
 41         try {
 42             for(int i = 0; i < n; i++){
 43                 Event event = new SimpleEvent();
 44                 event.setBody((prefix + i + suffix).getBytes());
 45                 event.setHeaders(new HashMap<String, String>());
 46                 channelProcessor.processEvent(event);
 47                 Thread.sleep(delay);
 48             }
 49             status = Status.READY;
 50         } catch (Exception e){
 51             status = Status.BACKOFF;
 52         }
 53 
 54         return status;
 55     }
 56 
 57     //设置每次回滚等待增加的时间
 58     public long getBackOffSleepIncrement() {
 59         return 0;
 60     }
 61 
 62     //设置回滚等待时间上限
 63     public long getMaxBackOffSleepInterval() {
 64         return 0;
 65     }
 66 
 67     public void configure(Context context) {
 68         prefix = context.getString("prefix", "Default");
 69         suffix = context.getString("suffix", "SDfault");
 70 
 71         delay = context.getLong("delay",2000L);
 72         n = context.getInteger("count", 5);
 73 
 74     }
 75 }
 76 
 77 5)测试
 78 1.打包
 79 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。
 80 2.配置文件
 81 # Name the components on this agent
 82 a1.sources = r1
 83 a1.sinks = k1
 84 a1.channels = c1
 85 
 86 # Describe/configure the source
 87 a1.sources.r1.type = com.baway.flume.MySource
 88 a1.sources.r1.delay = 1000
 89 a1.sources.r1.prefix = baway
 90 a1.sources.r1.count = 10
 91 
 92 # Describe the sink
 93 a1.sinks.k1.type = logger
 94 
 95 # Use a channel which buffers events in memory
 96 a1.channels.c1.type = memory
 97 a1.channels.c1.capacity = 1000
 98 a1.channels.c1.transactionCapacity = 100
 99 
100 # Bind the source and sink to the channel
101 a1.sources.r1.channels = c1
102 a1.sinks.k1.channel = c1
103 3.开启任务
104 [root@hadoop102 flume]$ pwd
105 /opt/module/flume
106 [root@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
107 4.结果展示
  1 package com.bawei;
  2 
  3 import org.apache.flume.*;
  4 import org.apache.flume.conf.Configurable;
  5 import org.apache.flume.sink.AbstractSink;
  6 import org.slf4j.Logger;
  7 import org.slf4j.LoggerFactory;
  8 
  9 public class MySink extends AbstractSink implements Configurable {
 10 
 11     //创建Logger对象
 12     private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
 13 
 14     private String prefix;
 15     private String suffix;
 16 
 17     @Override
 18     public Status process() throws EventDeliveryException {
 19 
 20         //声明返回值状态信息
 21         Status status;
 22 
 23         //获取当前Sink绑定的Channel
 24         Channel ch = getChannel();
 25 
 26         //获取事务
 27         Transaction txn = ch.getTransaction();
 28 
 29         //声明事件
 30         Event event;
 31 
 32         //开启事务
 33         txn.begin();
 34 
 35         //读取Channel中的事件,直到读取到事件结束循环
 36         while (true) {
 37             event = ch.take();
 38             if (event != null) {
 39                 break;
 40             }
 41         }
 42         try {
 43             //处理事件(打印)
 44             LOG.info(prefix + new String(event.getBody()) + suffix);
 45 
 46             //事务提交
 47             txn.commit();
 48             status = Status.READY;
 49         } catch (Exception e) {
 50 
 51             //遇到异常,事务回滚
 52             txn.rollback();
 53             status = Status.BACKOFF;
 54         } finally {
 55 
 56             //关闭事务
 57             txn.close();
 58         }
 59         return status;
 60     }
 61 
 62     @Override
 63     public void configure(Context context) {
 64 
 65         //读取配置文件内容,有默认值
 66         prefix = context.getString("prefix", "hello:");
 67 
 68         //读取配置文件内容,无默认值
 69         suffix = context.getString("suffix");
 70     }
 71 }
 72 4)测试
 73 1.打包
 74 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。
 75 2.配置文件
 76 # Name the components on this agent
 77 a1.sources = r1
 78 a1.sinks = k1
 79 a1.channels = c1
 80 
 81 # Describe/configure the source
 82 a1.sources.r1.type = netcat
 83 a1.sources.r1.bind = localhost
 84 a1.sources.r1.port = 44444
 85 
 86 # Describe the sink
 87 a1.sinks.k1.type = com.bawei.MySink
 88 #a1.sinks.k1.prefix = bawei:
 89 a1.sinks.k1.suffix = :bawei
 90 
 91 # Use a channel which buffers events in memory
 92 a1.channels.c1.type = memory
 93 a1.channels.c1.capacity = 1000
 94 a1.channels.c1.transactionCapacity = 100
 95 
 96 # Bind the source and sink to the channel
 97 a1.sources.r1.channels = c1
 98 a1.sinks.k1.channel = c1
 99 3.开启任务
100 [root@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
101 [root@hadoop102 ~]$ nc localhost 44444
102 hello
103 OK
104 Flume
105 OK

 

以上是关于自定义Source sink的主要内容,如果未能解决你的问题,请参考以下文章

自定义Source sink

flink02------1.自定义source

FLINK 基于1.15.2的Java开发-自定义Source端

Flink sql 实现 -connection-clickhouse的 source和 sink

005- Flume Source之Custom

大数据Flume自定义类型