FurionLoki查询实现请求信息和日志的获取

Posted sysu_lluozh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FurionLoki查询实现请求信息和日志的获取相关的知识,希望对你有一定的参考价值。

Furion平台中fagent上报日志和请求sample信息到Loki中【Furion】Loki日志系统

现在需要不使用Grafana展示数据,直接获取Loki的数据进行展示

一、Loki API

Loki提供了一系列开放的API支持数据的查询HTTP API,接下来看看符合使用的query_range接口的使用

GET /loki/api/v1/query_range

/loki/api/v1/query_range is used to do a query over a range of time andaccepts the following query parameters in the URL:

  • query: The LogQL query to perform
  • limit: The max number of entries to return
  • start: The start time for the query as a nanosecond Unix epoch. Defaults to one hour ago
  • end: The start time for the query as a nanosecond Unix epoch. Defaults to now
  • step: Query resolution step width in seconds. Defaults to a dynamic value based on start and end
  • direction: Determines the sort order of logs. Supported values are forward or backward. Defaults to backward

Requests against this endpoint require Loki to query the index store in order tofind log streams for particular labels. Because the index store is spread out bytime, the time span covered by start and end, if large, may cause additionalload against the index server and result in a slow query

In microservices mode, /loki/api/v1/query_range is exposed by the querier

Response:


  "status": "success",
  "data": 
    "resultType": "matrix" | "streams",
    "result": [<matrix value>] | [<stream value>]
  

Where <matrix value> is:


  "metric": 
    <label key-value pairs>
  ,
  "values": [
    <number: nanosecond unix epoch>,
    <string: value>
  ]

And <stream value> is:


  "stream": 
    <label key-value pairs>
  ,
  "values": [
    [
      <string: nanosecond unix epoch>,
      <string: log line>
    ],
    ...
  ]

Examples

$ curl -G -s  "http://localhost:3100/loki/api/v1/query_range" --data-urlencode 'query=job="varlogs"' | jq

  "status": "success",
  "data": 
    "resultType": "streams",
    "result": [
      
        "stream": 
          "filename": "/var/log/myproject.log",
          "job": "varlogs",
          "level": "info"
        ,
        "values": [
          
            "1569266497240578000",
            "foo"
          ,
          
            "1569266492548155000",
            "bar"
          
        ]
      
    ]
  

二、查询语句

查看loki的配置信息

data:
  config.yml: |-
    server:
      http_listen_port: 9080
      grpc_listen_port: 0
    positions:
      filename: /tmp/positions.yaml
    clients:
      - url: http://loki-swqa-dev.test.seewo.com/loki/api/v1/push
    scrape_configs:
      - job_name: jmxlog
        static_configs:
        - targets:
          - localhost
          labels:
            job: jmxlogs
            __path__: /app/jmxs/loki/logs/*.log
      - job_name: jmxsample
        static_configs:
        - targets:
          - localhost
          labels:
            job: jmxsamples
            __path__: /app/jmxs/loki/sample/*.csv

现在需要的是解析得到接口请求的sample信息列表和各个节点的日志信息

2.1 查询sample信息列表的语句

  • 对应job
    从配置信息中可知查询对应的job = "jmxsamples"

  • 区分任务搜索
    sample信息列表需要区分不同的任务执行的信息,解决方案是不同的执行压测任务的sample生成在不同文件中,文件名中带有此次执行的任务id
    所以在查询语句中需要查询对应的文件名且需要模糊查询,文件名搜索如filename = ".+taskId.+.csv"

所以对应的搜索语句

job = "jmxsamples", filename = ".+taskId.+.csv"

2.2 查询日志信息的语句

  • 对应job
    从配置信息中可知查询对应的job = "jmxlogs"
  • 区分agent搜索
    日志需要获取到不同的任务中不同的agent的执行日志信息,解决方案是不同的执行压测任务的日志生成在不同的文件中,且在文件名中带有此次执行的任务id以及该agnet的ip地址
    所以在查询语句中需要查询对应的文件名且需要模糊查询,文件名搜索如filename = ".+agentIp+taskId+.+.csv"

所以对应的搜索语句

job = "jmxlogs", filename = ".+agentIp+taskId.+.csv"

三、sample数据

3.1 接口请求

通过http请求获取对应的信息

OkHttp3Client httpClient = new OkHttp3Client();
// loki请求的url信息
String lokiUrl = lokiHost + "/loki/api/v1/query_range";
// 请求的查询语句
String queryParam = "job=\\"jmxsamples\\", filename=~\\".+"+ reportId +".+.csv\\"";
// http请求
String responseJson = httpClient.get(lokiUrl)
            .param("direction", "BACKWARD")
            .param("limit", limit)
            .param("query", URLEncoder.encode(queryParam, "utf-8"))
            .param("start", startTimestamp)
            .param("end", endTimestamp)
            .execute().convertString();

3.2 解析数据

根据请求loki的响应数据,解析获得需要的数据格式

// 定义获取报告列表的变量
List<ReportSampleReqDto> reportSamples = new ArrayList<>();

// 接口请求的数据格式化处理
ReportLokiDto res = JSONObject.parseObject(responseJson, ReportLokiDto.class);

// 解析响应的数据
res.getData().getResult().forEach(it->it.getValues().stream().map(val->val.get(1)).forEach(str -> 
              List<String> ele = Arrays.asList(str.split(","));
              ReportSampleReqDto sampleDto = new ReportSampleReqDto();
              sampleDto.setReportId(reportId);
              sampleDto.setTimestamp(ele.get(0));
              sampleDto.setElapsed(Long.valueOf(ele.get(1)));
              sampleDto.setLabel(ele.get(2));
              sampleDto.setResponseCode(ele.get(3));
              sampleDto.setResponseMessage(ele.get(4));
              sampleDto.setThreadName(ele.get(5));
              sampleDto.setSuccess(ele.get(7));
              sampleDto.setFailureMessage(ele.get(8));
              sampleDto.setBytes(Long.valueOf(ele.get(9)));
              sampleDto.setUrl(ele.get(13));
              sampleDto.setConnect(Long.valueOf(ele.get(16)));
              // 将解析的数据写入到列表中
              reportSamples.add(sampleDto);
          
));

3.3 批量写入

将解析的数据批量写入数据库中

ReportSampleDao.java

/**
 * 批量插入报告详情
 */
