Oracle Database Advance Queue

Posted Jane Chiu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Oracle Database Advance Queue相关的知识,希望对你有一定的参考价值。

AQ 实例

授权

GRANT EXECUTE ON DBMS_AQ TO APPS;
GRANT EXECUTE ON DBMS_AQADM TO APPS;

BEGIN
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY', 'APPS', FALSE);
  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY', 'APPS', FALSE);
END;

Create payload type

-- ============= system role =======================
-- create payload type
CREATE OR REPLACE TYPE CUXWMS_CMD_TYPE AS OBJECT
(
  cuxwms_cmd_id  NUMBER,
  cuxwms_cmd     VARCHAR2(1000),
  transaction_id VARCHAR2(20),
  datalength     VARCHAR2(20)
);

--
GRANT ALL ON SYSTEM.CUXWMS_CMD_TYPE TO APPS;

Create queue table

-- ==============apps role =======================
-- Create queue table
BEGIN
  sys.dbms_aqadm.create_queue_table(queue_table        => 'APPS.CUXWMS_CMD_QT',
                                    queue_payload_type => 'SYSTEM.CUXWMS_CMD_TYPE',
                                    multiple_consumers => TRUE);
END;

Create queue

-- ==============apps role =======================
--create queue
BEGIN
  sys.dbms_aqadm.create_queue(queue_name     => 'APPS.CUXWMS_CMD_Q',
                              queue_table    => 'APPS.CUXWMS_CMD_QT',
                              queue_type     => sys.dbms_aqadm.normal_queue,
                              max_retries    => 5,
                              retry_delay    => 0,
                              retention_time => 0);
END;

--查看
SELECT * from dba_objects t WHERE t.OBJECT_NAME LIKE '%CUXWMS_CMD%';

SELECT * from dba_queues t WHERE t.name LIKE '%CUXWMS_CMD%';

Start queue

-- ==============apps role =======================
-- start queue
BEGIN
  sys.dbms_aqadm.start_queue(queue_name => 'APPS.CUXWMS_CMD_Q');
END;

Enqueue

-- 入列
DECLARE
  recipients         DBMS_AQ.aq$_recipient_list_t;
  enqopt    dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;
  enq_msgid RAW(16);

BEGIN
  recipients(1) := sys.aq$_agent('AGENT1', 'APPS.CUXWMS_CMD_Q', NULL);
  recipients(2) := sys.aq$_agent('AGENT2', 'APPS.CUXWMS_CMD_Q', NULL);
  
  mprop.recipient_list := recipients;

  dbms_aq.enqueue(queue_name         => 'APPS.CUXWMS_CMD_Q',
                  enqueue_options    => enqopt,
                  message_properties => mprop,
                  payload            => system.cuxwms_cmd_type_2(1,
                                                            'test message',
                                                            10001,
                                                            12),
                  msgid              => enq_msgid);
  COMMIT;
END;

SELECT * from AQ$CUXWMS_CMD_QT;

单消费者多消费者多消费者
QUEUECUXWMS_CMD_QCUXWMS_CMD_Q_2CUXWMS_CMD_Q_2
MSG_ID4E0A2F92F8B82488E05318F0640AE8534E09B39276D12486E05318F0640AE80F4E09B39276D12486E05318F0640AE80F
CORR_ID
MSG_PRIORITY111
MSG_STATEREADYREADYREADY
DELAY
DELAY_TIMESTAMP
EXPIRATION
ENQ_TIME2017/4/26 11:10:432017/4/26 11:15:032017/4/26 11:15:03
ENQ_TIMESTAMP26-APR-17 11.10.43.387722 AM26-APR-17 11.15.03.656672 AM26-APR-17 11.15.03.656672 AM
ENQ_USER_IDAPPSAPPSAPPS
ENQ_TXN_ID16.6.3645222.13.3128422.13.31284
DEQ_TIME
DEQ_TIMESTAMP
DEQ_USER_ID
DEQ_TXN_ID
RETRY_COUNT0
EXCEPTION_QUEUE_OWNER
EXCEPTION_QUEUE
USER_DATA.CUXWMS_CMD_ID111
USER_DATA.CUXWMS_CMDtest messagetest messagetest message
USER_DATA.TRANSACTION_ID100011000110001
USER_DATA.DATALENGTH121212
PROPAGATED_MSGID
SENDER_NAME
SENDER_ADDRESS
SENDER_PROTOCOL
ORIGINAL_MSGID
ORIGINAL_QUEUE_NAME
ORIGINAL_QUEUE_OWNER
EXPIRATION_REASON
CONSUMER_NAMEAGENT1AGENT2
ADDRESS
PROTOCOL

对于多消费者,当全部消费者均出队列之后,该信息的所有记录才会被清除。

Dequeue

-- 出列
DECLARE
  deqopt  dbms_aq.dequeue_options_t;
  mprop   dbms_aq.message_properties_t;
  msgid   RAW(16);
  payload system.cuxwms_cmd_type;
BEGIN
  deqopt.consumer_name := 'AGENT1';
  deqopt.navigation    := dbms_aq.first_message;
  deqopt.wait          := 0;
  dbms_aq.dequeue(queue_name         => 'APPS.CUXWMS_CMD_Q',
                  dequeue_options    => deqopt,
                  message_properties => mprop,
                  payload            => payload,
                  msgid              => msgid);
  dbms_output.put_line('payload.cuxwms_cmd:' || payload.cuxwms_cmd);
  COMMIT;
END;

Edition 特性

Oracle Database 从11g 版本之后,启用Edtion 特性

主要控制层次在用户层:

USERNAMESYSTEMPOAPPSAR
USER_ID57117364
PASSWORD
ACCOUNT_STATUSOPENOPENOPENOPEN
LOCK_DATE
EXPIRY_DATE
DEFAULT_TABLESPACESYSTEMAPPS_TS_TX_DATAAPPS_TS_TX_DATAAPPS_TS_TX_DATA
TEMPORARY_TABLESPACETEMPTEMPTEMPTEMP
CREATED2000/5/12 14:29:072000/5/14 18:18:222000/5/14 18:23:062000/5/14 18:18:21
PROFILEDEFAULTDEFAULTDEFAULTDEFAULT
INITIAL_RSRC_CONSUMER_GROUPSYS_GROUPDEFAULT_CONSUMER_GROUPDEFAULT_CONSUMER_GROUPDEFAULT_CONSUMER_GROUP
EXTERNAL_NAME
PASSWORD_VERSIONS10G 11G10G 11G10G 11G10G 11G
EDITIONS_ENABLEDNYYY

edition控制对象:

因此,TYPE 是edition控制对象, QUEUE_TABLE 不是edition 控制对象。如果在edition 用户创建 type, queue_type 时,会报错:

因此,建议的部署方案是:在非edtion用户(比如system),创建type, 并授权给业务用户使用,在业务用户下创建queue_type, queue.

以上是关于Oracle Database Advance Queue的主要内容,如果未能解决你的问题,请参考以下文章

clone database and rename

Oracle的多实例集群有啥特征,能够为企业用户带来啥

[Q&A]yum报错‘database disk image is malformed’

OPatch从 Oracle Database 19.3 升级到 Oracle Database

Oracle Database Gateways 下载

Oracle LiveLabs实验:Manage Database Instance and Memory for Oracle Database 21c