omn​​iORBpy 通知服务

Posted

技术标签:

【中文标题】omn​​iORBpy 通知服务【英文标题】:omniORBpy Notification service 【发布时间】:2020-08-13 17:54:01 【问题描述】:

我第一次尝试在 python 中实现omniORBpy Corba 通知服务。在谷歌上搜索后,我得到了代码并修改了我的使用。我能够获得频道并能够订阅,但每当我调用 push_structured_event ... Scavenger 减少 strand 的空闲计数 错误并且不显示事件时。我已启用 traceLevel = 40,但无法得出结论。需要支持。

omniORB 4.3.0 全能ORBpy 4.3.0 操作系统-Linux Python3.6 供应商 - ECI Light NMS

#--------------------------------------------------------------
#      Code

#!/usr/bin/env python

from omniORB import CORBA
import CosNaming
import CosNotification
import CosNotifyComm
import CosNotifyComm__POA
import CosNotifyChannelAdmin
import CosNotifyFilter
import time
import sys
import signal
#-----------------------------------------------------------------------------
# Initialization class

class Initialization (CosNotifyComm__POA.StructuredPushConsumer):

#
   def __init__ (self, orb, evName, subFilter="1==1"):         # Constructor
      
#     
        rootContext=None
        try:
           
           obj=orb.resolve_initial_references("NameService")
           rootContext=obj._narrow(CosNaming.NamingContext)
           if rootContext is None:
               raise CORBA.OBJECT_NOT_EXIST(0,CORBA.COMPLETED_NO)
        except CORBA.Exception as ex:
           if needNameService:
               raise ex
           else:
               sys.stderr.write("Warning - failed to %s\n"%action)

        name = [CosNaming.NameComponent("TMF_MTNM","Class"),
               CosNaming.NameComponent("ECI","Vendor"),
               CosNaming.NameComponent("ECI:LightSoft_1","EmsInstance"),
               CosNaming.NameComponent("3_5","Version"),
               CosNaming.NameComponent("ECI:LightSoft_1","EmsSessionFactory_I")]

        try:
             obj = rootContext.resolve(name)
        except CosNaming.NamingContext.NotFound as ex:
             print("Except->Name not found")
             print(ex)
        try:
           poa = orb.resolve_initial_references("RootPOA")
        except Exception as ex:
           print('POA initial References Error: '.format(ex))

        try:
           poaManager = poa._get_the_POAManager()
        except Exception as ex:
           print('POA get manager error : '.format(ex))
        try:
           poaManager.activate()
        except Exception as ex:
           print('POA manager Activation Error: '.format(ex))

       ### Narrow the object 
        ems_session = obj._narrow(emsSessionFactory.EmsSessionFactory_I)
        if ems_session is None:
           print("Object reference is not an EmsSessionFactory_I")
       ### Get NMS session
        nms_session_i = NmsSession_I() 
        nms_session_o = nms_session_i._this()               
        if nms_session_o is None:
           print("Object reference is not an NmsSession_I")
           sys.exit(1)

       ##Get ems Session ###
        try:
             session = ems_session.getEmsSession("usr", "password123", nms_session_o)
        
        except Exception as ex:
              print('System Error: '.format(ex))
              sys.exit(1)
        #print(dir(session))
        try:
              channel = session.getEventChannel()
        except globaldefs.ProcessingFailureException as ex:
              print('System Error: '.format(ex))
              channel.destroy()
              session.endSession()
              sys.exit(1)
        channel = echannel_ref._narrow(CosNotifyChannelAdmin.EventChannel)

#.............................................................................
#     Resolve the Consumer Admin.
#
        self.cadmin, consID = channel.new_for_consumers (
                                          CosNotifyChannelAdmin.AND_OP)

#.............................................................................
#     Create the ProxyPushSupplier.
#
        psupp, prxID = self.cadmin.obtain_notification_push_supplier (
                                    CosNotifyChannelAdmin.STRUCTURED_EVENT)
        self.psupp = psupp._narrow(
                      CosNotifyChannelAdmin.StructuredProxyPushSupplier)

#.............................................................................
#     Create a default filter and bind it to the supplier.
#
        ffp = channel._get_default_filter_factory()
        self.filter = ffp.create_filter("EXTENDED_TCL")

        exp = [ \
            CosNotifyFilter.ConstraintExp (
            [ CosNotification.EventType("SPW", evName) ]
            , subFilter
            )
        ]
        self.cis1 = self.filter.add_constraints (exp)

#.............................................................................
#     Register the push consumer.
#
        id1 = self.cadmin.add_filter(self.filter)
        self.psupp.connect_structured_push_consumer(self._this())

    def disconnect (self):
        self.cadmin.remove_all_filters()
        self.filter.destroy()
        self.psupp.disconnect_structured_push_supplier()
        self.cadmin.destroy()

#-----------------------------------------------------------------------------
#    Functions

    def handleEvent (self, event):
        print 'You forgot to override handleEvent.'

    def push_structured_event ( self, event ):
        self.handleEvent (event)

    def disconnect_structured_push_consumer (self):
        pass

    def offer_change (self, added, removed):
        pass

#-----------------------------------------------------------------------------
#
class LogListen (NotChanListen):


    def __init__ (self, orb):

        Initilization.__init__ (self, orb, 'Log')

#-----------------------------------------------------------------------------
#
    def handleEvent (self, event):

        evdata = "notificationId":'Unknown',"objectName":globaldefs.NamingAttributes_T,"nativeEMSName":'Unknown',"nativeProbableCause":'Unknown',\
                "objectType": notifications.ObjectType_T,"objectTypeQualifier":'Unknown',"emsTime":globaldefs.Time_T,"neTime":globaldefs.Time_T,\
                "isClearable":'Unknown',"layerRate":transmissionParameters.LayerRate_T,"probableCause":'Unknown',"probableCauseQualifier":'Unknown',\
                "perceivedSeverity":notifications.PerceivedSeverity_T,"serviceAffecting":notifications.ServiceAffecting_T,\
                "affectedTPList":globaldefs.NamingAttributesList_T,"additionalText":'Unknown'

        for field in event.filterable_data:
            evdata[field.name] = field.value.value()

        message = event.remainder_of_body.value()

        print(evdata)
        
#---------------------------------------------------------------------------
#     Handler

def handler(signum, frame):
   listener.disconnect()
   orb.shutdown(1)

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
argv = ["-ORBInitRef", "NameService=corbaloc:iiop:1.2@NMS1-GJ-NMS:5075/NameService"] 

orb=CORBA.ORB_init(argv, CORBA.ORB_ID)

poa = orb.resolve_initial_references("RootPOA")

listener = LogListen(orb)

poaManager = poa._get_the_POAManager()
poaManager.activate()

signal.signal(signal.SIGINT, handler)

orb.run()

【问题讨论】:

【参考方案1】:

有2种方式获取eventChannel

    从 ems_session 获取 通过解析 evenChannel 的 NamingComponent

看起来你正在混合这两种类型

类型2获取事件通道的方法:https://www.omniorb-support.com/pipermail/omniorb-list/2006-January/027404.html

【讨论】:

以上是关于omn​​iORBpy 通知服务的主要内容,如果未能解决你的问题,请参考以下文章

omn​​iORB C++ 服务器,运行应用程序的 Java 客户端问题

text omn​​ifocus.taskpaper

omn​​et++ 中的 TKenv 无法运行

omn​​iORB::MaxMessageSize 在omni 4

omn​​ipay paypal express 没有返回地址

omn​​ikey 3121 windows 动态 emv 读卡器和写卡器