使用zeebe DebugHttpExporter 查看zeebe 工作流信息

Posted rongfengliang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用zeebe DebugHttpExporter 查看zeebe 工作流信息相关的知识,希望对你有一定的参考价值。

zeebe 提供了一个DebugHttpExporter 可以方便的查看部署以及wokrflow 运行信息
以下是一个简单的运行试用,同时集成了prometheus,添加了一个简单的grafana dashboard

环境准备

  • docker-compose 文件
 
version: "3"
services:
    operate:
        image: camunda/operate:1.1.0
        ports:
            - "8080:8080"
        volumes:
            - "./application.yml:/usr/local/operate/config/application.yml"
    grafana:
        image: grafana/grafana
        ports:
            - "3000:3000"
    nginx-grpc-lb:
        image: openresty/openresty:alpine
        volumes: 
        - "./nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf"
        ports:
        - "27500:26500"
    prometheus:
        image: prom/prometheus
        volumes:
            - "./prometheus.yml:/etc/prometheus/prometheus.yml"
        ports:
            - "9090:9090"
    broker-1:
        image: camunda/zeebe:${ZEEBE_VERSION:-latest}
        ports:
            - 26500:26500
            - 26501:26501
            - 5701:5701
            - 8000:8000
            - 9600:9600
        environment:
            - ZEEBE_LOG_LEVEL=${ZEEBE_LOG_LEVEL:-debug}
            - ZEEBE_NODE_ID=0
            - ZEEBE_PARTITIONS_COUNT=1
            - ZEEBE_CLUSTER_SIZE=1
            - ZEEBE_REPLICATION_FACTOR=1
        volumes:
            - ./broker_1:/usr/local/zeebe/data
            - ./zeebe.cfg.toml:/usr/local/zeebe/conf/zeebe.cfg.toml
    elasticsearch:
        image: elasticsearch:6.7.1
        container_name: elasticsearch
        environment:
            - "discovery.type=single-node"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        ports:
            - 9200:9200
            - 9300:9300 # required for Performance Analyzer
 
 
  • operate 配置文件
    application.yml 文件
 
# Operate configuration file
?
camunda.operate:
  # ELS instance to store Operate data
  elasticsearch:
    # Cluster name
    clusterName: elasticsearch
    # Host
    host: elasticsearch
    # Transport port
    port: 9200
  # Zeebe instance
  zeebe:
    # Broker contact point
    brokerContactPoint: broker-1:26500
  # ELS instance to export Zeebe data to
  zeebeElasticsearch:
    # Cluster name
    clusterName: elasticsearch
    # Host
    host: elasticsearch
    # Transport port
    port: 9200
    # Index prefix, configured in Zeebe Elasticsearch exporter
    prefix: zeebe-record
logging:
  level:
    ROOT: INFO
    org.camunda.operate: DEBUG
#Spring Boot Actuator endpoints to be exposed
management.endpoints.web.exposure.include: health,info,conditions,configprops,prometheus
  • zeebe 配置
    zeebe.cfg.toml 文件
 
