Python。如何从队列/主题 ActiveMQ 中删除任何消息

Posted

技术标签:

【中文标题】Python。如何从队列/主题 ActiveMQ 中删除任何消息【英文标题】:Python. How to delete any msg from queue/topic ActiveMQ 【发布时间】:2017-06-09 05:01:40 【问题描述】:

我使用 ActiveMQ 在主题 (topic) 和队列 (queue) 中发送消息。

我有两个问题:

    如何delete(取消操作)将消息发送到队列或 话题。 如何removepurge 完全所有队列/主题。

使用通过协议STOMP stompy库组织的AMQ,但没有合适的functions

告诉我应该使用哪些库或解决方案本身。

非常感谢。

【问题讨论】:

如何使用 REST 删除消息? 【参考方案1】:

如何删除我知道但仅理论上的消息(通过 WireShark 分析包流量的结果,工作浏览器在 AMQ 管理页面 ActiveMQ 中的页面 localhost:8161\admin 中),并且我无法以编程方式删除消息(Python)。

理论上我可以调用(我在packege中看到这个在删除时发送到\admin AMQ)deleteMessage()在AMQ中使用param.[id,secret], 在哪里

id - queue\topic中msg的唯一名称 secret - 唯一编号(可能是一些“令牌”),每次更新时都会更改(例如 F5)\admin\browse 页面。我不能说它是什么....

请在我的回复帖子中查看图片:https://ru.***.com/q/618697/228254。

示例测试队列:

id: ID:#######NAME_SERVER######-44458-1485427798954-6:1:1:1:1 秘密:1dbd2916-337a-48cc-bce7-63b00d38ba3

此时,我的想法变体是:获取队列中的所有味精,并确认我需要从队列中删除\删除的味精。

这是我的简单客户端代码:

from stompy import stomp
import json


s = stomp.Stomp(amq_ip, amq_port)

try:
    s.connect(username=amq_user, password=amq_pass)
    s.subscribe('destination': '%s' % amq_queue, 'ack': 'client')
except Exception as e:
    print "ActiveMQ error\n %s" % e

while True:
    try:
        frame = s.receive_frame()
        body = json.loads(frame.body)

        # это сообщение для меня?
        if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
            print "Its for me. I receive it"
            # Это сообщение для меня. Я его приму и обработаю
            s.ack(frame)
        else:
            # Это сообщение предназначено для кого-то другого и мне не подходит
            print "Its not for me"
    except Exception as e:
        print e

还添加了从队列中删除味精的实验测试代码(不起作用,不删除)

# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json


# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"

LOGIN_EXEMPT_URLS = [
    r'admin/'
]

LOGIN_URL = 'url_login'

LOGOUT_REDIRECT_URL = 'url_login'

if __name__ == '__main__':
    user_agent = "curl/7.49.1"
    headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
    addition = 
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "*/*"
    
    try:
        headers.update(addition)
        connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
        manager = activemq_api.AMQManager(connect)
    except Exception as e:
        print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
    else:
        print(u'Соединение успешно установлено')

    try:
        id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
        secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
        print(manager.removeMsgQueue("test", id))
    except Exception as inst:
        print inst


#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json

class Connection:
    def __init__(self, amq_ip, amq_port, broker, header, postfix):
        self.BROKER_NAME = broker
        self.AMQ_IP = amq_ip
        self.AMQ_PORT = amq_port
        self.HEADERS = header
        self.POSTFIX = postfix

