每次生产 Kafka 消息时都需要连接吗?
Posted
技术标签:
【中文标题】每次生产 Kafka 消息时都需要连接吗?【英文标题】:Do we need to connect everytime we producer Kafka message? 【发布时间】:2021-07-18 11:32:26 【问题描述】:我创建了这个类,以使代码在发送/生成 Kafka 消息时更加可重用和清洁。
我正在使用 Node 和使用 KafkaJS 的 Kafka。
我对此很陌生,我在互联网上的任何地方都找不到完美/好的方法来在生产应用程序中使用它。
QUESTION(简而言之):我们是否需要在每次生成新消息时都以生产者身份连接。我们不能以 Redis 或 NATS 的身份保持连接。
这是我迄今为止尝试过的:
用例
假设每次创建新用户时我都需要发送一条消息。
1。创建 afka 客户端
创建了 kafka 客户端,这样我们就不必每次都重新配置它了
import Kafka from 'kafkajs';
class KafkaClient
private _client: Kafka;
get client()
if (!this._client)
throw new Error('Cannot access client before initializing it');
else
return this._client;
connect(clientId: string, brokers: string[])
this._client = new Kafka(
clientId,
brokers,
);
export const producerClient = new KafkaClient();
2。创建抽象发布者类
为所有类型的生产者创建了kafka生产者抽象类
import Kafka, Message from 'kafkajs';
import Topics from './topics';
interface Event
topic: Topics;
data: Message;
export abstract class Publisher<T extends Event>
private client: Kafka;
abstract topic: Topics;
constructor(client: Kafka)
this.client = client;
async publish(data: T['data']): Promise<void>
const producer = this.client.producer();
await producer.connect();
await producer.send(
topic: this.topic,
messages: [data],
);
3。最后是用户创建的生产者(继承自基类)
所有生产者都有自己的这些类
import Publisher from './publisher';
import Topics from './topics';
interface Event
topic: Topics.USER_CREATED;
data:
value: string;
;
export class UserCreatedPublisher extends Publisher<Event>
topic: Topics = Topics.USER_CREATED;
PRODUCING 用户创建的事件/消息
在nodejs路由中使用
import Router from 'express';
import producerClient from './kafka-client';
import UserCreatedPublisher from './user-created-publisher';
const router = Router();
router.post('/create-user', async (req, res) =>
const email, password = req.body;
// send this to the other service using kafka
await new UserCreatedPublisher(producerClient.client).publish(
value: JSON.stringify( email, password ),
console.log('Message published');
res.status(201).send();
);
【问题讨论】:
您不应该按记录连接/断开连接。您是否从某个地方复制了此模式?如果在导出之前连接生产者会发生什么? 【参考方案1】:问:我们是否需要在每次生成新消息时都以生产者身份连接? 我的回答:没有
在这种情况下我要做的是创建一个单例类来共享同一个 kafka 生产者实例,并在我优雅地关闭我的应用程序时断开它。
例如为 kafka producer 创建一个类
import Kafka from 'kafkajs';
export class KafkaProducer
private static instance: KafkaProducer;
private _producer = null;
private _isConnected = false;
private constructor()
const kafka = new Kafka(
clientId: process.env.KAFKA_CLIENTID,
brokers: [process.env.KAFKA_SERVER],
);
this._producer = kafka.producer();
public static getInstance(): KafkaProducer
if (!KafkaProducer.instance)
KafkaProducer.instance = new KafkaProducer();
return KafkaProducer.instance;
public get isConnected()
return this._isConnected;
async connect(): Promise<void>
try
await this._producer.connect();
this._isConnected = true;
catch (err)
console.error(err);
get producer()
return this._producer;
并使用此生产者在其他代码位置发送消息(我创建了 isConnected() 方法,因为我找不到用于检查客户端是否已连接的内置函数)
let kafka = KafkaProducer.getInstance();
if (!kafka.isConnected)
await kafka.connect();
await kafka.producer.send(
topic: 'topicName',
messages: [
value: JSON.stringify(valueObj),
,
],
);
并在应用关闭前断开连接
try
let kafka = KafkaProducer.getInstance();
if (kafka.isConnected)
let producer = kafka.producer;
await producer.disconnect();
catch (err)
console.error(err);
【讨论】:
以上是关于每次生产 Kafka 消息时都需要连接吗?的主要内容,如果未能解决你的问题,请参考以下文章