如何使用 Micrometer Timer 记录异步方法的持续时间(返回 Mono 或 Flux)

Posted

技术标签:

【中文标题】如何使用 Micrometer Timer 记录异步方法的持续时间(返回 Mono 或 Flux)【英文标题】:How to use Micrometer Timer to record duration of async method (returns Mono or Flux) 【发布时间】:2018-08-24 23:23:52 【问题描述】:

我想使用 Micrometer 记录异步方法最终发生时的执行时间。有推荐的方法吗?

示例:Kafka 回复模板。我想记录实际执行 sendAndReceive 调用所需的时间(发送关于请求主题的消息并接收关于回复主题的响应)。

    public Mono<String> sendRequest(Mono<String> request) 
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> 
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            )
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

有点像

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

在这里不起作用;它只记录创建Supplier 所需的时间,而不是实际执行时间。

【问题讨论】:

它如何与recordCallable() 一起工作? 【参考方案1】:

您可以只使用 Mono/Flux() 中的 metrics()(在此处查看 metrics():https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) 那么你可以做类似的事情

public Mono<String> sendRequest(Mono<String> request) 
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> 
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        )
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()

例如在石墨中,您将看到测量此调用的延迟(您可以在此处查看更多信息:How to use Micrometer timer together with webflux endpoints)

【讨论】:

【参考方案2】:

你可以使用reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer 
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() 
        Mono.fromCallable(() -> 
            Thread.sleep(1234);
            return true;
        ).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) 
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    

【讨论】:

【参考方案3】:

您可以执行以下操作:

// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));

有关示例文档,请参见此处:http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code

对于更好的方法,您还可以查看弹性 4j,他们通过变换装饰单声道:https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor

【讨论】:

【参考方案4】:

我使用了以下内容:

private <T> Publisher<T> time(String metricName, Flux<T> publisher) 
  return Flux.defer(() -> 

  long before = System.currentTimeMillis();
  return publisher.doOnNext(next -> Metrics.timer(metricName)
        .record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
  );

所以要在实践中使用它:

Flux.just(someValue)
  .flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
  .subscribe(val -> )

【讨论】:

【参考方案5】:

你可以使用metrics(),计算时间间隔b/w subscribe()onComplete()的方法。你可以这样做,

 .metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);

【讨论】:

【参考方案6】:

如果您考虑使用metrics(),请理解即使您调用Mono.name(),它也不会创建新的仪表。

根据你的情况,你有三个选择。

    使用metrics() 好吧,如果您考虑使用metrics(),请理解即使您调用Mono.name(),它也不会创建新的仪表。 在doOnNext 中记录时间并计算时间。 使用Alexander Pankin 规定的subscriptionContext

就个人而言,我想使用方法3

【讨论】:

【参考方案7】:

看起来像 Brian Clozel 建议的 recordCallable 就是答案。我写了一个快速测试来验证这一点:

import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;

public class Capitalizer 

    private final Timer timer;

    public Capitalizer(Timer timer) 
        this.timer = timer;
    

    public Mono<String> capitalize(Mono<String> val) 
        return val.flatMap(v -> 
            try 
                return timer.recordCallable(() -> toUpperCase(v));
             catch (Exception e) 
                e.printStackTrace();
                return null;
            
        ).filter(r -> r != null);
    

    private Mono<String> toUpperCase(String val) throws InterruptedException 
        Thread.sleep(1000);
        return Mono.just(val.toUpperCase());
    

并对此进行测试:

import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

public class CapitalizerTest 

    private static final Logger logger =
        LoggerFactory.getLogger(CapitalizerTest.class);

    private Capitalizer capitalizer;
    private Timer timer;

    @Before
    public void setUp() 
        timer = new SimpleMeterRegistry().timer("test");
        capitalizer = new Capitalizer(timer);
    

    @Test
    public void testCapitalize() 
        String val = "Foo";
        Mono<String> inputMono = Mono.just(val);
        Mono<String> mono = capitalizer.capitalize(inputMono);
        mono.subscribe(v -> logger.info("Capitalized  to ", val, v));
        assertEquals(1, timer.count());
        logger.info("Timer executed in  ms",
            timer.totalTime(TimeUnit.MILLISECONDS));
        assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
    

计时器报告执行时间大约为 1004 毫秒,延迟 1000 毫秒,没有延迟 4 毫秒。

【讨论】:

为什么recordCallable 会抛出一个经过检查但通用的Exception 该测试仅“有效”,因为 Thread.sleep(1000); 阻塞。计时器仍然不记录返回的单声道的持续时间。 timer.recordCallable 只计算toUpperCase 的执行时间,而不是Mono 的执行时间。

以上是关于如何使用 Micrometer Timer 记录异步方法的持续时间(返回 Mono 或 Flux)的主要内容,如果未能解决你的问题,请参考以下文章

PromQL:rate() 函数的用途是啥?

如何在 io.micrometer 中启用 DiskSpaceMetrics

如何使用 Spring Boot 在运行时配置 Micrometer 的监控系统

如何根据标签更新 MicroMeter 量规

如何使用 Micrometer 和 Alertmanager 在 Prometheus 中提醒 JVM 内存使用情况

如何配置 spring-micrometer 标签