1.5万字长文:从 C# 入门 Kafka

Posted 痴者工良

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.5万字长文:从 C# 入门 Kafka相关的知识,希望对你有一定的参考价值。

作者:痴者工良

个人网站:https://www.whuanle.cn

博客园:https://www.cnblogs.com/whuanle/

本教程地址:https://kafka.whuanle.cn/

本教程是关于 Kafka 知识的教程,从 C# 中实践编写 Kafka 程序,一边写代码一边了解 Kafka。

1, 搭建 Kafka 环境

本章的内容比较简单,我们将使用 Docker 快速部署一个单节点的 Kafka 或 Kafka 集群,在后面的章节中,将会使用已经部署好的 Kafka 实例做实验,然后我们通过不断地实验,逐渐了解 Kafka 的知识点以及掌握客户端的使用。

这里笔者给出了单机和集群两种部署方式,但是为了便于学习后面的章节,请以集群的方式部署 Kafka。

安装 docker-compose

使用 docker-compose 部署 Kafka 可以减少很多没必要的麻烦,一个脚本即可完成部署,省下折腾时间。

安装 docker-compose 也是挺简单的,直接下载二进制可执行文件即可。

INSTALLPATH=/usr/local/bin
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o $INSTALLPATH/docker-compose

sudo chmod +x $INSTALLPATH/docker-compose

docker-compose --version

如果系统没有映射 /usr/local/bin/ 路径,执行命令完成后,如果发现找不到 docker-compose 命令,请将文件下载到 /usr/bin,即替换 INSTALLPATH=/usr/local/binINSTALLPATH=/usr/bin

单节点 Kafka 的部署

创建一个 docker-compose.yml 文件,文件内容如下:

---
version: \'3\'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: \'zookeeper:2181\'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.156:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    volumes:
      - /data/kafka/broker/logs:/opt/kafka/logs
      - /var/run/docker.sock:/var/run/docker.sock

请替换 PLAINTEXT://192.168.3.156 中的 IP 。

然后执行命令开始部署应用:

docker-compose up -d

接着,安装 kafdrop,这是一个 Kafka 管理界面,可以很方便地查看一些信息。

docker run -d --rm  -p 9000:9000 \\
-e JVM_OPTS="-Xms32M -Xmx64M" \\
-e KAFKA_BROKERCONNECT=192.168.3.156:9092 \\
-e SERVER_SERVLET_CONTEXTPATH="/" \\
obsidiandynamics/kafdrop

Kafka 集群的部署

Kafka 集群的部署方法有很多,方法不尽相同,其中使用的配置参数(环境变量)也很多,这里笔者只给出自己在使用的快速部署参数,读者可以参阅官方文档,以便定制配置。

笔者的部署脚本中其中一些重要的环境变量说明如下:

  • KAFKA_BROKER_ID: 当前 Broker 实例的 id,Broker id 不能重复;
  • KAFKA_NUM_PARTITIONS:默认 Topic 的分区数量,默认为 1,如果设置了这个配置,自动创建的 Topic 会根据这个大小设置分区数量。
  • KAFKA_DEFAULT_REPLICATION_FACTOR:默认 Topic 分区的副本数;
  • KAFKA_ZOOKEEPER_CONNECT:Zookeeper 地址;
  • KAFKA_LISTENERS:Kafka Broker 实例监听的 ip;
  • KAFKA_ADVERTISED_LISTENERS:外部如何访问当前实例,用于 Zookeeper 监控;

创建一个 docker-compose.yml 文件,文件内容如下:

---
version: \'3\'
services:
 zookeeper:
   image: confluentinc/cp-zookeeper:7.3.0
   container_name: zookeeper
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181
     ZOOKEEPER_TICK_TIME: 2000

 kafka1:
   image: confluentinc/cp-kafka:7.3.0
   container_name: broker1
   ports:
     - 19092:9092
   depends_on:
     - zookeeper
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_NUM_PARTITIONS: 3
     KAFKA_DEFAULT_REPLICATION_FACTOR: 2
     KAFKA_ZOOKEEPER_CONNECT: \'zookeeper:2181\'
     KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:19092
   volumes:
     - /data/kafka/broker1/logs:/opt/kafka/logs
     - /var/run/docker.sock:/var/run/docker.sock
     
 kafka2:
   image: confluentinc/cp-kafka:7.3.0
   container_name: broker2
   ports:
     - 29092:9092
   depends_on:
     - zookeeper
   environment:
     KAFKA_BROKER_ID: 2
     KAFKA_NUM_PARTITIONS: 3
     KAFKA_DEFAULT_REPLICATION_FACTOR: 2
     KAFKA_ZOOKEEPER_CONNECT: \'zookeeper:2181\'
     KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:29092
   volumes:
     - /data/kafka/broker2/logs:/opt/kafka/logs
     - /var/run/docker.sock:/var/run/docker.sock
     
 kafka3:
   image: confluentinc/cp-kafka:7.3.0
   container_name: broker3
   ports:
     - 39092:9092
   depends_on:
     - zookeeper
   environment:
     KAFKA_BROKER_ID: 3
     KAFKA_NUM_PARTITIONS: 3
     KAFKA_DEFAULT_REPLICATION_FACTOR: 2
     KAFKA_ZOOKEEPER_CONNECT: \'zookeeper:2181\'
     KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.158:39092
   volumes:
     - /data/kafka/broker3/logs:/opt/kafka/logs
     - /var/run/docker.sock:/var/run/docker.sock

