分布式ELK+KAFKA日志采集 docker-compose
Posted gblfy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式ELK+KAFKA日志采集 docker-compose相关的知识,希望对你有一定的参考价值。
文章目录
一、安装docker-compose插件
1. 下载docker-compose插件
curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
2. 赋予权限
chmod +x /usr/local/bin/docker-compose
二、搭建ELK+KAFKA环境
内存建议4g及以上
2.1. 编写docker-compose
cd /app/
mkdir mayiktelkkafka
上传docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
volumes:
- /etc/localtime:/etc/localtime
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: 120
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000
kafka-manager:
image: sheepkiller/kafka-manager
environment:
ZK_HOSTS: 192.168.122.128
ports:
- "9001:9001"
elasticsearch:
image: daocloud.io/library/elasticsearch:6.5.4
restart: always
container_name: elasticsearch
ports:
- "9200:9200"
kibana:
image: daocloud.io/library/kibana:6.5.4
restart: always
container_name: kibana
ports:
- "5601:5601"
environment:
- elasticsearch_url=http://192.168.122.128:9200
depends_on:
- elasticsearch
2.2. 启动docker-compose
docker-compose up
这个错误需要你检查一下命令后面是否有多余的空格,删除重新运行即可
启动成功后的效果图
成功启动后有5个容器,如果容器个数不够根据容器ID查看日志,我使用的是虚拟机,启动后es容器启动失败,查查看日志
异常信息+解决方案->跳转:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
2.3. 验证效果
访问zk:http://192.168.122.128:2181
访问es:http://192.168.122.128:9200
访问kibana:http://192.168.122.128:5601/app/kibana#/home?_g=()
2.4. 安装logstash
提前安装jdk环境,logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702
上传或者下载logstash-6.4.3.tar.gz到服务器中
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
解压
tar -zxvf logstash-6.4.3.tar.gz
安装插件
cd logstash-6.4.3
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch
编写配置文件
cd config
vim elk-kafka.conf
内容如下
input
kafka
bootstrap_servers => "192.168.122.128:9092"
topics => "mayikt-log"
filter
#Only matched data are send to output.
output
elasticsearch
action => "index" #The operation on ES
hosts => "192.168.122.128:9200" #Ellasticsearch host,can be array.
index => "mayikt_logs" #The index towrite data to.
启动logstash
cd bin
./logstash -f ../config/elk-kafka.conf
三、微信项目投递消息kafka
3.1. 微信集成kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.2. 配置kafka
bootstrap.yml
spring:
kafka:
bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group #群组ID
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.3. aop拦截
package com.mayikt.api.impl.elk.log;
import com.alibaba.fastjson.JSONObject;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
*
* elk+kafka采集
*/
@Aspect
@Component
public class AopLogAspect
@Value("$server.port")
private String serverPort;
@Autowired
private LogContainer logContainer;
// 申明一个切点 里面是 execution表达式
@Pointcut("execution(* com.mayikt.api.impl.*.*.*(..))")
private void serviceAspect()
//
// 请求method前打印内容
@Before(value = "serviceAspect()")
public void methodBefore(JoinPoint joinPoint)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("request_time", df.format(new Date()));
jsonObject.put("request_url", request.getRequestURL().toString());
jsonObject.put("request_method", request.getMethod());
jsonObject.put("signature", joinPoint.getSignature());
jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
JSONObject requestJsonObject = new JSONObject();
requestJsonObject.put("request", jsonObject);
jsonObject.put("request_time", df.format(new Date()));
jsonObject.put("log_type", "info");
// 将日志信息投递到kafka中
String log = requestJsonObject.toJSONString();
logContainer.putLog(log);
//
// 在方法执行完结后打印返回内容
@AfterReturning(returning = "o", pointcut = "serviceAspect()")
public void methodAfterReturing(Object o)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject respJSONObject = new JSONObject();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("response_time", df.format(new Date()));
jsonObject.put("response_content", JSONObject.toJSONString(o));
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
jsonObject.put("log_type", "info");
respJSONObject.put("response", jsonObject);
// 将日志信息投递到kafka中
// kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());
logContainer.putLog(respJSONObject.toJSONString());
//
//
/**
* 异常通知
*
* @param point
*/
@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
public void serviceAspect(JoinPoint point, Exception e)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("request_time", df.format(new Date()));
jsonObject.put("request_url", request.getRequestURL().toString());
jsonObject.put("request_method", request.getMethod());
jsonObject.put("signature", point.getSignature());
jsonObject.put("request_args", Arrays.toString(point.getArgs()));
jsonObject.put("error", e.toString());
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
jsonObject.put("log_type", "error");
JSONObject requestJsonObject = new JSONObject();
requestJsonObject.put("request", jsonObject);
// 将日志信息投递到kafka中
String log = requestJsonObject.toJSONString();
logContainer.putLog(log);
//
public static String getIpAddr(HttpServletRequest request)
//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
String ipAddress = request.getHeader("x-forwarded-for");
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress))
ipAddress = request.getHeader("Proxy-Client-IP");
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress))
ipAddress = request.getHeader("WL-Proxy-Client-IP");
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress))
ipAddress = request.getRemoteAddr();
if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1"))
//根据网卡取本机配置的IP
InetAddress inet = null;
try
inet = InetAddress.getLocalHost();
catch (UnknownHostException e)
e.printStackTrace();
ipAddress = inet.getHostAddress();
//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
if (ipAddress != null && ipAddress.length() > 15) //"***.***.***.***".length() = 15
if (ipAddress.indexOf(",") > 0)
ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
return ipAddress;
3.4. 消息投递
package com.mayikt.api.impl.elk.log;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingDeque;
@Component
public class LogContainer
private LogThread logThread;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public LogContainer()
logThread = new LogThread();
logThread.start();
private static LinkedBlockingDeque<String> logs = new LinkedBlockingDeque<>();
/**
* 存入一条日志消息到并发队列中
*
* @param log
*/
public void putLog(String log)
logs.offer(log);
/**
* 异步日志线程 实时从队列中获取内容
*/
class LogThread extends Thread
@Override
public void run()
while (true)
/**
* 代码的优化
* 当前线程批量获取多条日志消息 投递kafka 批量
*
*/
String log = logs.poll();
if (!StringUtils.isEmpty(log))
/// 将该消息投递到kafka中 批量形式投递kafka
kafkaTemplate.send("mayikt-log", log);
3.5. 测试接口
package com.mayikt.api.weixin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
public interface WeChatService
/**
* feign rpc远程调用 405
* @param a
* @return
*/
@GetMapping("/getWeChat")
String getWeChat( @RequestParam("a")Integer a);
3.6. apipost 发送请求
http://localhost:9000/getWeChat?a=123456888
3.7. kibana 查看日志
以上是关于分布式ELK+KAFKA日志采集 docker-compose的主要内容,如果未能解决你的问题,请参考以下文章
「视频小课堂」ELK和Kafka是怎么就玩在一起成了日志采集解决方案文字版