Flume+Kafka+HDFS综合运用
Posted 皓洲
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume+Kafka+HDFS综合运用相关的知识,希望对你有一定的参考价值。
Flume+Kafka+HDFS综合运用
实验内容
如下图所示:
在某一实际应用中,有一个的数据源(可用Source类型为Exec Source或NetCat Source的Agent a1来用模拟),
为方便后期数据分析,需要记录事件的产生IP、时间(格式:年月日时分秒)以及事件类型(事件类型根据事件Body中包含WARNING:、ERROR:、**INFO:**来确定为WARNING、ERROR、INFO,如不包含,则无需记录事件类型)事件经处理后汇总到Agent a2。
Agent a2根据事件类型,将事件分别送入不同通道进行下一步处理,并将事件类型为WARNING、ERROR写入Kafka集群,并最终被消费者和Agent a3进行消费。
步骤分析
先分析a1,a1需要输出产生IP、时间、事件类型,这里就需要用到拦截器,产生IP使用Host Interceptor,产生时间使用Timestamp Interceptor,产生事件类型需要我们自定义拦截器。
然后分析a2,a2要将正常信息输出,把WARNING、ERROR作为生产者消息传到Kafka集群中,这里需要用到多路 Channel选择器
最后分析a3,a3的Source要作为消费者接收Kafka的消息,再把消息输出到HDFS
还要另外写一个消费者,把错误信息输出到控制台中。
a1的配置信息
自定义拦截器
package flume.zhz.com;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @author zhz
* @date 2021/4/29 16:32
* 备注:
*/
public class MyFlumeInterceptor implements Interceptor {
//自定义属性hostIP
private String hostIP=null;
private MyFlumeInterceptor(String hostIP){
this.hostIP=hostIP;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String[] statuses = {"WARNING","ERROR","INFO"};
/**
* 处理body
*/
StringBuilder builder = new StringBuilder();
//获得body体字节数组
byte[] byteBody = event.getBody();
//将body体转化为字符串
String body = new String(byteBody, Charsets.UTF_8);
//获得headers
Map<String,String> headers = event.getHeaders();
hostIP = headers.get("host");
//时间
String timestamp = headers.get("timestamp");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String datetime = sdf.format(new Date(Long.valueOf(timestamp)));
//事件类型
String status = "NULL";
for(String str : statuses) {
if(body.indexOf(str) == 0) {
status = str;
}
}
//拼接IP地址与body体,形成新body
builder.append("ip:"+hostIP);
builder.append(";datetime:"+datetime);
builder.append(";status:"+status);
builder.append(";body:"+body);
byte[] newBody = builder.toString().trim().getBytes();
//重新设置body体
event.setBody(newBody);
System.out.println("body信息:"+builder.toString().trim());
/**
* 处理headers
*/
String error = "ERROR:";
String warning = "WARNING:";
String info = "INFO:";
StringBuilder str = new StringBuilder();
boolean flag = true;
for(int i=0;i<body.length()&&i<8;i++){
str.append(body.charAt(i));
if (str.equals(error)||str.equals(warning)||str.equals(info)){
headers.put(str.toString(),body);
flag = false;
break;
}
}
if (flag)headers.put("INFO:",body);
event.setHeaders(headers);
return event;
}
/**
* 定义内部类MyBuilder,用于构建自定义拦截器MyFlumeInterceptor的实例,
* 并获取Flume配置文件中自定义的拦截器属性值,将值传给自定义类MyFlumeInterceptor
*/
public static class MyBuilder implements Interceptor.Builder{
private String hostIP=null;
public void configure(Context context){
//获取Flume配置文件中设置的自定义属性值
//字符串“hostIP”需与配置文件中设置的属性hostIP一致
String hostIP = context.getString("hostIP");
this.hostIP = hostIP;
}
public Interceptor build(){
//实例化自定义拦截器并传入自定义属性
return new MyFlumeInterceptor(hostIP);
}
}
@Override
public List<Event> intercept(List<Event> list) {
for(Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
}
a1配置信息
# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source属性配置信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 拦截器名称
a1.sources.r1.interceptors = i1 i2 i3
# 时间拦截器
a1.sources.r1.interceptors.i1.type = timestamp
# 主机名拦截器
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.useIP = true
# 自定义
a1.sources.r1.interceptors.i3.type = flume.zhz.com.MyFlumeInterceptor$MyBuilder
# sink属性配置信息
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = centos02
a1.sinks.k1.port = 23570
# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1
a2配置信息
sink输出到kafka参考官方文档:https://flume.liyifeng.org/#kafka-sink
# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# source属性配置信息
a1.sources.r1.type = avro
a1.sources.r1.bind = centos02
a1.sources.r1.port = 23570
# 选择器配置信息
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = status
a1.sources.r1.selector.mapping.NULL = c1
a1.sources.r1.selector.mapping.INFO = c1
a1.sources.r1.selector.mapping.ERROR = c2
a1.sources.r1.selector.mapping.WARNING = c2
# sink1属性配置信息(控制台输出)
a1.sinks.k1.type = logger
# sink2属性配置信息(输出到kafka)
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topictest
a1.sinks.k2.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sinks.k2.serializer.class = kafka.serializer.StringEncoder
# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# channe2属性配置信息
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels= c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a3配置信息
# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source属性配置信息
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sources.r1.kafka.topics = topictest
a1.sources.r1.serializer.class = kafka.serializer.StringEncoder
# sink属性配置信息
# 组件类型为hdfs
a1.sinks.k1.type = hdfs
#将文件放入按年分类的文件夹中
a1.sinks.k1.hdfs.path = hdfs://centos01:8020/flume/%Y-%m
# 使用时间戳作为文件前缀
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H
# 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .log
# 使用服务器时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 文件块副本数
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 86400
a1.sinks.k1.hdfs.rollSize = 1000000
a1.sinks.k1.hdfs.rollCount = 10000
# channe1属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1
运行
开启zookeeper集群,hdfs HA集群,kafka集群和flume,节点二的生产者,节点三的消费者
测试语句:
正常信息显示到了centos02的控制台上:
消费者捕捉错误信息,并上传到hdfs上:
以上是关于Flume+Kafka+HDFS综合运用的主要内容,如果未能解决你的问题,请参考以下文章