由于三个 Broker 实例都在同一个虚拟机上面,因此这里通过暴露不同的端口,避免 Broker 冲突。

然后执行命令开始部署应用:

docker-compose up -d

接着部署 kafdrop:

docker run -d --rm  -p 9000:9000 \\
-e JVM_OPTS="-Xms32M -Xmx64M" \\
-e KAFKA_BROKERCONNECT=192.168.3.158:19092,192.168.3.158:29092,192.168.3.158:39092 \\
-e SERVER_SERVLET_CONTEXTPATH="/" \\
obsidiandynamics/kafdrop

现在,已经部署好了 Kafka 环境以及管理面板。

2, Kafka 概念

在本章中,笔者会介绍 Kafka 的一些基本概念,文中的内容是笔者个人理解总结,可能会有错误或其它问题,如有疑问,欢迎指出。

基本概念

一个简单的 生产消息 -> 保存到 Broker -> 消费消息 的结构图示例如下:

在这里,出现了四个对象:

生产者 Producer:产生 Message 的客户端;

消费者 Consumer :消费 Message 的客户端;

主题 Topic:逻辑上的东西;

消息 Message: 数据实体;

当然图中每一个对象本身都是很复杂的,这里为了便于学习,画了个简单的图,现在我们先从最简单的结构图开始了解这些东西。

这里的图比较简单,大概是这样的, Kafka 中有多个 Topic,Producer 可以向指定的 Topic 生产一条消息,而 Consumer 可以消费指定 Topic 的消息。

Producer 和 Consumer 都是客户端应用,只是在执行的功能上有所区分,理论上 Kafka 的客户端库都是将两者的代码写在同一个模块,例如 C# 的 confluent-kafka-dotnet,同时具有生产者和消费者的 API。

然后就是这个 Message 了,Message 主要结构是:

Key
Value 
其它元数据

其中 Value 是我们自定义消息内容的地方。

关于 Message,我们这里简单了解即可,在后面的章节中会继续深入介绍。

在 Kafka 中,每个 Kafka 实例称为 Broker,每个 Broker 中可以保存多个 Topic。每个 Topic 可以划分为多个分区,每个分区保存的数据是不一样的,这些分区可以在同一个 Broker 中,也可以在散布在不同的 Broker 中。

一个 Broker 可以存储不同 Topic 的不同分区,也可以存储同一个 Topic 的不同分区。

如果一个 Topic 有多个分区,一般来说其并发量会有所提高,通过增加分区数实现集群的负载均衡,一般情况下,分区均衡需要散布在不同的 Broker 才能合理地负载均衡,不然分区都在同一个 Broker 时,瓶颈在单个机器上。

如果 Broker 的实例比较少,但是 Topic 划分了多个分区,那么这些分区会被部署到同一个 Broker 上。

主题分区可以有效提高生产者或消费者的并发量,因为将消息分别存储到不同的分区中,可以同时往多个分区推送消息,会比只向一个分区推送消息的速度快。

前面提到,每个 Message 都有 Key 和 Value,Topic 可以根据 Message 的 Key 将一个 Message 存储到不同的分区。当然,我们也可以在生产消息的时候,指定向一个分区推送消息。

分区可以提高并发,但是如果一个 Broker 挂了,数据便会丢失,怎么办?

在 Kafka 中,分区可以设置多个分区副本,这些副本跟分区并不在同一个 Broker 上,这个当 Broker 挂了后,这些分区可以利用副本在其它 Broker 上复活。

[info] 提示

在 《Kafka权威指南(第2版)》 的 21 页中,指导了如何合理设置分区数量,以及分区的优势和缺点。

关于 Kafka 脚本工具

前面介绍了 Kafka 的一些简单概念,为了更加好地了解 Kafka,我们可以利用 Kafka 的脚本做一些实验。

打开其中一个 Kafka 容器(docker exec 命令进入容器),然后执行命令查看自带的二进制脚本:

 ls -lah /usr/bin/ | grep kafka

可以看到,里面有很多 CLI 工具,每种 CLI 工具说明文档可以到这里查看:

https://docs.cloudera.com/runtime/7.2.10/kafka-managing/topics/kafka-manage-basics.html

下面笔者介绍部分 CLI 工具的使用方法。

主题管理

kafka-topics 是用于主题管理的 CLI 工具,kafka-topics 提供基本操作如下所示:

  • 操作:
    • --create:创建主题;
    • --alter:变更这个主题,修改分区数等;
    • --config:修改主题相关的配置;
    • --delete:删除该主题;

在管理主题时,我们可以设置主题配置,主题配置存储时,其格式示例为 default.replication.factor ,如果用 CLI 工具操作,那么传递的参数示例为 --replication-factor,因此我们通过不同工具操作主题时,参数名称可能不同一样。

主题的所有配置参数可以查看官方文档:

https://kafka.apache.org/090/documentation.html

kafka-topics 一些常用参数:

  • --partitions :分区数量,该主题划分成多少个分区;

  • --replication-factor:副本数量,表示每个分区一共有多少个副本;副本数量需要小于或等于 Broker 的数量;

  • --replica-assignment:指定副本分配方案,不能与 --partitions--replication-factor 同时使用;

  • --list: 列出有效的主题;

  • --describe:查询该主题的信息信息。

下面是使用 CLI 手工创建主题的命令,创建主题时设置分区、分区副本。

kafka-topics --create --bootstrap-server 192.168.3.158:19092 \\
--replication-factor 3 \\
--partitions 3 \\
--topic hello-topic

