玩转Kafka海量日志收集实战之架构介绍与日志输出

Posted 怎能止步于此

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了玩转Kafka海量日志收集实战之架构介绍与日志输出相关的知识,希望对你有一定的参考价值。

海量日志收集实战

话不多说,看架构图:

最左侧,是Beats,它主要是用于收集日志的,比如这个Filebeat它的底层是用erlang语言写的,性能非常好,其实我们的系统中打印出来的日志,都会用这个Filebeat给我们抓取出来。
在我们这个架构中,Filebeat的主要作用就是把我们的日志搜集出来并转储到Kafka。

然后看Kafka的右边,是Logstash,它主要是用日志做一个过滤,然后它会把你过滤后的数据发送到Elasticsearch里。

最后利用Kibana进行展示。

接下来,我们就来说一下我们要做的事情,先看下这张图:

我们这了采用的是Log4j2,为什么不采用SpringBoot默认集成的logback?
这是因为Log4j2性能会更好一些,因为它的底层是基于无锁并行框架Disruptor做的,关于Disruptor大家可以看看这篇文章《java高阶编程之无锁并行计算框架——Disruptor初识》,当然,它想要发挥出威力还是有一定前提的,就是要求你配置一定要高,因为Disruptor比较耗内存和CPU,所以你想要日志收集的更实时,那对应的你应用服务的性能和配置也要高一些,并且也要经过一些压测。

这里我们看一下图中两个粉色的app.log与error.log,这里我们把它分成两个对应的log日志文件,app.log在这里我们可以存储一个全量的log日志,也就是说无论我们的业务打什么样级别的日志,都会存储到app.log里。然后error.log,只要出现异常就会存储在error.log里。这样的的目的主要是用于我们后续做一些告警和分析,所以我们后续的一些告警会通过对这个error.log做一些手段,比如定时任务做定时抓取。

最后,我们把这些日志通过filebeat抓取出来,转储到Kafka中,然后Kafka收到消息后,他对应的消费者也就是Logstash,它得到消息后在发送给Elasticsearch。

以上就是这个架构的整体设计。

日志输出

我们直接开拔,打开Idea,每天coding十小时,健康工作一百年。

首先引入依赖:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/>
    </parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>
    </dependencies>

工具类:
这是Dubbo底层用到的一个工具类,可以拿来直接使用。

public class NetUtil 

    public static String normalizeAddress(String address) 
        String[] blocks = address.split("[:]");
        if (blocks.length > 2) 
            throw new IllegalArgumentException(address + " is invalid");
        
        String host = blocks[0];
        int port = 80;
        if (blocks.length > 1) 
            port = Integer.valueOf(blocks[1]);
         else 
            address += ":" + port; //use default 80
        
        String serverAddr = String.format("%s:%d", host, port);
        return serverAddr;
    

    public static String getLocalAddress(String address) 
        String[] blocks = address.split("[:]");
        if (blocks.length != 2) 
            throw new IllegalArgumentException(address + " is invalid address");
        
        String host = blocks[0];
        int port = Integer.valueOf(blocks[1]);

        if ("0.0.0.0".equals(host)) 
            return String.format("%s:%d", NetUtil.getLocalIp(), port);
        
        return address;
    

    private static int matchedIndex(String ip, String[] prefix) 
        for (int i = 0; i < prefix.length; i++) 
            String p = prefix[i];
            if ("*".equals(p))  //*, assumed to be IP
                if (ip.startsWith("127.") ||
                        ip.startsWith("10.") ||
                        ip.startsWith("172.") ||
                        ip.startsWith("192.")) 
                    continue;
                
                return i;
             else 
                if (ip.startsWith(p)) 
                    return i;
                
            
        

        return -1;
    

    public static String getLocalIp(String ipPreference) 
        if (ipPreference == null) 
            ipPreference = "*>10>172>192>127";
        
        String[] prefix = ipPreference.split("[> ]+");
        try 
            Pattern pattern = Pattern.compile("[0-9]+\\\\.[0-9]+\\\\.[0-9]+\\\\.[0-9]+");
            Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
            String matchedIp = null;
            int matchedIdx = -1;
            while (interfaces.hasMoreElements()) 
                NetworkInterface ni = interfaces.nextElement();
                Enumeration<InetAddress> en = ni.getInetAddresses();
                while (en.hasMoreElements()) 
                    InetAddress addr = en.nextElement();
                    String ip = addr.getHostAddress();
                    Matcher matcher = pattern.matcher(ip);
                    if (matcher.matches()) 
                        int idx = matchedIndex(ip, prefix);
                        if (idx == -1) continue;
                        if (matchedIdx == -1) 
                            matchedIdx = idx;
                            matchedIp = ip;
                         else 
                            if (matchedIdx > idx) 
                                matchedIdx = idx;
                                matchedIp = ip;
                            
                        
                    
                
            
            if (matchedIp != null) return matchedIp;
            return "127.0.0.1";
         catch (Exception e) 
            return "127.0.0.1";
        
    

    public static String getLocalIp() 
        return getLocalIp("*>10>172>192>127");
    

    public static String remoteAddress(SocketChannel channel) 
        SocketAddress addr = channel.socket().getRemoteSocketAddress();
        String res = String.format("%s", addr);
        return res;
    

    public static String localAddress(SocketChannel channel) 
        SocketAddress addr = channel.socket().getLocalSocketAddress();
        String res = String.format("%s", addr);
        return addr == null ? res : res.substring(1);
    

    public static String getPid() 
        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName();
        int index = name.indexOf("@");
        if (index != -1) 
            return name.substring(0, index);
        
        return null;
    

    public static String getLocalHostName() 
        try 
            return (InetAddress.getLocalHost()).getHostName();
         catch (UnknownHostException uhe) 
            String host = uhe.getMessage();
            if (host != null) 
                int colon = host.indexOf(':');
                if (colon > 0) 
                    return host.substring(0, colon);
                
            
            return "UnknownHost";
        
    

