如何使用 Apache Kafka 修复 Python2.7 中的“AssertionError: Value must be bytes”错误

Posted

技术标签:

【中文标题】如何使用 Apache Kafka 修复 Python2.7 中的“AssertionError: Value must be bytes”错误【英文标题】:How to fix "AssertionError: Value must be bytes" error in Python2.7 with Apache Kafka 【发布时间】:2017-04-29 23:49:02 【问题描述】:

我是 Apache Kafka 技术的新手。我正在尝试使用 python 2.7 将消息作为 JSON 对象发送到 kafka 主题,但出现“AssertionError:值必须是字节”错误。我可以成功地将消息作为字符串发送,我可以使用 kafka-console-consumer.sh 查看我的消息。我正在使用 apache kafka 2.10-0.8.2.1 版本。我在下面给出我的代码。

from kafka import KafkaProducer
import yaml

producer = KafkaProducer(bootstap_servers="localhost:9092")
msg = yaml.safe_load('"id":1, "name":"oguz"')

producer.send("my-topic", msg)

感谢您的帮助。

【问题讨论】:

【参考方案1】:

yaml.safe_load() 返回一个字典,因此将其转换为字节需要做两件事——通过 JSON 将其序列化为字符串,然后将其编码为 UTF-8 字节。

取自示例in the kafka-python docs,您可以在实例化KafkaProducer 时使用value_serializer 关键字参数:

>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('my-topic', msg)

或者,您可以在调用send() 时手动对其进行序列化:

>>> producer.send('my-topic', json.dumps(msg).encode('utf-8'))

【讨论】:

以上是关于如何使用 Apache Kafka 修复 Python2.7 中的“AssertionError: Value must be bytes”错误的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试以修复 TopicExistsException 间歇性错误?

Spring for Apache Kafka 1.1.0 Milestone 2 发布

重磅!Apache Kafka 3.1.0发布!

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

如何监控 Apache Kafka 指标?

如何开始在开源 Apache Kafka 中运行 kafka 连接器?