读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)



【中文标题】读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)【英文标题】:Read a kafka topic and expose the data via Rest API for prometheus to scrape ( Nodejs) 【发布时间】:2021-06-27 09:14:22 【问题描述】:

我使用 kafkajs 公开从 kafka 主题读取的数据,以便通过 http 端点公开,以便 prometheus 抓取数据。但我无法公开来自 kafka 主题的数据。我写过这样的生产者和消费者


 // import the `Kafka` instance from the kafkajs library
 = require("kafkajs")
const fs = require("fs");
const path = require("path");

// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"

// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka(
    // logLevel: logLevel.INFO
const producer = kafka.producer()

// we define an async function that writes a new message each second
const produce = async () => 
    await producer.connect()
    // after the produce has connected, we start an interval timer

        // send a message to the configured topic with
        // the key and value formed from the current value of `i`
        await producer.send(
            acks: 1,
            messages: [
                key: "metrics on premise",
                value: fs.readFileSync(path.join(__dirname,'metrics.txt'), 'utf8'),
            , ],

        // if the message is written successfully, log it and increment `i`
        console.log("writes:  #####################")
     catch (err) 
        console.error("could not write message " + err)

module.exports = produce


const produce = require("./produce")
const consume = require("./consume")
const fs = require("fs");
const path = require("path");

const express = require('express')
const app = express()
const port = 3003

app.get('/metrics', async (req, res) => 
    //res.send(fs.readFileSync(path.join(__dirname,'topic_message.txt'), 'utf8'))

    consume(res).catch(err => 
        console.error("Error in consumer: ", err)

app.listen(port, () => 
    console.log(`Example app listening at http://localhost:$port`)

// call the `produce` function and log an error if it occurs
produce().catch((err) => 
    console.error("error in producer: ", err)

下面是消费者 Consumer.js

 = require("kafkajs")
const fs = require("fs");
const path = require("path");
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"

const kafka = new Kafka(
    // logCreator: customLogger,
    // logLevel: logLevel.DEBUG,
const consumer = kafka.consumer(
    groupId: clientId,
    minBytes: 5,
    maxBytes: 1e6,
    // wait for at most 3 seconds before receiving new data
    maxWaitTimeInMs: 3000,

const consume = async (res) => 
    // first, we wait for the client to connect and subscribe to the given topic

    let myString = "";
    await consumer.connect()
    await consumer.subscribe(
        fromBeginning: true
    await consumer.run(
        // this function is called every time the consumer gets a new message
        eachMessage: (
        ) => 
            console.log("Message received ###############################################################################");

    setTimeout(async () => 
        await consumer.disconnect();
    , 2000);

module.exports = consume

当我点击 api 时,我无法将消费的消息发送到 API


请参阅***.com/help/how-to-ask - 我们至少需要一些代码、错误消息、您认为应该发生的事情与正在发生的事情。 【参考方案1】:

除非您以某种方式通过流式 HTTP 响应或使用 websockets(您不在此代码中)进行抓取,否则我不确定这是一个好方法。

如果您真的想将 Kafka 记录发送到 Prometheus,请通过消费者的 PushGateway 发送它们,而不是使用同步 HTTP 抓取


以上是关于读取 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)的主要内容,如果未能解决你的问题,请参考以下文章

如何在使用 REST API 创建 Kafka 连接器时定义模式

我可以使用 Spring WebFlux 实现通过 Kafka 请求/响应主题获取数据的 REST 服务吗?

从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题

如何在 Spring Boot 中从 Mongodb 读取集合数据并定期发布到 kafka 主题中

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?
