每次生产 Kafka 消息时都需要连接吗?

Posted

技术标签:

【中文标题】每次生产 Kafka 消息时都需要连接吗?【英文标题】:Do we need to connect everytime we producer Kafka message? 【发布时间】:2021-07-18 11:32:26 【问题描述】:

我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和清洁。

我正在使用 Node 和使用 KafkaJS 的 Kafka

对此很陌生,我在互联网上的任何地方都找不到完美/好的方法来在生产应用程序中使用它。

QUESTION(简而言之):我们是否需要在每次生成新消息时都以生产者身份连接。我们不能以 Redis 或 NATS 的身份保持连接。

这是我迄今为止尝试过的:

用例

假设每次创建新用户时我都需要发送一条消息。

1。创建 afka 客户端

创建了 kafka 客户端,这样我们就不必每次都重新配置它了

import  Kafka  from 'kafkajs';

class KafkaClient 
  private _client: Kafka;

  get client() 
    if (!this._client) 
      throw new Error('Cannot access client before initializing it');
     else 
      return this._client;
    
  
  connect(clientId: string, brokers: string[]) 
    this._client = new Kafka(
      clientId,
      brokers,
    );
  


export const producerClient = new KafkaClient();

2。创建抽象发布者类

为所有类型的生产者创建了kafka生产者抽象类

import  Kafka, Message  from 'kafkajs';
import  Topics  from './topics';

interface Event 
  topic: Topics;
  data: Message;


export abstract class Publisher<T extends Event> 
  private client: Kafka;
  abstract topic: Topics;
  constructor(client: Kafka) 
    this.client = client;
  

  async publish(data: T['data']): Promise<void> 
    const producer = this.client.producer();
    await producer.connect();
    await producer.send(
      topic: this.topic,
      messages: [data],
    );
  

3。最后是用户创建的生产者(继承自基类)

所有生产者都有自己的这些类

import  Publisher  from './publisher';
import  Topics  from './topics';

interface Event 
  topic: Topics.USER_CREATED;
  data: 
    value: string;
  ;


export class UserCreatedPublisher extends Publisher<Event> 
  topic: Topics = Topics.USER_CREATED;

PRODUCING 用户创建的事件/消息

在nodejs路由中使用

import  Router  from 'express';
import  producerClient  from './kafka-client';
import  UserCreatedPublisher  from './user-created-publisher';

const router = Router();

router.post('/create-user', async (req, res) => 
  const  email, password  = req.body;

  // send this to the other service using kafka
    await new UserCreatedPublisher(producerClient.client).publish(
      value: JSON.stringify( email, password ),
    console.log('Message published');

  res.status(201).send();
);

【问题讨论】:

您不应该按记录连接/断开连接。您是否从某个地方复制了此模式?如果在导出之前连接生产者会发生什么? 【参考方案1】:

问:我们是否需要在每次生成新消息时都以生产者身份连接? 我的回答:没有

在这种情况下我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。

例如为 kafka producer 创建一个类

import  Kafka  from 'kafkajs';

export class KafkaProducer 
  private static instance: KafkaProducer;
  private _producer = null;
  private _isConnected = false;

  private constructor() 
    const kafka = new Kafka(
      clientId: process.env.KAFKA_CLIENTID,
      brokers: [process.env.KAFKA_SERVER],
    );
    this._producer = kafka.producer();
  

  public static getInstance(): KafkaProducer 
    if (!KafkaProducer.instance) 
      KafkaProducer.instance = new KafkaProducer();
    
    return KafkaProducer.instance;
  

  public get isConnected() 
    return this._isConnected;
  

  async connect(): Promise<void> 
    try 
      await this._producer.connect();
      this._isConnected = true;
     catch (err) 
      console.error(err);
    
  

  get producer() 
    return this._producer;
  

并使用此生产者在其他代码位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)

     let kafka = KafkaProducer.getInstance();
     if (!kafka.isConnected) 
       await kafka.connect();
     
     await kafka.producer.send(
       topic: 'topicName',
       messages: [
         
           value: JSON.stringify(valueObj),
         ,
       ],
     );

并在应用关闭前断开连接

try 
      let kafka = KafkaProducer.getInstance();
      if (kafka.isConnected) 
        let producer = kafka.producer;
        await producer.disconnect();
      
     catch (err) 
      console.error(err);
    

【讨论】:

以上是关于每次生产 Kafka 消息时都需要连接吗?的主要内容,如果未能解决你的问题,请参考以下文章

swoole soa 每次修改server 端的方法时都需要重启server吗

Kafka生产者

为啥搭建Kafka需要zookeeper

kafka内置的zookeeper

深入理解Kafka必知必会

Odoo 在每次页面加载时都会给我连接丢失消息