Apache Beam Counter/Metrics 在 Flink WebUI 中不可用

Posted

技术标签:

【中文标题】Apache Beam Counter/Metrics 在 Flink WebUI 中不可用【英文标题】:Apache Beam Counter/Metrics not available in Flink WebUI 【发布时间】:2018-08-07 09:47:22 【问题描述】:

我正在使用 Flink 1.4.1 和 Beam 2.3.0,想知道是否可以在 Flink WebUI(或任何地方)中提供指标,就像在 Dataflow WebUI 中一样?

我使用过类似的计数器:

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();

但我在 Flink WebUI 中的任何地方(任务指标或累加器)都找不到 "elements_read" 计数。我认为在BEAM-773 之后这会很简单。

【问题讨论】:

【参考方案1】:

在仪表板中选择作业后,您将看到该作业的 DAG,并且 DAG 下方有一个选项卡列表。

    点击“任务指标”标签 点击你的 DAG 框 单击“添加指标”按钮,以显示该运算符指标

【讨论】:

试过了,但没有运气。我的计数器不在指标列表中。您是如何创建 Beam 计数器/指标的? 嗯...你能在累加器标签中看到你的计数器吗? @robosoul,有什么进展吗?我也面临同样的问题:我能看到的只是标准指标,没有我的自定义指标的迹象。 @diegoreico .. 我可以在 Accumulators 选项卡中看到指标,但在 Metrics 选项卡中看不到。我正在使用 Flink 版本:1.12.0 .. 使用最新的 Apache Beam Master 分支代码..跨度> 【参考方案2】:

如果您的管道在分离模式下运行,则不支持查询指标。参考this。

public class FlinkDetachedRunnerResult implements PipelineResult 

  FlinkDetachedRunnerResult() 

  @Override
  public State getState() 
    return State.UNKNOWN;
  

  @Override
  public MetricResults metrics() 
    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
  

  @Override
  public State cancel() throws IOException 
    throw new UnsupportedOperationException("Cancelling is not yet supported.");
  

  @Override
  public State waitUntilFinish() 
    return State.UNKNOWN;
  

  @Override
  public State waitUntilFinish(Duration duration) 
    return State.UNKNOWN;
  

  @Override
  public String toString() 
    return "FlinkDetachedRunnerResult";
  

但是,我可以使用 slf4j reporter 查看指标

【讨论】:

@zorro 您如何通过 slf4j 记者查看指标?【参考方案3】:
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
import csv
import logging

GAME_DATA = [
'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921',
'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921',
'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955',
'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955',
'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959',
'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955',
'robot1_team1,team1,9000,1447697463000,2015-11-16 18:11:03.955',
'robot2_team2,team2,1,1447697463000,2015-11-16 20:11:03.955',
'robot2_team2,team2,9000,1447697463000,2015-11-16 21:11:03.955',
'robot1_team1,1000,2447697463000,2915-11-16 21:11:03.955',
'robot2_team2,9000,1447697463000,2015-11-16 21:11:03.955']

class ParseGameEventFn(beam.DoFn):
    def __init__(self):
        super(ParseGameEventFn, self).__init__()
    self.game_events = Metrics.counter(self.__class__, 'game_events')

    def process(self, element, *args, **kwargs):
        try:
            self.game_events.inc()
            row = list(csv.reader([element]))[0]
            if int(row[2]) < 5:
               return
            yield 
                'user': row[0],
                'team': row[1],
                'score': int(row[2]),
                'timestamp': int(row[3]) / 1000.0,
            
        except Exception as ex:
            logging.error('Parse error on : '.format(element, ex))

with beam.Pipeline(options=pipeline_options) as pipeline:
    results = (
        pipeline
        | "Create" >> beam.Create(GAME_DATA)
        | "Parsing" >> beam.ParDo(ParseGameEventFn())
        | "AddEventTimestamps" >> beam.Map(
             lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
        | "Print" >> beam.Map(print))

metric_results = pipeline.result.metrics().query(MetricsFilter().with_name('game_events'))
outputs_user_counter = metric_results['counters'][0]
print(outputs_user_counter.committed)

conf/flink-conf.yaml 中 Prometheus 的 Flink 配置

metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

我可以在 Accumulators 选项卡中看到指标,但在 Metrics 选项卡中看不到。我使用的是 Flink 版本:1.12.0。使用最新的 Apache Beam 主分支代码。

【讨论】:

以上是关于Apache Beam Counter/Metrics 在 Flink WebUI 中不可用的主要内容,如果未能解决你的问题,请参考以下文章

如何运行 Apache Beam 集成测试?

Python 上的 Apache Beam 将 beam.Map 调用相乘

Apache Beam - 跳过管道步骤

什么是 Apache Beam? [关闭]

apache beam ElasticSearchIO 遇到异常后job中断执行 自己定制beam IO

数据流管道上的 Apache Beam StatusRuntimeException