干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?

Posted 国产消息中间件

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?相关的知识,希望对你有一定的参考价值。

一,背景

  1. ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

  2. ActiveMQ支持多种消息存储方式:

    队列存储

    采取先进先出模式,同一时间,消息只会发送给某一个消费者,只有当该消息被消费并告知已收到时,它才能在代理的存储中被删除。对于持久性订阅来说,每一个消费者都会获取消息的拷贝。为了节约空间,代理的存储介质中只存储了一份消息,存储介质的持久订阅对象为其以后的被存储的消息维护了一个指针,消费者消费时,从存储介质中复制一个消息。消息被所有订阅者获取后才能删除。

    KahaDB存储

    从ActiveMQ5.3后,推荐使用KahaDB存储普通用途的消息。KahaDB是一种基于文件的消息存储机制,为了提高消息存储的可靠性和可恢复性,它整合了一个事务日志。KahaDB拥有高性能和可扩展性等特点。由于KahaDB使用的是基于文件的存储,所以不需要使用第三方数据库。KahaDB技术架构是为高速存取消息而设计的。数据的块存数在journal file中(数据日志文件)所有的Broker事件可以持续不断的增加进来,特别是消息也是存储在数据文件中。

    KahaDB消息存储机制为所有目的地使用一个索引,其索引使用一个事务日志。

    KahaDB被使用在10000个活动连接的产品环境,每个连接都拥有一个分割的队列。

    AMQ消息存储

    AMQ消息存储与KahaDB消息存储类似,由一个提供可靠持续性的事务日志以及高效索引组成,当一个应用中,消息吞吐量是主要需求时,AMQ是最好的选择。但由于它为每个索引使用了两个分隔文件,而每个目的地都有一个索引,所以它不能被使用于每个代理拥有成百上千个队列的情况。ActiveMQ代理没有被完全关闭时,索引的覆盖也会很慢。这是因为所有的索引都需要被重建。

    JDBC消息存储

    该消息存储方式支持第三方数据库。

    本文以mysql为例,将用于消息存储的各数据库表和全部SQL语句摘录出来,供有兴趣的读者备案查阅。

二,ActiveMQ使用数据库表存储消息:库表设计

1,ACTIVEMQ_MSGS(消息主表)

ID

BIGINT

NOT NULL

PRIMARY KEY

CONTAINER

VARCHAR(250)

MSGID_PROD

VARCHAR(250)

MSGID_SEQ

BIGINT

EXPIRATION

BIGINT

MSG

VARCHAR(250) / BLOB

PRIORITY

BIGINT

XID

VARCHAR(250)

CREATE INDEX

ACTIVEMQ_MSGS_MIDX

MSGID_PROD

MSGID_SEQ

ACTIVEMQ_MSGS_CIDX

CONTAINER

ACTIVEMQ_MSGS_EIDX

EXPIRATION

ACTIVEMQ_MSGS_PIDX

PRIORITY

ACTIVEMQ_MSGS_XIDX

XID

2,ACTIVEMQ_ACKS(消息确认表)

CONTAINER

VARCHAR(250)

NOT NULL

PRIMARY KEY

SUB_DEST

VARCHAR(250)

CLIENT_ID

VARCHAR(250)

NOT NULL

PRIMARY KEY

SUB_NAME

VARCHAR(250)

NOT NULL

PRIMARY KEY

SELECTOR

VARCHAR(250)

LAST_ACKED_ID

BIGINT

PRIORITY

BIGINT

DEFAULT 5

NOT NULL

PRIMARY KEY

XID

VARCHAR(250)

CREATE INDEX

ACTIVEMQ_ACKS__XIDX

XID

3,ACTIVEMQ_LOCK(全局锁)

ID

BIGINT

NOT NULL

PRIMARY KEY

TIME

BIGINT

BROKER_NAME

VARCHAR(250)

INSERT INTOACTIVEMQ_LOCK (ID) VALUES (1)

三,ActiveMQ使用数据库表存储消息:SQL语句

AddMessage

INSERT INTO

ACTIVEMQ_MSGS(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID )

VALUES( ?,?, ?, ?, ?, ?, ?, ? )

UpdateMessage

UPDATE

  ACTIVEMQ_MSGS

SET

  MSG = ?

WHERE

  MSGID_PROD = ?

  AND MSGID_SEQ = ?

  AND CONTAINER = ?

RemoveMessage

DELETE

FROM

  ACTIVEMQ_MSGS

WHERE

  ID = ?

FindMessageSequenceID

SELECT

  ID,PRIORITY

FROM

  ACTIVEMQ_MSGS

WHERE

  MSGID_PROD=?

  AND MSGID_SEQ=?

  AND CONTAINER=?