# Zeebe broker configuration file
?
# Overview -------------------------------------------
?
# This file contains a complete list of available configuration options.
?
# Default values:
#
# When the default value is used for a configuration option, the option is
# commented out. You can learn the default value from this file
?
# Conventions:
#
# Byte sizes
# For buffers and others must be specified as strings and follow the following
# format: "10U" where U (unit) must be replaced with K = Kilobytes, M = Megabytes or G = Gigabytes.
# If unit is omitted then the default unit is simply bytes.
# Example:
# sendBufferSize = "16M" (creates a buffer of 16 Megabytes)
#
# Time units
# Timeouts, intervals, and the likes, must be specified as strings and follow the following
# format: "VU", where:
# - V is a numerical value (e.g. 1, 1.2, 3.56, etc.)
# - U is the unit, one of: ms = Millis, s = Seconds, m = Minutes, or h = Hours
#
# Paths:
# Relative paths are resolved relative to the installation directory of the
# broker.
?
# ----------------------------------------------------
?
?
[gateway]
# Enable the embedded gateway to start on broker startup.
# This setting can also be overridden using the environment variable ZEEBE_EMBED_GATEWAY.
# enable = true
?
[gateway.network]
# Sets the host the embedded gateway binds to.
# This setting can be specified using the following precedence:
# 1. setting the environment variable ZEEBE_GATEWAY_HOST
# 2. setting gateway.network.host property in this file
# 3. setting the environment variable ZEEBE__HOST
# 4. setting network.host property in this file
# host = "0.0.0.0"
?
# Sets the port the embedded gateway binds to.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_PORT.
# port = 26500
?
[gateway.cluster]
# Sets the broker the gateway should initial contact.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CONTACT_POINT.
# contactPoint = "127.0.0.1:26501"
?
# Sets size of the transport buffer to send and received messages between gateway and broker cluster.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_TRANSPORT_BUFFER.
# transportBuffer = "128M"
?
# Sets the timeout of requests send to the broker cluster
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_REQUEST_TIMEOUT.
# requestTimeout = "15s"
?
[gateway.threads]
# Sets the number of threads the gateway will use to communicate with the broker cluster
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_MANAGEMENT_THREADS.
# managementThreads = 1
?
[gateway.monitoring]
# Enables the metrics collection in the gateway
enabled = true
?
[network]
?
# This section contains the network configuration. Particularly, it allows to
# configure the hosts and ports the broker should bind to. The broker exposes two sockets:
# 1. command: the socket which is used for gateway-to-broker communication
# 2. internal: the socket which is used for broker-to-broker communication
# 3. monitoring: the socket which is used to monitor the broker
?
# Controls the default host the broker should bind to. Can be overwritten on a
# per binding basis for client, management and replication
#
# This setting can also be overridden using the environment variable ZEEBE_HOST.
# host = "0.0.0.0"
?
# If a port offset is set it will be added to all ports specified in the config
# or the default values. This is a shortcut to not always specifying every port.
#
# The offset will be added to the second last position of the port, as Zeebe
# requires multiple ports. As example a portOffset of 5 will increment all ports
# by 50, i.e. 26500 will become 26550 and so on.
#
# This setting can also be overridden using the environment variable ZEEBE_PORT_OFFSET.
# portOffset = 0
?
[network.commandApi]
# Overrides the host used for gateway-to-broker communication
# host = "localhost"
?
# Sets the port used for gateway-to-broker communication
# port = 26501
?
# Sets the size of the buffer used for buffering outgoing messages
# sendBufferSize = "16M"
?
[network.internalApi]
# Overrides the host used for internal broker-to-broker communication
# host = "localhost"
?
# Sets the port used for internal broker-to-broker communication
# port = 26502
?
[network.monitoringApi]
# Overrides the host used for exposing monitoring information
  host = "0.0.0.0"
?
# Sets the port used for exposing monitoring information
  port = 9600