使用 CLI 时,可以通过 --bootstrap-server 配置连接到一个 Kafka 实例,或者通过 --zookeeper 连接到 Zookeeper,然后 CLI 自动找到 Kafka 实例执行命令。

查看主题的详细信息:

kafka-topics --describe --bootstrap-server 192.168.3.158:19092 --topic hello-topic
Topic: hello-topic	TopicId: r3IlKv8BSMaaoaT4MYG8WA	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: hello-topic	Partition: 0	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: hello-topic	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: hello-topic	Partition: 2	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

可以看到,创建的分区会被均衡分布到不同的 Broker 实例中;对于 Replicas 这些东西,我们后面的章节再讨论。

也可以打开 kafdrop 查看主题的信息。

如果一个 Topic 的分区数量大于 Broker 数量呢?前面笔者已经提到,如果分区数量比较大时,部分 Broker 中会存在同一个主题的多个分区。

下面我们来实验验证一下:

kafka-topics --create --bootstrap-server 192.168.3.158:19092 \\
--replication-factor 2 \\
--partitions 4 \\
--topic hello-topic1

可以看到,Broker 2,分到了 hello-topic1 的两个分区。

使用 C# 创建分区

客户端库中可以利用接口管理主题,如 C# 的 confluent-kafka-dotnet,使用 C# 代码创建 Topic 的示例如下:

    static async Task Main()
    
        var config = new AdminClientConfig
        
            BootstrapServers = "192.168.3.158:19092"
        ;

        using (var adminClient = new AdminClientBuilder(config).Build())
        
            try
            
                await adminClient.CreateTopicsAsync(new TopicSpecification[] 
                    new TopicSpecification  Name = "hello-topic2", ReplicationFactor = 3, NumPartitions = 2  );
            
            catch (CreateTopicsException e)
            
                Console.WriteLine($"An error occured creating topic e.Results[0].Topic: e.Results[0].Error.Reason");
            
        
    

在 AdminClient 中还有很多方法可以探索。

分区与复制

在前面,我们创建了一个名为 hello-topic 的主题,并且为其设置三个分区,三个副本。

接着,使用 kafka-topics --describe 命令查看一个 Topic 的信息,可以看到:

Topic: hello-topic	TopicId: r3IlKv8BSMaaoaT4MYG8WA	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: hello-topic	Partition: 0	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: hello-topic	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: hello-topic	Partition: 2	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 这些数字都是指 Broker ID,Broker ID 可以是数字也可以是有英文。

主题的每个分区都有至少一个副本,也就是 --replication-factor 参数必须设置大于大于 1。副本分为 leader 和 follwer 两种,每个副本都需要消耗一个存储空间,leader 对外提供读写消息,而 follwer 提供冗余备份,leader 会及时将消息增量同步到所有 follwer 中。

Partition: 0 Leader: 3 Replicas: 3,1,2 表示分区 0 的副本分布在 ID 为 312 的 Kafka broker 中。

hello-topic 主题中,当分区只有一个副本时,或只关注 leader 副本时,leader 副本对应的 Broker 节点位置如下:

Kafka 分配分区到不同的节点有一定的规律,感兴趣的读者可参考 《Kafka 权威指南》第二版或官方文档。

如果设置了多个副本( --replication-factor=3 ) 时,leader 副本和 follwer 副本的位置如下所示:

分区的副本数量不能大于 Broker 数量,每个 Broker 只能有此分区的一个副本,副本数量范围必须在[1,Broker数量] 中。也就是说,如果集群只有三个 Broker,那么创建的分区,其副本数量必须在 [1,3] 范围内。

在不同的副本中,只有 leader 副本能够进行读写,follwer 接收从 leader 推送过来的数据,做好冗余备份。

一个分区的所有副本统称为 AR(Assigned Repllicas),当 leader 接收到消息时,需要推送到 follwer 中,理想情况下,分区的所有副本的数据都是一致的。

但是 leader 同步到 follwer 的过程中可能会因为网络拥堵、故障等,导致 follwer 在一定时间内未能与 leader 中的数据一致(同步滞后),那么这些副本称为 OSR( Out-Sync Relipcas)。

如果副本中的数据为最新的数据,在给定的时间内同步没有出现滞后,那么这些副本称为 ISR。

AR = ISR + OSR

如果 leader 故障,那么剩下的 follwer 会重新选举 一个 leader;但是如果 leader 接收到生产者的消息后还没有同步到 follwer 就故障了,那么这些消息就会丢失。为了避免这种情况,需要生产者设置合理的 ACK,在第四章中会讨论这个问题。

生产者消费者

kafka-console-producer 可以给指定的主题发送消息:

kafka-console-producer --bootstrap-server 192.168.3.158:19092 --topic hello-topic

kafka-console-consumer 则可以从指定主题接收消息:

kafka-console-consumer --bootstrap-server 192.168.3.158:19092 --topic hello-topic \\
--group hello-group \\
--from-beginning

订阅主题时,消费者需要指定消费者组。可以通过 --group 指定;如果不指定,脚本会自动为我们创建一个消费者组。

kafka-consumer-groups 则可以为我们管理消费者组,例如查看所有的消费者组:

kafka-consumer-groups --bootstrap-server 192.168.3.158:19092 --list

查看消费者组详细信息:

kafka-consumer-groups --bootstrap-server 192.168.3.158:19092 --describe --group hello-group

当然,也可以从 Kafdrop 界面中查看消费者组的信息。

