C# 无法使用 Kafka 主题的消息?
Posted
技术标签:
【中文标题】C# 无法使用 Kafka 主题的消息?【英文标题】:C# Unable to consume message on Kafka topic? 【发布时间】:2019-01-11 09:46:17 【问题描述】:我一直在查看 Confluent.Kafka 客户端 (https://github.com/confluentinc/confluent-kafka-dotnet/) 的几个示例,虽然我可以成功让生产者将消息推送到 Kafka,但我无法将任何消息拉回消费者.
通过 UI 我可以看到主题已创建并且消息正在进入该主题(当前有 10 个分区和 3 条消息),但我的消费者总是报告“分区结束”,没有任何消息的消费( 3 留在主题上,“OnMessage”永远不会触发)。
但是消费者肯定在访问主题,并且可以在其中一个分区上看到 3 条消息:
分区结束:dotnet-test-topic [6] @3
它只是不消耗消息并触发 OnMessage()。有什么想法吗?
var conf = new Dictionary<string, object>
"group.id", Guid.NewGuid().ToString() ,
"bootstrap.servers", "mykafkacluster:9094" ,
"sasl.mechanisms", "SCRAM-SHA-256" ,
"security.protocol", "SASL_SSL" ,
"sasl.username", "myuser" ,
"sasl.password", "mypass"
;
using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
.ContinueWith(result =>
var msg = result.Result;
if (msg.Error.Code != ErrorCode.NoError)
Console.WriteLine($"failed to deliver message: msg.Error.Reason");
else
Console.WriteLine($"delivered to: result.Result.TopicPartitionOffset");
);
producer.Flush(TimeSpan.FromSeconds(10));
using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
consumer.Subscribe("dotnet-test-topic");
consumer.OnConsumeError += (_, err)
=> Console.WriteLine($"consume error: err.Error.Reason");
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"consumed: msg.Value");
consumer.OnPartitionEOF += (_, tpo)
=> Console.WriteLine($"end of partition: tpo");
while (true)
consumer.Poll(TimeSpan.FromMilliseconds(100));
【问题讨论】:
【参考方案1】:如果没有提供以下配置,似乎 OnMessage 事件不会触发:
"auto.offset.reset", "smallest"
添加后,我能够阅读有关该主题的消息。
【讨论】:
以上是关于C# 无法使用 Kafka 主题的消息?的主要内容,如果未能解决你的问题,请参考以下文章
即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]