Python API操作RocketMQ

Posted 京城小筑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python API操作RocketMQ相关的知识,希望对你有一定的参考价值。

  1. 背景
    1. 开发背景:公司相关报表需求需要将订单业务数据同步至RocketMQ中,由于需要保证开发的一致性(多个部门协同开发),所以采用读取Hive离线数据的方式通过PythonAPI写入RocketMQ中,便于其他开发同事调用~
    2. 开发环境:
      1. 本地调试
        系统                    Mac
        Python                 3.7.5
        rocketmq               0.4.4 (Python模块)    
        rocketmq-client-python 2.0.0 (Python模块) 
      2. 服务器
        系统                    CentOS Linux release 7.4.1708
        Python                 3.7.3
        rocketmq               0.4.4 (Python模块)    
        rocketmq-client-python 2.0.0 (Python模块) 
  2. Python相关代码
    1. Produce端
      import json
      from rocketmq.client import Producer, Message
      
      #BDP_Process_GroupID:groupid 
      #max_message_size:消息字节长度限制,此处为1M,如果消息过大,可以修改此处参数
      producer = Producer('BDP_Process',max_message_size=1024*1024)
      producer.set_namesrv_addr('xxxx:9876')#ip:prot
      #producer.set_name_server_address('xxxx:9876')#linux和mac代码不通,此处为mac系统书写格式
      #producer.set_namesrv_addr('xxxx:9876,xxxx:9876,xxxx:9876')#集群模式可以这样写
      
      producer.start()
      
      msg_body = {
                  "id":212331,
                  "orderId":"320106004011202105318032",
                  "storeId":"X12N",
                  "riqi":"2020-12-12 22:12:32",
                  "processTime":"1595311611000"
              }
      #将字典封装成消息并修改消息编码,此处不修改编码可能会中文乱码
      data = json.dumps(msg_body,ensure_ascii=False).encode('utf-8')
      msg = Message('YOUR-TOPIC')#消息主题
      msg.set_keys(msg_body.get('orderId'))#消息TAG,用于消息过滤对消息的整体分类
      msg.set_tags('BPD_Process')#Message索引键
      msg.set_body(data)#消息主体
      ret = producer.send_sync(msg)#发送异步消息
      print(ret.status, ret.msg_id, ret.offset)
      producer.shutdown()
    2. PushConsumer端
      import time
      
      from rocketmq.client import PushConsumer, ConsumeStatus
      
      
      def callback(msg):
          print(msg.id, msg.body)
          return ConsumeStatus.CONSUME_SUCCESS
      
      #BDP_Process_GroupID:groupid 
      consumer = PushConsumer('BDP_Process_GroupID')
      consumer.set_name_server_address('xxxx:9876')#ip:prot
      consumer.subscribe('YOUR-TOPIC', callback)#Topic
      consumer.start()
      
      while True:
          time.sleep(3600)
      
      consumer.shutdown()
    3. Produce端读取Hive
      import re
      import sys
      import time
      import json
      import logging
      from pyhive import hive,presto
      #qry为查询hive的SQL
      def Hive(self,qry):
              try:
                  connect = hive.Connection(
                      host=hive_host,
                      port=hive_port,
                      database=hive_database,
                      username=hive_username,
                      password=hive_password,
                      auth='LDAP',#网络通信的方式
                      configuration={"mapreduce.job.queuename":"root.EXTRACT"}#队列
                  )
                  cursor = connect.cursor()
                  cursor.execute(qry)
                  fetchall = cursor.fetchall()
                  result = [list(i) for i in fetchall]
                  return result
      
              except Exception as e:
      
                  if (str(e) == "No result set"):
                      logger.info("该sql无返回值:\\t%s" % qry)
                      logger.info(e)
                  else:
                      logger.warning("Hive Connect ERROR\\n")
                      logger.info("该sql有误:%s" % qry)
                      logger.info(e)
  3. 开发过程中遇到的问题整理
    1. Python开发RocketMQ代码前,需要先安装对应的轻量级client,链接地址https://github.com/apache/rocketmq-client-python
    2. rocketmq-client-python2.0.0目前只支持Mac、Linux系统
    3. 开发前期请先确认rocketMq相关访问的端口(9876、10911(VIP消息端口))、Topic是否创建,以及网络是否可以访问
    4. Linux和Mac代码有不同的地方,例如set_name_server_address和set_namesrv_addr,消费端也有不同的地方,但是因为没有涉及到消费端的代码调试,此处就不一一列出了,这个是很容易被误导的地方。
  4. 总结
    1. 在着手代码开发前,先把网络相关的端口、IP梳理清楚,是否可以直接访问,这样会省很多开发时间
    2. 开发过程中尽量与其他同事沟通好,看双方消息协议、数据类型等是否都一致,都需要确认好,这样会少走很多弯路。
    3. 开发中不反对百度,但是百度很可能解决不了你的问题,你可以在官网看看是否有相关报错的issue,或者本地自己一步步Debug。这样你下次会更快的定位到问题的根本原因。

博客到此就告一段落了,如果代码或者书写哪里有问题,大家可以评论留言,博主看到后会修改代码,也谢谢各位啦~

以上是关于Python API操作RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ安装配置+与java交互API操作+集群搭建+高级特性

关于RocketMQ的基础API操作——这一篇就够了

关于RocketMQ的基础API操作——这一篇就够了

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Apache RocketMQ:使用api实现对Topic的基本操作

linux 运行python 操作rocketmq