这些参数我们现在可以先跳过。

C# 部分并没有重要的内容要说,代码可以参考:

    static async Task Main()
    
        var config = new AdminClientConfig
        
            BootstrapServers = "192.168.3.158:19092"
        ;

        using (var adminClient = new AdminClientBuilder(config).Build())
        
            var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10));
            foreach (var item in groups)
            
                Console.WriteLine(item.Group);
            
        
    

对于消费者组来说,我们需要关注以下参数:

  • state:消费者组的状态;

  • members:消费者组成员;

  • offsets: ACK 偏移量;

修改配置

可以使用 kafka-configs 工具设置、描述或删除主题属性。

查看主题属性描述:

kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --describe
kafka-configs --bootstrap-server 192.168.3.158:19092 --entity-type topics --entity-name hello-topic --describe

使用 --alter 参数后,可以添加、修改或删除主题属性,命令格式:

kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --alter --add-config [PROPERTY NAME]=[VALUE]
kafka-configs --bootstrap-server [HOST:PORT] --entity-type topics --entity-name [TOPIC] --alter --delete-config [PROPERTY_NAME]

例如 Kafka 默认限制发送的消息最大为 1MB,为了修改这个限制,可以使用以下命令:

kafka-configs --bootstrap-server  192.168.3.158:19092 --entity-type topics --entity-name hello-topic --alter --add-config \'max.message.bytes=1048576\'

其中还有很多参数,请参考:

https://kafka.apache.org/10/documentation.html#topicconfigs

此外,我们还可以通过 kafka-configs 查看 Broker 的配置:

kafka-configs --bootstrap-server 192.168.3.158:19092 --describe --broker 1

3, Kafka .NET 基础

在第一章中,笔者介绍了如何部署 Kafka;在第二章中,笔者介绍了 Kafka 的一些基础知识;在本章中,笔者将介绍如何使用 C# 编写程序连接 kafka,完成生产和消费过程。

在第二章的时候,我们已经使用到了 confluent-kafka-dotnet ,通过 confluent-kafka-dotnet 编写代码调用 Kafka 的接口,去管理主题。

confluent-kafka-dotnet 其底层使用了一个 C 语言编写的库 librdkafka,其它语言编写的 Kafka 客户端库也是基于 librdkafka 的,基于 librdkafka 开发客户端库,官方可以统一维护底层库,不同的编程语言可以复用代码,还可以利用 C 语言编写的库提升性能。

此外,因为不同的语言都使用了相同的底层库,也使用了相同的接口,因此其编写的客户端库接口看起来也会十分接近。大多数情况下,Java 和 C# 使用 Kafka 的代码是比较相近的。

接着说一下 confluent-kafka-dotnet,Github 仓库中对这个库的其中一个特点介绍是:

  • High performance : confluent-kafka-dotnet 是一个轻量级的程序包装器,它包含了一个精心调优的 C 语言写的 librdkafka 库。

Library dkafka 是 Apache Kafka 协议的 C 库实现,提供了 Producer、 Consumer 和 Admin 客户端。它的设计考虑到信息传递的可靠性和高性能,目前的性能超过 100万条消息/秒 的生产和 300万条消息/秒 的消费能力(原话是:current figures exceed 1 million msgs/second for the producer and 3 million msgs/second for the consumer)。

现在,这么牛逼的东西,到 nuget 直接搜索 Confluent.Kafka 即可使用。

回归正题,下面笔者将会介绍如果使用 C# 编写生产者、消费者程序。在本章中,我们只需要学会怎么用就行,大概了解过程,而不必深究参数配置,也不必细究代码的功能或作用,在后面的章节中,笔者会详细介绍的。

生产者

编写生产者程序大概可以分为两步,第一步是定义 ProducerConfig 配置,里面是关于生产者的各种配置,例如 Broker 地址、发布消息重试次数、缓冲区大小等;第二步是定义发布消息的过程。例如要发布什么内容、如何记录错误消息、如何拦截异常、自定义消息分区等。

下面是生产者代码的示例:

using Confluent.Kafka;
using System.Net;

public class Program

    static void Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "host1:9092",
            ...
        ;

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        
            ...
        
    

如果要将消息推送到 Kafka,那么代码是这样写的:

var result = await producer.ProduceAsync("weblog", new Message<Null, string>  Value="a log message" );

Value 就是消息的内容。其实一条消息的结构比较复杂的,除了 Value ,还有 Key 和各种元数据,这个在后面的章节中我们再讨论。

下面是发布一条消息的实际代码示例:

using Confluent.Kafka;
using System.Net;

public class Program

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.156:9092"
        ;

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        
            var result = await producer.ProduceAsync("weblog", new Message<Null, string>  Value = "a log message" );
        
    

运行这段代码后,可以打开 kafdrop 面板查看主题信息。

如果我们断点调试 ProduceAsync 后的内容,可以看到有比较多的信息,例如:

这些信息记录了当前消息是否被 Broker 接收并确认(ACK),该条消息被推送到哪个 Broker 的哪个分区中,消息偏移量数值又是什么。

当然,这里暂时不需要关注这个。

批量生产

这一节中,我们来了解如何通过代码批量推送消息到 Broker。

下面是代码示例:

using Confluent.Kafka;
using System.Net;

