通过 telegraf 将数据流式传输到 Influx 云

Posted

技术标签:

【中文标题】通过 telegraf 将数据流式传输到 Influx 云【英文标题】:Stream data to Influx cloud by telegraf 【发布时间】:2020-06-09 11:22:41 【问题描述】:

谁能帮我将 Telegraf 流式传输到云 InfluxDB 中?我使用这个tutorial,python 脚本在我的本地机器上启动,并将通知推送到 rabbitMQ。 Telegraf 通过此配置订阅了 rabbitMQ。


# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true
  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000
  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
  ## output, and will flush this buffer on a successful write. Oldest metrics
  ## are dropped first when this buffer fills.
  ## This buffer only fills when writes fail to output plugin(s).
  metric_buffer_limit = 10000
  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"
  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"
  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s.
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  precision = ""
  ## Logging configuration:
  ## Run telegraf with debug log messages.
  debug = true
  ## Run telegraf in quiet mode (error log messages only).
  quiet = false
  ## Specify the log file name. The empty string means to log to stderr.
  logfile = ""
  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false
[[outputs.influxdb_v2]]
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ## urls exp: http://127.0.0.1:9999
  urls = ["https://eu-central-1-1.aws.cloud2.influxdata.com"]
  ## Token for authentication.
  token = "$INFLUX_TOKEN"
  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "some@gmail.com"
  ## Destination bucket to write into.
  bucket = "two"
[[inputs.cpu]]
  ## Whether to report per-cpu stats or not
  percpu = true
  ## Whether to report total system cpu stats or not
  totalcpu = true
  ## If true, collect raw CPU time metrics.
  collect_cpu_time = false
  ## If true, compute and report the sum of all non-idle CPU states.
  report_active = false
[[inputs.disk]]
  ## By default stats will be gathered for all mount points.
  ## Set mount_points will restrict the stats to only the specified mount points.
  # mount_points = ["/"]
  ## Ignore mount points by filesystem type.
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.mem]]
[[inputs.net]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]
# # Reads metrics from RabbitMQ servers via the Management Plugin
[[inputs.rabbitmq]]
#   ## Management Plugin url. (default: http://localhost:15672)
url = "http://localhost:15672"
#   ## Tag added to rabbitmq_overview series; deprecated: use tags
#   # name = "rmq-server-1"
#   ## Credentials
username = "guest"
password = "guest"
#
#   ## Optional TLS Config
#   # tls_ca = "/etc/telegraf/ca.pem"
#   # tls_cert = "/etc/telegraf/cert.pem"
#   # tls_key = "/etc/telegraf/key.pem"
#   ## Use TLS but skip chain & host verification
#   # insecure_skip_verify = false
#
#   ## Optional request timeouts
#   ##
#   ## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
#   ## for a server's response headers after fully writing the request.
header_timeout = "3s"
#   ##
#   ## client_timeout specifies a time limit for requests made by this client.
#   ## Includes connection time, any redirects, and reading the response body.
client_timeout = "4s"
#
#   ## A list of nodes to gather as the rabbitmq_node measurement. If not
#   ## specified, metrics for all nodes are gathered.
#   # nodes = ["rabbit@node1", "rabbit@node2"]
#
#   ## A list of queues to gather as the rabbitmq_queue measurement. If not
#   ## specified, metrics for all queues are gathered.
#   # queues = ["telegraf"]
#
#   ## A list of exchanges to gather as the rabbitmq_exchange measurement. If no
[[inputs.mqtt_consumer]]
 name_prefix = "influx"
 servers = ["tcp://rabbitmq:1883"]
 qos = 0
 connection_timeout = "30s"
 topics = [
   "crypto/btc",
   # "crypto/eth",
 ]
 persistent_session = false
 client_id = ""
 data_format = "json"
 json_string_fields

日志显示数据正在写入 influxdb 云

2020-02-25T16:01:53Z I! Starting Telegraf 1.13.3
2020-02-25T16:01:53Z I! Loaded inputs: mqtt_consumer disk diskio net system rabbitmq cpu mem processes swap
2020-02-25T16:01:53Z I! Loaded aggregators: 
2020-02-25T16:01:53Z I! Loaded processors: 
2020-02-25T16:01:53Z I! Loaded outputs: influxdb_v2
2020-02-25T16:01:53Z I! Tags enabled: host=dos4dev
2020-02-25T16:01:53Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"dos4dev", Flush Interval:10s
2020-02-25T16:01:53Z D! [agent] Initializing plugins
2020-02-25T16:01:53Z D! [agent] Connecting outputs
2020-02-25T16:01:53Z D! [agent] Attempting connection to [outputs.influxdb_v2]
2020-02-25T16:01:53Z D! [agent] Successfully connected to outputs.influxdb_v2
2020-02-25T16:01:53Z D! [agent] Starting service inputs
2020-02-25T16:02:00Z D! [inputs.mqtt_consumer] Connecting [tcp://rabbitmq:1883]
2020-02-25T16:02:10Z D! [inputs.mqtt_consumer] Connecting [tcp://rabbitmq:1883]
2020-02-25T16:02:10Z D! [outputs.influxdb_v2] Wrote batch of 78 metrics in 595.462779ms
2020-02-25T16:02:10Z D! [outputs.influxdb_v2] Buffer fullness: 83 / 10000 metrics
2020-02-25T16:02:20Z D! [inputs.mqtt_consumer] Connecting [tcp://rabbitmq:1883]
2020-02-25T16:02:20Z D! [outputs.influxdb_v2] Wrote batch of 83 metrics in 344.265787ms
2020-02-25T16:02:20Z D! [outputs.influxdb_v2] Buffer fullness: 83 / 10000 metrics
2020-02-25T16:02:30Z D! [inputs.mqtt_consumer] Connecting [tcp://rabbitmq:1883]

但我在云 Influxdb 中找不到数据

【问题讨论】:

【参考方案1】:

根据来自 Telegraf 的日志消息,看起来正在写入数据。您是否尝试过按照文档来探索您的数据? https://v2.docs.influxdata.com/v2.0/visualize-data/explore-metrics/

【讨论】:

以上是关于通过 telegraf 将数据流式传输到 Influx 云的主要内容,如果未能解决你的问题,请参考以下文章

通过压缩将HTTP发布多部分/表单数据流式传输并上传到存储中?

BigQuery - 通过 java 流式传输非常慢

如何将音频数据从 Android 流式传输到 WebSocket 服务器?

通过 Spring JDBC 流式传输数据,长度未知

Hadoop - 将来自 HTTP 上传 (PUT) 的数据直接流式传输到 HDFS

使用Tornado将二进制文件流式传输到Google Storage