群组成员支持的协议与现有成员的协议不兼容

Posted

技术标签:

【中文标题】群组成员支持的协议与现有成员的协议不兼容【英文标题】:The group member's supported protocols are incompatible with those of existing members 【发布时间】:2018-12-31 09:19:10 【问题描述】:

我遇到了一个与 Kafka 相关的问题。

我正在使用我当前的服务 (Producer) 将消息发送到 Kafka 主题 (events)。该服务使用kafka_2.12 v1.0.0,用Java编写。

我正在尝试将它与spark-streaming 的示例项目集成为Consumer 服务(here 使用 kafka_2.11 v0.10.0,用 Scala 编写)

消息从Producer成功发送到Kafka主题。但是,我总是收到以下错误堆栈:

Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)    at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
    at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)     at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)   at scala.App$class.main(App.scala:76)
    at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
    at com.jj.streaming.ItemApp.main(ItemApp.scala)

我不知道根本原因。我该如何解决这个问题?

【问题讨论】:

Tin Nguyen,您找到解决方案了吗?我也面临同样的问题 嗨@Satish,不,我没有。 【参考方案1】:

当我尝试将消费者添加到使用与以前不同的分区分配策略的集群时,会在我的配置中发生这种情况。

例如:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor

混合或默认为:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

【讨论】:

为了确定这是否也是您的问题,请使用 kafka-consumer-groups --bootstrap-server SERVER:9092 --describe --group GROUP --state 在结果中查找 ASSIGNMENT-STRATEGY。【参考方案2】:

正如@john Cairns 和@Iraj Hedyati 所指出的,检查分配给消费者组的分配策略。不同的客户端创建具有不同默认策略的消费者组。例如

当我使用 kafka 命令行客户端 ( java ) 时,它使用“范围”策略创建了一个消费者组。

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   range                Stable          1

而当我使用 go 客户端使用 sarama 库创建消费者组时,它使用的是“循环策略”。

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   roundrobin           Stable          1

因此,如果该组已经存在并且具有不同的策略,则报告 InconsistentGroupProtocolException

【讨论】:

以上是关于群组成员支持的协议与现有成员的协议不兼容的主要内容,如果未能解决你的问题,请参考以下文章

数据读取器与指定的 [X] 不兼容。类型的成员

mysql udf函数怎么调用

JGroups 入门实践(转)

Websocket原理

winsocket UDP “使用了与请求的协议不兼容的地址”错误

数通面试私房菜之组播专题第三期: IGMP各版本间区别