public class Program

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.156:9092"
        ;

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        
            for (int i = 0; i < 10; ++i)
            
                producer.Produce("my-topic", new Message<Null, string>  Value = i.ToString() , handler);
            
        
        // 帮忙程序自动退出
        Console.ReadKey();
    

    public static void handler(DeliveryReport<Null, string> r)
    
        Console.WriteLine(!r.Error.IsError
            ? $"Delivered message to r.TopicPartitionOffset"
            : $"Delivery Error: r.Error.Reason");
    

可以看到,这里批量推送消息使用了 Produce,而之前我们使用的异步代码用了 ProduceAsync

其实两者都是异步的,但是 Product 方法更直接地映射到底层的 librdkafka API,能够利用 librdkafka 中高性能的接口批量推送消息。而 ProduceAsync 则是 C# 实现的异步,相对来说Product 的开销小一些,但是 ProduceAsync 仍然非常高性能——在典型的硬件上每秒能够产生数十万条消息

如果说最最直观的差异,那么就是两者的返回结果。

从定义来看:

Task<DeliveryResult<TKey, TValue>> ProduceAsync(string topic, Message<TKey, TValue> message, ...);

void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);

ProduceAsync 可以直接获得 Task,然后通过等待 Task 获取响应结果。

Produce 并不能直接获得结果,而是通过回调方式获取推送结果,由 librdkafka 执行回调。

由于 Produce 是框架底层异步的,但是没有 Task,所以不能 await ,为了避免在批量消息处理完成之前,producer 生命周期结束了,所以需要使用 producer.Flush(TimeSpan.FromSeconds(10)) 这样的代码等待批量消息完成推送。

调用 Flush 方法可使所有缓冲记录立即可用于发送,并在与这些记录关联的请求完成时发生阻塞。

Flush 有两个重载:

int Flush(TimeSpan timeout);
void Flush(CancellationToken cancellationToken = default(CancellationToken));

int Flush() 会等待指定的时间,如果时间到了,队列中的消息只发送一部分,那么会返回没成功发送的消息数量

示例代码如下:

using Confluent.Kafka;
using System.Net;

public class Program

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.156:9092"
        ;

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        
            for (int i = 0; i < 10; ++i)
            
                producer.Produce("my-topic", new Message<Null, string>  Value = i.ToString() , handler);
            
            // 只等待 10s
            var count = producer.Flush(TimeSpan.FromSeconds(10));
            // 或者使用
            // void Flush(CancellationToken cancellationToken = default(CancellationToken));
        
        // 不让程序自动退出
        Console.ReadKey();
    

    public static void handler(DeliveryReport<Null, string> r)
    
        Console.WriteLine(!r.Error.IsError
            ? $"Delivered message to r.TopicPartitionOffset"
            : $"Delivery Error: r.Error.Reason");
    

如果将 Kafka 服务停止,客户端肯定是不能推送消息的,那么我们在使用批量推送代码时会有什么现象呢?

这里可以停止所有 Broker 或者给 BootstrapServers 参数设置一个错误的地址,然后启动程序,会发现 producer.Flush(TimeSpan.FromSeconds(10)); 会等待 10s,但是此时 handler 不会起效。

可以看到,如果使用批量消息,需要注意使用 Flush,即使连接不上 Broker,程序也不会报错。

所以我们使用批量消息时,一定要注意与 Broker 的连接状态,以及处理 Flush 返回的失败数量。

            var result = producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine(result);

使用 Tasks.WhenAll

前面提到了使用 Produce 方法来批量推送消息,除了框架本身的批量提交,我们也可以利用 Tasks.WhenAll 来实现批量提交获取返回结果,不过性能并没有 produce - Flush 好。

示例代码如下:

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        
            List<Task> tasks = new();
            for (int i = 0; i < 10; ++i)
            
                var task = producer.ProduceAsync("my-topic", new Message<Null, string>  Value = i.ToString() );
                tasks.Add(task);
            
            await Task.WhenAll(tasks.ToArray());
        

如何进行性能测试

produce - Flush 的性能到底有多好呢?

我们可以使用 BenchmarkDotNet 做性能测试,来评估推送不同消息数量时,消耗的时间和内存。由于不同服务器的 CPU、内存、磁盘速度,以及客户端与服务器之间的网络带宽、时延都是影响消息吞吐量的重要因素,因此有必要编写代码来进行性能测试,来评估客户端以及服务器需要多高的性能来运行程序。

示例代码如下:

using Confluent.Kafka;
using System.Net;
using System.Security.Cryptography;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using BenchmarkDotNet.Jobs;

public class Program

    static void Main()
    
        var summary = BenchmarkRunner.Run<KafkaProduce>();
    


[SimpleJob(RuntimeMoniker.Net70)]
[SimpleJob(RuntimeMoniker.NativeAot70)]
[RPlotExporter]
public class KafkaProduce

    // 每批消息数量
    [Params(1000, 10000,100000)]
    public int N;

    private ProducerConfig _config;
    
    
    [GlobalSetup]
    public void Setup()
    
        _config = new ProducerConfig
        
            BootstrapServers = "192.168.3.156:9092"
        ;
    

    [Benchmark]
    public async Task UseAsync()
    
        using (var producer = new ProducerBuilder<Null, string>(_config).Build())
        
            List<Task> tasks = new();
            for (int i = 0; i < N; ++i)
            
                var task = producer.ProduceAsync("ben1-topic", new Message<Null, string>  Value = i.ToString() );
                tasks.Add(task);
            
            await Task.WhenAll(tasks);
        
    

    [Benchmark]
    public void UseLibrd()
    
        using (var producer = new ProducerBuilder<Null, string>(_config).Build())
        
            for (int i = 0; i < N; ++i)
            
                producer.Produce("ben2-topic", new Message<Null, string>  Value = i.ToString() , null);
            
            producer.Flush(TimeSpan.FromSeconds(60));
        
    