?
?
[data]
?
# This section allows to configure Zeebe‘s data storage. Data is stored in
# "partition folders". A partition folder has the following structure:
#
# partition-0 (root partition folder)
# ├── partition.json (metadata about the partition)
# ├── segments (the actual data as segment files)
# │ ├── 00.data
# │ └── 01.data
# └── state (stream processor state and snapshots)
# └── stream-processor
# ├── runtime
# └── snapshots
?
# Specify a list of directories in which data is stored. Using multiple
# directories makes sense in case the machine which is running Zeebe has
# multiple disks which are used in a JBOD (just a bunch of disks) manner. This
# allows to get greater throughput in combination with a higher io thread count
# since writes to different disks can potentially be done in parallel.
#
# This setting can also be overridden using the environment variable ZEEBE_DIRECTORIES.
# directories = [ "data" ]
?
# The size of data log segment files.
# logSegmentSize = "512M"
?
# How often we take snapshots of streams (time unit)
# snapshotPeriod = "15m"
?
# The maximum number of snapshots kept (must be a positive integer). When this 
# limit is passed the oldest snapshot is deleted.
# maxSnapshots = "3"
#
# How often follower partitions will check for new snapshots to replicate from
# the leader partitions. Snapshot replication enables faster failover by
# reducing how many log entries must be reprocessed in case of leader change.
# snapshotReplicationPeriod = "5m"
?
?
[cluster]
?
# This section contains all cluster related configurations, to setup an zeebe cluster
?
# Specifies the unique id of this broker node in a cluster.
# The id should be between 0 and number of nodes in the cluster (exclusive).
#
# This setting can also be overridden using the environment variable ZEEBE_NODE_ID.
# nodeId = 0
?
# Controls the number of partitions, which should exist in the cluster.
#
# This can also be overridden using the environment variable ZEEBE_PARTITIONS_COUNT.
# partitionsCount = 1
?
# Controls the replication factor, which defines the count of replicas per partition.
# The replication factor cannot be greater than the number of nodes in the cluster.
#
# This can also be overridden using the environment variable ZEEBE_REPLICATION_FACTOR.
# replicationFactor = 1
?
# Specifies the zeebe cluster size. This value is used to determine which broker
# is responsible for which partition.
#
# This can also be overridden using the environment variable ZEEBE_CLUSTER_SIZE.
# clusterSize = 1
?
# Allows to specify a list of known other nodes to connect to on startup
# The contact points of the internal network configuration must be specified.
# The format is [HOST:PORT]
# Example:
# initialContactPoints = [ "192.168.1.22:26502", "192.168.1.32:26502" ]
#
# This setting can also be overridden using the environment variable ZEEBE_CONTACT_POINTS
# specifying a comma-separated list of contact points.
#
# To guarantee the cluster can survive network partitions, all nodes must be specified
# as initial contact points.
#
# Default is empty list:
# initialContactPoints = []
?
# Allows to specify a name for the cluster
# This setting can also be overridden using the environment variable ZEEBE_CLUSTER_NAME
# Example:
# clusterName = "zeebe-cluster"
?
[threads]
?
# Controls the number of non-blocking CPU threads to be used. WARNING: You
# should never specify a value that is larger than the number of physical cores
# available. Good practice is to leave 1-2 cores for ioThreads and the operating
# system (it has to run somewhere). For example, when running Zeebe on a machine
# which has 4 cores, a good value would be 2.
#
# The default value is 2.
#cpuThreadCount = 2
?
# Controls the number of io threads to be used. These threads are used for
# workloads that write data to disk. While writing, these threads are blocked
# which means that they yield the CPU.
#
# The default value is 2.
#ioThreadCount = 2
?
# Configure exporters below; note that configuration parsing conventions do not apply to exporter
# arguments, which will be parsed as normal TOML.
#
# Each exporter should be configured following this template:
#
# id:
# property should be unique in this configuration file, as it will server as the exporter
# ID for loading/unloading.
# jarPath:
# path to the JAR file containing the exporter class. JARs are only loaded once, so you can define
# two exporters that point to the same JAR, with the same class or a different one, and use args
# to parametrize its instantiation.
# className:
# entry point of the exporter, a class which *must* extend the io.zeebe.exporter.Exporter
# interface.
#
# A nested table as [exporters.args] will allow you to inject arbitrary arguments into your
# class through the use of annotations.
#
# Enable the following debug exporter to log the exported records to console
# This exporter can also be enabled using the environment variable ZEEBE_DEBUG, the pretty print
# option will be enabled if the variable is set to "pretty".
#
# [[exporters]]
# id = "debug-log"
# className = "io.zeebe.broker.exporter.debug.DebugLogExporter"
# [exporters.args]
# logLevel = "debug"
# prettyPrint = false
#
# Enable the following debug exporter to start a http server to inspect the exported records
#
?
# 启动http 信息查看exporter
[[exporters]]
  id = "debug-http"
  className = "io.zeebe.broker.exporter.debug.DebugHttpExporter"
  [exporters.args]
   port = 8000
   limit = 1024
