自定义Source sink
Posted xjqi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义Source sink相关的知识,希望对你有一定的参考价值。
1 导入pom依赖 2 <dependencies> 3 <dependency> 4 <groupId>org.apache.flume</groupId> 5 <artifactId>flume-ng-core</artifactId> 6 <version>1.7.0</version> 7 </dependency> 8 9 </dependencies> 10 编写代码 11 package com.baway.flume; 12 13 import org.apache.flume.Context; 14 import org.apache.flume.Event; 15 import org.apache.flume.EventDeliveryException; 16 import org.apache.flume.PollableSource; 17 import org.apache.flume.channel.ChannelProcessor; 18 import org.apache.flume.conf.Configurable; 19 import org.apache.flume.event.SimpleEvent; 20 import org.apache.flume.source.AbstractSource; 21 22 import java.util.HashMap; 23 24 25 public class MySource extends AbstractSource implements Configurable, PollableSource { 26 27 //前缀参数,从配置文件中获取 28 private String prefix; 29 //后缀参数,从配置文件中获取 30 private String suffix; 31 //数据生成延迟时间参数 32 private Long delay; 33 34 //数据生成条数参数 35 private int n; 36 37 public Status process() throws EventDeliveryException { 38 ChannelProcessor channelProcessor = getChannelProcessor(); 39 Status status; 40 41 try { 42 for(int i = 0; i < n; i++){ 43 Event event = new SimpleEvent(); 44 event.setBody((prefix + i + suffix).getBytes()); 45 event.setHeaders(new HashMap<String, String>()); 46 channelProcessor.processEvent(event); 47 Thread.sleep(delay); 48 } 49 status = Status.READY; 50 } catch (Exception e){ 51 status = Status.BACKOFF; 52 } 53 54 return status; 55 } 56 57 //设置每次回滚等待增加的时间 58 public long getBackOffSleepIncrement() { 59 return 0; 60 } 61 62 //设置回滚等待时间上限 63 public long getMaxBackOffSleepInterval() { 64 return 0; 65 } 66 67 public void configure(Context context) { 68 prefix = context.getString("prefix", "Default"); 69 suffix = context.getString("suffix", "SDfault"); 70 71 delay = context.getLong("delay",2000L); 72 n = context.getInteger("count", 5); 73 74 } 75 } 76 77 5)测试 78 1.打包 79 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。 80 2.配置文件 81 # Name the components on this agent 82 a1.sources = r1 83 a1.sinks = k1 84 a1.channels = c1 85 86 # Describe/configure the source 87 a1.sources.r1.type = com.baway.flume.MySource 88 a1.sources.r1.delay = 1000 89 a1.sources.r1.prefix = baway 90 a1.sources.r1.count = 10 91 92 # Describe the sink 93 a1.sinks.k1.type = logger 94 95 # Use a channel which buffers events in memory 96 a1.channels.c1.type = memory 97 a1.channels.c1.capacity = 1000 98 a1.channels.c1.transactionCapacity = 100 99 100 # Bind the source and sink to the channel 101 a1.sources.r1.channels = c1 102 a1.sinks.k1.channel = c1 103 3.开启任务 104 [root@hadoop102 flume]$ pwd 105 /opt/module/flume 106 [root@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console 107 4.结果展示
1 package com.bawei; 2 3 import org.apache.flume.*; 4 import org.apache.flume.conf.Configurable; 5 import org.apache.flume.sink.AbstractSink; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 public class MySink extends AbstractSink implements Configurable { 10 11 //创建Logger对象 12 private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); 13 14 private String prefix; 15 private String suffix; 16 17 @Override 18 public Status process() throws EventDeliveryException { 19 20 //声明返回值状态信息 21 Status status; 22 23 //获取当前Sink绑定的Channel 24 Channel ch = getChannel(); 25 26 //获取事务 27 Transaction txn = ch.getTransaction(); 28 29 //声明事件 30 Event event; 31 32 //开启事务 33 txn.begin(); 34 35 //读取Channel中的事件,直到读取到事件结束循环 36 while (true) { 37 event = ch.take(); 38 if (event != null) { 39 break; 40 } 41 } 42 try { 43 //处理事件(打印) 44 LOG.info(prefix + new String(event.getBody()) + suffix); 45 46 //事务提交 47 txn.commit(); 48 status = Status.READY; 49 } catch (Exception e) { 50 51 //遇到异常,事务回滚 52 txn.rollback(); 53 status = Status.BACKOFF; 54 } finally { 55 56 //关闭事务 57 txn.close(); 58 } 59 return status; 60 } 61 62 @Override 63 public void configure(Context context) { 64 65 //读取配置文件内容,有默认值 66 prefix = context.getString("prefix", "hello:"); 67 68 //读取配置文件内容,无默认值 69 suffix = context.getString("suffix"); 70 } 71 } 72 4)测试 73 1.打包 74 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。 75 2.配置文件 76 # Name the components on this agent 77 a1.sources = r1 78 a1.sinks = k1 79 a1.channels = c1 80 81 # Describe/configure the source 82 a1.sources.r1.type = netcat 83 a1.sources.r1.bind = localhost 84 a1.sources.r1.port = 44444 85 86 # Describe the sink 87 a1.sinks.k1.type = com.bawei.MySink 88 #a1.sinks.k1.prefix = bawei: 89 a1.sinks.k1.suffix = :bawei 90 91 # Use a channel which buffers events in memory 92 a1.channels.c1.type = memory 93 a1.channels.c1.capacity = 1000 94 a1.channels.c1.transactionCapacity = 100 95 96 # Bind the source and sink to the channel 97 a1.sources.r1.channels = c1 98 a1.sinks.k1.channel = c1 99 3.开启任务 100 [root@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console 101 [root@hadoop102 ~]$ nc localhost 44444 102 hello 103 OK 104 Flume 105 OK
以上是关于自定义Source sink的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Source端