在示例代码中,笔者除了记录时间速度外,也开启了 GC 记录。

Ping 服务器的结果以及 BenchmarkDotNet 性能测试结果如下:

正在 Ping 192.168.3.156 具有 32 字节的数据:
来自 192.168.3.156 的回复: 字节=32 时间=1ms TTL=64
来自 192.168.3.156 的回复: 字节=32 时间=2ms TTL=64
来自 192.168.3.156 的回复: 字节=32 时间=2ms TTL=64
来自 192.168.3.156 的回复: 字节=32 时间=1ms TTL=64
Method Job Runtime N Mean Error StdDev Gen0 Gen1 Gen2 Allocated
UseAsync .NET 7.0 .NET 7.0 1000 125.1 ms 2.21 ms 2.17 ms - - - 1055.43 KB
UseLibrd .NET 7.0 .NET 7.0 1000 124.7 ms 2.26 ms 2.12 ms - - - 359.18 KB
UseAsync NativeAOT 7.0 NativeAOT 7.0 1000 124.8 ms 1.83 ms 1.62 ms - - - 1055.43 KB
UseLibrd NativeAOT 7.0 NativeAOT 7.0 1000 125.1 ms 1.76 ms 1.64 ms - - - 359.18 KB
UseAsync .NET 7.0 .NET 7.0 10000 143.9 ms 3.70 ms 10.86 ms 1250.0000 750.0000 250.0000 10577.22 KB
UseLibrd .NET 7.0 .NET 7.0 10000 140.6 ms 2.74 ms 4.80 ms 250.0000 - - 3523.29 KB
UseAsync NativeAOT 7.0 NativeAOT 7.0 10000 145.7 ms 3.25 ms 9.59 ms 1250.0000 750.0000 250.0000 10577.22 KB
UseLibrd NativeAOT 7.0 NativeAOT 7.0 10000 140.6 ms 2.78 ms 5.56 ms 250.0000 - - 3523.29 KB
UseAsync .NET 7.0 .NET 7.0 100000 407.3 ms 7.17 ms 9.58 ms 13000.0000 7000.0000 2000.0000 105185.91 KB
UseLibrd .NET 7.0 .NET 7.0 100000 259.7 ms 5.72 ms 16.78 ms 4000.0000 - - 35164.82 KB
UseAsync NativeAOT 7.0 NativeAOT 7.0 100000 419.8 ms 8.31 ms 13.19 ms 14000.0000 8000.0000 2000.0000 105194.3 KB
UseLibrd NativeAOT 7.0 NativeAOT 7.0 100000 255.3 ms 6.31 ms 18.62 ms 4000.0000 - - 35164.72 KB

可以看到使用了 librdkafka 批量推送,比使用 Task.WhenAll 性能要好一些,特别是消息数量比较大的情况下。

不过这个性能测试的结果意义也不大,主要是让读者了解如何使用 BenchmarkDotNet 进行性能测试,客户端推送消息到 Broker,能够实现每秒多大的负载,以此评估在当前环境下可以承载多大的流量。

消费

生产消息后,接着编写消费者程序处理消息,消费的代码分为 ConsumerConfig 配置和消费两步,其示例代码如下:

using System.Collections.Generic;
using Confluent.Kafka;

...

var config = new ConsumerConfig

    // 这些配置后面的章节中笔者会介绍,这里跳过。
    BootstrapServers = "host1:9092,host2:9092",
    GroupId = "foo",
    AutoOffsetReset = AutoOffsetReset.Earliest
;

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())

    ...

消费者配置默认会自动提交确认(ACK),所以消费后不需要编写代码确认消息,所以笔者编写的消费者示例代码如下:

using Confluent.Kafka;
using System.Net;

public class Program

    static void Main()
    
        var config = new ConsumerConfig
        
            BootstrapServers = "192.168.3.156:9092",
            GroupId = "test1",
            AutoOffsetReset = AutoOffsetReset.Earliest
        ;

        CancellationTokenSource source = new CancellationTokenSource();
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        
            // 订阅主题
            consumer.Subscribe("my-topic");
            
            // 循环消费
            while (!source.IsCancellationRequested)
            
                var consumeResult = consumer.Consume(source.Token);
                Console.WriteLine(consumeResult.Message.Value);
            

            consumer.Close();
        
    

在本章中,关于 Kafka .NET 的基础就到这里,接下来笔者会详细讲解生产者和消费者的代码编写方法以及各种参数配置的使用方法。

4,生产者

在第三章中,我们学习到了 Kafka C# 客户端的一些使用方法,学习了如何编写生产者程序。

在本章中,笔者将会详细介绍生产者程序的参数配置、接口使用方法,以便在项目中更加好地应用 Kafka,以及应对可能发生的故障。

下图是一个生产者推送消息的流程:

使用客户端库编写生产者是比较简单的,但是消息推送过程是比较复杂的,从上图中可以看到生产者推送消息时,客户端库会先用序列化器将消息序列化为二进制,然后通过分区器算出 Topic 的消息需要推送到哪个 Broker 、哪个分区中 。

接着,如果推送消息失败,那么客户端库还要确认是否重试,重试次数、时间间隔等。

所以说,推送消息虽然很简单,但是怎么处理故障,确保消息不会丢失,还有生产者的配置,这些都需要开发者根据场景考虑,设计合理的生产者程序逻辑。