FindMessage

SELECT

  MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  MSGID_PROD=?

  AND MSGID_SEQ=?

FindMessageById

SELECT

  MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  ID=?

FindXidById

SELECT

  XID

FROM

  ACTIVEMQ_MSGS

WHERE

  ID=?

FindAllMessages

SELECT

  ID, MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  CONTAINER=?

ORDER BY ID

FindAllMessagesId

SELECT

  ID, MSGID_PROD, MSGID_SEQ

FROM

  ACTIVEMQ_MSGS

ORDER BY IDDESC

FindLastSequenceIdInMsgs

SELECT

  MAX(ID)

FROM

  ACTIVEMQ_MSGS

FindLastProducerSequenceId

SELECT

  MAX(MSGID_SEQ)

FROM

  ACTIVEMQ_MSGS

WHERE

  MSGID_PROD=?

FindLastSequenceIdInAcks

SELECT

  MAX(LAST_ACKED_ID)

FROM

  ACTIVEMQ_MSGS

CreateDurableSub

INSERT INTO

ACTIVEMQ_ACKS(CONTAINER,CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY )

VALUES (?, ?, ?, ?, ?,?, ?)

FindDurableSub

SELECT

  SELECTOR,SUB_DEST

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

FindAllDurableSubs

SELECT

  SELECTOR,SUB_NAME, CLIENT_ID, SUB_DEST

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

  AND PRIORITY=0

UpdateLastPriorityAckRowOfDurableSub

UPDATE

  ACTIVEMQ_ACKS

SET

  LAST_ACKED_ID=?

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

  AND PRIORITY=?

DeleteSubscription

DELETE

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

FindAllDurableSubMessages

SELECT

  M.ID,M.MSG

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?

  AND M.CONTAINER=D.CONTAINER

  AND M.ID > D.LAST_ACKED_ID

ORDER BY M.PRIORITYDESC, M.ID

FindDurableSubMessages

SELECT

  M.ID,M.MSG

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?

  AND M.XID IS NULL

  AND M.CONTAINER=D.CONTAINER

  AND M.ID > D.LAST_ACKED_ID

  AND M.ID > ?

ORDER BY M.ID

FindDurableSubMessagesByPriority

SELECT

  M.ID,M.MSG

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?
  AND M.XID IS NULL
  AND M.CONTAINER=D.CONTAINER
  AND M.PRIORITY=D.PRIORITY

  AND M.ID > D.LAST_ACKED_ID
  AND M.ID > ?

  AND M.PRIORITY = ?

ORDER BY M.ID

FindAllDurableSubMessages

SELECT

  M.ID,M.MSG

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?
  AND M.CONTAINER=D.CONTAINER

  AND M.ID > D.LAST_ACKED_ID

ORDER BY M.ID

FindNextDurableSubscriberMessage

SELECT

  M.ID,M.MSG

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?

  AND M.CONTAINER=D.CONTAINER

  AND M.ID > ?

ORDER BY M.ID

DurableSubscriberMessageCount

SELECT

  COUNT(*)

FROM

  ACTIVEMQ_MSGS M,

  ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?

  AND M.CONTAINER=D.CONTAINER

  AND M.ID >

  (

    SELECT

      LAST_ACKED_ID

    FROM

      ACTIVEMQ_ACKS

    WHERE

      CONTAINER=D.CONTAINER

      AND CLIENT_ID=D.CLIENT_ID
      AND SUB_NAME=D.SUB_NAME

  )

DurableSubscriberMessageCountWithPriority

SELECT COUNT(*)

FROM

ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D,

WHERE

  D.CONTAINER=?

  AND D.CLIENT_ID=?

  AND D.SUB_NAME=?

  AND M.CONTAINER=D.CONTAINER

  AND M.PRIORITY=D.PRIORITY

  AND M.ID > D.LAST_ACKED_ID

FindAllDestinations

SELECT

  DISTINCTCONTAINER

FROM

  ACTIVEMQ_MSGS

UNION

SELECT

  DISTINCTCONTAINER

FROM

  ACTIVEMQ_ACKS

RemoveAllMessages

DELETE

FROM

  ACTIVEMQ_MSGS

WHERE

  CONTAINER=?

RemoveAllSubscriptions

DELETE

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

DeleteOldMessagesWithPriority

DELETE

FROM

  ACTIVEMQ_ACKS

WHERE

  (

   PRIORITY=?

   ANDID <=

    (

      SELECT

       min(ACTIVEMQ_ACKS.LAST_ACKED_ID)

      FROM

       ACTIVEMQ_ACKS

      WHERE

        ACTIVEMQ_ACKS.CONTAINER

        = ACTIVEMQ_MSGS.CONTAINER
       AND

        ACTIVEMQ_ACKS.PRIORITY=?

    )

  )