#
#
# An example configuration for the elasticsearch exporter:
# es exporter 配置信息查看
[[exporters]]
id = "elasticsearch"
className = "io.zeebe.exporter.ElasticsearchExporter"
  [exporters.args]
  url = "http://elasticsearch:9200"
#
  [exporters.args.bulk]
  delay = 5
  size = 1_000
#
  [exporters.args.authentication]
# username = elastic
# password = changeme
#
  [exporters.args.index]
  prefix = "zeebe-record"
# createTemplate = true
  command = false
  event = true
  rejection = false
  deployment = true
  error = true
  incident = true
  job = true
  jobBatch = false
  message = false
  messageSubscription = false
  variable = true
  variableDocument = false
  workflowInstance = true
  workflowInstanceCreation = false
  workflowInstanceSubscription = false
?
?
   # If true, the exporter update its position after publish the record to Hazelcast.
   # Otherwise, it never update its position. On broker start, it will always start from the begin of the log.
   # CAUTION! The broker can‘t delete data and may run out of disk space if set to false.
 
 
  • prometheus 配置
    prometheus.yml 文件
 
scrape_configs:
  - job_name: brokers
    metrics_path: /metrics
    scrape_interval: 10s
    scrape_timeout: 10s
    static_configs:
      - targets: [‘broker-1:9600‘]
 
 
  • nginx 配置
    主要是基于nginx proxy grpc 作为lb