就 “避免消息丢失” 这个话题来说,除了生产者需要关注消息是否已经推送到 Broker,还要关注 leader 副本是否及时与 follwer 副本同步。否则即使客户端已经将消息推送到 Broker,Broker 的 leader 还没有同步最新的消息到 follwer 副本就挂了,那么此条消息还是会丢失的,所以客户端还需要设置合理的 ACK。

说明了消息会不会丢失,不仅跟生产者的状态有关,还跟 Broker 状态有关。

下面笔者将详细介绍生产者推送消息时,一些日常开发中会遇到的配置以及细节。

连接 Broker

生产者连接 Broker,需要定义 ProducerConfig ,首先是 BootstrapServers 属性,填写所有 Broker 的服务器地址,格式如下:

host1:9092,host2:9092,...
using Confluent.Kafka;
using System.Net;

public class Program

    static void Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "host1:9092",
            ...
        ;
        ... ...
    

如果需要通过加密连接,ProducerConfig 可以参考下面的代码:

        var config = new ProducerConfig
        
            BootstrapServers = "<your-IP-port-pairs>",
            SslCaLocation = "/Path-to/cluster-ca-certificate.pem",
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslMechanism = SaslMechanism.ScramSha256,
            SaslUsername = "ickafka",
            SaslPassword = "yourpassword",
        ;

客户端并不需要填写所有 Broker 的地址,因为生产者在建立连接之后,便可以从已连接的 Broker 中查找集群信息,获取到所有 Broker 地址。但是建议至少填写两个 Broker 地址,因为如果第一个 Broker 地址不可用,客户端还可以从其它 Broker 中获取当前集群的信息,不至于完全连不上服务器。

例如服务器有三个 Broker,客户端只填写了一个 BootstrapServers 地址,然后客户端推送消息,这些消息还是会被自动推送到对应的分区中的。

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.158:19092"
        ;
        using (var producer = new ProducerBuilder<string, string>(config).Build())
        
            var r1 = await producer.ProduceAsync("hello-topic", new Message<string, string>  Key = "a", Value = "a log message" );
            var r2 = await producer.ProduceAsync("hello-topic", new Message<string, string>  Key = "b", Value = "a log message" );
            var r3 = await producer.ProduceAsync("hello-topic", new Message<string, string>  Key = "c", Value = "a log message" );
            var r4 = await producer.ProduceAsync("hello-topic", new Message<string, string>  Key = "d", Value = "a log message" );
            Console.WriteLine($"""
                r1  Status:r1.Status,Partition:r1.Partition
                r2  Status:r2.Status,Partition:r2.Partition
                r3  Status:r3.Status,Partition:r3.Partition
                r4  Status:r4.Status,Partition:r4.Partition
                """);
        
    

可以看到,即使只填写一个 Broker,消息依然可以被正确分区。

Key 分区

本节会介绍 Key 的使用方法。

提前创建了一个 hello-topic 主题,并设置了 3 个分区,3 个副本,其创建命令如下所示:

kafka-topics --create --bootstrap-server 192.168.3.158:19092 \\
--replication-factor 23 \\
--partitions 3 \\
--topic hello-topic

在前面的章节中,笔者介绍了如何编写生产者以及推送消息,但是代码比较简单,只设置了 Value

new Message<Null, string>  Value = "a log message" 

然后是关于分区的问题。

首先是分区器,分区器决定将当前消息推送到哪个分区,而分区器位于客户端

推送消息时,我们可以在客户端显示指定将消息推送到哪个分区,如果没有显式指定分区位置,那么就会由分区器基于 Key 决定将消息推送到哪个分区中。

如果一个消息没有设置 Key,即 Keynull,那么这些没有 Key 的消息,会被均衡分布到各个分区上,按照 p0 => p1 => p2 => p0 这样的顺序推送消息。

接下来,笔者介绍 Key 使用。

创建主题后,我们来看一下 C# 代码中的生产者构造器以及 Message<TKey, TValue> 的定义。

ProducerBuilder<TKey, TValue> Message<TKey, TValue> 两者都具有相同的泛型参数。

public class ProducerBuilder<TKey, TValue>
    public class Message<TKey, TValue> : MessageMetadata
    
        //
        // 摘要:
        //     Gets the message key value (possibly null).
        public TKey Key  get; set; 

        //
        // 摘要:
        //     Gets the message value (possibly null).
        public TValue Value  get; set; 
    

然后,在编写代码时,我们需要为 Key 和 Value 设置对应的类型。

生产者的代码示例如下:

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.158:19092,192.168.3.158:29092,192.168.3.158:39092"
        ;
        using (var producer = new ProducerBuilder<int, string>(config).Build())
        
            var r1 = await producer.ProduceAsync("hello-topic", new Message<int, string>  Key = 1, Value = "a log message" );
            var r2 = await producer.ProduceAsync("hello-topic", new Message<int, string>  Key = 2, Value = "a log message" );
            var r3 = await producer.ProduceAsync("hello-topic", new Message<int, string>  Key = 3, Value = "a log message" );
            var r4 = await producer.ProduceAsync("hello-topic", new Message<int, string>  Key = 4, Value = "a log message" );
            Console.WriteLine($"""
                r1  Status:r1.Status,Partition:r1.Partition
                r2  Status:r2.Status,Partition:r2.Partition
                r3  Status:r3.Status,Partition:r3.Partition
                r4  Status:r4.Status,Partition:r4.Partition
                """);
        
    

