为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?相关的知识,希望对你有一定的参考价值。

我正在尝试使用kafkajs管理员客户端获取我的kafka经纪人主题的元数据。我已经用Node.js + express.js编写了服务器。

这是我的index.js文件,它是npm的入口点。

'use strict';

const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');

const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());

const adminConfig = new Admin({
    clientId: 'admin-client-4981',
    brokers: ['localhost:9092']
})

const admin = adminConfig.getAdmin();




.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.




//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
    const topic = req.body;
    const topicmetadata = new TopicMetaData(admin);

    topicmetadata.setTopicConfig(topic);
    topicmetadata.commit(req, res);
});

app.listen(4040);

这是create-admin-client.js文件,用于检索管理对象。

'use strict';

const { Kafka } = require('kafkajs');

class Admin {
    constructor(kafkaConfig) {
         this.kafka = new Kafka({
            clientId: kafkaConfig.clientId,
            brokers: kafkaConfig.brokers
        });
    }

    getAdmin() {
        return this.kafka.admin();
    };
}

module.exports = Admin;

这是topics-metadata-fetch.js文件,用于获取主题的元数据。

'use strict';

class TopicMetaData {

    constructor(admin) {
        this.admin = admin;
    }

    setTopicConfig(topicConfig) {
        this.topic = topicConfig;
    }

    commit(req, res) {
        this.admin.fetchTopicMetadata({
            topics: this.topic
        })
        .then((topics) => {
            console.log("Topic MetaData Fetched Successfully!");
            res.status(200).send({
                topics
            });
        })
        .catch((err) => {
            console.error(err);
            res.status(500).send(err);
        })
    }
}

module.exports = TopicMetaData;

[每当我发送POST请求以获取主题的元数据时(例如'SERVICE-TYPES',我已经成功创建了主题),并且req.body

{
    "topic": "SERVICE-TYPES",
    "partitions": [{
        "partitionErrorCode": 0,
        "partitionId": 0,
        "leader": 0,
        "replicas": [0],
        "isr": [0]
    }]
}

它返回TypeError: topics.forEach is not a function错误。我哪里出错了?

答案

我发现我的http请求正文应具有以下结构:

{
    "topics": [{
        "topic": "SERVICE_TYPES",
        "partitions": [{
            "partitionErrorCode": 0,
            "partitionId": 0,
            "leader": 0,
            "replicas": [0],
            "isr": [0]
        }]
    }]
}

和我的路线处理程序应该像:

app.get('/api/v1/dev/admin/topicmetadata', (req, res) => {
        //topic should be array of the topics
        const topic = req.body.topics;
        const topicmetadata = new TopicMetaData(admin);

        topicmetadata.setTopicConfig(topic);
        topicmetadata.commit(req, res);
});

所以我传递了错误的请求正文结构。

以上是关于为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?的主要内容,如果未能解决你的问题,请参考以下文章

nodejs中的kafkajs,消费顺序,不重复消费

nodejs中的kafkajs,消费顺序,不重复消费

什么参数对 Node js 的 Google Document AI 客户端库无效?

node.js - 向不同客户端发送不同数据的良好实现是啥?

node.js + MySQL & JSON-result - 回调问题 & 对客户端无响应

Node.js xmpp 客户端收到“BAD_REGISTRATION”错误作为消息响应