Python。如何从队列/主题 ActiveMQ 中删除任何消息
Posted
技术标签:
【中文标题】Python。如何从队列/主题 ActiveMQ 中删除任何消息【英文标题】:Python. How to delete any msg from queue/topic ActiveMQ 【发布时间】:2017-06-09 05:01:40 【问题描述】:我使用 ActiveMQ 在主题 (topic
) 和队列 (queue
) 中发送消息。
我有两个问题:
-
如何
delete
(取消操作)将消息发送到队列或
话题。
如何remove
和purge
完全所有队列/主题。
使用通过协议STOMP stompy库组织的AMQ,但没有合适的functions
告诉我应该使用哪些库或解决方案本身。
非常感谢。
【问题讨论】:
如何使用 REST 删除消息? 【参考方案1】:如何删除我知道但仅理论上的消息(通过 WireShark 分析包流量的结果,工作浏览器在 AMQ 管理页面 ActiveMQ 中的页面 localhost:8161\admin 中),并且我无法以编程方式删除消息(Python)。
理论上我可以调用(我在packege中看到这个在删除时发送到\admin AMQ)deleteMessage()
在AMQ中使用param.[id,secret]
,
在哪里
id
- queue\topic中msg的唯一名称
secret
- 唯一编号(可能是一些“令牌”),每次更新时都会更改(例如 F5)\admin\browse 页面。我不能说它是什么....
请在我的回复帖子中查看图片:https://ru.***.com/q/618697/228254。
示例测试队列:
id: ID:#######NAME_SERVER######-44458-1485427798954-6:1:1:1:1 秘密:1dbd2916-337a-48cc-bce7-63b00d38ba3此时,我的想法变体是:获取队列中的所有味精,并确认我需要从队列中删除\删除的味精。
这是我的简单客户端代码:
from stompy import stomp
import json
s = stomp.Stomp(amq_ip, amq_port)
try:
s.connect(username=amq_user, password=amq_pass)
s.subscribe('destination': '%s' % amq_queue, 'ack': 'client')
except Exception as e:
print "ActiveMQ error\n %s" % e
while True:
try:
frame = s.receive_frame()
body = json.loads(frame.body)
# это сообщение для меня?
if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
print "Its for me. I receive it"
# Это сообщение для меня. Я его приму и обработаю
s.ack(frame)
else:
# Это сообщение предназначено для кого-то другого и мне не подходит
print "Its not for me"
except Exception as e:
print e
还添加了从队列中删除味精的实验测试代码(不起作用,不删除)
# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json
# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"
LOGIN_EXEMPT_URLS = [
r'admin/'
]
LOGIN_URL = 'url_login'
LOGOUT_REDIRECT_URL = 'url_login'
if __name__ == '__main__':
user_agent = "curl/7.49.1"
headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
addition =
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*"
try:
headers.update(addition)
connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
manager = activemq_api.AMQManager(connect)
except Exception as e:
print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
else:
print(u'Соединение успешно установлено')
try:
id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
print(manager.removeMsgQueue("test", id))
except Exception as inst:
print inst
#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json
class Connection:
def __init__(self, amq_ip, amq_port, broker, header, postfix):
self.BROKER_NAME = broker
self.AMQ_IP = amq_ip
self.AMQ_PORT = amq_port
self.HEADERS = header
self.POSTFIX = postfix
class AMQManager():
def __init__(self, conn):
self.QUEUES =
self.QUEUES_COUNT = None
self.HEAP_MEMORY_USED = None
self.MEMORY_PERSENT_USED = None
self.CONNECTION = conn
self.update()
def rmQueue(self, queue_names):
REUQEST =
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"operation": "removeQueue(java.lang.String)",
"arguments": [queue_names]
return json.dumps(REUQEST)
def queueList(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute":"Queues"
return json.dumps(REUQEST)
def browseQueueSubscribers(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "QueueSubscribers"
return json.dumps(REUQEST)
def memoryPersentUsed(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
"attribute": "MemoryPercentUsage"
return json.dumps(REUQEST)
def heapMemoryUsed(self):
REUQEST =
"type": "read",
"mbean": "java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"path":"used"
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
body = ''
if name == "removeQueue":
body = self.rmQueue(param["QUEUE_NAME"])
elif name == "queueList":
body = self.queueList()
elif name == "browseQueueSubscribers":
body = self.browseQueueSubscribers()
elif name == "memoryPersentUsed":
body = self.memoryPersentUsed()
elif name == "heapMemoryUsed":
body = self.heapMemoryUsed()
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateQueues(self):
res = json.loads(self.request("queueList", ))
# print res
data = []
for queue in res["value"]:
object =
queue["objectName"] = queue["objectName"].split(":")[1]
for key in queue["objectName"].split(","):
object.update(key.split("=")[0]: key.split("=")[1])
data.append(object)
self.QUEUES_COUNT = 0
self.QUEUES =
# print data
for queue in data:
self.QUEUES.update(queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION))
self.QUEUES_COUNT += 1
def updateHeapMem(self):
self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", ))["value"]
def updatePersMem(self):
self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", ))["value"]
Ars, [26.01.17 14:06]
## EXPORTABLE
def update(self):
self.updateQueues()
self.updateHeapMem()
self.updatePersMem()
## EXPORTABLE
def getQueues(self):
self.updateQueues()
data = []
for queue in self.QUEUES:
data.append(self.QUEUES[queue].getInfo())
return
"queues_count": self.QUEUES_COUNT,
"queues": data
## EXPORTABLE
def getQueueInfo(self, name):
return self.QUEUES[name].getInfo()
## EXPORTABLE
def browseQueue(self, name):
return self.QUEUES[name].browse()
## EXPORTABLE
def getMessage(self, name, msg_id):
return self.QUEUES[name].message(msg_id)
def getAllQueueMessages(self, name):
return self.QUEUES[name].messages()
## EXPORTABLE
def removeQueue(self, name):
param =
"QUEUE_NAME": name
return json.loads(self.request("removeQueue", param))
## EXPORTABLE
def clearQueue(self, name):
return self.QUEUES[name].clear()
# ARS
def removeMsgQueue(self,nameQueue, id):
return self.QUEUES[nameQueue].delete_msg(id)
class Queue():
def __init__(self, q_name, conn):
# научите обращаться к атрибутам суперкласса!
self.MESSAGES = []
self.QUEUE_NAME = q_name
self.ENQUEUE_COUNT = None
self.DEQUEUE_COUNT = None
self.CONSUMER_COUNT = None
self.QUEUE_SIZE = None
self.CONNECTION = conn
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
def queueEnqueueCount(self):
# MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "EnqueueCount"
return json.dumps(REUQEST)
def queueDequeueCount(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "DequeueCount"
return json.dumps(REUQEST)
def queueConsumerCount(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "ConsumerCount"
return json.dumps(REUQEST)
def queueSize(self):
REUQEST =
"type": "read",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"attribute": "QueueSize"
return json.dumps(REUQEST)
def browseMessages(self):
REUQEST =
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "browse()",
# "arguments": [""]
return json.dumps(REUQEST)
Ars, [26.01.17 14:06]
def purge(self):
REUQEST =
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "purge()"
return json.dumps(REUQEST)
#ARS
def deleteMsg(self, ID):
REUQEST =
"type": "exec",
"mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
% (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
"operation": "deleteMessage()",
"arguments": [ID, "11111111-1111-1111-1111-111111111111"]
return json.dumps(REUQEST)
def request(self, name, param):
http = urllib3.PoolManager()
if name == "queueEnqueueCount":
body = self.queueEnqueueCount()
elif name == "queueDequeueCount":
body = self.queueDequeueCount()
elif name == "queueConsumerCount":
body = self.queueConsumerCount()
elif name == "queueSize":
body = self.queueSize()
elif name == "browseMessages":
body = self.browseMessages()
elif name == "purge":
body = self.purge()
elif name == "delete_msg":
body = self.deleteMsg(param)
url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
return r.data
def updateEnCount(self):
try:
self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", ))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateDeCount(self):
try:
self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", ))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateCoCount(self):
try:
self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", ))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateQuSize(self):
try:
self.QUEUE_SIZE = json.loads(self.request("queueSize", ))["value"]
except Exception as inst:
self.ENQUEUE_COUNT = -1
def updateMessages(self):
self.MESSAGES = []
res = json.loads(self.request("browseMessages", ))["value"]
for msg in res:
data =
"id": msg["JMSMessageID"],
"data": msg["Text"],
"timestamp": msg["JMSTimestamp"],
"priority": msg["JMSPriority"]
self.MESSAGES.append(data)
def update(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
self.updateMessages()
def getInfo(self):
self.updateEnCount()
self.updateDeCount()
self.updateCoCount()
self.updateQuSize()
return
"queue_name": self.QUEUE_NAME,
"enqueue_count": self.ENQUEUE_COUNT,
"dequeue_count": self.DEQUEUE_COUNT,
"consumer_count": self.CONSUMER_COUNT,
"queue_size": self.QUEUE_SIZE
def browse(self):
self.updateMessages()
data = []
for msg in self.MESSAGES:
chunk =
"id": msg["id"],
"timestamp": msg["timestamp"],
"priority": msg["priority"]
data.append(chunk)
return data
Ars, [26.01.17 14:06]
def message(self, msg_id):
self.updateMessages()
for msg in self.MESSAGES:
if msg["id"] == msg_id:
return msg["data"]
# ARS
def messages(self):
self.updateMessages()
return self.MESSAGES
# ARS
def delete_msg(self, id):
return json.loads(self.request("delete_msg",id))
def clear(self):
return json.loads(self.request("purge", ))
【讨论】:
以上是关于Python。如何从队列/主题 ActiveMQ 中删除任何消息的主要内容,如果未能解决你的问题,请参考以下文章