读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)
Posted
技术标签:
【中文标题】读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)【英文标题】:Read a kafka topic and expose the data via Rest API for prometheus to scrape ( Nodejs) 【发布时间】:2021-06-27 09:14:22 【问题描述】:我使用 kafkajs 公开从 kafka 主题读取的数据,以便通过 http 端点公开,以便 prometheus 抓取数据。但我无法公开来自 kafka 主题的数据。我写过这样的生产者和消费者
Producer.js
// import the `Kafka` instance from the kafkajs library
const
Kafka,
logLevel
= require("kafkajs")
const fs = require("fs");
const path = require("path");
// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka(
clientId,
brokers,
// logLevel: logLevel.INFO
)
const producer = kafka.producer()
// we define an async function that writes a new message each second
const produce = async () =>
await producer.connect()
// after the produce has connected, we start an interval timer
try
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send(
topic,
acks: 1,
messages: [
key: "metrics on premise",
value: fs.readFileSync(path.join(__dirname,'metrics.txt'), 'utf8'),
, ],
)
// if the message is written successfully, log it and increment `i`
console.log("writes: #####################")
catch (err)
console.error("could not write message " + err)
module.exports = produce
索引.js
const produce = require("./produce")
const consume = require("./consume")
const fs = require("fs");
const path = require("path");
const express = require('express')
const app = express()
const port = 3003
app.get('/metrics', async (req, res) =>
//res.send(fs.readFileSync(path.join(__dirname,'topic_message.txt'), 'utf8'))
consume(res).catch(err =>
console.error("Error in consumer: ", err)
)
)
app.listen(port, () =>
console.log(`Example app listening at http://localhost:$port`)
)
// call the `produce` function and log an error if it occurs
produce().catch((err) =>
console.error("error in producer: ", err)
)
下面是消费者 Consumer.js
const
Kafka,
logLevel
= require("kafkajs")
const fs = require("fs");
const path = require("path");
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"
const kafka = new Kafka(
clientId,
brokers,
// logCreator: customLogger,
// logLevel: logLevel.DEBUG,
)
const consumer = kafka.consumer(
groupId: clientId,
minBytes: 5,
maxBytes: 1e6,
// wait for at most 3 seconds before receiving new data
maxWaitTimeInMs: 3000,
);
const consume = async (res) =>
// first, we wait for the client to connect and subscribe to the given topic
let myString = "";
await consumer.connect()
await consumer.subscribe(
topic,
fromBeginning: true
)
await consumer.run(
// this function is called every time the consumer gets a new message
eachMessage: (
message
) =>
console.log("Message received ###############################################################################");
res.send(message.value);
,
)
setTimeout(async () =>
await consumer.disconnect();
, 2000);
module.exports = consume
当我点击 api 时,我无法将消费的消息发送到 API
【问题讨论】:
请参阅***.com/help/how-to-ask - 我们至少需要一些代码、错误消息、您认为应该发生的事情与正在发生的事情。 【参考方案1】:除非您以某种方式通过流式 HTTP 响应或使用 websockets(您不在此代码中)进行抓取,否则我不确定这是一个好方法。
如果您真的想将 Kafka 记录发送到 Prometheus,请通过消费者的 PushGateway 发送它们,而不是使用同步 HTTP 抓取
【讨论】:
以上是关于读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)的主要内容,如果未能解决你的问题,请参考以下文章
如何在使用 REST API 创建 Kafka 连接器时定义模式
我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?
从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题
如何在 Spring Boot 中从 Mongodb 读取集合数据并定期发布到 kafka 主题中
使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?