LockCreate

SELECT *

FROM

  ACTIVEMQ_LOCK

WHERE

  ID = 1

FOR UPDATE

LeaseObtain

UPDATE

  ACTIVEMQ_LOCK

SET

  BROKER_NAME = ?,

  TIME = ?

WHERE

  (

   TIME IS NULL

   OR TIME < ?

  )

  AND ID = 1;

CurrentDateTime

SELECT

  CURRENT_TIMESTAMP

FROM

  ACTIVEMQ_LOCK

LeaseUpdate

UPDATE

  ACTIVEMQ_LOCK

SET

  BROKER_NAME = ?,

  TIME = ?

WHERE

  BROKER_NAME = ? AND ID = 1

LeaseOwner

SELECT

  BROKER_NAME,TIME

FROM

  ACTIVEMQ_LOCK

WHERE

  ID = 1

LockUpdate

UPDATE

  ACTIVEMQ_LOCK

SET

  TIME = ?

  WHERE

  ID = 1

DestinationMessageCount

SELECT

  ID,MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  CONTAINER=?

  AND ID > ?

  AND XID IS NULL

ORDER BY ID

FindNextMessagesByPriority

SELECT

  ID,MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  CONTAINER=?
  AND XID IS NULL
  AND

   (

    (

     ID> ?

     AND PRIORITY = ?

    )

    OR

     PRIORITY< ?

   )

ORDER BY PRIORITY DESC,ID

LastAckedDurableSubscriberMessage

SELECT

  MAX(LAST_ACKED_ID)

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

SelectDurablePriorityAck

SELECT

  LAST_ACKED_ID

FROM

  ACTIVEMQ_ACKS

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

  AND PRIORITY = ?

InsertDurablePriorityAck

UPDATE

  ACTIVEMQ_ACKS

SET

  LAST_ACKED_ID=?,

  XID= NULL

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

UpdateDurableLastAckInTx

UPDATE

  ACTIVEMQ_ACKS

SET

  XID=?

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

UpdateDurableLastAckWithPriority

UPDATE

  ACTIVEMQ_ACKS

SET

  LAST_ACKED_ID=?, XID = NULL

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

  AND PRIORITY=?

UpdateDurableLastAckWithPriorityInTx

UPDATE

  ACTIVEMQ_ACKS

SET

  XID=?

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

  AND PRIORITY=?

ClearDurableLastAckInTx

UPDATE

  ACTIVEMQ_ACKS

SET

  XID= NULL

WHERE

  CONTAINER=?

  AND CLIENT_ID=?

  AND SUB_NAME=?

  AND PRIORITY=?

FindOpsPendingOutcome

SELECT

  ID,XID, MSG

FROM

  ACTIVEMQ_MSGS

WHERE

  XID IS NOT NULL

ORDER BY ID

FindAcksPendingOutcome

SELECT

  XID,CONTAINER,CLIENT_ID, SUB_NAME

FROM

  ACTIVEMQ_ACKS

WHERE

  XID IS NOT NULL

UpdateXidFlag

UPDATE

  ACTIVEMQ_MSGS

SET

  XID= ?

WHERE

  ID =?

ClearXidFlag

UPDATE

  ACTIVEMQ_MSGS

SET

  XID= NULL

WHERE

  ID =?

四,结束语

  1. 本文为老谭原创,将ActiveMQ通过JDBC方式进行消息存储的各数据库表和全部SQL语句摘录列举出来(以MySQL为例),供有兴趣的读者备案查阅。

  2. 基于本文内容,后续会对各SQL语句的设计优劣和对性能的影响,以及其他消息存储方式进行进一步研究和分享,敬请关注。

  3. 如您发现不妥之处,请您关注公共号并留言,在此老谭先行谢过。

  

如何关注?

  • 回文章顶部,点击蓝色字体“国产消息中间件”进行订阅;

国产消息中间件第一品牌:TongLINK/Q

  • 我们致力于最先进的数据传输技术研究,并创造全球领先的数据传输产品,为中国的信息化建设提供最好的数据传输服务。

  • 我们了解消息中间件,因此我们可以创造出最好的消息中间件!

以上是关于干货ActiveMQ:如何使用数据库表进行消息存储和消息操作?的主要内容,如果未能解决你的问题,请参考以下文章

如何避免activeMQ数据丢失

消息如何从 ActiveMQ 读取并传递到相应的队列?

ActiveMQ消息持久化存储策略

消息中间件ActiveMQRabbitMQRocketMQZeroMQKafka如何选型?

ActiveMQ中持久化协议 高可用集群

ActiveMQ的学习(ActiveMQ的持久化)