玩转Kafka海量日志收集实战之架构介绍与日志输出
Posted 怎能止步于此
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了玩转Kafka海量日志收集实战之架构介绍与日志输出相关的知识,希望对你有一定的参考价值。
海量日志收集实战
话不多说,看架构图:
最左侧,是Beats,它主要是用于收集日志的,比如这个Filebeat它的底层是用erlang语言写的,性能非常好,其实我们的系统中打印出来的日志,都会用这个Filebeat给我们抓取出来。
在我们这个架构中,Filebeat的主要作用就是把我们的日志搜集出来并转储到Kafka。
然后看Kafka的右边,是Logstash,它主要是用日志做一个过滤,然后它会把你过滤后的数据发送到Elasticsearch里。
最后利用Kibana进行展示。
接下来,我们就来说一下我们要做的事情,先看下这张图:
我们这了采用的是Log4j2,为什么不采用SpringBoot默认集成的logback?
这是因为Log4j2性能会更好一些,因为它的底层是基于无锁并行框架Disruptor做的,关于Disruptor大家可以看看这篇文章《java高阶编程之无锁并行计算框架——Disruptor初识》,当然,它想要发挥出威力还是有一定前提的,就是要求你配置一定要高,因为Disruptor比较耗内存和CPU,所以你想要日志收集的更实时,那对应的你应用服务的性能和配置也要高一些,并且也要经过一些压测。
这里我们看一下图中两个粉色的app.log与error.log,这里我们把它分成两个对应的log日志文件,app.log在这里我们可以存储一个全量的log日志,也就是说无论我们的业务打什么样级别的日志,都会存储到app.log里。然后error.log,只要出现异常就会存储在error.log里。这样的的目的主要是用于我们后续做一些告警和分析,所以我们后续的一些告警会通过对这个error.log做一些手段,比如定时任务做定时抓取。
最后,我们把这些日志通过filebeat抓取出来,转储到Kafka中,然后Kafka收到消息后,他对应的消费者也就是Logstash,它得到消息后在发送给Elasticsearch。
以上就是这个架构的整体设计。
日志输出
我们直接开拔,打开Idea,每天coding十小时,健康工作一百年。
首先引入依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>
工具类:
这是Dubbo底层用到的一个工具类,可以拿来直接使用。
public class NetUtil
public static String normalizeAddress(String address)
String[] blocks = address.split("[:]");
if (blocks.length > 2)
throw new IllegalArgumentException(address + " is invalid");
String host = blocks[0];
int port = 80;
if (blocks.length > 1)
port = Integer.valueOf(blocks[1]);
else
address += ":" + port; //use default 80
String serverAddr = String.format("%s:%d", host, port);
return serverAddr;
public static String getLocalAddress(String address)
String[] blocks = address.split("[:]");
if (blocks.length != 2)
throw new IllegalArgumentException(address + " is invalid address");
String host = blocks[0];
int port = Integer.valueOf(blocks[1]);
if ("0.0.0.0".equals(host))
return String.format("%s:%d", NetUtil.getLocalIp(), port);
return address;
private static int matchedIndex(String ip, String[] prefix)
for (int i = 0; i < prefix.length; i++)
String p = prefix[i];
if ("*".equals(p)) //*, assumed to be IP
if (ip.startsWith("127.") ||
ip.startsWith("10.") ||
ip.startsWith("172.") ||
ip.startsWith("192."))
continue;
return i;
else
if (ip.startsWith(p))
return i;
return -1;
public static String getLocalIp(String ipPreference)
if (ipPreference == null)
ipPreference = "*>10>172>192>127";
String[] prefix = ipPreference.split("[> ]+");
try
Pattern pattern = Pattern.compile("[0-9]+\\\\.[0-9]+\\\\.[0-9]+\\\\.[0-9]+");
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
String matchedIp = null;
int matchedIdx = -1;
while (interfaces.hasMoreElements())
NetworkInterface ni = interfaces.nextElement();
Enumeration<InetAddress> en = ni.getInetAddresses();
while (en.hasMoreElements())
InetAddress addr = en.nextElement();
String ip = addr.getHostAddress();
Matcher matcher = pattern.matcher(ip);
if (matcher.matches())
int idx = matchedIndex(ip, prefix);
if (idx == -1) continue;
if (matchedIdx == -1)
matchedIdx = idx;
matchedIp = ip;
else
if (matchedIdx > idx)
matchedIdx = idx;
matchedIp = ip;
if (matchedIp != null) return matchedIp;
return "127.0.0.1";
catch (Exception e)
return "127.0.0.1";
public static String getLocalIp()
return getLocalIp("*>10>172>192>127");
public static String remoteAddress(SocketChannel channel)
SocketAddress addr = channel.socket().getRemoteSocketAddress();
String res = String.format("%s", addr);
return res;
public static String localAddress(SocketChannel channel)
SocketAddress addr = channel.socket().getLocalSocketAddress();
String res = String.format("%s", addr);
return addr == null ? res : res.substring(1);
public static String getPid()
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName();
int index = name.indexOf("@");
if (index != -1)
return name.substring(0, index);
return null;
public static String getLocalHostName()
try
return (InetAddress.getLocalHost()).getHostName();
catch (UnknownHostException uhe)
String host = uhe.getMessage();
if (host != null)
int colon = host.indexOf(':');
if (colon > 0)
return host.substring(0, colon);
return "UnknownHost";
这个是FastJson通用的工具类:
public class FastJsonConvertUtil
private static final SerializerFeature[] featuresWithNullValue = SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty ;
/**
* <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
* <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
* @param data JSON字符串
* @param clzss 转换对象
* @return T
*/
public static <T> T convertJSONToObject(String data, Class<T> clzss)
try
T t = JSON.parseObject(data, clzss);
return t;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
* <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
* @param data JSONObject对象
* @param clzss 转换对象
* @return T
*/
public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss)
try
T t = JSONObject.toJavaObject(data, clzss);
return t;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
* <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
* @param data JSON字符串数组
* @param clzss 转换对象
* @return List<T>集合对象
*/
public static <T> List<T> convertJSONToArray(String data, Class<T> clzss)
try
List<T> t = JSON.parseArray(data, clzss);
return t;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
* <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
* @param data List<JSONObject>
* @param clzss 转换对象
* @return List<T>集合对象
*/
public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss)
try
List<T> t = new ArrayList<T>();
for (JSONObject jsonObject : data)
t.add(convertJSONToObject(jsonObject, clzss));
return t;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B>将对象转为JSON字符串<BR>
* <B>概要说明:</B>将对象转为JSON字符串<BR>
* @param obj 任意对象
* @return JSON字符串
*/
public static String convertObjectToJSON(Object obj)
try
String text = JSON.toJSONString(obj);
return text;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B>将对象转为JSONObject对象<BR>
* <B>概要说明:</B>将对象转为JSONObject对象<BR>
* @param obj 任意对象
* @return JSONObject对象
*/
public static JSONObject convertObjectToJSONObject(Object obj)
try
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
catch (Exception e)
e.printStackTrace();
return null;
/**
* <B>方法名称:</B><BR>
* <B>概要说明:</B><BR>
* @param obj
* @return
*/
public static String convertObjectToJSONWithNullValue(Object obj)
try
String text = JSON.toJSONString(obj, featuresWithNullValue);
return text;
catch (Exception e)
e.printStackTrace();
return null;
然后,我们再写一个工具类:
@Componet
public class InputMDC implements EnvironmentAware
private static Environment environment;
@Override
public void setEnvironment(Environment environment)
InputMDC.environment = environment;
public static void putMDC()
MDC.put("hostName",NetUtil.getLocalHostName());
MDC.put("ip",NetUtil.getLocalIp());
MDC.put("applicationName",environment.getProperty(以上是关于玩转Kafka海量日志收集实战之架构介绍与日志输出的主要内容,如果未能解决你的问题,请参考以下文章