class AMQManager():
    def __init__(self, conn):
        self.QUEUES = 
        self.QUEUES_COUNT = None
        self.HEAP_MEMORY_USED = None
        self.MEMORY_PERSENT_USED = None
        self.CONNECTION = conn
        self.update()

    def rmQueue(self, queue_names):
        REUQEST = 
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "operation": "removeQueue(java.lang.String)",
            "arguments": [queue_names]
        
        return json.dumps(REUQEST)

    def queueList(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute":"Queues"
        
        return json.dumps(REUQEST)

    def browseQueueSubscribers(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute": "QueueSubscribers"
        
        return json.dumps(REUQEST)

    def memoryPersentUsed(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute": "MemoryPercentUsage"
        
        return json.dumps(REUQEST)

    def heapMemoryUsed(self):
        REUQEST = 
            "type": "read",
            "mbean": "java.lang:type=Memory",
            "attribute":"HeapMemoryUsage",
            "path":"used"
        
        return json.dumps(REUQEST)

    def request(self, name, param):
        http = urllib3.PoolManager()
        body = ''
        if name == "removeQueue":
            body = self.rmQueue(param["QUEUE_NAME"])
        elif name == "queueList":
            body = self.queueList()
        elif name == "browseQueueSubscribers":
            body = self.browseQueueSubscribers()
        elif name == "memoryPersentUsed":
            body = self.memoryPersentUsed()
        elif name == "heapMemoryUsed":
            body = self.heapMemoryUsed()

        url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
        r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
        return r.data

    def updateQueues(self):
        res = json.loads(self.request("queueList", ))
        # print res
        data = []
        for queue in res["value"]:
            object = 
            queue["objectName"] = queue["objectName"].split(":")[1]
            for key in queue["objectName"].split(","):
                object.update(key.split("=")[0]: key.split("=")[1]) 
            data.append(object)
        self.QUEUES_COUNT = 0
        self.QUEUES = 
        # print data
        for queue in data:
            self.QUEUES.update(queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION))
            self.QUEUES_COUNT += 1

    def updateHeapMem(self):
        self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", ))["value"]

    def updatePersMem(self):
        self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", ))["value"]

Ars, [26.01.17 14:06]
## EXPORTABLE
    def update(self):
        self.updateQueues()
        self.updateHeapMem()
        self.updatePersMem()        
    ## EXPORTABLE
    def getQueues(self):
        self.updateQueues()
        data = []
        for queue in self.QUEUES:
            data.append(self.QUEUES[queue].getInfo())
        return 
            "queues_count": self.QUEUES_COUNT,
            "queues": data
        
    ## EXPORTABLE
    def getQueueInfo(self, name):
        return self.QUEUES[name].getInfo()
    ## EXPORTABLE
    def browseQueue(self, name):
        return self.QUEUES[name].browse()
    ## EXPORTABLE
    def getMessage(self, name, msg_id):
        return self.QUEUES[name].message(msg_id)
    def getAllQueueMessages(self, name):
        return self.QUEUES[name].messages()
    ## EXPORTABLE
    def removeQueue(self, name):
        param = 
            "QUEUE_NAME": name
        
        return json.loads(self.request("removeQueue", param))
    ## EXPORTABLE
    def clearQueue(self, name):
        return self.QUEUES[name].clear()
    # ARS
    def removeMsgQueue(self,nameQueue, id):
        return self.QUEUES[nameQueue].delete_msg(id)



class Queue():
    def __init__(self, q_name, conn):
        # научите обращаться к атрибутам суперкласса!
        self.MESSAGES = []
        self.QUEUE_NAME = q_name
        self.ENQUEUE_COUNT = None
        self.DEQUEUE_COUNT = None
        self.CONSUMER_COUNT = None
        self.QUEUE_SIZE = None
        self.CONNECTION = conn
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()

    def queueEnqueueCount(self):
        # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "EnqueueCount"
        
        return json.dumps(REUQEST)

    def queueDequeueCount(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "DequeueCount"
        
        return json.dumps(REUQEST)

    def queueConsumerCount(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "ConsumerCount"
        
        return json.dumps(REUQEST)

    def queueSize(self):
        REUQEST = 
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "QueueSize"
        
        return json.dumps(REUQEST)

    def browseMessages(self):
        REUQEST = 
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "browse()",
            # "arguments": [""]
        
        return json.dumps(REUQEST)

Ars, [26.01.17 14:06]
def purge(self):
        REUQEST = 
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "purge()"
        
        return json.dumps(REUQEST)
    #ARS
    def deleteMsg(self, ID):
        REUQEST = 
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                     % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "deleteMessage()",
            "arguments": [ID, "11111111-1111-1111-1111-111111111111"]
        
        return json.dumps(REUQEST)

    def request(self, name, param):
        http = urllib3.PoolManager()

        if name == "queueEnqueueCount":
            body = self.queueEnqueueCount()
        elif name == "queueDequeueCount":
            body = self.queueDequeueCount()
        elif name == "queueConsumerCount":
            body = self.queueConsumerCount()
        elif name == "queueSize":
            body = self.queueSize()
        elif name == "browseMessages":
            body = self.browseMessages()
        elif name == "purge":
            body = self.purge()
        elif name == "delete_msg":
            body = self.deleteMsg(param)

        url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
        r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
        return r.data

    def updateEnCount(self):
        try:
            self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", ))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateDeCount(self):
        try:
            self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", ))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateCoCount(self):
        try:
            self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", ))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateQuSize(self):
        try:
            self.QUEUE_SIZE = json.loads(self.request("queueSize", ))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateMessages(self):
        self.MESSAGES = []
        res = json.loads(self.request("browseMessages", ))["value"]
        for msg in res:
            data = 
                "id": msg["JMSMessageID"],
                "data": msg["Text"],
                "timestamp": msg["JMSTimestamp"],
                "priority": msg["JMSPriority"]                
            
            self.MESSAGES.append(data)

    def update(self):
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()
        self.updateMessages()

    def getInfo(self):
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()
        return 
            "queue_name": self.QUEUE_NAME,
            "enqueue_count": self.ENQUEUE_COUNT,
            "dequeue_count": self.DEQUEUE_COUNT,
            "consumer_count": self.CONSUMER_COUNT,
            "queue_size": self.QUEUE_SIZE
        

    def browse(self):
        self.updateMessages()
        data = []
        for msg in self.MESSAGES:
            chunk = 
                "id": msg["id"],
                "timestamp": msg["timestamp"],
                "priority": msg["priority"]
            
            data.append(chunk)
        return data

Ars, [26.01.17 14:06]
def message(self, msg_id):
        self.updateMessages()
        for msg in self.MESSAGES:
            if msg["id"] == msg_id:
                return msg["data"]
    # ARS
    def messages(self):
        self.updateMessages()
        return self.MESSAGES

    # ARS
    def delete_msg(self, id):
        return json.loads(self.request("delete_msg",id))

    def clear(self):
        return json.loads(self.request("purge", ))

【讨论】:

以上是关于Python。如何从队列/主题 ActiveMQ 中删除任何消息的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ之队列和主题发布订阅实例

使用Java编写ActiveMQ的队列模式和主题模式

防止在ActiveMQ中创建临时队列自动主题

ActiveMQ队列主题模式区别

如何在 Mule 4 中通过 ActiveMQ 传递属性

ActiveMQ--模式(队列模式/主题模式)