Kafka学习征途:.NET Core操作Kafka
Posted dotNET跨平台
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka学习征途:.NET Core操作Kafka相关的知识,希望对你有一定的参考价值。
【Kafka】| 总结/Edison Zhou
1可用的Kafka .NET客户端
作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。
因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:
接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。
2基于Confluent.Kafka的Sample
要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。
安装相关组件
在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:
PM>Install-Package Confluent.Kafka
编写KafkaService
编写IKafkaService接口:
namespace EDT.Kafka.Core
public interface IKafkaService
Task PublishAsync<T>(string topicName, T message) where T : class;
Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;
编写KafkaService实现类:
namespace EDT.Kafka.Core
public class KafkaService : IKafkaService
public static string KAFKA_SERVERS = "127.0.0.1:9091";
public async Task PublishAsync<T>(string topicName, T message) where T : class
var config = new ProducerConfig
BootstrapServers = KAFKA_SERVERS,
BatchSize = 16384, // 修改批次大小为16K
LingerMs = 20 // 修改等待时间为20ms
;
using (var producer = new ProducerBuilder<string, string>(config).Build())
await producer.ProduceAsync(topicName, new Message<string, string>
Key = Guid.NewGuid().ToString(),
Value = JsonConvert.SerializeObject(message)
); ;
public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class
var config = new ConsumerConfig
BootstrapServers = KAFKA_SERVERS,
GroupId = "Consumer",
EnableAutoCommit = false, // 禁止AutoCommit
Acks = Acks.Leader, // 假设只需要Leader响应即可
AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起
;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
consumer.Subscribe(topics);
try
while (true)
try
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed message 'consumeResult.Message?.Value' at: 'consumeResult?.TopicPartitionOffset'.");
if (consumeResult.IsPartitionEOF)
Console.WriteLine($" - DateTime.Now:yyyy-MM-dd HH:mm:ss 已经到底了:consumeResult.Topic, partition consumeResult.Partition, offset consumeResult.Offset.");
continue;
T messageResult = null;
try
messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);
catch (Exception ex)
var errorMessage = $" - DateTime.Now:yyyy-MM-dd HH:mm:ss【Exception 消息反序列化失败,Value:consumeResult.Message.Value】 :ex.StackTrace?.ToString()";
Console.WriteLine(errorMessage);
messageResult = null;
if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
messageFunc(messageResult);
try
consumer.Commit(consumeResult);
catch (KafkaException e)
Console.WriteLine(e.Message);
catch (ConsumeException e)
Console.WriteLine($"Consume error: e.Error.Reason");
catch (OperationCanceledException)
Console.WriteLine("Closing consumer.");
consumer.Close();
await Task.CompletedTask;
为了方便后续的演示,在此项目中再创建一个类 EventData:
public class EventData
public string TopicName get; set;
public string Message get; set;
public DateTime EventTime get; set;
编写Producer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:
namespace EDT.Kafka.Demo.Producer
public class Program
static async Task Main(string[] args)
KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";
var kafkaService = new KafkaService();
for (int i = 0; i < 50; i++)
var eventData = new EventData
TopicName = "testtopic",
Message = $"This is a message from Producer, Index : i + 1",
EventTime = DateTime.Now
;
await kafkaService.PublishAsync(eventData.TopicName, eventData);
Console.WriteLine("Publish Done!");
Console.ReadKey();
编写Consumer
新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:
namespace EDT.Kafka.Demo.Consumer
public class Program
static async Task Main(string[] args)
KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";
var kafkaService = new KafkaService();
var topics = new List<string> "testtopic" ;
await kafkaService.SubscribeAsync<EventData>(topics, (eventData) =>
Console.WriteLine($" - eventData.EventTime: yyyy-MM-dd HH:mm:ss 【eventData.TopicName】- > 已处理");
);
测试Pub/Sub效果
将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。
3基于CAP项目的Sample
模拟场景说明
假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。
Catalog API
新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:
PM>Install Package DotNetCore.CAP
PM>Install Package DotNetCore.CAP.MongoDB
PM>Install Package DotNetCore.CAP.Kafka
在Startup中的ConfigureServices方法中注入CAP:
public void ConfigureServices(IServiceCollection services)
......
services.AddCap(x =>
x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");
x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");
);
新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:
namespace EDT.Demo.Catalog.API.Controllers
[ApiController]
[Route("[controller]")]
public class ProductController : ControllerBase
private static readonly IList<Product> Products = new List<Product>
new Product Id = "0001", Name = "电动牙刷A", Price = 99.90M, Introduction = "暂无介绍" ,
new Product Id = "0002", Name = "电动牙刷B", Price = 199.90M, Introduction = "暂无介绍" ,
new Product Id = "0003", Name = "洗衣机A", Price = 2999.90M, Introduction = "暂无介绍" ,
new Product Id = "0004", Name = "洗衣机B", Price = 3999.90M, Introduction = "暂无介绍" ,
new Product Id = "0005", Name = "电视机A", Price = 1899.90M, Introduction = "暂无介绍" ,
;
private readonly ICapPublisher _publisher;
private readonly IMapper _mapper;
public ProductController(ICapPublisher publisher, IMapper mapper)
_publisher = publisher;
_mapper = mapper;
[HttpGet]
public IList<ProductDTO> Get()
return _mapper.Map<IList<ProductDTO>>(Products); ;
[HttpPut]
public async Task<IActionResult> UpdatePrice(string id, decimal newPrice)
// 业务代码
var product = Products.FirstOrDefault(p => p.Id == id);
product.Price = newPrice;
// 发布消息
await _publisher.PublishAsync("ProductPriceChanged",
new ProductDTO Id = product.Id, Name = product.Name, Price = product.Price);
return NoContent();
Basket API
参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。
新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。
namespace EDT.Demo.Basket.API.Controllers
[ApiController]
[Route("[controller]")]
public class BasketController : ControllerBase
private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>
new MyBasketDTO UserId = "U001", Catalogs = new List<Catalog>
new Catalog Product = new ProductDTO Id = "0001", Name = "电动牙刷A", Price = 99.90M , Count = 2 ,
new Catalog Product = new ProductDTO Id = "0005", Name = "电视机A", Price = 1899.90M , Count = 1 ,
,
new MyBasketDTO UserId = "U002", Catalogs = new List<Catalog>
new Catalog Product = new ProductDTO Id = "0002", Name = "电动牙刷B", Price = 199.90M , Count = 2 ,
new Catalog Product = new ProductDTO Id = "0004", Name = "洗衣机B", Price = 3999.90M , Count = 1 ,
;
[HttpGet]
public IList<MyBasketDTO> Get()
return Baskets;
[NonAction]
[CapSubscribe("ProductPriceChanged")]
public async Task RefreshBasketProductPrice(ProductDTO productDTO)
if (productDTO == null)
return;
foreach (var basket in Baskets)
foreach (var catalog in basket.Catalogs)
if (catalog.Product.Id == productDTO.Id)
catalog.Product.Price = productDTO.Price;
break;
await Task.CompletedTask;
测试效果
同时启动Catalog API 和 Basket API两个项目。
首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。
然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。
最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。
End总结
本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。
参考资料
阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741
麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html
Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html
极客时间,胡夕《Kafka核心技术与实战》
B站,尚硅谷《Kafka 3.x入门到精通教程》
年终总结:Edison的2020年终总结
数字化转型:我在传统企业做数字化转型
C#刷题:C#刷剑指Offer算法题系列文章目录
.NET面试:.NET开发面试知识体系
.NET大会:2020年中国.NET开发者大会PDF资料
以上是关于Kafka学习征途:.NET Core操作Kafka的主要内容,如果未能解决你的问题,请参考以下文章