连接kafka并发送数据
Posted 测试—LDY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了连接kafka并发送数据相关的知识,希望对你有一定的参考价值。
1 import json 2 3 from kafka import KafkaProducer 4 5 """ 6 安装模块:pip install kafka-python 7 8 """ 9 class handle_kafka: 10 11 def __init__(self, server_ip=\'ip:port\'): 12 self.producer = KafkaProducer(bootstrap_servers=server_ip) 13 14 def send_message(self, topic, msg_dict): 15 """ 16 :param topic: 埋点打入的topic名称 17 :param msg_dict: 埋点数据 18 """ 19 # 最新版本的库必须使用 bytes 类型进行数据的发送,所以使用 encode()方法进行编码 20 msg_dict = json.dumps(msg_dict).encode() 21 self.producer.send(topic=topic, value=msg_dict) 22 23 def close(self): 24 self.producer.close() 25 26 27 if __name__ == \'__main__\': 28 hk = handle_kafka() 29 msg_dict = { 30 "sleep_time": 10, 31 "db_config": { 32 "database": "test_1", 33 "host": "xxxx", 34 "user": "root", 35 "password": "root" 36 }, 37 "table": "msg", 38 "msg": "Hello World" 39 } 40 hk.send_message(\'qa_wisdom_kp_diag\', msg_dict) 41 hk.close()
以上是关于连接kafka并发送数据的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?