Flume配置项目 离线数仓项目
Posted mengbin0546
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume配置项目 离线数仓项目相关的知识,希望对你有一定的参考价值。
第一层Flume 从 source-> KAFKA .[kafka作为channel]
1.安装的 Flume是1.7的版本
安装略
日志文件分为两种一种是启动日志,一种是事件日志. 通过selector选择器把这两个日志分开,并且过滤一些空数据.
我们先写选择器吧.
1. 建立一个工程. maven工程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.atguigu</groupId> <artifactId>ETLExceptor</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> </project>
以下两个类是自己写的有问题 。
MyInterceptor
package com.atguigu.dw.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { //创建符合要求的数据集合 private List<Event> results = new ArrayList<Event>(); private String startFlag = "start"; public void initialize() { } //核心方法,拦截event public Event intercept(Event event) { byte[] body = event.getBody(); String bodyStr = new String(body, Charset.forName("utf-8")); //header中添加key Map<String, String> headers = event.getHeaders(); boolean flag = true; //符合启动日志特征 if (bodyStr.contains(startFlag)) { headers.put("topic", "topic_start"); flag = ETLUtils.validaStratLog(bodyStr); } else { //事件日志 headers.put("topic", "topic_event"); flag = ETLUtils.validEvent(bodyStr); } //如果验证结果是false if (!flag) { return null; } return event; } // public List<Event> intercept(List<Event> events) { //先清空results for (Event event : events) { Event result = intercept(event); //有可能intercepts(event), event不符合要求,会拦截掉返回null if (result != null) { //放入合法的数据集合中 results.add(result); } } return results; } public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { //从flume的配置文件中读取参数。 } } }
Utils
package com.atguigu.dw.flume; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; public class ETLUtils { //判断启动startLog日志是否符合要求 //验证json字符串的完整性,是否已{} 开通 public static boolean validaStratLog(String source) { //1. 判断body部分是否有数据 if ((StringUtils.isBlank(source))) { return false; } //2. 去前后空格 String trimStr = source.trim(); //3. 验证json字符串的完整性,是否以{}开头 if (trimStr.startsWith("{") && trimStr.endsWith("}")) { return true; } return false; } //判断事件Event日志是否符合要求 //"1593089978858|{"cm":{"ln":"-75.0","sv": " + // ""V2.3.8","os":"8.1.2","g":"4LALWMU9@gmail.com","mid":"5","nw":"4G","l":"pt","vc":"17","hw":"750*1134","ar":" " + // "MX","uid":"5","t":"1593008192223","la":"-35.0","md":"HTC-13","vn":"1.1.1","ba":"HTC","sr":"M"},"ap":"app","e " + // "t":[{"ett":"1593072416695","en":"loading","kv":{"extend2":"","loading_time":"3","action":"2","extend1":"","t " + // "ype":"3","type1":"325","loading_way":"1"}},{"ett":"1593070499363","en":"ad","kv":{"entry":"1","show_style":" " + // "5","action":"4","detail":"201","source":"3","behavior":"2","content":"1","newstype":"0"}},{"ett":"1593022214 " + // "129","en":"active_foreground","kv":{"access":"","push_id":"3"}}]} "; public static boolean validEvent(String source) { //1. 判断body部分是否有数据 if ((StringUtils.isBlank(source))) { return false; } //2. 去前后空格 String trimStr = source.trim(); //3. String[] words = trimStr.split("\|"); if (words.length != 2) { return false; } //判斷时间戳 if (words[0].length() != 13 || !NumberUtils.isDigits(words[0])) { return false; } //3. 验证json字符串的完整性,是否以{}开头 if (words[1].startsWith("{") && words[1].endsWith("}")) { return true; } return false; } }
打成jar包传入lib目录下
Flume conf文件编写案例1 . 编辑 config文件 这个是测试用的.
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #组名名.属性名=属性值 a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$ //定义了数据源的格式 #json保存的位置 a1.sources.r1.positionFile=/opt/module/log_postition.json #定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor //Lib目录下的拦截器 #定义sink a1.sinks.k1.type=logger a1.sinks.k1.maxBytesToLog=100 #定义chanel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
我们打开log4j文件
# -Dflume.root.logger=DEBUG,console when launching flume. 这个是debug的命令 ,可以很好的排错 #flume.root.logger=DEBUG,console flume.root.logger=INFO,LOGFILE flume.log.dir=./logs flume.log.file=flume.log log4j.logger.org.apache.flume.lifecycle = INFO log4j.logger.org.jboss = WARN log4j.logger.org.mortbay = INFO log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN log4j.logger.org.apache.hadoop = INFO log4j.logger.org.apache.hadoop.hive = ERROR # Define the root logger to the system property "flume.root.logger". log4j.rootLogger=${flume.root.logger} # Stock log4j rolling file appender # Default log rotation configuration log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender log4j.appender.LOGFILE.MaxFileSize=100MB log4j.appender.LOGFILE.MaxBackupIndex=10 log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file} log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n # Warning: If you enable the following appender it will fill up your disk if you don‘t have a cleanup job! # This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy. # See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html # Add "DAILY" to flume.root.logger above if you want to use this log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file} log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd} log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n # console # Add "console" to flume.root.logger above if you want to use this log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
开启 flume
/opt/module/apache-flume-1.7.0-bin/bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG
即可在生成日志的情况下,查看
cd /tmp/log
lg.sh 命令
扯到另外一个问题,如何制造日志文件
#! /bin/bash for i in hadoop101 hadoop101 do ssh $i " source /etc/profile ; java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atgu #这个是自己制作的 log生成jar文件. igu.appclient.AppMain $1 $2 >/opt/module/test.log &" #红色输出到一个位置,意义不大. done
代码里会看到放入到了/tmp/log目录下.
开启
#一下是正确的 信息 -2.7.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.3/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.3/contrib/capacity-scheduler/*.jar:/lib/*‘ -Djava.library.path=:/opt/module/hadoop-2.7.3/lib/native org.apache.flume.node.Application --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Creating channels 20/06/27 10:54:39 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Created channel c1 20/06/27 10:54:39 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR 20/06/27 10:54:39 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 20/06/27 10:54:39 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:Taildir source: { positionFile: /opt/module/log_postition.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@736c21d4 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 20/06/27 10:54:39 INFO node.Application: Starting Channel c1 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 20/06/27 10:54:39 INFO node.Application: Starting Sink k1 20/06/27 10:54:39 INFO node.Application: Starting Source r1 20/06/27 10:54:39 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/^app.+.log$} 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: taildirCache: [{filegroup=‘f1‘, filePattern=‘/tmp/logs/^app.+.log$‘, cached=true}] 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: headerTable: {} 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Opening file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 0 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/module/log_postition.json 20/06/27 10:54:39 INFO taildir.TailFile: Updated position, file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 5693224 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
#一下是正确的 信息 -2.7.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-2.7.3/share/hadoop/mapreduce/*:/opt/module/hadoop-2.7.3/contrib/capacity-scheduler/*.jar:/lib/*‘ -Djava.library.path=:/opt/module/hadoop-2.7.3/lib/native org.apache.flume.node.Application --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/apache-flume-1.7.0-bin/lib/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 20/06/27 10:54:39 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/opt/module/apache-flume-1.7.0-bin/conf/flume-test.conf 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Processing:k1 20/06/27 10:54:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Creating channels 20/06/27 10:54:39 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Created channel c1 20/06/27 10:54:39 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR 20/06/27 10:54:39 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 20/06/27 10:54:39 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 20/06/27 10:54:39 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:Taildir source: { positionFile: /opt/module/log_postition.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@736c21d4 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 20/06/27 10:54:39 INFO node.Application: Starting Channel c1 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 20/06/27 10:54:39 INFO node.Application: Starting Sink k1 20/06/27 10:54:39 INFO node.Application: Starting Source r1 20/06/27 10:54:39 INFO taildir.TaildirSource: r1 TaildirSource source starting with directory: {f1=/tmp/logs/^app.+.log$} 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: taildirCache: [{filegroup=‘f1‘, filePattern=‘/tmp/logs/^app.+.log$‘, cached=true}] 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: headerTable: {} 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Opening file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 0 20/06/27 10:54:39 INFO taildir.ReliableTaildirEventReader: Updating position from position file: /opt/module/log_postition.json 20/06/27 10:54:39 INFO taildir.TailFile: Updated position, file: /tmp/logs/app-2020-06-27.log, inode: 2643708, pos: 5693224 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 20/06/27 10:54:39 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
一下是生成的debug信息.
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 36 7C 7B 22 1593226324596|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 37 7C 7B 22 1593226324597|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_event} body: 31 35 39 33 32 32 36 33 32 34 35 39 37 7C 7B 22 1593226324597|{" }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_start} body: 7B 22 61 63 74 69 6F 6E 22 3A 22 31 22 2C 22 61 {"action":"1","a }
20/06/27 10:52:10 INFO sink.LoggerSink: Event: { headers:{topic=topic_start} body: 7B 22 61 63 74 69 6F 6E 22 3A 22 31 22 2C 22 61 {"action":"1","a }
一下的测试kafkachannel 把kafka的安装省略
/opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list /opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 2 --partitions 1 --topic topic_start 创建 topic ,在创建个event 的. /opt/module/kafka_2.11-1.0.1/bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic topic_start
开启kafka
/opt/module/kafka_2.11-1.0.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-1.0.1/config/server.properties
查看在zk中的注册情况
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids [30, 20, 10] 三台注册正常.
然后开启flume ,下面就是最新的配置文件 ,flume channel到kafka里面去
/opt/module/apache-flume-1.7.0-bin/bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/module/apache-flume-1.7.0-bin/conf/flume.conf -Dflume.root.logger=DEBUG,console
案例2 . flume conf文件测试
以下是flume的配置文件 ,channel到kafkachannel里面去,没有sink阶段.
[root@hadoop101 conf]# more flume.conf #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔 a1.sources = r1 a1.channels = c1 c2 #组名名.属性名=属性值 a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 a1.sources.r1.batchSize =1000 a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$ #json保存的位置 a1.sources.r1.positionFile=/opt/module/log_postition.json #定义拦截器 a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder #定义选择器 a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header =topic a1.sources.r1.selector.mapping.topic_start =c1 a1.sources.r1.selector.mapping.topic_event =c2 #定义channel a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092, a1.channels.c1.kafka.topic=topic_start a1.channels.c1.parseAsFlumeEvent=false a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092, a1.channels.c2.kafka.topic=topic_event a1.channels.c2.parseAsFlumeEvent=false #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a1.sources.r1.channels=c1 c2
在咖啡卡里查看有没有channel到的数据
第二层Flume 从 kafka->HDFS .[kafka作作为kafka_source]
flume-hdfs.conf