这个是FastJson通用的工具类:

public class FastJsonConvertUtil 

    private static final SerializerFeature[] featuresWithNullValue =  SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
            SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty ;

    /**
     * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
     * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
     * @param data JSON字符串
     * @param clzss 转换对象
     * @return T
     */
    public static <T> T convertJSONToObject(String data, Class<T> clzss) 
        try 
            T t = JSON.parseObject(data, clzss);
            return t;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

    /**
     * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
     * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
     * @param data JSONObject对象
     * @param clzss 转换对象
     * @return T
     */
    public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) 
        try 
            T t = JSONObject.toJavaObject(data, clzss);
            return t;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

    /**
     * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
     * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
     * @param data JSON字符串数组
     * @param clzss 转换对象
     * @return List<T>集合对象
     */
    public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) 
        try 
            List<T> t = JSON.parseArray(data, clzss);
            return t;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

    /**
     * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
     * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
     * @param data List<JSONObject>
     * @param clzss 转换对象
     * @return List<T>集合对象
     */
    public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) 
        try 
            List<T> t = new ArrayList<T>();
            for (JSONObject jsonObject : data) 
                t.add(convertJSONToObject(jsonObject, clzss));
            
            return t;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

    /**
     * <B>方法名称:</B>将对象转为JSON字符串<BR>
     * <B>概要说明:</B>将对象转为JSON字符串<BR>
     * @param obj 任意对象
     * @return JSON字符串
     */
    public static String convertObjectToJSON(Object obj) 
        try 
            String text = JSON.toJSONString(obj);
            return text;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

    /**
     * <B>方法名称:</B>将对象转为JSONObject对象<BR>
     * <B>概要说明:</B>将对象转为JSONObject对象<BR>
     * @param obj 任意对象
     * @return JSONObject对象
     */
    public static JSONObject convertObjectToJSONObject(Object obj)
        try 
            JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
            return jsonObject;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    


    /**
     * <B>方法名称:</B><BR>
     * <B>概要说明:</B><BR>
     * @param obj
     * @return
     */
    public static String convertObjectToJSONWithNullValue(Object obj) 
        try 
            String text = JSON.toJSONString(obj, featuresWithNullValue);
            return text;
         catch (Exception e) 
            e.printStackTrace();
            return null;
        
    

然后,我们再写一个工具类:

@Componet
public class InputMDC implements EnvironmentAware 

    private static Environment environment;
    @Override
    public void setEnvironment(Environment environment) 
        InputMDC.environment = environment;
    

    public static void putMDC() 
        MDC.put("hostName",NetUtil.getLocalHostName());
        MDC.put("ip",NetUtil.getLocalIp());
        MDC.put("applicationName",environment.getProperty(

以上是关于玩转Kafka海量日志收集实战之架构介绍与日志输出的主要内容,如果未能解决你的问题,请参考以下文章

Go实现海量日志收集系统

海量日志下的日志架构优化:filebeat+logstash+kafka+ELK

ELK 日志采集框架:架构设计

玩转Flume+Kafka原来也就那点事儿

JavaWeb项目架构之Kafka分布式日志队列

JavaWeb项目架构之Kafka分布式日志队列