分布式ELK+KAFKA日志采集 docker-compose

Posted gblfy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式ELK+KAFKA日志采集 docker-compose相关的知识,希望对你有一定的参考价值。



文章目录

一、安装docker-compose插件
1. 下载docker-compose插件
curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
2. 赋予权限
chmod +x /usr/local/bin/docker-compose
二、搭建ELK+KAFKA环境

内存建议4g及以上

2.1. 编写docker-compose
cd /app/
mkdir mayiktelkkafka

上传docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    volumes:
      - /etc/localtime:/etc/localtime
    ports:
      - "9092:9092"
    environment:
       KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_ADVERTISED_PORT: 9092
       KAFKA_LOG_RETENTION_HOURS: 120
       KAFKA_MESSAGE_MAX_BYTES: 10000000
       KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
       KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
       KAFKA_NUM_PARTITIONS: 3
       KAFKA_DELETE_RETENTION_MS: 1000
  kafka-manager:
    image: sheepkiller/kafka-manager
    environment:
      ZK_HOSTS: 192.168.122.128
    ports:
      - "9001:9001"
  elasticsearch:
    image: daocloud.io/library/elasticsearch:6.5.4
    restart: always
    container_name: elasticsearch
    ports:
      - "9200:9200"
  kibana:
    image: daocloud.io/library/kibana:6.5.4
    restart: always
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
       - elasticsearch_url=http://192.168.122.128:9200
    depends_on:
      - elasticsearch
2.2. 启动docker-compose
docker-compose up

这个错误需要你检查一下命令后面是否有多余的空格,删除重新运行即可

启动成功后的效果图

成功启动后有5个容器,如果容器个数不够根据容器ID查看日志,我使用的是虚拟机,启动后es容器启动失败,查查看日志
异常信息+解决方案->跳转:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

2.3. 验证效果

访问zk:http://192.168.122.128:2181

访问es:http://192.168.122.128:9200

访问kibana:http://192.168.122.128:5601/app/kibana#/home?_g=()

2.4. 安装logstash

提前安装jdk环境,logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702


上传或者下载logstash-6.4.3.tar.gz到服务器中

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz

解压

tar -zxvf logstash-6.4.3.tar.gz

安装插件

cd logstash-6.4.3
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch


编写配置文件

cd config
vim elk-kafka.conf

内容如下

input 
   kafka 
     bootstrap_servers => "192.168.122.128:9092"
     topics => "mayikt-log"
 

filter 
  #Only matched data are send to output.


output 
  elasticsearch 
    action => "index"  #The operation on ES
    hosts  => "192.168.122.128:9200" #Ellasticsearch host,can be array.
    index  => "mayikt_logs" #The index towrite data to.
 

启动logstash

cd bin
./logstash -f ../config/elk-kafka.conf
三、微信项目投递消息kafka
3.1. 微信集成kafka
    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
3.2. 配置kafka

bootstrap.yml

spring:
  kafka:
    bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default_consumer_group #群组ID
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.3. aop拦截
package com.mayikt.api.impl.elk.log;

import com.alibaba.fastjson.JSONObject;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

/**
 * 
 * elk+kafka采集
 */
@Aspect
@Component
public class AopLogAspect 
    @Value("$server.port")
    private String serverPort;
    @Autowired
    private LogContainer logContainer;
    // 申明一个切点 里面是 execution表达式
    @Pointcut("execution(* com.mayikt.api.impl.*.*.*(..))")
    private void serviceAspect() 
    



    //
    // 请求method前打印内容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) 
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", joinPoint.getSignature());
        jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("request", jsonObject);
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("log_type", "info");
        // 将日志信息投递到kafka中
        String log = requestJsonObject.toJSONString();
        logContainer.putLog(log);

    

    //
    // 在方法执行完结后打印返回内容
    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) 
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        jsonObject.put("log_type", "info");
        respJSONObject.put("response", jsonObject);
        // 将日志信息投递到kafka中
//      kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());
       logContainer.putLog(respJSONObject.toJSONString());
    
//
//
    /**
     * 异常通知
     *
     * @param point
     */
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint point, Exception e) 
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", point.getSignature());
        jsonObject.put("request_args", Arrays.toString(point.getArgs()));
        jsonObject.put("error", e.toString());
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        jsonObject.put("log_type", "error");
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("request", jsonObject);
        // 将日志信息投递到kafka中
        String log = requestJsonObject.toJSONString();
        logContainer.putLog(log);
    
//
    public static String getIpAddr(HttpServletRequest request) 
        //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
        String ipAddress = request.getHeader("x-forwarded-for");
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) 
            ipAddress = request.getHeader("Proxy-Client-IP");
        
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) 
            ipAddress = request.getHeader("WL-Proxy-Client-IP");
        
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) 
            ipAddress = request.getRemoteAddr();
            if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) 
                //根据网卡取本机配置的IP
                InetAddress inet = null;
                try 
                    inet = InetAddress.getLocalHost();
                 catch (UnknownHostException e) 
                    e.printStackTrace();
                
                ipAddress = inet.getHostAddress();
            
        
        //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
        if (ipAddress != null && ipAddress.length() > 15)  //"***.***.***.***".length() = 15
            if (ipAddress.indexOf(",") > 0) 
                ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
            
        
        return ipAddress;
    

3.4. 消息投递
package com.mayikt.api.impl.elk.log;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingDeque;

@Component
public class LogContainer 
    private LogThread logThread;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public LogContainer() 
        logThread = new LogThread();
        logThread.start();
    

    private static LinkedBlockingDeque<String> logs = new LinkedBlockingDeque<>();

    /**
     * 存入一条日志消息到并发队列中
     *
     * @param log
     */
    public void putLog(String log) 
        logs.offer(log);
    

    /**
     * 异步日志线程 实时从队列中获取内容
     */
    class LogThread extends Thread 
        @Override
        public void run() 
            while (true) 

                /**
                 * 代码的优化
                 * 当前线程批量获取多条日志消息 投递kafka   批量
                 *
                 */
                String log = logs.poll();
                if (!StringUtils.isEmpty(log)) 
                    /// 将该消息投递到kafka中 批量形式投递kafka
                    kafkaTemplate.send("mayikt-log", log);
                
            
        
    




3.5. 测试接口
package com.mayikt.api.weixin;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

public interface WeChatService 

    /**
     * feign rpc远程调用 405
     * @param a
     * @return
     */
    @GetMapping("/getWeChat")
    String getWeChat( @RequestParam("a")Integer a);


3.6. apipost 发送请求
http://localhost:9000/getWeChat?a=123456888

3.7. kibana 查看日志


以上是关于分布式ELK+KAFKA日志采集 docker-compose的主要内容,如果未能解决你的问题,请参考以下文章

「视频小课堂」ELK和Kafka是怎么就玩在一起成了日志采集解决方案文字版

Kafka+ELK完成日志采集处理

2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch

ELK采集MySQL慢日志实现

ELK采集MySQL慢日志实现

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台