无法通过 websockets 发布到 aws mqtt 代理
Posted
技术标签:
【中文标题】无法通过 websockets 发布到 aws mqtt 代理【英文标题】:Cant publish to aws mqtt broker over websockets 【发布时间】:2021-10-08 18:23:36 【问题描述】:我正在关注aws api 通过 websockets 连接到 mqtt。以下是我的代码:
credentials_provider = AwsCredentialsProvider.new_static(
access_key_id = auth_response_dictionary['user']['accessKeyId'],
secret_access_key = auth_response_dictionary['user']['secretKey'],
session_token = auth_response_dictionary['user']['sessionToken']
)
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=auth_response_dictionary['user']['iotEndpoint'],
region=auth_response_dictionary['user']['region'],
credentials_provider=credentials_provider,
client_bootstrap=client_bootstrap,
client_id=clientId
)
print("Connecting to aws")
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
print('connect_future ' + str(connect_future))
x= connect_future.result()
print('connect_future ' + str(x))
print("Connected!")
future, packet_id = mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
future, packet_id = mqtt_connection.publish(topic='test/po', payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
print('future ' + str(future))
print('future ' + str(packet_id))
print('Publish End')
我在连接和发布时没有收到任何错误,但是当我在“测试”部分订阅该主题时,我没有在我的 aws mqtt 代理上收到任何消息。
我认为我在 credentials_provider
或 client_bootstrap
或两者中都配置了错误,但不知道是什么。
这是打印出来的日志
Connecting to aws
connect_future<Future at 0x7f605f942af0 state=pending>
connect_future'session_present': False
Connected!
future <Future at 0x7f605f8e54f0 state=pending>
future 3
Publish End
有人可以帮忙吗?
【问题讨论】:
【参考方案1】:mqtt_connection.subscribe(...)
用于订阅 AWS IoT 消息的 MQTT 主题,我在您的代码中的任何地方都看不到。
mqtt_connection.subscribe
如下调用,包含主题名称、服务质量级别和回调。
received_count = 0
received_all_event = threading.Event()
...
topic='test/po'
print("Subscribing to topic ''...".format(topic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
print("Subscribed with ".format(str(subscribe_result['qos'])))
on_message_received
可以是这样的:
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print("Received message from topic '': ".format(topic, payload))
global received_count
received_count += 1
# Number of messages to wait for
if received_count = 10:
received_all_event.set()
然后在你的 main 方法中,你可以等到收到 10 条消息:
# Wait for all messages to be received.
# This waits forever if count was set to 0.
if not received_all_event.is_set():
print("Waiting for all messages to be received...")
received_all_event.wait()
print(" message(s) received.".format(received_count))
sample code provided by AWS 真的很不错,我建议你看看。
【讨论】:
我已在 aws iot 控制台的“测试”部分订阅了我发布的主题。问题是我在那里没有收到任何东西 在哪里?您需要订阅该主题 - 无论test
在哪里,都不是您的问题以上是关于无法通过 websockets 发布到 aws mqtt 代理的主要内容,如果未能解决你的问题,请参考以下文章
无法使用 Forge、nginx 和 AWS 连接到 Laravel Websocket
如何通过 CloudFront 访问 aws websocket
AWS EB:WebSocket 握手期间出错:意外响应代码:400
通过 HTTP 集成从 AWS Api Gateway 中的 websocket 提取 websocket 连接 ID 和其他详细信息