python 龙卷风pubsub

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 龙卷风pubsub相关的知识,希望对你有一定的参考价值。


from tornado import ioloop
import json

class Subscriber:
    def __init__(self, io_loop=None, poll_interval=100, redis_ip=None, is_json=True):
        '''
        Parameters:
          io_loop: The io_loop to use, None by default
          redis_name: Which redis instance to connect to, defaults to None
          poll_interval: At what interval we should poll redis in milliseconds (100ms by default)
        '''
        self.io_loop = io_loop or ioloop.IOLoop.current()
        self.connection = redis.StrictRedis(redis_ip, port=6379)
        self.pubsub = self.connection.pubsub()
        self.callbacks = {}
        self.last_values = {}
        self.is_json = is_json
        self.poller = ioloop.PeriodicCallback(self.poll_redis, poll_interval, io_loop=self.io_loop)
        self.next_sub_id = 1
        self.max_dispatch = 10

    def list_topics(self, channel, topic_pattern=None):
        '''List topics (and their values) for a channel and specified topic pattern (wildcards supported) without opening a subscription.
        '''
        topics_and_values = self.connection.hscan_iter('state_'+channel, match=topic_pattern)
        
        return  [ (k, self._deserialize(v)) for k, v in topics_and_values ]

    def snap_value(self, channel, topic):
        '''Snaps the latest value for a channel/topic without opening a subscription.
        Returns None if there is no last value.
        '''
        last_value = self.connection.hget('state_'+channel, topic)
        
        return  self._deserialize(last_value)

    def subscribe(self, channel, topic, callback):
        '''Subscribe to channel and topic with a callback to be invoked on new messages.
        
        Parameters:
          channel: Name of a channel to subscribe
          topic: Topic name to subscribe, note wildcards are supported, so * will subscribe to all topics.
          callback: A callback function with a signature (channel, topic, message)

        Returns a subscription_id which you can use to unsubscribe.  Note callbacks are invoked via the
        tornado IOLoop.
        
        Notes: If you use a wildcard subscription then you won't get a lastValue
        '''
        channel_prefix = ('rt_'+channel+'_').encode('utf-8')
        full_topic_name = channel_prefix+topic.encode('utf-8')
        state_name = ('state_'+channel).encode('utf-8')

        # add callback to subscriptions dictionary
        was_subscribed = len(self.callbacks) > 0
        cbs = self.callbacks.get(full_topic_name)
        sub_id = self.next_sub_id
        self.next_sub_id += 1
        if cbs is None:
            if '*' in channel or '*' in topic:
                self.pubsub.psubscribe(full_topic_name)
            else:
                self.pubsub.subscribe(full_topic_name)
            cbs = self.callbacks[full_topic_name] = {sub_id: callback}
        else:
            cbs[sub_id] = callback
        is_subscribed = len(self.callbacks) > 0

        if not was_subscribed and is_subscribed:
            self.poller.start()

        # TODO- we should probably integrate this with the io_loop since we will block until
        # redis returns all matches
        if '*' in topic:
            last_values = list(self.connection.hscan_iter(state_name, match=topic))
        else:
            last_values = [(topic.encode('utf-8'), self.connection.hget(state_name, topic))]
        for k, v in last_values:
            self.last_values[channel_prefix+k] = v
            if not v: continue # no value for this topic
            self.io_loop.add_callback(callback, channel, k.decode('utf-8'), {'type': 'last_value', 'data': self._deserialize(v)})
        return sub_id

    def unsubscribe(self, channel, topic, sub_id):
        '''Unsubscribe from channel/topic given a subscription id
        '''
        full_topic_name = ('rt_'+channel+'_'+topic).encode('utf-8')
        cbs = self.callbacks.get(full_topic_name)
        if cbs.pop(sub_id, None) is not None:
            if len(cbs) == 0:
                self.pubsub.unsubscribe(full_topic_name)
                del self.callbacks[full_topic_name]
                self.last_values.pop(full_topic_name, None)
                if len(self.callbacks) == 0:
                    self.poller.stop()

    def callback_from_thread(self, result):
        # TODO- plug in redis threaded listener to invoke this function when we get a subscription message
        self.io_loop.add_callback(self.handle_sub_result, result)

    def handle_sub_result(self, result, verbose=False):
        if result is None:
            return False # Nothing more to do
        if result['type'] not in ('message', 'pmessage'):
            # skip redis control message like redis telling us that we have successfully subscribed?
            # TODO- Daniel- please look into why we get messages with type: 'subscribe' and if we need to do anything
            if verbose:
                print('Dropping', result)
            # nothing more to do
            return False
        full_topic_name = result['channel']
        pattern = result['pattern']
        if pattern is not None:
            cbs = self.callbacks.get(pattern)
        else:
            cbs = self.callbacks.get(full_topic_name)
        if cbs is not None:
            full_topic_name_str = full_topic_name.decode('utf-8')
            _, channel, topic = full_topic_name_str.split('_', 2)
            for cb in cbs.values():
                result['data'] = self._deserialize(result['data'])
                self.io_loop.add_callback(cb, channel, topic, result)
        return True

    def poll_redis(self):
        if len(self.callbacks) == 0:
            return
        for ndispatch in range(self.max_dispatch):
            result = self.pubsub.get_message()
            if not self.handle_sub_result(result):
                break

    def _deserialize(self, msg):
        if self.is_json:
            if type(msg) is bytes:
                msg = msg.decode('utf-8')
            if msg is None:
                return None
            else:
                return json.loads(msg)
        return msg

以上是关于python 龙卷风pubsub的主要内容,如果未能解决你的问题,请参考以下文章

python龙卷风中的SSL(https)

python 龙卷风

python 龙卷风的websocket样本

python 龙卷风异步爬虫演示

python 龙卷风AsyncHTTPSTestCase示例

python 龙卷风getattr