响应结果中可以看到消息被推送到哪个分区中。

接下来还有一个疑问,如果向 Broker 推送具有相同值的 Key,那么会覆盖之前的消息?

正常情况下应该不会

主题有个 cleanup.policy 参数,设置日志保留策略,如果保留策略是compact(压实),那么只为每个 key 保留最新的值。

下面我们可以来做使用,首先向 Broker 推送 20 条消息,一共有 10 个 Key,两两重复。

    static async Task Main()
    
        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.158:19092",
        ;
        using (var producer = new ProducerBuilder<string, string>(config)
            .Build())
        
            int i = 1;
            while (i <= 10)
            
                var r1 = await producer.ProduceAsync("same-hello", new Message<string, string>  Key = i.ToString(), Value = "1" );
                Console.WriteLine($"id:r1.Key,status:r1.Status");
                i++;
            

            i = 1;
            while (i <= 10)
            
                var r1 = await producer.ProduceAsync("same-hello", new Message<string, string>  Key = i.ToString(), Value = "2" );
                Console.WriteLine($"id:r1.Key,status:r1.Status");
                i++;
            
        
    

或者:

         int i = 1;
         while (i <= 10)
         
             var r1 = await producer.ProduceAsync("same-hello", new Message<string, string>  Key = i.ToString(), Value = "1" );
             Console.WriteLine($"id:r1.Key,status:r1.Status");
             var r2 = await producer.ProduceAsync("same-hello", new Message<string, string>  Key = i.ToString(), Value = "2" );
             Console.WriteLine($"id:r1.Key,status:r2.Status");
             i++;
         

然后打开 kafdrop,查看每个分区的消息数量,。

可以看到,消息数量总数为 20 条,虽然部分 key 重复,但是消息还在,不会丢失。

接着打开其中一个分区,会发现分区器依然是正常工作,相同的 key 依然会被划分到同一个分区中。

所以我们并不需要担心 Key 为空,以及相同的 Key 覆盖消息。

评估消息发送时间

下面是推送一条消息的步骤。

这里的批量指的是缓冲区。

客户端库里面设计到了好几个时间配置,在《Kafka权威指南(第2版)》,给出了一个时间公式:

delivery.timeout.ms >= linger.ms + retry.backoff.ms + request.timeout.ms

delivery.timeout.ms 设置将消息放到缓冲区、推送消息到 Broker、获得 Ack、以及重试的总时间不能超过这个范围,否则视为超时。

在 C# 中没有这么详细的时间配置,然后这些时间的配置验证比较麻烦,因此这里笔者只给出简单的说明,详细每个时间配置,读者可以参考 《Kafka权威指南(第2版)》 的 41 页。

生产者配置

本节主要参考文章:

https://towardsdatascience.com/10-configs-to-make-your-kafka-producer-more-resilient-ec6903c63e3f

部分图片来源于此文章。

参考资料还包括 《Kafka权威指南(第2版)》。

本节介绍生产者的以下配置:

  • acks
  • bootstrap.servers
  • retries
  • enable.idempotence
  • max.in.flight.requests.per.connection
  • buffer.memory
  • max.block.ms
  • linger.ms
  • batch.size
  • compression.type

查看 ProducerConfig 的源码可以发现,每个属性字段都对应了一个 Kafka 配置项。

完整的生产者配置文档:https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#

接下来笔者对日常开发中比较容易用到的配置项进行一一说明。

acks

C# 中对应的枚举如下:

    public enum Acks
    
        None = 0,
        Leader = 1,
        All = -1
    

使用示例:

        var config = new ProducerConfig
        
            BootstrapServers = "192.168.3.158:9092",
            Acks = Acks.Leader
        ;

默认值是 Acks.Leader

acks 指定了生产者推送消息时,需要多少个分区副本全部收到消息的情况下,才会认为消息写入成功。

在默认情况下,在首领副本收到消息后,即可向客户端回应消息已写入成功,这有助于控制发送的消息的持久性。

下面是 akcs 配置的说明:

  • acks=0: 这意味着该记录将立即添加到套接字缓冲区并被视为已发送,如果网络故障或其它原因消息没有推送到 Broker,那么抱歉,这个消息就会被丢弃;
  • acks=1: 只要生产者收到 Leader 副本的确认,它就会将其视为成功的提交。不过在 Leader 副本发生崩溃的情况下,消息还是有可能丢失的;
  • acks=all: 消息提交后必须等待来自该主题的所有副本的确认,它提供了最强大的可用消息持久性,但是耗时会增加。

在第二章和第三章都提到过这个 leader 和 follwer 的情况。

acks 的默认值为 1,这意味着只要生产者从该主题的 Leader 副本收到 ack,它就会将其视为成功的提交并继续下一条消息。

acks= all 将确保生产者从该主题的所有同步副本中获得 acks 才会认为消息已经提交,它提供了最强的消息持久性,但是它也需要较长的时间,从而导致较高的延迟。

下图是 acks=1acks=all 的区别。

acks=all 也可以写成 acks=-1

【图源:1.5万字长文:从 C# 入门 Kafka(Kafka基础知识)

1.5万字长文:从 C# 入门 Kafka(Kafka .NET 基础)

2 万字长文深入详解 Kafka,从源码到架构全部讲透

java基础入门黑马程序员第二版,万字长文!

2 万字长文深入详解 Kafka,从源码到架构全部讲透

Vue学习之从入门到神经(万字长文 建议收藏)