无法在 Pulsar 中创建生产者
Posted
技术标签:
【中文标题】无法在 Pulsar 中创建生产者【英文标题】:Cannot create a producer in Pulsar 【发布时间】:2021-12-10 02:29:09 【问题描述】:我目前正在本地 Minikube 实例上运行 Pulsar。我正在尝试连接到实例并使用 Python 创建生产者。安装/启动 Pulsar 后,我得到以下连接信息:
|-----------|-------------------|-------------|---------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|-------------------|-------------|---------------------------|
| pulsar | pulsar-mini-proxy | http/80 | http://192.168.49.2:31183 |
| | | pulsar/6650 | http://192.168.49.2:30841 |
|-----------|-------------------|-------------|---------------------------|
???? Starting tunnel for service pulsar-mini-proxy.
|-----------|-------------------|-------------|------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|-------------------|-------------|------------------------|
| pulsar | pulsar-mini-proxy | | http://127.0.0.1:50069 |
| | | | http://127.0.0.1:50070 |
|-----------|-------------------|-------------|------------------------|
我已尝试在我的代码中使用上述所有 URL 进行连接。无论我使用哪个 URL,在尝试创建生产者时都会收到连接被拒绝或超时。请注意,失败的不是实际的连接步骤,而是失败的 producer = client...
步骤。我的代码/输出如下:
代码(http://192.168.49.2:31183):
import pulsar
# create a producer
client = pulsar.Client('http://192.168.49.2:31183')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)
输出(超时):
2021-10-24 00:19:12.908 INFO [0x70000ca06000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:31183/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:19:42.909 ERROR [0x70000ca06000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut
代码(http://192.168.49.2:30841):
import pulsar
# create a producer
client = pulsar.Client('http://192.168.49.2:30841')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)
输出(超时):
2021-10-24 00:21:58.313 INFO [0x7000108b5000] HTTPLookupService:237 | Curl Lookup Request sent for http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] HTTPLookupService:270 | Response failed for url http://192.168.49.2:30841/admin/v2/persistent/public/default/my-topic/partitions. Error Code 28
2021-10-24 00:22:28.314 ERROR [0x7000108b5000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- TimeOut
代码(http://127.0.0.1:50069):
import pulsar
# create a producer
client = pulsar.Client('http://127.0.0.1:50069')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)
输出(连接错误):
2021-10-24 00:23:54.336 INFO [0x7000103da000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:23:54.337 ERROR [0x7000103da000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50069/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:23:54.337 ERROR [0x7000103da000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError
代码(http://127.0.0.1:50070):
import pulsar
# create a producer
client = pulsar.Client('http://127.0.0.1:50070')
producer = client.create_producer('persistent://public/default/my-topic', block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=10)
输出(连接错误):
2021-10-24 00:27:00.336 INFO [0x700011650000] HTTPLookupService:237 | Curl Lookup Request sent for http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions
2021-10-24 00:27:00.337 ERROR [0x700011650000] HTTPLookupService:262 | Response failed for url http://127.0.0.1:50070/admin/v2/persistent/public/default/my-topic/partitions. Error Code 7
2021-10-24 00:27:00.337 ERROR [0x700011650000] ClientImpl:181 | Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/my-topic -- ConnectError
【问题讨论】:
【参考方案1】:我对 Pulsar 代理没有太多经验,我也不确定 helm chart 是如何设置它的,但通常你会在端口 6650(6651 用于 TLS)上连接到 Pulsar,或者管理服务,端口 8080(8443 用于 TLS。) 参考:https://pulsar.apache.org/docs/en/reference-configuration/
您使用的那些端口号看起来像是 Kubernetes 部署的内部端口。通常,要连接到 Kubernetes 内部的服务,您需要创建一个 NodePort 或 LoadBalancer。 (在生产中,通常您会使用 Ingress 来设置 LoadBalancer。) 使用 minikube,有一些细微的差别,因为您需要使用 minikube 隧道来公开 LoadBalancer。这里解释一下:https://minikube.sigs.k8s.io/docs/handbook/accessing/
我建议查看由 helm chart 创建的对象并熟悉它们的工作原理,尤其是那些涉及网络的对象,并确保您了解它们实际为您配置的内容。
【讨论】:
【参考方案2】:事实证明,Pulsar 的 helm 安装确实创建了一个 LoadBalancer
对象。 LoadBalancer
服务对象称为pulsar-mini-proxy
。到达LoadBalancer
服务的正确地址是:
pulsar://127.0.0.1:6650
我需要做的唯一其他步骤是:
minikube tunnel
【讨论】:
以上是关于无法在 Pulsar 中创建生产者的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot整合Pulsar生产和消费消息 简单示例代码
Spring Boot整合Pulsar生产和消费消息 简单示例代码
06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费