无法在 Pulsar 中创建生产者

Posted

技术标签:

【中文标题】无法在 Pulsar 中创建生产者【英文标题】:Cannot create a producer in Pulsar 【发布时间】:2021-12-10 02:29:09 【问题描述】:

我目前正在本地 Minikube 实例上运行 Pulsar。我正在尝试连接到实例并使用 Python 创建生产者。安装/启动 Pulsar 后,我得到以下连接信息:

|-----------|-------------------|-------------|---------------------------|
| NAMESPACE |       NAME        | TARGET PORT |            URL            |
|-----------|-------------------|-------------|---------------------------|
| pulsar    | pulsar-mini-proxy | http/80     | http://192.168.49.2:31183 |
|           |                   | pulsar/6650 | http://192.168.49.2:30841 |
|-----------|-------------------|-------------|---------------------------|
????  Starting tunnel for service pulsar-mini-proxy.
|-----------|-------------------|-------------|------------------------|
| NAMESPACE |       NAME        | TARGET PORT |          URL           |
|-----------|-------------------|-------------|------------------------|
| pulsar    | pulsar-mini-proxy |             | http://127.0.0.1:50069 |
|           |                   |             | http://127.0.0.1:50070 |
|-----------|-------------------|-------------|------------------------|

我已尝试在我的代码中使用上述所有 URL 进行连接。无论我使用哪个 URL,在尝试创建生产者时都会收到连接被拒绝或超时。请注意,失败的不是实际的连接步骤,而是失败的 producer = client... 步骤。我的代码/输出如下:

代码(http://192.168.49.2:31183):

import pulsar

# create a producer
client = pulsar.Client('http://192.168.49.2:31183')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

输出(超时):

2021-10-24 00:19:12.908 INFO  [0x70000ca06000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut

代码(http://192.168.49.2:30841):

import pulsar

# create a producer
client = pulsar.Client('http://192.168.49.2:30841')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

输出(超时):

2021-10-24 00:21:58.313 INFO  [0x7000108b5000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut

代码(http://127.0.0.1:50069):

import pulsar

# create a producer
client = pulsar.Client('http://127.0.0.1:50069')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

输出(连接错误):

2021-10-24 00:23:54.336 INFO  [0x7000103da000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:23:54.337 ERROR [0x7000103da000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:23:54.337 ERROR [0x7000103da000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError

代码(http://127.0.0.1:50070):

import pulsar

# create a producer
client = pulsar.Client('http://127.0.0.1:50070')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)

输出(连接错误):

2021-10-24 00:27:00.336 INFO  [0x700011650000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:27:00.337 ERROR [0x700011650000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:27:00.337 ERROR [0x700011650000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError

【问题讨论】:

【参考方案1】:

我对 Pulsar 代理没有太多经验,我也不确定 helm chart 是如何设置它的,但通常你会在端口 6650(6651 用于 TLS)上连接到 Pulsar,或者管理服务,端口 8080(8443 用于 TLS。) 参考:https://pulsar.apache.org/docs/en/reference-configuration/

您使用的那些端口号看起来像是 Kubernetes 部署的内部端口。通常,要连接到 Kubernetes 内部的服务,您需要创建一个 NodePort 或 LoadBalancer。 (在生产中,通常您会使用 Ingress 来设置 LoadBalancer。) 使用 minikube,有一些细微的差别,因为您需要使用 minikube 隧道来公开 LoadBalancer。这里解释一下:https://minikube.sigs.k8s.io/docs/handbook/accessing/

我建议查看由 helm chart 创建的对象并熟悉它们的工作原理,尤其是那些涉及网络的对象,并确保您了解它们实际为您配置的内容。

【讨论】:

【参考方案2】:

事实证明,Pulsar 的 helm 安装确实创建了一个 LoadBalancer 对象。 LoadBalancer 服务对象称为pulsar-mini-proxy。到达LoadBalancer服务的正确地址是:

pulsar://127.0.0.1:6650

我需要做的唯一其他步骤是:

minikube tunnel

【讨论】:

以上是关于无法在 Pulsar 中创建生产者的主要内容,如果未能解决你的问题,请参考以下文章

Pulsar 消息概念1

Spring Boot整合Pulsar生产和消费消息 简单示例代码

Spring Boot整合Pulsar生产和消费消息 简单示例代码

Pulsar 中的意外积压大小

“Where query”在生产环境中创建错误的 SQL

06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费