使用 pika 读取 rabbitmq 消息的 gzipped 有效负载

Posted

技术标签:

【中文标题】使用 pika 读取 rabbitmq 消息的 gzipped 有效负载【英文标题】:Reading gzipped payload of a rabbitmq message with pika 【发布时间】:2021-06-23 09:55:34 【问题描述】:

在谷歌两天后感到绝望。 我正在使用来自 rabbitmq 的 pika 消息。 我知道它是 base64 并且 content_encoding 是 gzip。 我开始将字节正文编码为 utf-8。 现在我仍然需要以某种方式解压缩它。到目前为止的代码如下:

    1 def callback(channel, method, properties, body):
    2    if properties.content_encoding == 'gzip':
    3    
    4        decoded_body  = base64.b64encode(bytes(str(body), 'utf-8'))
    5        #eliminating the leading 'b
    6        decoded_body = decoded_body.decode()
    7
    8        #this snippet is just copied from another solution
    9        buf = io.StringIO(decoded_body)
    10       f = gzip.GzipFile(fileobj=buf)
    11       decompressed_body = f.read()
    12       print(decompressed_body)

以下错误信息出现在第 11 行。

 return self._buffer[read:] + \
 TypeError: can't concat str to bytes

显然 gzip 文件中有一个“\”,我需要将其转换为 b“\”。 但我不知道怎么做。 我是在正确的轨道上还是更容易解决?

非常感谢您的帮助!

这是在 rabbitmq 中打开消息时的有效负载: AFN3oPUfiwgAAAAAAAQAhU89T8MwEP0ryGJrHJ3t2mo8AqqK+BhIqGC8JKc0UuKUxBmqqv+di4pYGFhOenfv495ZvNA0YUOv2JPw4n77ke4Ix1gSRpGIPY1T OwS+qFQzLtqe8oj9kTcatJJgpDaFWnvrPOg0g40Ca1agPADz30P7NdPjA9OzUpFzTktnMyfXWNYSyVmZmUpXCg2ylhX5MI/V8srTcPgc5tCkGzAgTZrnMn+W AFYttALHhqLwYe66RLwRp0xxybkuflrdDfVJ+LO4jafjn3rJDUO24rnDUHf/h/5Kt+MSGCo2FwD+2vZy+QYahuynTgEAAA==

这是我在没有任何编码的情况下打印正文时得到的: b"\x00Sw\xa0\xf5\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x00\x85O=O\xc30\x10\xfd+\xc8bk\x1c\x9d\xed\xdaj\xbc$\xa74R\xe2\x94\xc4\x19\xaa\xaa\xff\x9d\x8b\x8aX\x18XNzw\xef\xe3\xdeY\xbc\xd04aC\xaf\xd8\x93\xf0\xe2~\xfb\x91\xee\x08\xc7X\x12F\x91\x88=\x8dS;\x04\xbe\xa8T3.\xda\x9e\xf2\x88\xfd\x917\x1a\xb4\x92\xa46\x85Z\xeb

【问题讨论】:

能否请您告诉我您是如何解压缩数据的。 在回调函数中我跳过了不必要的字节def callback(channel, method, properties, body): if properties.content_encoding == 'gzip': for count, bite in enumerate(body): if body[count] == 31 and body[count + 1] == 139: break try: decompressed_body = gzip.decompress(body[count:]) print("gzipped: \n".format(decompressed_body.decode())) except: print("unzipped: \n".format(body)) 感谢您的回复。 【参考方案1】:

您的消息中有五个字节位于 gzip 流前面。你需要在1f 8b开始gzip解码。

【讨论】:

你好马克,谢谢你的推荐。我会试试的,然后回来找你。你怎么知道 gzip 流从哪里开始? gzip 流始终以 1f 8b 开头。 亲爱的 Mark,现在我可以开始解压缩,但它导致我出现 EOF 错误:raise EOFError("Compressed file ends before the " 代码是:gzip.decompress(body[5:-1 ]) 是不是因为我一开始就剪掉了字节? 没有。 -1 正在删除最后一个字节。你想要body[5:] 亲爱的马克,非常感谢。现在它起作用了。解压缩消息后,我还可以在不解码 base64 的情况下以纯文本形式读取有效负载。【参考方案2】:

这里有很多不必要的编码和解码。

这行得通吗?

buf = io.BytesIO(body)
f = gzip.GzipFile(fileobj=buf)
decompressed_body = f.read()
print(decompressed_body)

【讨论】:

很遗憾没有。阅读 f.read() 时它说: gzip.BadGzipFile: Not a gzipped file (b'\x00S') @m1ch4 在您的问题中将消息作为文本发布(而不是图像)。 @m1ch4 我怀疑邮件已损坏。如果将二进制文件写入文件,它仍然不是 gzip 压缩文件。但我可能忽略了所有这些编码中的某些内容。 我无法读取存储在 rabbitmq 上的任何 gzip 压缩消息。我不希望一切都被破坏。至少我不希望如此 :) 也许还有另一种方法可以在不进行所有这些我不知道的编码的情况下获得纯文本正文。

以上是关于使用 pika 读取 rabbitmq 消息的 gzipped 有效负载的主要内容,如果未能解决你的问题,请参考以下文章

当我尝试使用 pika (python) 向 RabbitMQ 确认消息时出现错误“未知的传递标签”

如何删除rabbitmq(pika)中的消息

RabbitMQ Python Pika-多个消息的连接处理

RabbitMQ发送消息+python

celery+rabbitmq分布式消息队列的使用

rabbitmq消息订阅发布