Oracle AQ c#并在c#中从oracle队列中使消息出队

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Oracle AQ c#并在c#中从oracle队列中使消息出队相关的知识,希望对你有一定的参考价值。

我正在使用Oracle 12.01,ODP.NET x64版本4,并在.net Framework 4.6中引用Oracle.DataAccess.dll。当我尝试在Visual Studio中使消息出队时出现问题。我的用户已授予出队权,并且该队列是客户端另一个模式的一部分。在c#代码中,队列名称等于X_SHEMA.X.QUEUE_NAME。

在SQL Developer中成功执行的sql脚本:

DECLARE
  queueopts dbms_aq.dequeue_options_t;
  msgprops  dbms_aq.message_properties_t;
  msg_id    RAW(16);
  message   sys.aq$_jms_text_message;
  msg_text  CLOB;
  msg_line  VARCHAR2(255);
  msg_count INTEGER;

  no_subscribers EXCEPTION;
  no_messages    EXCEPTION;
  PRAGMA EXCEPTION_INIT(no_subscribers, -24033);
  PRAGMA EXCEPTION_INIT(no_messages, -25228);
BEGIN
  queueopts.wait          := DBMS_AQ.NO_WAIT;
  queueopts.navigation    := DBMS_AQ.FIRST_MESSAGE;
  queueopts.dequeue_mode  := DBMS_AQ.LOCKED;
  queueopts.consumer_name := '&receiver';
  msg_count := 0;
  WHILE (queueopts.navigation = DBMS_AQ.FIRST_MESSAGE OR msg_id IS NOT NULL) LOOP
    BEGIN
      dbms_aq.dequeue(queue_name         => '&queue',
                      dequeue_options    => queueopts,
                      message_properties => msgprops,
                      payload            => message,
                      msgid              => msg_id);
      message.get_text(msg_text);
    EXCEPTION
      WHEN no_subscribers THEN
        -- Ignorieren.
        msg_text := NULL;
        msg_id := NULL;
      WHEN no_messages THEN
        -- Fertig.
        msg_text := NULL;
        msg_id := NULL;
    END;
    IF msg_id IS NULL THEN
      dbms_output.put_line('---------------==========##+##==========---------------');
      dbms_output.put_line(to_char(msg_count) || ' message(s) received');
    ELSE
      msg_count := msg_count + 1;
      dbms_output.put_line('---------------==========##+##==========---------------');
      dbms_output.put_line(':msg_nb     = ' || to_char(msg_count));
      dbms_output.put_line(':msg_id     = ' || RAWTOHEX(msg_id));
      dbms_output.put_line(':attempts   = ' || msgprops.attempts);
      dbms_output.put_line(':nl_msgtype = ' || message.get_string_property('NL_MSGTYPE'));
      dbms_output.put_line(':sender     = ' || message.get_string_property('SENDER'));
      dbms_output.put_line(':msg_text   = #' || length(msg_text));
      WHILE (length(msg_text) > 0) LOOP
        msg_line := substr(msg_text||chr(10),1,instr(msg_text||chr(10),chr(10)));
        msg_text := substr(msg_text,length(msg_line)+1);
        dbms_output.put_line(substr(msg_line,1,length(msg_line)-1));
      END LOOP;
    END IF;
    queueopts.navigation := DBMS_AQ.NEXT_MESSAGE;
  END LOOP;
  rollback;
END;
/

c#代码:

private Response ReceiveFromQueue()
        {
            #region data

            Response response = new Response ();

            OracleAQDequeueOptions options = new OracleAQDequeueOptions
            {
                DequeueMode = OracleAQDequeueMode.Locked,
                Wait = 0,
                NavigationMode = OracleAQNavigationMode.FirstMessage,
                ConsumerName = string.Empty,
                MessageId = new byte[16],
                ProviderSpecificType = true
            };

            string _connString = "data source=(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = X_IP_ADDRESS)(PORT = X_PORT)) (CONNECT_DATA = (ORACLE_SID = X_SID)));User Id=X_USER;Password=X_PASSWORD;";

            OracleAQQueue queue = new OracleAQQueue(queueName)
            {
                MessageType = OracleAQMessageType.Raw,
                DequeueOptions = new OracleAQDequeueOptions
                {
                    Visibility = OracleAQVisibilityMode.OnCommit,
                    DequeueMode = OracleAQDequeueMode.Locked,
                    NavigationMode = OracleAQNavigationMode.FirstMessage,
                    ConsumerName = string.Empty,
                    Wait = 0,
                    MessageId = new byte[16],
                    ProviderSpecificType = true,
                },
            };

            #endregion

            try
            {
                OracleConnection conn = new OracleConnection(_connString);
                conn.Open();

                queue.Connection = conn;

                OracleTransaction tnx = conn.BeginTransaction();

                OracleAQMessage deqMsg = queue.Dequeue(options);

                tnx.Commit();

                conn.Close();
                conn.Dispose();
                conn = null;
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }

            return response;
        }

出队引发异常ORA-25215:user_data类型和队列类型不匹配

当我更改选项时:

OracleAQQueue queue = new OracleAQQueue(queueName)
            {
                MessageType = OracleAQMessageType.Udt,
                DequeueOptions = new OracleAQDequeueOptions
                {
                    Visibility = OracleAQVisibilityMode.OnCommit,
                    DequeueMode = OracleAQDequeueMode.Locked,
                    NavigationMode = OracleAQNavigationMode.FirstMessage,
                    ConsumerName = string.Empty,
                    Wait = 0,
                    MessageId = new byte[16],
                    ProviderSpecificType = true,
                },
                UdtTypeName = "sys.aq$_jms_text_message"
            };

我得到下一个异常OCI-22303:键入“ sys”。找不到“ aq $ _jms_text_message”

我想知道您是否有想法(上面的C#代码,出队方法,队列等),如何解决并出队消息,我应该联系数据库管理员,还是有其他建议?

最诚挚的问候

答案

我认为您可能需要使用指定的SYS.AQ$_JMS_TEXT_MESSAGE类型创建一个C#类。您将需要一个Factory类:

[OracleCustomTypeMapping("SYS.AQ$_JMS_TEXT_MESSAGE")]
public class OraclePayloadFactory : IOracleCustomTypeFactory
{
    public IOracleCustomType CreateObject()
    {
        return new OraclePayload();
    }
}

然后是如下类型:

public class OraclePayload : IOracleCustomType, INullable
{
    [OracleObjectMapping("FIELD")]
    public string Field{ get; set; }

    public void FromCustomObject(OracleConnection con, IntPtr pUdt)
    {
        OracleUdt.SetValue(con, pUdt, "FIELD", this.Field);
    }

    public void ToCustomObject(OracleConnection con, IntPtr pUdt)
    {
        if (OracleUdt.GetValue(con, pUdt, "FIELD") != null)
            Field = OracleUdt.GetValue(con, pUdt, "FIELD").ToString();
    }
}

显然,您需要将oracle类型的元素匹配到C#等效项。

以上是关于Oracle AQ c#并在c#中从oracle队列中使消息出队的主要内容,如果未能解决你的问题,请参考以下文章

使用带有 MSGID 的 ODP.NET 使 Oracle AQ 出队

Oracle AQ订阅注册错误?

从哪里获取 11.2.0.3 的 Oracle jar 文件 - xdb、aq、i18n、xmlparser

如何代表不同的用户执行 oracle DMBS_AQ.REGISTER?

无法将大于 2000 字节的 LOB 作为缓冲消息排入 Oracle AQ

以编程方式检查Oracle AQ队列是否存在