为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?相关的知识,希望对你有一定的参考价值。
我正在尝试使用kafkajs管理员客户端获取我的kafka经纪人主题的元数据。我已经用Node.js + express.js编写了服务器。
这是我的index.js
文件,它是npm的入口点。
'use strict';
const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const adminConfig = new Admin({
clientId: 'admin-client-4981',
brokers: ['localhost:9092']
})
const admin = adminConfig.getAdmin();
.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.
//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
const topic = req.body;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
app.listen(4040);
这是create-admin-client.js
文件,用于检索管理对象。
'use strict';
const { Kafka } = require('kafkajs');
class Admin {
constructor(kafkaConfig) {
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
}
getAdmin() {
return this.kafka.admin();
};
}
module.exports = Admin;
这是topics-metadata-fetch.js
文件,用于获取主题的元数据。
'use strict';
class TopicMetaData {
constructor(admin) {
this.admin = admin;
}
setTopicConfig(topicConfig) {
this.topic = topicConfig;
}
commit(req, res) {
this.admin.fetchTopicMetadata({
topics: this.topic
})
.then((topics) => {
console.log("Topic MetaData Fetched Successfully!");
res.status(200).send({
topics
});
})
.catch((err) => {
console.error(err);
res.status(500).send(err);
})
}
}
module.exports = TopicMetaData;
[每当我发送POST请求以获取主题的元数据时(例如'SERVICE-TYPES',我已经成功创建了主题),并且req.body
为
{
"topic": "SERVICE-TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}
它返回TypeError: topics.forEach is not a function
错误。我哪里出错了?
答案
我发现我的http请求正文应具有以下结构:
{
"topics": [{
"topic": "SERVICE_TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}]
}
和我的路线处理程序应该像:
app.get('/api/v1/dev/admin/topicmetadata', (req, res) => {
//topic should be array of the topics
const topic = req.body.topics;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
所以我传递了错误的请求正文结构。
以上是关于为什么我的kafkajs客户端(Node.js / express.js)在获取主题元数据时抛出'TypeError:topic.forEach不是函数?的主要内容,如果未能解决你的问题,请参考以下文章
什么参数对 Node js 的 Google Document AI 客户端库无效?
node.js - 向不同客户端发送不同数据的良好实现是啥?