
Posted 有文化的技术人







def delete_v2(self, context, job_id, resources): cctxt = self.client.prepare(version='1.0') cctxt.cast(context, 'delete_v2', job_id=job_id, resources=resources)


def cast(self, ctxt, method, **kwargs): """Invoke a method and return immediately. See RPCClient.cast().""" msg = self._make_message(ctxt, method, kwargs) msg_ctxt = self.serializer.serialize_context(ctxt)
try: self.transport._send(self.target, msg_ctxt, msg, retry=self.retry, transport_options=self.transport_options) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex)


def _make_message(self, ctxt, method, args): msg = dict(method=method)
msg['args'] = dict() for argname, arg in args.items(): msg['args'][argname] = self.serializer.serialize_entity(ctxt, arg)
if self.target.namespace is not None: msg['namespace'] = self.target.namespace if self.target.version is not None: msg['version'] = self.target.version
return msg




target = messaging.Target(topic=self.topic, server=self.host)endpoints = [RemoveClass()]endpoints.extend(self.manager.additional_endpoints)access_policy = dispatcher.DefaultRPCAccessPolicyserver = messaging.get_rpc_server(TRANSPORT, target, endpoints, executor='eventlet', access_policy=access_policy)server.start()



def start(self, override_pool_size=None): """Start handling incoming messages.
This method causes the server to begin polling the transport for incoming messages and passing them to the dispatcher. Message processing will continue until the stop() method is called.
The executor controls how the server integrates with the applications I/O handling strategy - it may choose to poll for messages in a new process, thread or co-operatively scheduled coroutine or simply by registering a callback with an event loop. Similarly, the executor may choose to dispatch messages in a new thread, coroutine or simply the current thread. """ if self._started: LOG.warning('The server has already been started. Ignoring ' 'the redundant call to start().') return
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"): executor_opts["max_workers"] = ( override_pool_size or self.conf.executor_thread_pool_size ) self._work_executor = self._executor_cls(**executor_opts)
try: self.listener = self._create_listener() except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex)




def _listen(self, target, batch_size, batch_timeout): if not (target.topic and target.server): raise exceptions.InvalidTarget('A server\'s target must have ' 'topic and server names specified', target) return self._driver.listen(target, batch_size, batch_timeout)


amqpdriver.pydef listen(self, target, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = RpcAMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, callback=listener) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, target.server), callback=listener) conn.declare_fanout_consumer(target.topic, listener)
return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout)


super(PollStyleListenerAdapter, self).__init__( batch_size, batch_timeout, poll_style_listener.prefetch_size ) self._poll_style_listener = poll_style_listener self._listen_thread = threading.Thread(target=self._runner) self._listen_thread.daemon = True self._started = False


def _runner(self): while self._started: incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout)
if incoming: self.on_incoming_callback(incoming)
# listener is stopped but we need to process all already consumed # messages while True: incoming = self._poll_style_listener.poll( batch_size=self.batch_size, batch_timeout=self.batch_timeout)
if not incoming: return self.on_incoming_callback(incoming)
def poll(self, timeout=None): stopwatch = timeutils.StopWatch(duration=timeout).start()
while not self._shutdown.is_set(): self._message_operations_handler.process()
if self.incoming: return self.incoming.pop(0)
left = stopwatch.leftover(return_none=True) if left is None: left = self._current_timeout if left <= 0: return None
try: self.conn.consume(timeout=min(self._current_timeout, left)) except rpc_common.Timeout: self._current_timeout = max(self._current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
# NOTE(sileht): listener is stopped, just processes remaining messages # and operations self._message_operations_handler.process() if self.incoming: return self.incoming.pop(0)


def start(self, on_incoming_callback): super(PollStyleListenerAdapter, self).start(on_incoming_callback) self._started = True self._listen_thread.start()




def _on_incoming(self, incoming): """Handles on_incoming event
:param incoming: incoming request. """ self._work_executor.submit(self._process_incoming, incoming)


def _process_incoming(self, incoming): message = incoming[0]
# TODO(sileht): We should remove that at some point and do # this directly in the driver try: message.acknowledge() except Exception: LOG.exception("Can not acknowledge message. Skip processing") return
failure = None try: res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: failure = e.exc_info LOG.debug(u'Expected exception during message handling (%s)', e) except Exception: # current sys.exc_info() content can be overridden # by another exception raised by a log handler during # LOG.exception(). So keep a copy and delete it later. failure = sys.exc_info() LOG.exception('Exception during message handling')
try: if failure is None: message.reply(res) else: message.reply(failure=failure) except Exception: LOG.exception("Can not send reply for message") finally: # NOTE(dhellmann): Remove circular object reference # between the current stack frame and the traceback in # exc_info. del failure



def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method.
:param incoming: incoming message :type incoming: IncomingMessage :raises: NoSuchMethod, UnsupportedVersion """ message = incoming.message ctxt = incoming.ctxt
method = message.get('method') args = message.get('args', {}) namespace = message.get('namespace') version = message.get('version', '1.0')
# NOTE(danms): This event and watchdog thread are used to send # call-monitoring heartbeats for this message while the call # is executing if it runs for some time. The thread will wait # for the event to be signaled, which we do explicitly below # after dispatching the method call. completion_event = threading.Event() watchdog_thread = threading.Thread(target=self._watchdog, args=(completion_event, incoming)) if incoming.client_timeout: # NOTE(danms): The client provided a timeout, so we start # the watchdog thread. If the client is old or didn't send # a timeout, we just never start the watchdog thread. watchdog_thread.start()
found_compatible = False for endpoint in self.endpoints: target = getattr(endpoint, 'target', None) if not target: target = self._default_target
if not (self._is_namespace(target, namespace) and self._is_compatible(target, version)): continue
if hasattr(endpoint, method): if self.access_policy.is_allowed(endpoint, method): try: return self._do_dispatch(endpoint, method, ctxt, args) finally: completion_event.set() if incoming.client_timeout: watchdog_thread.join()
found_compatible = True
if found_compatible: raise NoSuchMethod(method) else: raise UnsupportedVersion(version, method=method)


def _do_dispatch(self, endpoint, method, ctxt, args): ctxt = self.serializer.deserialize_context(ctxt) new_args = dict() for argname, arg in args.items(): new_args[argname] = self.serializer.deserialize_entity(ctxt, arg) func = getattr(endpoint, method) result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result)



C++ 代码片段执行

在执行其他方法或代码之前完成 RPC 调用?

在 GWT-RPC 中将 ArrayList 作为参数发送

Gwt rpc AsyncCallbak 之后的代码不会被执行?


紧急:win32 Error.code:1722.RPC 服务器不可用!是怎么回事,有知道的麻烦给知道一下,多谢!