Kafka生产者中为消息类创建Schema
Posted cbyconquer
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者中为消息类创建Schema相关的知识,希望对你有一定的参考价值。
Kafka是当今主流的消息中间件,我们要用其实现(Kafka生产者)发送消息的功能,要先有定义消息的所有属性(字段)的消息类,然后才能用Kafka serializer来发送消息。
而这个消息类要由Schema生成,所以我们要先创建Schema。
本文以创建avro格式的schema为例,讲述为Kafka生产者创建schema的步骤,希望能帮助到刚开始作Kafka项目的朋友,也欢迎大家指正。
步骤如下:
1. 创建avro格式的schema文件,比如demoMessage.avsc,在其中定义“要把由该schema生成的消息类放到哪个package下面(通过定义namespace来定义)”、该schema的类型(type)、“要由该schema生成的消息类的类名(通过定义name来定义)、消息包含的字段(通过定义fields来定义)。例如:
"namespace": "demo.kafka.model",
"type": "record",
"name": "DemoMessage",
"fields": [
"name": "category", "type": "string",
"name": "product", "type": "string",
"name": "vendor", "type": "string", "default": "ABC",
]
2. 更新该项目的pom文件以支持avro插件,可参考https://dzone.com/articles/using-avros-code-generation,例如:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>$avro.version</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> </execution> </executions> </plugin> ... <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>$avro.version</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>$avro.version</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-compiler</artifactId> <version>$avro.version</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>$avro.version</version> </dependency> </dependencies>
3. 在IDE(eclipse或IDEA)里,右击该schema文件demoMessage.avsc、选择"maven -> generate sources"就能够生成对应的消息类DemoMessage.class了。
每次生产 Kafka 消息时都需要连接吗?
【中文标题】每次生产 Kafka 消息时都需要连接吗?【英文标题】:Do we need to connect everytime we producer Kafka message? 【发布时间】:2021-07-18 11:32:26 【问题描述】:我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和清洁。
我正在使用 Node 和使用 KafkaJS 的 Kafka。
我对此很陌生,我在互联网上的任何地方都找不到完美/好的方法来在生产应用程序中使用它。
QUESTION(简而言之):我们是否需要在每次生成新消息时都以生产者身份连接。我们不能以 Redis 或 NATS 的身份保持连接。
这是我迄今为止尝试过的:
用例
假设每次创建新用户时我都需要发送一条消息。
1。创建 afka 客户端
创建了 kafka 客户端,这样我们就不必每次都重新配置它了
import Kafka from 'kafkajs';
class KafkaClient
private _client: Kafka;
get client()
if (!this._client)
throw new Error('Cannot access client before initializing it');
else
return this._client;
connect(clientId: string, brokers: string[])
this._client = new Kafka(
clientId,
brokers,
);
export const producerClient = new KafkaClient();
2。创建抽象发布者类
为所有类型的生产者创建了kafka生产者抽象类
import Kafka, Message from 'kafkajs';
import Topics from './topics';
interface Event
topic: Topics;
data: Message;
export abstract class Publisher<T extends Event>
private client: Kafka;
abstract topic: Topics;
constructor(client: Kafka)
this.client = client;
async publish(data: T['data']): Promise<void>
const producer = this.client.producer();
await producer.connect();
await producer.send(
topic: this.topic,
messages: [data],
);
3。最后是用户创建的生产者(继承自基类)
所有生产者都有自己的这些类
import Publisher from './publisher';
import Topics from './topics';
interface Event
topic: Topics.USER_CREATED;
data:
value: string;
;
export class UserCreatedPublisher extends Publisher<Event>
topic: Topics = Topics.USER_CREATED;
PRODUCING 用户创建的事件/消息
在nodejs路由中使用
import Router from 'express';
import producerClient from './kafka-client';
import UserCreatedPublisher from './user-created-publisher';
const router = Router();
router.post('/create-user', async (req, res) =>
const email, password = req.body;
// send this to the other service using kafka
await new UserCreatedPublisher(producerClient.client).publish(
value: JSON.stringify( email, password ),
console.log('Message published');
res.status(201).send();
);
【问题讨论】:
您不应该按记录连接/断开连接。您是否从某个地方复制了此模式?如果在导出之前连接生产者会发生什么? 【参考方案1】:问:我们是否需要在每次生成新消息时都以生产者身份连接? 我的回答:没有
在这种情况下我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。
例如为 kafka producer 创建一个类
import Kafka from 'kafkajs';
export class KafkaProducer
private static instance: KafkaProducer;
private _producer = null;
private _isConnected = false;
private constructor()
const kafka = new Kafka(
clientId: process.env.KAFKA_CLIENTID,
brokers: [process.env.KAFKA_SERVER],
);
this._producer = kafka.producer();
public static getInstance(): KafkaProducer
if (!KafkaProducer.instance)
KafkaProducer.instance = new KafkaProducer();
return KafkaProducer.instance;
public get isConnected()
return this._isConnected;
async connect(): Promise<void>
try
await this._producer.connect();
this._isConnected = true;
catch (err)
console.error(err);
get producer()
return this._producer;
并使用此生产者在其他代码位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)
let kafka = KafkaProducer.getInstance();
if (!kafka.isConnected)
await kafka.connect();
await kafka.producer.send(
topic: 'topicName',
messages: [
value: JSON.stringify(valueObj),
,
],
);
并在应用关闭前断开连接
try
let kafka = KafkaProducer.getInstance();
if (kafka.isConnected)
let producer = kafka.producer;
await producer.disconnect();
catch (err)
console.error(err);
【讨论】:
以上是关于Kafka生产者中为消息类创建Schema的主要内容,如果未能解决你的问题,请参考以下文章