干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?
Posted 国产消息中间件
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?相关的知识,希望对你有一定的参考价值。
一,背景
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ支持多种消息存储方式:
队列存储
采取先进先出模式,同一时间,消息只会发送给某一个消费者,只有当该消息被消费并告知已收到时,它才能在代理的存储中被删除。对于持久性订阅来说,每一个消费者都会获取消息的拷贝。为了节约空间,代理的存储介质中只存储了一份消息,存储介质的持久订阅对象为其以后的被存储的消息维护了一个指针,消费者消费时,从存储介质中复制一个消息。消息被所有订阅者获取后才能删除。
KahaDB存储
从ActiveMQ5.3后,推荐使用KahaDB存储普通用途的消息。KahaDB是一种基于文件的消息存储机制,为了提高消息存储的可靠性和可恢复性,它整合了一个事务日志。KahaDB拥有高性能和可扩展性等特点。由于KahaDB使用的是基于文件的存储,所以不需要使用第三方数据库。KahaDB技术架构是为高速存取消息而设计的。数据的块存数在journal file中(数据日志文件)所有的Broker事件可以持续不断的增加进来,特别是消息也是存储在数据文件中。
KahaDB消息存储机制为所有目的地使用一个索引,其索引使用一个事务日志。
KahaDB被使用在10000个活动连接的产品环境,每个连接都拥有一个分割的队列。
AMQ消息存储
AMQ消息存储与KahaDB消息存储类似,由一个提供可靠持续性的事务日志以及高效索引组成,当一个应用中,消息吞吐量是主要需求时,AMQ是最好的选择。但由于它为每个索引使用了两个分隔文件,而每个目的地都有一个索引,所以它不能被使用于每个代理拥有成百上千个队列的情况。ActiveMQ代理没有被完全关闭时,索引的覆盖也会很慢。这是因为所有的索引都需要被重建。
JDBC消息存储
该消息存储方式支持第三方数据库。
本文以mysql为例,将用于消息存储的各数据库表和全部SQL语句摘录出来,供有兴趣的读者备案查阅。
二,ActiveMQ使用数据库表存储消息:库表设计
1,ACTIVEMQ_MSGS(消息主表)
ID |
BIGINT |
NOT NULL |
PRIMARY KEY |
CONTAINER |
VARCHAR(250) |
||
MSGID_PROD |
VARCHAR(250) |
||
MSGID_SEQ |
BIGINT |
||
EXPIRATION |
BIGINT |
||
MSG |
VARCHAR(250) / BLOB |
||
PRIORITY |
BIGINT |
||
XID |
VARCHAR(250) |
CREATE INDEX
ACTIVEMQ_MSGS_MIDX |
MSGID_PROD |
MSGID_SEQ |
|
ACTIVEMQ_MSGS_CIDX |
CONTAINER |
||
ACTIVEMQ_MSGS_EIDX |
EXPIRATION |
||
ACTIVEMQ_MSGS_PIDX |
PRIORITY |
||
ACTIVEMQ_MSGS_XIDX |
XID |
2,ACTIVEMQ_ACKS(消息确认表)
CONTAINER |
VARCHAR(250) |
NOT NULL |
PRIMARY KEY |
SUB_DEST |
VARCHAR(250) |
||
CLIENT_ID |
VARCHAR(250) |
NOT NULL |
PRIMARY KEY |
SUB_NAME |
VARCHAR(250) |
NOT NULL |
PRIMARY KEY |
SELECTOR |
VARCHAR(250) |
||
LAST_ACKED_ID |
BIGINT |
||
PRIORITY |
BIGINT |
DEFAULT 5 NOT NULL |
PRIMARY KEY |
XID |
VARCHAR(250) |
CREATE INDEX
ACTIVEMQ_ACKS__XIDX |
XID |
3,ACTIVEMQ_LOCK(全局锁)
ID |
BIGINT |
NOT NULL |
PRIMARY KEY |
TIME |
BIGINT |
||
BROKER_NAME |
VARCHAR(250) |
INSERT INTOACTIVEMQ_LOCK (ID) VALUES (1)
三,ActiveMQ使用数据库表存储消息:SQL语句
AddMessage
INSERT INTO
ACTIVEMQ_MSGS(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID )
VALUES( ?,?, ?, ?, ?, ?, ?, ? )
UpdateMessage
UPDATE
ACTIVEMQ_MSGS
SET
MSG = ?
WHERE
MSGID_PROD = ?
AND MSGID_SEQ = ?
AND CONTAINER = ?
RemoveMessage
DELETE
FROM
ACTIVEMQ_MSGS
WHERE
ID = ?
FindMessageSequenceID
SELECT
ID,PRIORITY
FROM
ACTIVEMQ_MSGS
WHERE
MSGID_PROD=?
AND MSGID_SEQ=?
AND CONTAINER=?
FindMessage
SELECT
MSG
FROM
ACTIVEMQ_MSGS
WHERE
MSGID_PROD=?
AND MSGID_SEQ=?
FindMessageById
SELECT
MSG
FROM
ACTIVEMQ_MSGS
WHERE
ID=?
FindXidById
SELECT
XID
FROM
ACTIVEMQ_MSGS
WHERE
ID=?
FindAllMessages
SELECT
ID, MSG
FROM
ACTIVEMQ_MSGS
WHERE
CONTAINER=?
ORDER BY ID
FindAllMessagesId
SELECT
ID, MSGID_PROD, MSGID_SEQ
FROM
ACTIVEMQ_MSGS
ORDER BY IDDESC
FindLastSequenceIdInMsgs
SELECT
MAX(ID)
FROM
ACTIVEMQ_MSGS
FindLastProducerSequenceId
SELECT
MAX(MSGID_SEQ)
FROM
ACTIVEMQ_MSGS
WHERE
MSGID_PROD=?
FindLastSequenceIdInAcks
SELECT
MAX(LAST_ACKED_ID)
FROM
ACTIVEMQ_MSGS
CreateDurableSub
INSERT INTO
ACTIVEMQ_ACKS(CONTAINER,CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY )
VALUES (?, ?, ?, ?, ?,?, ?)
FindDurableSub
SELECT
SELECTOR,SUB_DEST
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
FindAllDurableSubs
SELECT
SELECTOR,SUB_NAME, CLIENT_ID, SUB_DEST
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
AND PRIORITY=0
UpdateLastPriorityAckRowOfDurableSub
UPDATE
ACTIVEMQ_ACKS
SET
LAST_ACKED_ID=?
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
AND PRIORITY=?
DeleteSubscription
DELETE
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
FindAllDurableSubMessages
SELECT
M.ID,M.MSG
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.CONTAINER=D.CONTAINER
AND M.ID > D.LAST_ACKED_ID
ORDER BY M.PRIORITYDESC, M.ID
FindDurableSubMessages
SELECT
M.ID,M.MSG
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.XID IS NULL
AND M.CONTAINER=D.CONTAINER
AND M.ID > D.LAST_ACKED_ID
AND M.ID > ?
ORDER BY M.ID
FindDurableSubMessagesByPriority
SELECT
M.ID,M.MSG
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.XID IS NULL
AND M.CONTAINER=D.CONTAINER
AND M.PRIORITY=D.PRIORITY
AND M.ID > D.LAST_ACKED_ID
AND M.ID > ?
AND M.PRIORITY = ?
ORDER BY M.ID
FindAllDurableSubMessages
SELECT
M.ID,M.MSG
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.CONTAINER=D.CONTAINER
AND M.ID > D.LAST_ACKED_ID
ORDER BY M.ID
FindNextDurableSubscriberMessage
SELECT
M.ID,M.MSG
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.CONTAINER=D.CONTAINER
AND M.ID > ?
ORDER BY M.ID
DurableSubscriberMessageCount
SELECT
COUNT(*)
FROM
ACTIVEMQ_MSGS M,
ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.CONTAINER=D.CONTAINER
AND M.ID >
(
SELECT
LAST_ACKED_ID
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=D.CONTAINER
AND CLIENT_ID=D.CLIENT_ID
AND SUB_NAME=D.SUB_NAME
)
DurableSubscriberMessageCountWithPriority
SELECT COUNT(*)
FROM
ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D,
WHERE
D.CONTAINER=?
AND D.CLIENT_ID=?
AND D.SUB_NAME=?
AND M.CONTAINER=D.CONTAINER
AND M.PRIORITY=D.PRIORITY
AND M.ID > D.LAST_ACKED_ID
FindAllDestinations
SELECT
DISTINCTCONTAINER
FROM
ACTIVEMQ_MSGS
UNION
SELECT
DISTINCTCONTAINER
FROM
ACTIVEMQ_ACKS
RemoveAllMessages
DELETE
FROM
ACTIVEMQ_MSGS
WHERE
CONTAINER=?
RemoveAllSubscriptions
DELETE
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
DeleteOldMessagesWithPriority
DELETE
FROM
ACTIVEMQ_ACKS
WHERE
(
PRIORITY=?
ANDID <=
(
SELECT
min(ACTIVEMQ_ACKS.LAST_ACKED_ID)
FROM
ACTIVEMQ_ACKS
WHERE
ACTIVEMQ_ACKS.CONTAINER
= ACTIVEMQ_MSGS.CONTAINER
AND
ACTIVEMQ_ACKS.PRIORITY=?
)
)
LockCreate
SELECT *
FROM
ACTIVEMQ_LOCK
WHERE
ID = 1
FOR UPDATE
LeaseObtain
UPDATE
ACTIVEMQ_LOCK
SET
BROKER_NAME = ?,
TIME = ?
WHERE
(
TIME IS NULL
OR TIME < ?
)
AND ID = 1;
CurrentDateTime
SELECT
CURRENT_TIMESTAMP
FROM
ACTIVEMQ_LOCK
LeaseUpdate
UPDATE
ACTIVEMQ_LOCK
SET
BROKER_NAME = ?,
TIME = ?
WHERE
BROKER_NAME = ? AND ID = 1
LeaseOwner
SELECT
BROKER_NAME,TIME
FROM
ACTIVEMQ_LOCK
WHERE
ID = 1
LockUpdate
UPDATE
ACTIVEMQ_LOCK
SET
TIME = ?
WHERE
ID = 1
DestinationMessageCount
SELECT
ID,MSG
FROM
ACTIVEMQ_MSGS
WHERE
CONTAINER=?
AND ID > ?
AND XID IS NULL
ORDER BY ID
FindNextMessagesByPriority
SELECT
ID,MSG
FROM
ACTIVEMQ_MSGS
WHERE
CONTAINER=?
AND XID IS NULL
AND
(
(
ID> ?
AND PRIORITY = ?
)
OR
PRIORITY< ?
)
ORDER BY PRIORITY DESC,ID
LastAckedDurableSubscriberMessage
SELECT
MAX(LAST_ACKED_ID)
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
SelectDurablePriorityAck
SELECT
LAST_ACKED_ID
FROM
ACTIVEMQ_ACKS
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
AND PRIORITY = ?
InsertDurablePriorityAck
UPDATE
ACTIVEMQ_ACKS
SET
LAST_ACKED_ID=?,
XID= NULL
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
UpdateDurableLastAckInTx
UPDATE
ACTIVEMQ_ACKS
SET
XID=?
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
UpdateDurableLastAckWithPriority
UPDATE
ACTIVEMQ_ACKS
SET
LAST_ACKED_ID=?, XID = NULL
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
AND PRIORITY=?
UpdateDurableLastAckWithPriorityInTx
UPDATE
ACTIVEMQ_ACKS
SET
XID=?
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
AND PRIORITY=?
ClearDurableLastAckInTx
UPDATE
ACTIVEMQ_ACKS
SET
XID= NULL
WHERE
CONTAINER=?
AND CLIENT_ID=?
AND SUB_NAME=?
AND PRIORITY=?
FindOpsPendingOutcome
SELECT
ID,XID, MSG
FROM
ACTIVEMQ_MSGS
WHERE
XID IS NOT NULL
ORDER BY ID
FindAcksPendingOutcome
SELECT
XID,CONTAINER,CLIENT_ID, SUB_NAME
FROM
ACTIVEMQ_ACKS
WHERE
XID IS NOT NULL
UpdateXidFlag
UPDATE
ACTIVEMQ_MSGS
SET
XID= ?
WHERE
ID =?
ClearXidFlag
UPDATE
ACTIVEMQ_MSGS
SET
XID= NULL
WHERE
ID =?
四,结束语
本文为老谭原创,将ActiveMQ通过JDBC方式进行消息存储的各数据库表和全部SQL语句摘录列举出来(以MySQL为例),供有兴趣的读者备案查阅。
基于本文内容,后续会对各SQL语句的设计优劣和对性能的影响,以及其他消息存储方式进行进一步研究和分享,敬请关注。
如您发现不妥之处,请您关注公共号并留言,在此老谭先行谢过。
如何关注?
回文章顶部,点击蓝色字体“国产消息中间件”进行订阅;
国产消息中间件第一品牌:TongLINK/Q
我们致力于最先进的数据传输技术研究,并创造全球领先的数据传输产品,为中国的信息化建设提供最好的数据传输服务。
我们了解消息中间件,因此我们可以创造出最好的消息中间件!
以上是关于干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?的主要内容,如果未能解决你的问题,请参考以下文章