void batchAddSample(@Param("samples") List<ReportSampleReqDto> samples);

ReportSampleMapper.xml

<insert id="batchAddSample">
            INSERT INTO report_sample
                (`report_id`, `timestamp`, `elapsed`, `label`, `response_code`, `response_message`, `thread_name`, `success`, `failure_message`, `bytes`, `url`, `connect`)
            VALUES
                    <foreach collection="samples" item="sample" index="index" separator=",">
                    (
                        #sample.reportId,
                        #sample.timestamp,
                        #sample.elapsed,
                        #sample.label,
                        #sample.responseCode,
                        #sample.responseMessage,
                        #sample.threadName,
                        #sample.success,
                        #sample.failureMessage,
                        #sample.bytes,
                        #sample.url,
                        #sample.connect
                    )
                </foreach>
        </insert>

批量写入数据库

@Resource
ReportSampleDao reportSampleDao;
    
reportSampleDao.batchAddSample(reportSamples);

四、日志数据

4.1 接口请求

通过http请求获取对应的信息

OkHttp3Client httpClient = new OkHttp3Client();
// loki请求的url信息
String lokiUrl = lokiHost + "/loki/api/v1/query_range";
// 请求的查询语句
String queryParam = "job=\\"jmxlogs\\", filename=~\\".+-"+ agentIp + "-" + reportId +".log\\"";
// http请求
String responseJson = httpClient.get(lokiUrl)
            .param("direction", "BACKWARD")
            .param("limit", limit)
            .param("query", URLEncoder.encode(queryParam, "utf-8"))
            .param("start", startTimestamp)
            .param("end", endTimestamp)
            .execute().convertString();

4.2 解析数据

根据请求loki的响应数据,解析获得需要的数据格式

// 定义获取日志的变量
StringBuffer buf=new StringBuffer();

// 接口请求的数据格式化处理
ReportLokiDto res = JSONObject.parseObject(responseJson, ReportLokiDto.class);

// 解析响应的数据
res.getData().getResult().forEach(it->it.getValues().stream().map(val->val.get(1)).forEach(str -> 
         buf.append(str).append("\\n");
     
));

// 将获得的日志转换成string格式
buf.toString()

以上是关于FurionLoki查询实现请求信息和日志的获取的主要内容,如果未能解决你的问题,请参考以下文章

FurionLoki查询之LogQL语句

从Nginx的access日志统计PVUV和热点资源

如何在 Linux 中获取 Apache 的“每秒请求数”?

获取请求的信息对象

MySQL 元数据:获取查询语句影响的记录数

抖音获取视频点赞数播放数获取用户粉丝列表