Python API操作RocketMQ
Posted 京城小筑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python API操作RocketMQ相关的知识,希望对你有一定的参考价值。
- 背景:
- 开发背景:公司相关报表需求需要将订单业务数据同步至RocketMQ中,由于需要保证开发的一致性(多个部门协同开发),所以采用读取Hive离线数据的方式通过PythonAPI写入RocketMQ中,便于其他开发同事调用~
- 开发环境:
- 本地调试
系统 Mac Python 3.7.5 rocketmq 0.4.4 (Python模块) rocketmq-client-python 2.0.0 (Python模块)
- 服务器
系统 CentOS Linux release 7.4.1708 Python 3.7.3 rocketmq 0.4.4 (Python模块) rocketmq-client-python 2.0.0 (Python模块)
- 本地调试
- Python相关代码
- Produce端
import json from rocketmq.client import Producer, Message #BDP_Process_GroupID:groupid #max_message_size:消息字节长度限制,此处为1M,如果消息过大,可以修改此处参数 producer = Producer('BDP_Process',max_message_size=1024*1024) producer.set_namesrv_addr('xxxx:9876')#ip:prot #producer.set_name_server_address('xxxx:9876')#linux和mac代码不通,此处为mac系统书写格式 #producer.set_namesrv_addr('xxxx:9876,xxxx:9876,xxxx:9876')#集群模式可以这样写 producer.start() msg_body = { "id":212331, "orderId":"320106004011202105318032", "storeId":"X12N", "riqi":"2020-12-12 22:12:32", "processTime":"1595311611000" } #将字典封装成消息并修改消息编码,此处不修改编码可能会中文乱码 data = json.dumps(msg_body,ensure_ascii=False).encode('utf-8') msg = Message('YOUR-TOPIC')#消息主题 msg.set_keys(msg_body.get('orderId'))#消息TAG,用于消息过滤对消息的整体分类 msg.set_tags('BPD_Process')#Message索引键 msg.set_body(data)#消息主体 ret = producer.send_sync(msg)#发送异步消息 print(ret.status, ret.msg_id, ret.offset) producer.shutdown()
- PushConsumer端
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS #BDP_Process_GroupID:groupid consumer = PushConsumer('BDP_Process_GroupID') consumer.set_name_server_address('xxxx:9876')#ip:prot consumer.subscribe('YOUR-TOPIC', callback)#Topic consumer.start() while True: time.sleep(3600) consumer.shutdown()
- Produce端读取Hive
import re import sys import time import json import logging from pyhive import hive,presto #qry为查询hive的SQL def Hive(self,qry): try: connect = hive.Connection( host=hive_host, port=hive_port, database=hive_database, username=hive_username, password=hive_password, auth='LDAP',#网络通信的方式 configuration={"mapreduce.job.queuename":"root.EXTRACT"}#队列 ) cursor = connect.cursor() cursor.execute(qry) fetchall = cursor.fetchall() result = [list(i) for i in fetchall] return result except Exception as e: if (str(e) == "No result set"): logger.info("该sql无返回值:\\t%s" % qry) logger.info(e) else: logger.warning("Hive Connect ERROR\\n") logger.info("该sql有误:%s" % qry) logger.info(e)
- Produce端
- 开发过程中遇到的问题整理
- Python开发RocketMQ代码前,需要先安装对应的轻量级client,链接地址https://github.com/apache/rocketmq-client-python
- rocketmq-client-python2.0.0目前只支持Mac、Linux系统
- 开发前期请先确认rocketMq相关访问的端口(9876、10911(VIP消息端口))、Topic是否创建,以及网络是否可以访问
- Linux和Mac代码有不同的地方,例如set_name_server_address和set_namesrv_addr,消费端也有不同的地方,但是因为没有涉及到消费端的代码调试,此处就不一一列出了,这个是很容易被误导的地方。
- 总结
- 在着手代码开发前,先把网络相关的端口、IP梳理清楚,是否可以直接访问,这样会省很多开发时间
- 开发过程中尽量与其他同事沟通好,看双方消息协议、数据类型等是否都一致,都需要确认好,这样会少走很多弯路。
- 开发中不反对百度,但是百度很可能解决不了你的问题,你可以在官网看看是否有相关报错的issue,或者本地自己一步步Debug。这样你下次会更快的定位到问题的根本原因。
博客到此就告一段落了,如果代码或者书写哪里有问题,大家可以评论留言,博主看到后会修改代码,也谢谢各位啦~
以上是关于Python API操作RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章
rocketMQ安装配置+与java交互API操作+集群搭建+高级特性
Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段