Kafka生产者中为消息类创建Schema

Posted cbyconquer

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka生产者中为消息类创建Schema相关的知识,希望对你有一定的参考价值。

        Kafka是当今主流的消息中间件,我们要用其实现(Kafka生产者)发送消息的功能,要先有定义消息的所有属性(字段)的消息类,然后才能用Kafka serializer来发送消息。

        而这个消息类要由Schema生成,所以我们要先创建Schema。

        本文以创建avro格式的schema为例,讲述为Kafka生产者创建schema的步骤,希望能帮助到刚开始作Kafka项目的朋友,也欢迎大家指正。

        步骤如下:

        1. 创建avro格式的schema文件,比如demoMessage.avsc,在其中定义“要把由该schema生成的消息类放到哪个package下面(通过定义namespace来定义)”、该schema的类型(type)、“要由该schema生成的消息类的类名(通过定义name来定义)、消息包含的字段(通过定义fields来定义)。例如:

       

        "namespace": "demo.kafka.model",

        "type": "record",

        "name": "DemoMessage",

        "fields": [

                "name": "category", "type": "string",

                "name": "product", "type": "string",

                "name": "vendor", "type": "string", "default": "ABC",

        ]

       

        2. 更新该项目的pom文件以支持avro插件,可参考https://dzone.com/articles/using-avros-code-generation,例如:

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>$avro.version</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>      
      </goals>
    </execution>
  </executions>
</plugin>

...

<dependencies>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>$avro.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>$avro.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-compiler</artifactId>
    <version>$avro.version</version>
  </dependency>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-ipc</artifactId>
    <version>$avro.version</version>
  </dependency>
</dependencies>

        3. 在IDE(eclipse或IDEA)里,右击该schema文件demoMessage.avsc、选择"maven -> generate sources"就能够生成对应的消息类DemoMessage.class了。

每次生产 Kafka 消息时都需要连接吗?

【中文标题】每次生产 Kafka 消息时都需要连接吗?【英文标题】:Do we need to connect everytime we producer Kafka message? 【发布时间】:2021-07-18 11:32:26 【问题描述】:

我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和清洁。

我正在使用 Node 和使用 KafkaJS 的 Kafka

对此很陌生,我在互联网上的任何地方都找不到完美/好的方法来在生产应用程序中使用它。

QUESTION(简而言之):我们是否需要在每次生成新消息时都以生产者身份连接。我们不能以 Redis 或 NATS 的身份保持连接。

这是我迄今为止尝试过的:

用例

假设每次创建新用户时我都需要发送一条消息。

1。创建 afka 客户端

创建了 kafka 客户端,这样我们就不必每次都重新配置它了

import  Kafka  from 'kafkajs';

class KafkaClient 
  private _client: Kafka;

  get client() 
    if (!this._client) 
      throw new Error('Cannot access client before initializing it');
     else 
      return this._client;
    
  
  connect(clientId: string, brokers: string[]) 
    this._client = new Kafka(
      clientId,
      brokers,
    );
  


export const producerClient = new KafkaClient();

2。创建抽象发布者类

为所有类型的生产者创建了kafka生产者抽象类

import  Kafka, Message  from 'kafkajs';
import  Topics  from './topics';

interface Event 
  topic: Topics;
  data: Message;


export abstract class Publisher<T extends Event> 
  private client: Kafka;
  abstract topic: Topics;
  constructor(client: Kafka) 
    this.client = client;
  

  async publish(data: T['data']): Promise<void> 
    const producer = this.client.producer();
    await producer.connect();
    await producer.send(
      topic: this.topic,
      messages: [data],
    );
  

3。最后是用户创建的生产者(继承自基类)

所有生产者都有自己的这些类

import  Publisher  from './publisher';
import  Topics  from './topics';

interface Event 
  topic: Topics.USER_CREATED;
  data: 
    value: string;
  ;


export class UserCreatedPublisher extends Publisher<Event> 
  topic: Topics = Topics.USER_CREATED;

PRODUCING 用户创建的事件/消息

在nodejs路由中使用

import  Router  from 'express';
import  producerClient  from './kafka-client';
import  UserCreatedPublisher  from './user-created-publisher';

const router = Router();

router.post('/create-user', async (req, res) => 
  const  email, password  = req.body;

  // send this to the other service using kafka
    await new UserCreatedPublisher(producerClient.client).publish(
      value: JSON.stringify( email, password ),
    console.log('Message published');

  res.status(201).send();
);

【问题讨论】:

您不应该按记录连接/断开连接。您是否从某个地方复制了此模式?如果在导出之前连接生产者会发生什么? 【参考方案1】:

问:我们是否需要在每次生成新消息时都以生产者身份连接? 我的回答:没有

在这种情况下我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。

例如为 kafka producer 创建一个类

import  Kafka  from 'kafkajs';

export class KafkaProducer 
  private static instance: KafkaProducer;
  private _producer = null;
  private _isConnected = false;

  private constructor() 
    const kafka = new Kafka(
      clientId: process.env.KAFKA_CLIENTID,
      brokers: [process.env.KAFKA_SERVER],
    );
    this._producer = kafka.producer();
  

  public static getInstance(): KafkaProducer 
    if (!KafkaProducer.instance) 
      KafkaProducer.instance = new KafkaProducer();
    
    return KafkaProducer.instance;
  

  public get isConnected() 
    return this._isConnected;
  

  async connect(): Promise<void> 
    try 
      await this._producer.connect();
      this._isConnected = true;
     catch (err) 
      console.error(err);
    
  

  get producer() 
    return this._producer;
  

并使用此生产者在其他代码位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)

     let kafka = KafkaProducer.getInstance();
     if (!kafka.isConnected) 
       await kafka.connect();
     
     await kafka.producer.send(
       topic: 'topicName',
       messages: [
         
           value: JSON.stringify(valueObj),
         ,
       ],
     );

并在应用关闭前断开连接

try 
      let kafka = KafkaProducer.getInstance();
      if (kafka.isConnected) 
        let producer = kafka.producer;
        await producer.disconnect();
      
     catch (err) 
      console.error(err);
    

【讨论】:

以上是关于Kafka生产者中为消息类创建Schema的主要内容,如果未能解决你的问题,请参考以下文章

防止 Confluent Kafka 在生产时丢失消息

每次生产 Kafka 消息时都需要连接吗?

Kafka核心技术与实战——02 | 一篇文章带你快速搞定Kafka术语

中间件一文搞定kafka术语

Spring-Kafka 发送消息的两种写法

kafka源码分析 生产消息过程