worker_processes 2;
user root;  
events {
    worker_connections 65536;
}
http {
    include mime.types;
    default_type application/octet-stream;
    sendfile on;
    lua_code_cache off;
    lua_need_request_body on;
    gzip on;
    resolver 127.0.0.11 ipv6=off;          
    real_ip_header X-Forwarded-For;
    real_ip_recursive on;
    gzip_min_length 2k;
    gzip_buffers 4 16k;
    log_format compression ‘$remote_addr - $remote_user [$time_local] ‘
                       ‘"$request" $status $bytes_sent ‘
                       ‘"$http_referer" "$http_user_agent" "$gzip_ratio"‘;
?
    gzip_comp_level 4;
    gzip_types text/plain text/css image/png application/javascript image/jpeg image/gif;
    upstream grpcservers {
      server broker-1:26500;
    }
    server {
        listen 26500 http2;
        location / {
            grpc_pass grpc://grpcservers;
            error_page 502 = /error502grpc;
        }
        location = /error502grpc {
            internal;
            default_type application/grpc;
            add_header grpc-status 14;
            add_header grpc-message "unavailable";
            return 204;
        }
    }
    server {
        listen 8080;
        server_name _;
        charset utf-8;
        default_type text/html;
        location / {
           default_type text/plain;
           index index.html index.htm;
        }
        location = /favicon.ico {
            root /opt/app/static;
        }
        location = /empty {
            empty_gif;
        }
        error_page 500 502 503 504 /50x.html;
        location = /50x.html {
            root html;
        }
    }
}
  • flow 定义
    flow.bpmn 文件
 
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="1.5.0-nightly">
  <bpmn:process id="demoProcess" isExecutable="true">
    <bpmn:startEvent id="start" name="start">
      <bpmn:outgoing>SequenceFlow_1sz6737</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="SequenceFlow_1sz6737" sourceRef="start" targetRef="taskA" />
    <bpmn:sequenceFlow id="SequenceFlow_06ytcxw" sourceRef="taskA" targetRef="taskB" />
    <bpmn:sequenceFlow id="SequenceFlow_1oh45y7" sourceRef="taskB" targetRef="taskC" />
    <bpmn:endEvent id="end" name="end">
      <bpmn:incoming>SequenceFlow_148rk2p</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="SequenceFlow_148rk2p" sourceRef="taskC" targetRef="end" />
    <bpmn:serviceTask id="taskA" name="task A">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="foo" />
      </bpmn:extensionElements>
      <bpmn:incoming>SequenceFlow_1sz6737</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_06ytcxw</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:serviceTask id="taskB" name="task B">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="bar" />
      </bpmn:extensionElements>
      <bpmn:incoming>SequenceFlow_06ytcxw</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_1oh45y7</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:serviceTask id="taskC" name="task C">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="foo" />
      </bpmn:extensionElements>
      <bpmn:incoming>SequenceFlow_1oh45y7</bpmn:incoming>
      <bpmn:outgoing>SequenceFlow_148rk2p</bpmn:outgoing>
    </bpmn:serviceTask>
  </bpmn:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="demoProcess">
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="start">
        <dc:Bounds x="173" y="102" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="180" y="138" width="22" height="12" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="SequenceFlow_1sz6737_di" bpmnElement="SequenceFlow_1sz6737">
        <di:waypoint xsi:type="dc:Point" x="209" y="120" />
        <di:waypoint xsi:type="dc:Point" x="310" y="120" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="260" y="105" width="0" height="0" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_06ytcxw_di" bpmnElement="SequenceFlow_06ytcxw">
        <di:waypoint xsi:type="dc:Point" x="410" y="120" />
        <di:waypoint xsi:type="dc:Point" x="502" y="120" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="456" y="105" width="0" height="0" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="SequenceFlow_1oh45y7_di" bpmnElement="SequenceFlow_1oh45y7">
        <di:waypoint xsi:type="dc:Point" x="602" y="120" />
        <di:waypoint xsi:type="dc:Point" x="694" y="120" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="648" y="105" width="0" height="0" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="EndEvent_0gbv3sc_di" bpmnElement="end">
        <dc:Bounds x="867" y="102" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="876" y="138" width="18" height="12" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="SequenceFlow_148rk2p_di" bpmnElement="SequenceFlow_148rk2p">
        <di:waypoint xsi:type="dc:Point" x="794" y="120" />
        <di:waypoint xsi:type="dc:Point" x="867" y="120" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="831" y="105" width="0" height="0" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="ServiceTask_09m0goq_di" bpmnElement="taskA">
        <dc:Bounds x="310" y="80" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="ServiceTask_0sryj72_di" bpmnElement="taskB">
        <dc:Bounds x="502" y="80" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="ServiceTask_1xu4l3g_di" bpmnElement="taskC">
        <dc:Bounds x="694" y="80" width="100" height="80" />
      </bpmndi:BPMNShape>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>
 
 

启动&&测试

  • 构建镜像
docker-compose build
  • 启动
docker-compose up -d
  • DebugHttpExporter 效果

部署一个flow

./zbctl.darwin --insecure --address 127.0.0.1:27500 deploy flow.bpmn

创建一个instance

./zbctl.darwin --insecure --address 127.0.0.1:27500 create instance demoProcess

或者使用cli 批量创建

sh app.sh

app.sh 内容

#!/bin/sh
?
while true 
do
 ./zbctl.darwin --insecure --address 127.0.0.1:27500 create instance demoProcess
done
 

技术图片

 

 


grafana 信息
技术图片

 

 

参考资料

https://docs.zeebe.io/reference/exporters.html
https://github.com/rongfengliang/zeebe-debughttp-exporter-demo

以上是关于使用zeebe DebugHttpExporter 查看zeebe 工作流信息的主要内容,如果未能解决你的问题,请参考以下文章

zeebe框架最新消息

BPM技术Zeebe是一个用于微服务编排的工作流引擎。

What is Zeebe?

Performance Profiling Zeebe

zeebe 0.20.0 集群部署试用

zeebe 0.22 版本发布