pubSub Source:两次收到相同的消息

Posted

技术标签:

【中文标题】pubSub Source:两次收到相同的消息【英文标题】:pubSubSource: Receving the same message twice 【发布时间】:2021-05-26 15:04:02 【问题描述】:

说明

我在 Kafka Connect 分布式模式下有一个 pubSubSource 连接器,它只是从 PubSub 订阅中读取数据并写入 Kafka 主题。问题是,即使我将一条消息发布到 GCP PubSub,我也会在我的 Kafka 主题中两次写入这条消息。

如何重现

部署 Kafka 和 Kafka 连接

使用以下pubSubSource 配置创建连接器:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
  "name": "pubSubSource",
  "config": 
    "connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "tasks.max":"1",
    "cps.subscription":"pubsub-test-sub",
    "kafka.topic":"kafka-sub-topic",
    "cps.project":"test-project123",
    "gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json"
  
'

以下是 Kafka 连接配置:

"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable": "false"
"value.converter.schemas.enable": "false"
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"

使用以下命令向 PubSub 主题发布消息:

gcloud pubsub topics publish test-topic --message='"someKey":"someValue"'

从目标 Kafka 主题读取消息:

/usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning

# Output
"someKey":"someValue"
"someKey":"someValue"

为什么会这样,是不是我做错了什么?

【问题讨论】:

【参考方案1】:

我在https://cloud.google.com/pubsub/docs/faq 找到了以下信息,看来您也面临同样的问题。能否尝试生成大消息,看看结果是否相同?

链接中的详细信息:

为什么有太多重复的消息? Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在用于拉取订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和用于推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找提升的过期或 webhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传递。

另一种可能性是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行过 Acknowledge 调用;或者推送端点永远不会响应或响应错误。

如何检测重复消息? Pub/Sub 为每条消息分配一个唯一的 message_id,可用于检测订阅者收到的重复消息。但是,这将不允许您检测由对同一数据的多个发布请求产生的重复项。检测这些将需要发布者提供唯一的消息标识符。有关进一步讨论,请参阅 Pub/Sub I/O。

【讨论】:

以上是关于pubSub Source:两次收到相同的消息的主要内容,如果未能解决你的问题,请参考以下文章

JSON 数据作为 PubSub 的消息

使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”

PubSub 不确认消息

在 MessageReciever 之外确认 pubSub 消息

在存在接收已发布项目两次 ejabberd

使用 smack 读取 pubsub 中的传入消息数据包