有没有办法将融合模式注册表与 kafka-node 模块一起使用?

Posted

技术标签:

【中文标题】有没有办法将融合模式注册表与 kafka-node 模块一起使用?【英文标题】:Is there any way to use confluent schema registry with kafka-node module? 【发布时间】:2018-04-06 10:12:30 【问题描述】:

我在 node.js 中实现了 Avro 模式,模式与消息负载一起发送。它工作正常。我正在寻找是否有任何方法可以将模式注册表与 Kafka-node 模块一起使用。我已经探索过,但没有成功找到。

在每条消息中发送模式会增加消息大小?与使用模式注册表相比,它会影响性能吗?

我们将不胜感激。

【问题讨论】:

你找到解决办法了吗? 如果你搜索的话,Github 上有一些库。例如github.com/waldophotos/kafka-avro 【参考方案1】:

您可以使用“avro-schema-registry”模块。 它对我有用。我也是 Kafka 的新手,只是尝试一下。

const kafka = require('kafka-node');
const avroSchemaRegistry = require('avro-schema-registry');

/* Configuration */
const kafkaTopic = 'newkafkatopic';//'kafka.test';
const host =  'localhost:9092';
const schemaRegistry = 'http://localhost:8081';

const Consumer = kafka.Consumer;
const Client = kafka.KafkaClient;
const registry = avroSchemaRegistry(schemaRegistry);

var client = new Client(host);
var topics = [
  topic: kafkaTopic
];

var options = 
  autoCommit: false,
  fetchMaxWaitMs: 1000,
  fetchMaxBytes: 1024 * 1024,
  encoding: 'buffer'
;

var consumer = new Consumer(client, topics, options);

consumer.on('message', function(rawMessage) 
  console.log("Raw Message", rawMessage);

  registry.decode(rawMessage.value)
    .then((msg) => 
      console.log(msg)
    )
    .catch(err=>console.log(err))
);

consumer.on('error', (e) => 
  console.log(e.message)
  consumer.close();
)

【讨论】:

是的,要扩展之前的评论,请添加示例输入和输出。 您好,只是想知道rawMessage.value 您收到的数据格式是什么?它的缓冲区还是其他什么?

以上是关于有没有办法将融合模式注册表与 kafka-node 模块一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

融合模式注册表持久性

如何以编程方式从 Python 中的融合模式注册表中获取模式

在 databricks 中使用具有基本身份验证的融合 kafka-schema-registry-client 和托管融合模式注册表

使用带有kafka引擎的clickhouse进行融合模式注册表身份验证

通过 kafka-avro-console-producer 和融合模式注册表使用 RecordNameStrategy

无法使用 kafka-node 向 kafka Producer 发送消息